diff --git a/AppDB/datastore_server.py b/AppDB/datastore_server.py index ed64660034..00610744d5 100644 --- a/AppDB/datastore_server.py +++ b/AppDB/datastore_server.py @@ -107,7 +107,7 @@ class DatastoreDistributed(): a distributed datastore instead of a flat file. """ # Max number of results for a query - _MAXIMUM_RESULTS = 1000000 + _MAXIMUM_RESULTS = 10000 # The number of entries looked at when doing a composite query # It will keep looking at this size window when getting the result diff --git a/AppDB/zkappscale/zktransaction.py b/AppDB/zkappscale/zktransaction.py index def4f48df8..bbf0af3389 100644 --- a/AppDB/zkappscale/zktransaction.py +++ b/AppDB/zkappscale/zktransaction.py @@ -440,8 +440,8 @@ def create_node(self, path, value): self.reestablish_connection() try: - self.run_with_retry(self.handle.create, path, str(value), ZOO_ACL_OPEN, - False, False, True) + self.handle.create(path, value=str(value), acl=ZOO_ACL_OPEN, + ephemeral=False, sequence=False, makepath=True) logging.debug("Created path {0} with value {1}".format(path, value)) except kazoo.exceptions.KazooException as kazoo_exception: logging.exception(kazoo_exception) @@ -469,16 +469,17 @@ def create_sequence_node(self, path, value): self.reestablish_connection() try: - txn_id_path = self.run_with_retry(self.handle.create, path, str(value), - ZOO_ACL_OPEN, False, True, True) + txn_id_path = self.handle.create(path, value=str(value), + acl=ZOO_ACL_OPEN, ephemeral=False, sequence=True, makepath=True) if txn_id_path: txn_id = long(txn_id_path.split(PATH_SEPARATOR)[-1].lstrip( APP_TX_PREFIX)) if txn_id == 0: logging.warning("Created sequence ID 0 - deleting it.") self.run_with_retry(self.handle.delete_async, txn_id_path) - txn_id_path = self.run_with_retry(self.handle.create, path, - str(value), ZOO_ACL_OPEN, False, True, True) + txn_id_path = self.handle.create(path, + value=str(value), acl=ZOO_ACL_OPEN, ephemeral=False, + sequence=True, makepath=True) return long(txn_id_path.split(PATH_SEPARATOR)[-1].lstrip( APP_TX_PREFIX)) else: @@ -636,9 +637,10 @@ def acquire_additional_lock(self, app_id, txid, entity_key, create): try: logging.debug("Trying to create path {0} with value {1}".format( - lockrootpath, txpath)) - lockpath = self.run_with_retry(self.handle.create, lockrootpath, - str(txpath), ZOO_ACL_OPEN, False, False, True) + lockrootpath, txpath)) + lockpath = self.handle.create(lockrootpath, + value=str(txpath), acl=ZOO_ACL_OPEN, ephemeral=False, + sequence=False, makepath=True) except kazoo.exceptions.NodeExistsError: # fail to get lock try: @@ -665,24 +667,27 @@ def acquire_additional_lock(self, app_id, txid, entity_key, create): try: if create: - self.run_with_retry(self.handle.create_async, transaction_lock_path, - str(lockpath), ZOO_ACL_OPEN, False, False) + self.handle.create_async(transaction_lock_path, + value=str(lockpath), acl=ZOO_ACL_OPEN, ephemeral=False, + makepath=False) logging.debug("Created lock list path {0} with value {1}".format( transaction_lock_path, lockpath)) else: tx_lockpath = self.run_with_retry(self.handle.get, transaction_lock_path)[0] lock_list = tx_lockpath.split(LOCK_LIST_SEPARATOR) - if len(lock_list) >= MAX_GROUPS_FOR_XG: - raise ZKTransactionException("acquire_additional_lock: Too many " \ - "groups for this XG transaction.") - lock_list.append(lockpath) lock_list_str = LOCK_LIST_SEPARATOR.join(lock_list) self.run_with_retry(self.handle.set_async, transaction_lock_path, str(lock_list_str)) logging.debug("Set lock list path {0} to value {1}".format( transaction_lock_path, lock_list_str)) + # We do this check last, otherwise we may have left over locks to + # to a lack of a lock path reference. + if len(lock_list) >= MAX_GROUPS_FOR_XG: + raise ZKTransactionException("acquire_additional_lock: Too many " \ + "groups for this XG transaction.") + except kazoo.exceptions.KazooException as kazoo_exception: logging.exception(kazoo_exception) self.reestablish_connection() @@ -841,7 +846,8 @@ def release_lock(self, app_id, txid): transaction_lock_path)[0] lock_list = lock_list_str.split(LOCK_LIST_SEPARATOR) for lock_path in lock_list: - self.run_with_retry(self.handle.delete_async, lock_path) + logging.debug("Lock released: {0}".format(lock_path)) + self.run_with_retry(self.handle.delete, lock_path) self.run_with_retry(self.handle.delete, transaction_lock_path) except kazoo.exceptions.NoNodeError: try: @@ -866,14 +872,17 @@ def release_lock(self, app_id, txid): try: if self.is_xg(app_id, txid): xg_path = self.get_xg_path(app_id, txid) - self.run_with_retry(self.handle.delete_async, xg_path) + self.run_with_retry(self.handle.delete, xg_path) for child in self.run_with_retry(self.handle.get_children, txpath): - self.run_with_retry(self.handle.delete_async, PATH_SEPARATOR.join( + logging.debug("Removing lock: {0}".format(PATH_SEPARATOR.join( + [txpath, child]))) + self.run_with_retry(self.handle.delete, PATH_SEPARATOR.join( [txpath, child])) # This deletes the transaction root path. - self.run_with_retry(self.handle.delete_async, txpath) + self.run_with_retry(self.handle.delete, txpath) + except ZKInternalException as zk_exception: # Although there was a failure doing the async deletes, since we've # already released the locks above, we can safely return True here. @@ -908,8 +917,8 @@ def is_blacklisted(self, app_id, txid): blacklist_root = self.get_blacklist_root_path(app_id) try: if not self.run_with_retry(self.handle.exists, blacklist_root): - self.run_with_retry(self.handle.create, blacklist_root, DEFAULT_VAL, - ZOO_ACL_OPEN, False, False, True) + self.handle.create(blacklist_root, value=DEFAULT_VAL, + acl=ZOO_ACL_OPEN, ephemeral=False, sequence=False, makepath=True) except kazoo.exceptions.KazooException as kazoo_exception: logging.exception(kazoo_exception) self.reestablish_connection() @@ -992,9 +1001,9 @@ def register_updated_key(self, app_id, current_txid, target_txid, entity_key): txpath = self.get_transaction_path(app_id, current_txid) if self.run_with_retry(self.handle.exists, txpath): - self.run_with_retry(self.handle.create_async, - PATH_SEPARATOR.join([txpath, - TX_UPDATEDKEY_PREFIX]), str(value), ZOO_ACL_OPEN, False, True) + self.handle.create_async(PATH_SEPARATOR.join([txpath, + TX_UPDATEDKEY_PREFIX]), value=str(value), acl=ZOO_ACL_OPEN, + ephemeral=False, makepath=True) else: raise ZKTransactionException("Transaction {0} is not valid.".format( current_txid)) @@ -1049,11 +1058,11 @@ def notify_failed_transaction(self, app_id, txid): blacklist_root = self.get_blacklist_root_path(app_id) if not self.run_with_retry(self.handle.exists, blacklist_root): - self.run_with_retry(self.handle.create, blacklist_root, DEFAULT_VAL, - ZOO_ACL_OPEN, False, False, True) + self.handle.create(blacklist_root, value=DEFAULT_VAL, + acl=ZOO_ACL_OPEN, ephemeral=False, sequence=False, makepath=True) - self.run_with_retry(self.handle.create_async, - PATH_SEPARATOR.join([blacklist_root, str(txid)]), now, ZOO_ACL_OPEN) + self.handle.create_async(PATH_SEPARATOR.join([blacklist_root, str(txid)]), + value=now, acl=ZOO_ACL_OPEN) children = [] try: @@ -1080,17 +1089,29 @@ def notify_failed_transaction(self, app_id, txid): # Release the locks. for lock in lock_list: - self.run_with_retry(self.handle.delete_async, lock) + try: + self.run_with_retry(self.handle.delete, lock) + except kazoo.exceptions.NoNodeError: + # Try to delete all nodes, so skip any failure to release a lock. + pass if self.is_xg(app_id, txid): - self.run_with_retry(self.handle.delete_async, self.get_xg_path(app_id, - txid)) - + try: + self.run_with_retry(self.handle.delete, self.get_xg_path(app_id, + txid)) + except kazoo.exceptions.NoNodeError: + logging.error("No node error when trying to remove {0}".format(txid)) + # Remove the transaction paths. for item in self.run_with_retry(self.handle.get_children, txpath): - self.run_with_retry(self.handle.delete_async, - PATH_SEPARATOR.join([txpath, item])) - self.run_with_retry(self.handle.delete_async, txpath) + try: + self.run_with_retry(self.handle.delete, + PATH_SEPARATOR.join([txpath, item])) + except kazoo.exceptions.NoNodeError: + logging.error("No node error when trying to remove {0}".format(txid)) + + logging.debug("Removing lock: {0}".format(txpath)) + self.run_with_retry(self.handle.delete_async, txpath) except ZKInternalException as zk_exception: logging.exception(zk_exception) @@ -1150,7 +1171,7 @@ def gc_runner(self): Note: This must be running as separate thread. """ - logging.info("Starting GC thread.") + logging.debug("Starting GC thread.") while self.gc_running: # Scan each application's last GC time. @@ -1181,7 +1202,7 @@ def gc_runner(self): with self.gc_cv: self.gc_cv.wait(GC_INTERVAL) - logging.info("Stopping GC thread.") + logging.debug("Stopping GC thread.") def try_garbage_collection(self, app_id, app_path): """ Try to garbage collect timed out transactions. @@ -1214,8 +1235,9 @@ def try_garbage_collection(self, app_id, app_path): gc_path = PATH_SEPARATOR.join([app_path, GC_LOCK_PATH]) try: now = str(time.time()) - self.run_with_retry(self.handle.create, gc_path, now, ZOO_ACL_OPEN, - True) + # Get the global GC lock. + self.handle.create(gc_path, value=now, acl=ZOO_ACL_OPEN, + ephemeral=True) try: self.execute_garbage_collection(app_id, app_path) # Update the last time when the GC was successful. @@ -1223,7 +1245,8 @@ def try_garbage_collection(self, app_id, app_path): self.update_node(PATH_SEPARATOR.join([app_path, GC_TIME_PATH]), now) except Exception as exception: logging.exception(exception) - self.run_with_retry(self.handle.delete, gc_path) + # Release the lock. + self.run_with_retry(self.handle.delete, gc_path) except kazoo.exceptions.NodeExistsError: # Failed to obtain the GC lock. Try again later. pass @@ -1246,8 +1269,8 @@ def get_datastore_groomer_lock(self): """ try: now = str(time.time()) - self.run_with_retry(self.handle.create, DS_GROOM_LOCK_PATH, now, - ZOO_ACL_OPEN, True) + self.handle.create(DS_GROOM_LOCK_PATH, value=now, + acl=ZOO_ACL_OPEN, ephemeral=True) except kazoo.exceptions.NoNodeError: logging.debug("Couldn't create path {0}".format(DS_GROOM_LOCK_PATH)) return False @@ -1358,4 +1381,4 @@ def execute_garbage_collection(self, app_id, app_path): logging.exception(kazoo_exception) self.reestablish_connection() return - logging.info("Lock GC took {0} seconds.".format(str(time.time() - start))) + logging.debug("Lock GC took {0} seconds.".format(str(time.time() - start))) diff --git a/AppDB/zkappscale/zktransaction_stub.py b/AppDB/zkappscale/zktransaction_stub.py index 42361d9fed..37bcab008d 100644 --- a/AppDB/zkappscale/zktransaction_stub.py +++ b/AppDB/zkappscale/zktransaction_stub.py @@ -168,3 +168,10 @@ def get_datastore_groomer_lock(self): """ return True + def release_datastore_groomer_lock(self): + """ Stub for releasing the datastore groomer lock. + + Returns: + Always returns True. + """ + return True