Skip to content

Commit

Permalink
Merge pull request #2684 from cdonati/persistent-group-locks
Browse files Browse the repository at this point in the history
Allow group locks to persist when commits fail
  • Loading branch information
tmarballi authored Mar 5, 2018
2 parents 493606e + 919aa8e commit 68e1584
Show file tree
Hide file tree
Showing 13 changed files with 289 additions and 160 deletions.
7 changes: 3 additions & 4 deletions AppDB/appscale/datastore/cassandra_env/cassandra_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from cassandra.query import ConsistencyLevel
from cassandra.query import SimpleStatement
from cassandra.query import ValueSequence
from .constants import CURRENT_VERSION
from .large_batch import (FailedBatch,
LargeBatch)
from .retry_policies import (BASIC_RETRIES,
Expand Down Expand Up @@ -53,9 +54,6 @@
# The number of times to retry connecting to Cassandra.
INITIAL_CONNECT_RETRIES = 20

# The data layout version that the datastore expects.
EXPECTED_DATA_VERSION = 1.0

# The metadata key for the data layout version.
VERSION_INFO_KEY = 'version'

Expand Down Expand Up @@ -96,6 +94,7 @@ class ThriftColumn(object):
class IndexStates(object):
""" Possible states for datastore indexes. """
CLEAN = 'clean'
DIRTY = 'dirty'
SCRUB_IN_PROGRESS = 'scrub_in_progress'


Expand Down Expand Up @@ -780,7 +779,7 @@ def valid_data_version(self):
except cassandra.InvalidRequest:
return False

return version is not None and float(version) == EXPECTED_DATA_VERSION
return version is not None and float(version) == CURRENT_VERSION

def group_updates(self, groups):
""" Fetch the latest transaction IDs for each group.
Expand Down
4 changes: 4 additions & 0 deletions AppDB/appscale/datastore/cassandra_env/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
""" Datastore constants specific to the Cassandra implementation. """

# The current data layout version.
CURRENT_VERSION = 2.0
53 changes: 46 additions & 7 deletions AppDB/appscale/datastore/cassandra_env/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
from .cassandra_interface import INITIAL_CONNECT_RETRIES
from .cassandra_interface import KEYSPACE
from .cassandra_interface import ThriftColumn
from .constants import CURRENT_VERSION
from .. import dbconstants

# The data layout version to set after removing the journal table.
POST_JOURNAL_VERSION = 1.0

# A policy that does not retry statements.
NO_RETRIES = FallthroughRetryPolicy()

Expand Down Expand Up @@ -110,6 +108,7 @@ def create_batch_tables(cluster, session):
time.sleep(SCHEMA_CHANGE_TIMEOUT)
raise


def create_groups_table(session):
create_table = """
CREATE TABLE IF NOT EXISTS group_updates (
Expand Down Expand Up @@ -180,6 +179,32 @@ def create_entity_ids_table(session):
raise


def current_datastore_version(session):
""" Retrieves the existing datastore version value.
Args:
session: A cassandra-driver session.
Returns:
A float specifying the existing datastore version or None.
"""
key = cassandra_interface.VERSION_INFO_KEY
statement = """
SELECT {value} FROM "{table}"
WHERE {key} = %s
AND {column} = %s
""".format(
value=ThriftColumn.VALUE,
table=dbconstants.DATASTORE_METADATA_TABLE,
key=ThriftColumn.KEY,
column=ThriftColumn.COLUMN_NAME
)
results = session.execute(statement, (bytearray(key), key))
try:
return float(results[0].value)
except IndexError:
return None


def prime_cassandra(replication):
""" Create Cassandra keyspace and initial tables.
Expand Down Expand Up @@ -268,10 +293,23 @@ def prime_cassandra(replication):
value=ThriftColumn.VALUE
)

if not existing_entities:
if existing_entities:
current_version = current_datastore_version(session)
if current_version == 1.0:
# Instruct the groomer to reclean the indexes.
parameters = {'key': bytearray(cassandra_interface.INDEX_STATE_KEY),
'column': cassandra_interface.INDEX_STATE_KEY,
'value': bytearray(str(IndexStates.DIRTY))}
session.execute(metadata_insert, parameters)

parameters = {'key': bytearray(cassandra_interface.VERSION_INFO_KEY),
'column': cassandra_interface.VERSION_INFO_KEY,
'value': bytearray(str(CURRENT_VERSION))}
session.execute(metadata_insert, parameters)
else:
parameters = {'key': bytearray(cassandra_interface.VERSION_INFO_KEY),
'column': cassandra_interface.VERSION_INFO_KEY,
'value': bytearray(str(POST_JOURNAL_VERSION))}
'value': bytearray(str(CURRENT_VERSION))}
session.execute(metadata_insert, parameters)

# Mark the newly created indexes as clean.
Expand All @@ -283,7 +321,7 @@ def prime_cassandra(replication):
# Indicate that the database has been successfully primed.
parameters = {'key': bytearray(cassandra_interface.PRIMED_KEY),
'column': cassandra_interface.PRIMED_KEY,
'value': bytearray('true')}
'value': bytearray(str(CURRENT_VERSION))}
session.execute(metadata_insert, parameters)
logging.info('Cassandra is primed.')

Expand All @@ -300,6 +338,7 @@ def primed():
return False

try:
return db_access.get_metadata(cassandra_interface.PRIMED_KEY) == 'true'
primed_version = db_access.get_metadata(cassandra_interface.PRIMED_KEY)
return primed_version == str(CURRENT_VERSION)
finally:
db_access.close()
Loading

0 comments on commit 68e1584

Please sign in to comment.