Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set max limit for queries to 10k #857

Merged
merged 2 commits into from
Jan 21, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AppDB/datastore_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class DatastoreDistributed():
a distributed datastore instead of a flat file.
"""
# Max number of results for a query
_MAXIMUM_RESULTS = 1000000
_MAXIMUM_RESULTS = 10000

# The number of entries looked at when doing a composite query
# It will keep looking at this size window when getting the result
Expand Down
109 changes: 66 additions & 43 deletions AppDB/zkappscale/zktransaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,8 @@ def create_node(self, path, value):
self.reestablish_connection()

try:
self.run_with_retry(self.handle.create, path, str(value), ZOO_ACL_OPEN,
False, False, True)
self.handle.create(path, value=str(value), acl=ZOO_ACL_OPEN,
ephemeral=False, sequence=False, makepath=True)
logging.debug("Created path {0} with value {1}".format(path, value))
except kazoo.exceptions.KazooException as kazoo_exception:
logging.exception(kazoo_exception)
Expand Down Expand Up @@ -469,16 +469,17 @@ def create_sequence_node(self, path, value):
self.reestablish_connection()

try:
txn_id_path = self.run_with_retry(self.handle.create, path, str(value),
ZOO_ACL_OPEN, False, True, True)
txn_id_path = self.handle.create(path, value=str(value),
acl=ZOO_ACL_OPEN, ephemeral=False, sequence=True, makepath=True)
if txn_id_path:
txn_id = long(txn_id_path.split(PATH_SEPARATOR)[-1].lstrip(
APP_TX_PREFIX))
if txn_id == 0:
logging.warning("Created sequence ID 0 - deleting it.")
self.run_with_retry(self.handle.delete_async, txn_id_path)
txn_id_path = self.run_with_retry(self.handle.create, path,
str(value), ZOO_ACL_OPEN, False, True, True)
txn_id_path = self.handle.create(path,
value=str(value), acl=ZOO_ACL_OPEN, ephemeral=False,
sequence=True, makepath=True)
return long(txn_id_path.split(PATH_SEPARATOR)[-1].lstrip(
APP_TX_PREFIX))
else:
Expand Down Expand Up @@ -636,9 +637,10 @@ def acquire_additional_lock(self, app_id, txid, entity_key, create):

try:
logging.debug("Trying to create path {0} with value {1}".format(
lockrootpath, txpath))
lockpath = self.run_with_retry(self.handle.create, lockrootpath,
str(txpath), ZOO_ACL_OPEN, False, False, True)
lockrootpath, txpath))
lockpath = self.handle.create(lockrootpath,
value=str(txpath), acl=ZOO_ACL_OPEN, ephemeral=False,
sequence=False, makepath=True)
except kazoo.exceptions.NodeExistsError:
# fail to get lock
try:
Expand All @@ -665,24 +667,27 @@ def acquire_additional_lock(self, app_id, txid, entity_key, create):

try:
if create:
self.run_with_retry(self.handle.create_async, transaction_lock_path,
str(lockpath), ZOO_ACL_OPEN, False, False)
self.handle.create_async(transaction_lock_path,
value=str(lockpath), acl=ZOO_ACL_OPEN, ephemeral=False,
makepath=False)
logging.debug("Created lock list path {0} with value {1}".format(
transaction_lock_path, lockpath))
else:
tx_lockpath = self.run_with_retry(self.handle.get,
transaction_lock_path)[0]
lock_list = tx_lockpath.split(LOCK_LIST_SEPARATOR)
if len(lock_list) >= MAX_GROUPS_FOR_XG:
raise ZKTransactionException("acquire_additional_lock: Too many " \
"groups for this XG transaction.")

lock_list.append(lockpath)
lock_list_str = LOCK_LIST_SEPARATOR.join(lock_list)
self.run_with_retry(self.handle.set_async, transaction_lock_path,
str(lock_list_str))
logging.debug("Set lock list path {0} to value {1}".format(
transaction_lock_path, lock_list_str))
# We do this check last, otherwise we may have left over locks to
# to a lack of a lock path reference.
if len(lock_list) >= MAX_GROUPS_FOR_XG:
raise ZKTransactionException("acquire_additional_lock: Too many " \
"groups for this XG transaction.")

except kazoo.exceptions.KazooException as kazoo_exception:
logging.exception(kazoo_exception)
self.reestablish_connection()
Expand Down Expand Up @@ -841,7 +846,8 @@ def release_lock(self, app_id, txid):
transaction_lock_path)[0]
lock_list = lock_list_str.split(LOCK_LIST_SEPARATOR)
for lock_path in lock_list:
self.run_with_retry(self.handle.delete_async, lock_path)
logging.debug("Lock released: {0}".format(lock_path))
self.run_with_retry(self.handle.delete, lock_path)
self.run_with_retry(self.handle.delete, transaction_lock_path)
except kazoo.exceptions.NoNodeError:
try:
Expand All @@ -866,14 +872,17 @@ def release_lock(self, app_id, txid):
try:
if self.is_xg(app_id, txid):
xg_path = self.get_xg_path(app_id, txid)
self.run_with_retry(self.handle.delete_async, xg_path)
self.run_with_retry(self.handle.delete, xg_path)

for child in self.run_with_retry(self.handle.get_children, txpath):
self.run_with_retry(self.handle.delete_async, PATH_SEPARATOR.join(
logging.debug("Removing lock: {0}".format(PATH_SEPARATOR.join(
[txpath, child])))
self.run_with_retry(self.handle.delete, PATH_SEPARATOR.join(
[txpath, child]))

# This deletes the transaction root path.
self.run_with_retry(self.handle.delete_async, txpath)
self.run_with_retry(self.handle.delete, txpath)

except ZKInternalException as zk_exception:
# Although there was a failure doing the async deletes, since we've
# already released the locks above, we can safely return True here.
Expand Down Expand Up @@ -908,8 +917,8 @@ def is_blacklisted(self, app_id, txid):
blacklist_root = self.get_blacklist_root_path(app_id)
try:
if not self.run_with_retry(self.handle.exists, blacklist_root):
self.run_with_retry(self.handle.create, blacklist_root, DEFAULT_VAL,
ZOO_ACL_OPEN, False, False, True)
self.handle.create(blacklist_root, value=DEFAULT_VAL,
acl=ZOO_ACL_OPEN, ephemeral=False, sequence=False, makepath=True)
except kazoo.exceptions.KazooException as kazoo_exception:
logging.exception(kazoo_exception)
self.reestablish_connection()
Expand Down Expand Up @@ -992,9 +1001,9 @@ def register_updated_key(self, app_id, current_txid, target_txid, entity_key):
txpath = self.get_transaction_path(app_id, current_txid)

if self.run_with_retry(self.handle.exists, txpath):
self.run_with_retry(self.handle.create_async,
PATH_SEPARATOR.join([txpath,
TX_UPDATEDKEY_PREFIX]), str(value), ZOO_ACL_OPEN, False, True)
self.handle.create_async(PATH_SEPARATOR.join([txpath,
TX_UPDATEDKEY_PREFIX]), value=str(value), acl=ZOO_ACL_OPEN,
ephemeral=False, makepath=True)
else:
raise ZKTransactionException("Transaction {0} is not valid.".format(
current_txid))
Expand Down Expand Up @@ -1049,11 +1058,11 @@ def notify_failed_transaction(self, app_id, txid):
blacklist_root = self.get_blacklist_root_path(app_id)

if not self.run_with_retry(self.handle.exists, blacklist_root):
self.run_with_retry(self.handle.create, blacklist_root, DEFAULT_VAL,
ZOO_ACL_OPEN, False, False, True)
self.handle.create(blacklist_root, value=DEFAULT_VAL,
acl=ZOO_ACL_OPEN, ephemeral=False, sequence=False, makepath=True)

self.run_with_retry(self.handle.create_async,
PATH_SEPARATOR.join([blacklist_root, str(txid)]), now, ZOO_ACL_OPEN)
self.handle.create_async(PATH_SEPARATOR.join([blacklist_root, str(txid)]),
value=now, acl=ZOO_ACL_OPEN)

children = []
try:
Expand All @@ -1080,17 +1089,29 @@ def notify_failed_transaction(self, app_id, txid):

# Release the locks.
for lock in lock_list:
self.run_with_retry(self.handle.delete_async, lock)
try:
self.run_with_retry(self.handle.delete, lock)
except kazoo.exceptions.NoNodeError:
# Try to delete all nodes, so skip any failure to release a lock.
pass

if self.is_xg(app_id, txid):
self.run_with_retry(self.handle.delete_async, self.get_xg_path(app_id,
txid))

try:
self.run_with_retry(self.handle.delete, self.get_xg_path(app_id,
txid))
except kazoo.exceptions.NoNodeError:
logging.error("No node error when trying to remove {0}".format(txid))

# Remove the transaction paths.
for item in self.run_with_retry(self.handle.get_children, txpath):
self.run_with_retry(self.handle.delete_async,
PATH_SEPARATOR.join([txpath, item]))
self.run_with_retry(self.handle.delete_async, txpath)
try:
self.run_with_retry(self.handle.delete,
PATH_SEPARATOR.join([txpath, item]))
except kazoo.exceptions.NoNodeError:
logging.error("No node error when trying to remove {0}".format(txid))

logging.debug("Removing lock: {0}".format(txpath))
self.run_with_retry(self.handle.delete_async, txpath)

except ZKInternalException as zk_exception:
logging.exception(zk_exception)
Expand Down Expand Up @@ -1150,7 +1171,7 @@ def gc_runner(self):

Note: This must be running as separate thread.
"""
logging.info("Starting GC thread.")
logging.debug("Starting GC thread.")

while self.gc_running:
# Scan each application's last GC time.
Expand Down Expand Up @@ -1181,7 +1202,7 @@ def gc_runner(self):
with self.gc_cv:
self.gc_cv.wait(GC_INTERVAL)

logging.info("Stopping GC thread.")
logging.debug("Stopping GC thread.")

def try_garbage_collection(self, app_id, app_path):
""" Try to garbage collect timed out transactions.
Expand Down Expand Up @@ -1214,16 +1235,18 @@ def try_garbage_collection(self, app_id, app_path):
gc_path = PATH_SEPARATOR.join([app_path, GC_LOCK_PATH])
try:
now = str(time.time())
self.run_with_retry(self.handle.create, gc_path, now, ZOO_ACL_OPEN,
True)
# Get the global GC lock.
self.handle.create(gc_path, value=now, acl=ZOO_ACL_OPEN,
ephemeral=True)
try:
self.execute_garbage_collection(app_id, app_path)
# Update the last time when the GC was successful.
now = str(time.time())
self.update_node(PATH_SEPARATOR.join([app_path, GC_TIME_PATH]), now)
except Exception as exception:
logging.exception(exception)
self.run_with_retry(self.handle.delete, gc_path)
# Release the lock.
self.run_with_retry(self.handle.delete, gc_path)
except kazoo.exceptions.NodeExistsError:
# Failed to obtain the GC lock. Try again later.
pass
Expand All @@ -1246,8 +1269,8 @@ def get_datastore_groomer_lock(self):
"""
try:
now = str(time.time())
self.run_with_retry(self.handle.create, DS_GROOM_LOCK_PATH, now,
ZOO_ACL_OPEN, True)
self.handle.create(DS_GROOM_LOCK_PATH, value=now,
acl=ZOO_ACL_OPEN, ephemeral=True)
except kazoo.exceptions.NoNodeError:
logging.debug("Couldn't create path {0}".format(DS_GROOM_LOCK_PATH))
return False
Expand Down Expand Up @@ -1358,4 +1381,4 @@ def execute_garbage_collection(self, app_id, app_path):
logging.exception(kazoo_exception)
self.reestablish_connection()
return
logging.info("Lock GC took {0} seconds.".format(str(time.time() - start)))
logging.debug("Lock GC took {0} seconds.".format(str(time.time() - start)))
7 changes: 7 additions & 0 deletions AppDB/zkappscale/zktransaction_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,10 @@ def get_datastore_groomer_lock(self):
"""
return True

def release_datastore_groomer_lock(self):
""" Stub for releasing the datastore groomer lock.

Returns:
Always returns True.
"""
return True