# -*- coding: utf-8 -*-
"""
JSON Object bucket module
"""
import json
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode
from .service import Service
[docs]class ObjectBucket(object):
"""
JSON Object Storage Bucket.
Args:
service (Service): Service
bucket_name (str): Bucket name
Attributes:
service (Service): Service
bucket_name (str): Bucket name
"""
_MAX_QUERY_SIZE = 1500
# type: int
def __init__(self, service, bucket_name):
# type: (Service, str) -> None
self.service = service
self.bucket_name = bucket_name
[docs] def query(self, where=None, order=None, skip=0, limit=None, projection=None, delete_mark=False):
# type: (dict, str, int, int, dict, bool) -> list
"""
Query objects in this bucket.
Examples:
::
results = bucket.query(where={"product_name": "orange"}, order="-updatedAt", limit=100)
Args:
where (dict): Query conditions (JSON) (optional)
order (str): Sort conditions (optional)
skip (int): Skip count (optional, default=0)
limit (int): Limit count (optional)
projection (dict): Projection (JSON) (optional)
delete_mark (bool): Include soft deleted data (optional, default=False)
Returns:
list: List of JSON objects
"""
res = self._query(where=where, order=order, skip=skip, limit=limit, projection=projection,
delete_mark=delete_mark)
return res["results"]
[docs] def query_with_count(self, where=None, order=None, skip=0, limit=None, projection=None, delete_mark=False):
# type: (dict, str, int, int, dict, bool) -> (list, int)
"""
Query objects in this bucket (with count query).
Examples:
::
(results, count) = bucket.query_with_count(where={"product_name": "orange"}, order="-updatedAt", limit=100)
Args:
where (dict): Query conditions (JSON) (optional)
order (str): Sort conditions (optional)
skip (int): Skip count (optional, default=0)
limit (int): Limit count (optional)
projection (dict): Projection (JSON) (optional)
delete_mark (bool): Include soft deleted data (optional, default=False)
Returns:
(list, count): Tuple of list of JSON objects and total count of query.
"""
res = self._query(where=where, order=order, skip=skip, limit=limit, projection=projection,
delete_mark=delete_mark, count=True)
return res["results"], res["count"]
def _query(self, where=None, order=None, skip=0, limit=None, projection=None, delete_mark=False, count=False):
"""
Query objects (internal).
Args:
where (dict): Query conditions (JSON) (optional)
order (str): Sort conditions (optional)
skip (int): Skip count (optional, default=0)
limit (int): Limit count (optional)
projection (dict): Projection (JSON) (optional)
delete_mark (bool): Include delete marked data (optional, default=False)
count (bool): Get total count (optional, default=False)
Returns:
dict: Response in JSON
"""
query_params = {}
if where is not None:
query_params["where"] = json.dumps(where)
if order is not None:
query_params["order"] = order
if skip > 0:
query_params["skip"] = skip
if limit is not None:
query_params["limit"] = limit
if projection is not None:
query_params["projection"] = json.dumps(projection)
if delete_mark:
query_params["deleteMark"] = 1
if count:
query_params["count"] = 1
query_string = urlencode(query_params)
if len(query_string) < ObjectBucket._MAX_QUERY_SIZE:
r = self.service.execute_rest("GET", "/objects/{}".format(self.bucket_name), query=query_params)
else:
r = self.service.execute_rest("POST", "/objects/{}/_query".format(self.bucket_name), json=query_params)
return r.json()
[docs] def insert(self, data):
# type: (dict) -> dict
"""
Insert JSON Object
Examples:
::
acl = {"r": ["g:authenticated"], "w": ["g:authenticated"]}
result = bucket.insert({"name": "foo", "score": 70, "ACL": acl})
Args:
data (dict): Data (JSON)
Returns:
dict: Response JSON
"""
r = self.service.execute_rest("POST", "/objects/{}".format(self.bucket_name), json=data)
res = r.json()
return res
[docs] def update(self, oid, data, etag=None):
# type: (str, dict, str) -> dict
"""
Update JSON Object
Args:
oid (str): ID of Object
data (dict): Data (JSON)
etag (str): ETag (optional)
Returns:
dict: Response JSON
"""
query_params = {}
if etag is not None:
query_params["etag"] = etag
r = self.service.execute_rest("PUT", "/objects/{}/{}".format(self.bucket_name, oid),
query=query_params, json=data)
res = r.json()
return res
[docs] def remove(self, oid, soft_delete=False):
# type: (str, bool) -> dict
"""
Remove one JSON Object
Args:
oid (str): Object ID
soft_delete (bool): Soft delete (optional, default=False)
Returns:
dict: Response JSON
"""
if not oid: # fail-safe: check not None nor empty to avoid remove all objects.
raise ValueError("No oid")
r = self.service.execute_rest("DELETE", "/objects/{}/{}".format(self.bucket_name, oid),
query={"deleteMark": 1 if soft_delete else 0})
res = r.json()
return res
[docs] def remove_with_query(self, where=None, soft_delete=False):
# type: (dict, bool) -> dict
"""
Remove multiple JSON Objects
Args:
where (dict): Query condition (optional)
soft_delete (bool): Soft delete (optional, default=False)
Returns:
dict: Response JSON
"""
if where is None:
where = {}
query_params = {
"where": json.dumps(where),
"deleteMark": 1 if soft_delete else 0
}
r = self.service.execute_rest("DELETE", "/objects/{}".format(self.bucket_name), query=query_params)
res = r.json()
return res
[docs] def batch(self, requests, soft_delete=False):
# type: (list, bool) -> list
"""
Batch operation
Examples:
::
requests = [
{"op": "insert", "data": {"name": "foo", "score": 70}},
{"op": "insert", "data": {"name": "bar", "score": 80}}
]
results = bucket.batch(requests)
Args:
requests (list): List of batch requests
soft_delete (bool): Soft delete (optional, default=False)
Returns:
list: List of batch results
"""
query = {"deleteMark": 1 if soft_delete else 0}
body_json = {
"requests": requests
}
r = self.service.execute_rest("POST", "/objects/{}/_batch".format(self.bucket_name),
json=body_json, query=query)
res = r.json()
return res["results"]
[docs] def aggregate(self, pipeline, options=None):
# type: (list, dict) -> list
"""
Aggregation operation
Examples:
::
pipeline = [
{"$lookup": { ... }},
...
]
options = {"allowDiskUse": True}
results = bucket.aggregate(pipeline, options)
Args:
pipeline (list): List of aggregation pipeline stage
options (dist): Aggregation option parameter (optional)
Returns:
list: List of aggregation results
"""
body = {"pipeline": pipeline}
if options is not None:
body["options"] = options
r = self.service.execute_rest("POST", "/objects/{}/_aggregate".format(self.bucket_name), json=body)
res = r.json()
return res["results"]