diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 51501a07a3..3a50b890d7 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -78,8 +78,17 @@ def trace_call(name, session, extra_attributes=None): try: yield span except Exception as error: - span.set_status(Status(StatusCode.ERROR, str(error))) - span.record_exception(error) + set_span_error_and_record_exception(span, error) raise else: span.set_status(Status(StatusCode.OK)) + + +def set_span_error_and_record_exception(span, exc): + if exc and span: + span.set_status(Status(StatusCode.ERROR, str(exc))) + span.record_exception(exc) + + +def get_current_span(): + return trace.get_current_span() diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 56837bfc0b..85461324ce 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -16,6 +16,7 @@ import datetime import queue +import time from google.cloud.exceptions import NotFound from google.cloud.spanner_v1 import BatchCreateSessionsRequest @@ -24,6 +25,9 @@ _metadata_with_prefix, _metadata_with_leader_aware_routing, ) +from google.cloud.spanner_v1._opentelemetry_tracing import ( + get_current_span, +) from warnings import warn _NOW = datetime.datetime.utcnow # unit tests may replace @@ -199,13 +203,32 @@ def bind(self, database): _metadata_with_leader_aware_routing(database._route_to_leader_enabled) ) self._database_role = self._database_role or self._database.database_role + requested_session_count = self.size - self._sessions.qsize() request = BatchCreateSessionsRequest( database=database.name, - session_count=self.size - self._sessions.qsize(), + session_count=requested_session_count, session_template=Session(creator_role=self.database_role), ) + current_span = get_current_span() + if requested_session_count > 0: + current_span.add_event( + f"Requesting {requested_session_count} sessions", + {"kind": "fixed_size_pool"}, + ) + + if self._sessions.full(): + current_span.add_event( + "Session pool is already full", {"kind": "fixed_size_pool"} + ) + return + + returned_session_count = 0 while not self._sessions.full(): + current_span.add_event( + f"Creating {request.session_count} sessions", + {"kind": "fixed_size_pool"}, + ) resp = api.batch_create_sessions( request=request, metadata=metadata, @@ -214,6 +237,12 @@ def bind(self, database): session = self._new_session() session._session_id = session_pb.name.split("/")[-1] self._sessions.put(session) + returned_session_count += 1 + + current_span.add_event( + f"Requested for {requested_session_count}, returned {returned_session_count}", + {"kind": "fixed_size_pool"}, + ) def get(self, timeout=None): """Check a session out from the pool. @@ -229,12 +258,23 @@ def get(self, timeout=None): if timeout is None: timeout = self.default_timeout + start_time = time.time() + current_span = get_current_span() + current_span.add_event("Acquiring session", {"kind": type(self).__name__}) session = self._sessions.get(block=True, timeout=timeout) if not session.exists(): session = self._database.session() session.create() + current_span.add_event( + "Acquired session", + { + "time.elapsed": time.time() - start_time, + "session.id": session.session_id, + "kind": type(self).__name__, + }, + ) return session def put(self, session): @@ -307,6 +347,10 @@ def get(self): :returns: an existing session from the pool, or a newly-created session. """ + start_time = time.time() + current_span = get_current_span() + current_span.add_event("Acquiring session", {"kind": type(self).__name__}) + try: session = self._sessions.get_nowait() except queue.Empty: @@ -316,6 +360,15 @@ def get(self): if not session.exists(): session = self._new_session() session.create() + else: + current_span.add_event( + "Cache hit: has usable session", + { + "id": session.session_id, + "kind": type(self).__name__, + }, + ) + return session def put(self, session): @@ -422,6 +475,18 @@ def bind(self, database): session_template=Session(creator_role=self.database_role), ) + requested_session_count = request.session_count + current_span = get_current_span() + current_span.add_event(f"Requesting {requested_session_count} sessions") + + if created_session_count >= self.size: + current_span.add_event( + "Created no new sessions as sessionPool is full", + {"kind": type(self).__name__}, + ) + return + + returned_session_count = 0 while created_session_count < self.size: resp = api.batch_create_sessions( request=request, @@ -431,8 +496,17 @@ def bind(self, database): session = self._new_session() session._session_id = session_pb.name.split("/")[-1] self.put(session) + returned_session_count += 1 + created_session_count += len(resp.session) + current_span.add_event( + "Requested for {requested_session_count} sessions, return {returned_session_count}", + { + "kind": "pinging_pool", + }, + ) + def get(self, timeout=None): """Check a session out from the pool. @@ -447,6 +521,12 @@ def get(self, timeout=None): if timeout is None: timeout = self.default_timeout + start_time = time.time() + current_span = get_current_span() + current_span.add_event( + "Waiting for a session to become available", {"kind": "pinging_pool"} + ) + ping_after, session = self._sessions.get(block=True, timeout=timeout) if _NOW() > ping_after: @@ -457,6 +537,14 @@ def get(self, timeout=None): session = self._new_session() session.create() + current_span.add_event( + "Acquired session", + { + "time.elapsed": time.time() - start_time, + "session.id": session.session_id, + "kind": "pinging_pool", + }, + ) return session def put(self, session): diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 28280282f4..fc1553e068 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -30,7 +30,11 @@ _metadata_with_prefix, _metadata_with_leader_aware_routing, ) -from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +from google.cloud.spanner_v1._opentelemetry_tracing import ( + get_current_span, + set_span_error_and_record_exception, + trace_call, +) from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.snapshot import Snapshot from google.cloud.spanner_v1.transaction import Transaction @@ -113,6 +117,10 @@ def name(self): :raises ValueError: if session is not yet created """ if self._session_id is None: + err = "No session available" + current_span = get_current_span() + current_span.add_event(err) + set_span_error_and_record_exception(current_span, err) raise ValueError("No session ID set by back-end") return self._database.name + "/sessions/" + self._session_id @@ -124,8 +132,14 @@ def create(self): :raises ValueError: if :attr:`session_id` is already set. """ + current_span = get_current_span() + current_span.add_event("Creating Session") + if self._session_id is not None: - raise ValueError("Session ID already set by back-end") + err = "Session ID already set by back-end" + current_span.add_event(err) + set_span_error_and_record_exception(current_span, err) + raise ValueError(err) api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) if self._database._route_to_leader_enabled: @@ -148,6 +162,7 @@ def create(self): metadata=metadata, ) self._session_id = session_pb.name.split("/")[-1] + current_span.add_event("Using Session", {"id": self._session_id}) def exists(self): """Test for the existence of this session. diff --git a/tests/_helpers.py b/tests/_helpers.py index 5e514f2586..b0a045bf50 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -92,3 +92,13 @@ def assertSpanAttributes( self.assertEqual(span.name, name) self.assertEqual(span.status.status_code, status) self.assertEqual(dict(span.attributes), attributes) + + def assertSpanEvents(self, name, wantEventNames=[], span=None): + if HAS_OPENTELEMETRY_INSTALLED: + if not span: + span_list = self.ot_exporter.get_finished_spans() + self.assertEqual(len(span_list) > 0, True) + span = span_list[0] + + self.assertEqual(span.name, name) + self.assertEqual(len(span.events), len(wantEventNames)) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 2f6b5e4ae9..a7f7a6f970 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -611,6 +611,10 @@ def __init__(self, database=None, name=TestBatch.SESSION_NAME): self._database = database self.name = name + @property + def session_id(self): + return self.name + class _Database(object): name = "testing" diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 90fa0c269f..6e29255fb7 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -3188,6 +3188,10 @@ def run_in_transaction(self, func, *args, **kw): self._retried = (func, args, kw) return self._committed + @property + def session_id(self): + return self.name + class _MockIterator(object): def __init__(self, *values, **kw): diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index 23ed3e7251..589e89545a 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -14,6 +14,7 @@ from functools import total_ordering +import time import unittest import mock @@ -923,6 +924,8 @@ def __init__(self, database, exists=True, transaction=None): self.create = mock.Mock() self._deleted = False self._transaction = transaction + # Generate a faux id. + self._session_id = f"time.time()" def __lt__(self, other): return id(self) < id(other) @@ -949,6 +952,10 @@ def transaction(self): txn = self._transaction = _make_transaction(self) return txn + @property + def session_id(self): + return self._session_id + class _Database(object): def __init__(self, name): diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 2ae0cb94b8..385b34dda9 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -15,6 +15,7 @@ import google.api_core.gapic_v1.method from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call import mock from tests._helpers import ( OpenTelemetryBase, @@ -174,6 +175,43 @@ def test_create_w_database_role(self): "CloudSpanner.CreateSession", attributes=TestSession.BASE_ATTRIBUTES ) + def test_create_session_span_annotations(self): + from google.cloud.spanner_v1 import CreateSessionRequest + from google.cloud.spanner_v1 import Session as SessionRequestProto + + session_pb = self._make_session_pb( + self.SESSION_NAME, database_role=self.DATABASE_ROLE + ) + + gax_api = self._make_spanner_api() + gax_api.create_session.return_value = session_pb + database = self._make_database(database_role=self.DATABASE_ROLE) + database.spanner_api = gax_api + session = self._make_one(database, database_role=self.DATABASE_ROLE) + + with trace_call("TestSessionSpan", session): + session.create() + + self.assertEqual(session.session_id, self.SESSION_ID) + self.assertEqual(session.database_role, self.DATABASE_ROLE) + session_template = SessionRequestProto(creator_role=self.DATABASE_ROLE) + + request = CreateSessionRequest( + database=database.name, + session=session_template, + ) + + gax_api.create_session.assert_called_once_with( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) + + wantEventNames = ["Acquiring session", "Creating Session", "Using Session"] + self.assertSpanEvents("CloudSpanner.CreateSession", wantEventNames) + def test_create_wo_database_role(self): from google.cloud.spanner_v1 import CreateSessionRequest diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index bf7363fef2..479a0d62e9 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -1822,6 +1822,10 @@ def __init__(self, database=None, name=TestSnapshot.SESSION_NAME): self._database = database self.name = name + @property + def session_id(self): + return self.name + class _MockIterator(object): def __init__(self, *values, **kw): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index ab5479eb3c..ff34a109af 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -1082,6 +1082,10 @@ def __init__(self, database=None, name=TestTransaction.SESSION_NAME): self._database = database self.name = name + @property + def session_id(self): + return self.name + class _MockIterator(object): def __init__(self, *values, **kw): diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index d52fb61db1..e426f912b2 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -939,6 +939,10 @@ def __init__(self, database=None, name=TestTransaction.SESSION_NAME): self._database = database self.name = name + @property + def session_id(self): + return self.name + class _FauxSpannerAPI(object): _committed = None