-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implementation of client side statements that return #1046
Changes from 2 commits
3f3b8cb
8b63b9c
2361dfb
72f6221
27594e5
9108192
f688d54
03f42a2
e603dc3
500b513
00df2ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,10 +15,24 @@ | |
|
||
if TYPE_CHECKING: | ||
from google.cloud.spanner_dbapi import Connection | ||
from google.cloud.spanner_dbapi import ProgrammingError | ||
|
||
from google.cloud.spanner_dbapi.parsed_statement import ( | ||
ParsedStatement, | ||
ClientSideStatementType, | ||
) | ||
from google.cloud.spanner_v1 import ( | ||
Type, | ||
StructType, | ||
TypeCode, | ||
ResultSetMetadata, | ||
PartialResultSet, | ||
) | ||
|
||
from google.cloud.spanner_v1._helpers import _make_value_pb | ||
from google.cloud.spanner_v1.streamed import StreamedResultSet | ||
|
||
CONNECTION_CLOSED_ERROR = "This connection is closed" | ||
|
||
|
||
def execute(connection: "Connection", parsed_statement: ParsedStatement): | ||
|
@@ -33,8 +47,43 @@ def execute(connection: "Connection", parsed_statement: ParsedStatement): | |
:param parsed_statement: parsed_statement based on the sql query | ||
""" | ||
if parsed_statement.client_side_statement_type == ClientSideStatementType.COMMIT: | ||
return connection.commit() | ||
connection.commit() | ||
return None | ||
if parsed_statement.client_side_statement_type == ClientSideStatementType.BEGIN: | ||
return connection.begin() | ||
connection.begin() | ||
return None | ||
if parsed_statement.client_side_statement_type == ClientSideStatementType.ROLLBACK: | ||
return connection.rollback() | ||
connection.rollback() | ||
return None | ||
if ( | ||
parsed_statement.client_side_statement_type | ||
== ClientSideStatementType.SHOW_COMMIT_TIMESTAMP | ||
): | ||
if connection.is_closed: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe move this to the start of this function. I think that we want to do this for any type of statement. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
raise ProgrammingError(CONNECTION_CLOSED_ERROR) | ||
return _get_streamed_result_set( | ||
ClientSideStatementType.SHOW_COMMIT_TIMESTAMP.name, | ||
TypeCode.TIMESTAMP, | ||
connection._transaction.committed, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. made the change to return an empty result set for both cases |
||
) | ||
if ( | ||
parsed_statement.client_side_statement_type | ||
== ClientSideStatementType.SHOW_READ_TIMESTAMP | ||
): | ||
if connection.is_closed: | ||
raise ProgrammingError(CONNECTION_CLOSED_ERROR) | ||
return _get_streamed_result_set( | ||
ClientSideStatementType.SHOW_READ_TIMESTAMP.name, | ||
TypeCode.TIMESTAMP, | ||
connection._snapshot._transaction_read_timestamp, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here as above? What happens is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same response as above |
||
) | ||
|
||
|
||
def _get_streamed_result_set(column_name, type_code, column_value): | ||
struct_type_pb = StructType( | ||
fields=[StructType.Field(name=column_name, type_=Type(code=type_code))] | ||
) | ||
|
||
result_set = PartialResultSet(metadata=ResultSetMetadata(row_type=struct_type_pb)) | ||
result_set.values.extend([_make_value_pb(column_value)]) | ||
return StreamedResultSet(iter([result_set])) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,12 @@ | |
RE_BEGIN = re.compile(r"^\s*(BEGIN|START)(TRANSACTION)?", re.IGNORECASE) | ||
RE_COMMIT = re.compile(r"^\s*(COMMIT)(TRANSACTION)?", re.IGNORECASE) | ||
RE_ROLLBACK = re.compile(r"^\s*(ROLLBACK)(TRANSACTION)?", re.IGNORECASE) | ||
RE_SHOW_COMMIT_TIMESTAMP = re.compile( | ||
r"^\s*(SHOW VARIABLE COMMIT_TIMESTAMP)", re.IGNORECASE | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this regex accept There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hopefully not because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed regex for spacing |
||
) | ||
RE_SHOW_READ_TIMESTAMP = re.compile( | ||
r"^\s*(SHOW VARIABLE READ_TIMESTAMP)", re.IGNORECASE | ||
) | ||
|
||
|
||
def parse_stmt(query): | ||
|
@@ -37,16 +43,19 @@ def parse_stmt(query): | |
:rtype: ParsedStatement | ||
:returns: ParsedStatement object. | ||
""" | ||
client_side_statement_type = None | ||
if RE_COMMIT.match(query): | ||
return ParsedStatement( | ||
StatementType.CLIENT_SIDE, query, ClientSideStatementType.COMMIT | ||
) | ||
client_side_statement_type = ClientSideStatementType.COMMIT | ||
if RE_BEGIN.match(query): | ||
return ParsedStatement( | ||
StatementType.CLIENT_SIDE, query, ClientSideStatementType.BEGIN | ||
) | ||
client_side_statement_type = ClientSideStatementType.BEGIN | ||
if RE_ROLLBACK.match(query): | ||
client_side_statement_type = ClientSideStatementType.ROLLBACK | ||
if RE_SHOW_COMMIT_TIMESTAMP.match(query): | ||
client_side_statement_type = ClientSideStatementType.SHOW_COMMIT_TIMESTAMP | ||
if RE_SHOW_READ_TIMESTAMP.match(query): | ||
client_side_statement_type = ClientSideStatementType.SHOW_READ_TIMESTAMP | ||
if client_side_statement_type is not None: | ||
return ParsedStatement( | ||
StatementType.CLIENT_SIDE, query, ClientSideStatementType.ROLLBACK | ||
StatementType.CLIENT_SIDE, query, client_side_statement_type | ||
) | ||
return None |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -107,6 +107,9 @@ def __init__(self, instance, database=None, read_only=False): | |||||
self._staleness = None | ||||||
self.request_priority = None | ||||||
self._transaction_begin_marked = False | ||||||
# whether transaction started at Spanner. This means that we had | ||||||
# made atleast one call to Spanner. | ||||||
self._spanner_transaction_started = False | ||||||
|
||||||
@property | ||||||
def autocommit(self): | ||||||
|
@@ -140,26 +143,14 @@ def database(self): | |||||
return self._database | ||||||
|
||||||
@property | ||||||
def _spanner_transaction_started(self): | ||||||
"""Flag: whether transaction started at Spanner. This means that we had | ||||||
made atleast one call to Spanner. Property client_transaction_started | ||||||
would always be true if this is true as transaction has to start first | ||||||
at clientside than at Spanner | ||||||
|
||||||
Returns: | ||||||
bool: True if Spanner transaction started, False otherwise. | ||||||
""" | ||||||
def inside_transaction(self): | ||||||
"""Deprecated property which won't be supported in future versions. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is deprecated, should it be tagged (Python does support stacking decorators. Order matters; they are applied in the reverse of the order in which they are listed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, Added |
||||||
Please use spanner_transaction_started property instead.""" | ||||||
return ( | ||||||
self._transaction | ||||||
and not self._transaction.committed | ||||||
and not self._transaction.rolled_back | ||||||
) or (self._snapshot is not None) | ||||||
olavloite marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
@property | ||||||
def inside_transaction(self): | ||||||
"""Deprecated property which won't be supported in future versions. | ||||||
Please use spanner_transaction_started property instead.""" | ||||||
return self._spanner_transaction_started | ||||||
) | ||||||
|
||||||
@property | ||||||
def _client_transaction_started(self): | ||||||
|
@@ -293,7 +284,7 @@ def retry_transaction(self): | |||||
""" | ||||||
attempt = 0 | ||||||
while True: | ||||||
self._transaction = None | ||||||
self._spanner_transaction_started = False | ||||||
attempt += 1 | ||||||
if attempt > MAX_INTERNAL_RETRIES: | ||||||
raise | ||||||
|
@@ -319,7 +310,7 @@ def _rerun_previous_statements(self): | |||||
status, res = transaction.batch_update(statements) | ||||||
|
||||||
if status.code == ABORTED: | ||||||
self.connection._transaction = None | ||||||
self._spanner_transaction_started = False | ||||||
raise Aborted(status.details) | ||||||
|
||||||
retried_checksum = ResultsChecksum() | ||||||
|
@@ -363,6 +354,7 @@ def transaction_checkout(self): | |||||
if not self.read_only and self._client_transaction_started: | ||||||
if not self._spanner_transaction_started: | ||||||
self._transaction = self._session_checkout().transaction() | ||||||
self._spanner_transaction_started = True | ||||||
self._transaction.begin() | ||||||
|
||||||
return self._transaction | ||||||
|
@@ -377,11 +369,11 @@ def snapshot_checkout(self): | |||||
:returns: A Cloud Spanner snapshot object, ready to use. | ||||||
""" | ||||||
if self.read_only and self._client_transaction_started: | ||||||
if not self._snapshot: | ||||||
if not self._spanner_transaction_started: | ||||||
self._snapshot = Snapshot( | ||||||
self._session_checkout(), multi_use=True, **self.staleness | ||||||
) | ||||||
self._snapshot.begin() | ||||||
olavloite marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
self._spanner_transaction_started = True | ||||||
|
||||||
return self._snapshot | ||||||
|
||||||
|
@@ -391,7 +383,7 @@ def close(self): | |||||
The connection will be unusable from this point forward. If the | ||||||
connection has an active transaction, it will be rolled back. | ||||||
""" | ||||||
if self._spanner_transaction_started and not self.read_only: | ||||||
if self._spanner_transaction_started and not self._read_only: | ||||||
self._transaction.rollback() | ||||||
|
||||||
if self._own_pool and self.database: | ||||||
|
@@ -405,13 +397,15 @@ def begin(self): | |||||
Marks the transaction as started. | ||||||
|
||||||
:raises: :class:`InterfaceError`: if this connection is closed. | ||||||
:raises: :class:`OperationalError`: if there is an existing transaction that has begin or is running | ||||||
:raises: :class:`OperationalError`: if there is an existing transaction | ||||||
that has begin or is running | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I know that it's not a change in this PR, but I just noticed it now):
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||
""" | ||||||
if self._transaction_begin_marked: | ||||||
raise OperationalError("A transaction has already started") | ||||||
if self._spanner_transaction_started: | ||||||
raise OperationalError( | ||||||
"Beginning a new transaction is not allowed when a transaction is already running" | ||||||
"Beginning a new transaction is not allowed when a transaction " | ||||||
"is already running" | ||||||
) | ||||||
self._transaction_begin_marked = True | ||||||
|
||||||
|
@@ -432,14 +426,13 @@ def commit(self): | |||||
self.run_prior_DDL_statements() | ||||||
if self._spanner_transaction_started: | ||||||
try: | ||||||
if self.read_only: | ||||||
self._snapshot = None | ||||||
else: | ||||||
if not self._read_only: | ||||||
self._transaction.commit() | ||||||
|
||||||
self._release_session() | ||||||
self._statements = [] | ||||||
self._transaction_begin_marked = False | ||||||
self._spanner_transaction_started = False | ||||||
except Aborted: | ||||||
self.retry_transaction() | ||||||
self.commit() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens with the above flags if we reach this? And what happens with them if we reach this block and the retry fails (i.e. an error is raised here as well)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Flag There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the code to always set these 2 flags to False post commit and rollback even if there is an uncaught exception |
||||||
|
@@ -457,14 +450,13 @@ def rollback(self): | |||||
return | ||||||
|
||||||
if self._spanner_transaction_started: | ||||||
if self.read_only: | ||||||
self._snapshot = None | ||||||
else: | ||||||
if not self._read_only: | ||||||
self._transaction.rollback() | ||||||
|
||||||
self._release_session() | ||||||
self._statements = [] | ||||||
self._transaction_begin_marked = False | ||||||
self._spanner_transaction_started = False | ||||||
|
||||||
@check_not_closed | ||||||
def cursor(self): | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,6 +179,7 @@ def close(self): | |
self._is_closed = True | ||
|
||
def _do_execute_update(self, transaction, sql, params): | ||
self.connection._transaction = transaction | ||
olavloite marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._result_set = transaction.execute_sql( | ||
sql, params=params, param_types=get_param_types(params) | ||
) | ||
|
@@ -239,15 +240,18 @@ def execute(self, sql, args=None): | |
self._row_count = _UNSET_COUNT | ||
|
||
try: | ||
if self.connection.read_only: | ||
self._handle_DQL(sql, args or None) | ||
return | ||
|
||
parsed_statement = parse_utils.classify_statement(sql) | ||
if parsed_statement.statement_type == StatementType.CLIENT_SIDE: | ||
return client_side_statement_executor.execute( | ||
self._result_set = client_side_statement_executor.execute( | ||
self.connection, parsed_statement | ||
) | ||
if self._result_set is not None: | ||
self._itr = PeekIterator(self._result_set) | ||
return | ||
|
||
if self.connection.read_only: | ||
self._handle_DQL(sql, args or None) | ||
return | ||
if parsed_statement.statement_type == StatementType.DDL: | ||
self._batch_DDLs(sql) | ||
if not self.connection._client_transaction_started: | ||
|
@@ -293,10 +297,16 @@ def execute(self, sql, args=None): | |
args or None, | ||
) | ||
except (AlreadyExists, FailedPrecondition, OutOfRange) as e: | ||
self.close() | ||
self.connection.close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the code to not close the connection and update the flag if needed in finally block |
||
raise IntegrityError(getattr(e, "details", e)) from e | ||
except InvalidArgument as e: | ||
self.close() | ||
self.connection.close() | ||
raise ProgrammingError(getattr(e, "details", e)) from e | ||
except InternalServerError as e: | ||
self.close() | ||
self.connection.close() | ||
raise OperationalError(getattr(e, "details", e)) from e | ||
|
||
@check_not_closed | ||
|
@@ -492,6 +502,7 @@ def _handle_DQL(self, sql, params): | |
with self.connection.database.snapshot( | ||
**self.connection.staleness | ||
) as snapshot: | ||
self.connection._snapshot = snapshot | ||
self._handle_DQL_with_snapshot(snapshot, sql, params) | ||
|
||
def __enter__(self): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are going to be lots of these
if
s and they'll all be long due to the long variable name, you might consider doingstatement_type = parsed_statement.client_side_statement_type
at the top of this block ofif
s. Then you can use the shorter name throughout here.You could also do something like
Type = ClientSideStatementType
. Aliasing a class is a little less common, though. And, I think, probably not necessary to get the line length consistently under the length limit?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related, very minor thing:
In Python, everything is interpreted and there's no JIT compiler doing type inference, so every time you do
a.b
, the Python interpreter has to actually interpret that logic at runtime. Specifically -- go look up the type ofa
, look up and run code associated with any overrides ona
's.
operator (for example properties are implemented under the hood as an overload to the dot operator that checks whether the name of the field that you're looking up is the same as the name of a property on the class and, if so, replaces that lookup with a call to the underlying getter(/setter(/deleter)) function), then if still applicable go look upb
ina
's member dictionary.As a result:
is actually slightly-but-measurably slower than
In most other languages, unless a pointer dereference is required (which is still cheap), nested member-variable access is zero cost at runtime; all of this resolution is sorted out at compile time or by a JIT compiler.
The performance difference is not usually enough (and Python code is usually not performance-sensitive enough) that it matters. But it gently nudges Python to tend to use simple variables rather than complex nested structures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defined a variable
statement_type
. Thanks for the explanation, was not aware of it. Will try to remember and take care of this in future code