8000 observability: annotate Session+SessionPool events · googleapis/python-spanner@0d5bf26 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0d5bf26

Browse files
committed
observability: annotate Session+SessionPool events
This change adds annotations for session and session pool events to aid customers in debugging latency issues with session pool malevolence and also for maintainers to figure out which session pool type is the most appropriate. Updates #1170
1 parent cb8a2b7 commit 0d5bf26

File tree

11 files changed

+198
-5
lines changed

11 files changed

+198
-5
lines changed

google/cloud/spanner_v1/_opentelemetry_tracing.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,17 @@ def trace_call(name, session, extra_attributes=None):
7878
try:
7979
yield span
8080
except Exception as error:
81-
span.set_status(Status(StatusCode.ERROR, str(error)))
82-
span.record_exception(error)
81+
set_span_error_and_record_exception(span, error)
8382
raise
8483
else:
8584
span.set_status(Status(StatusCode.OK))
85+
86+
87+
def set_span_error_and_record_exception(span, exc):
88+
if exc and span:
89+
span.set_status(Status(StatusCode.ERROR, str(exc)))
90+
span.record_exception(exc)
91+
92+
93+
def get_current_span():
94+
return trace.get_current_span()

google/cloud/spanner_v1/pool.py

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import datetime
1818
import queue
19+
import time
1920

2021
from google.cloud.exceptions import NotFound
2122
from google.cloud.spanner_v1 import BatchCreateSessionsRequest
@@ -24,6 +25,9 @@
2425
_metadata_with_prefix,
2526
_metadata_with_leader_aware_routing,
2627
)
28+
from google.cloud.spanner_v1._opentelemetry_tracing import (
29+
get_current_span,
30+
)
2731
from warnings import warn
2832

2933
_NOW = datetime.datetime.utcnow # unit tests may replace
@@ -199,13 +203,32 @@ def bind(self, database):
199203
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
200204
)
201205
self._database_role = self._database_role or self._database.database_role
206+
requested_session_count = self.size - self._sessions.qsize()
202207
request = BatchCreateSessionsRequest(
203208
database=database.name,
204-
session_count=self.size - self._sessions.qsize(),
209+
session_count=requested_session_count,
205210
session_template=Session(creator_role=self.database_role),
206211
)
207212

213+
current_span = get_current_span()
214+
if requested_session_count > 0:
215+
current_span.add_event(
216+
f"Requesting {requested_session_count} sessions",
217+
{"kind": "fixed_size_pool"},
218+
)
219+
220+
if self._sessions.full():
221+
current_span.add_event(
222+
"Session pool is already full", {"kind": "fixed_size_pool"}
223+
)
224+
return
225+
226+
returned_session_count = 0
208227
while not self._sessions.full():
228+
current_span.add_event(
229+
f"Creating {request.session_count} sessions",
230+
{"kind": "fixed_size_pool"},
231+
)
209232
resp = api.batch_create_sessions(
210233
request=request,
211234
metadata=metadata,
@@ -214,6 +237,12 @@ def bind(self, database):
214237
session = self._new_session()
215238
session._session_id = session_pb.name.split("/")[-1]
216239
self._sessions.put(session)
240+
returned_session_count += 1
241+
242+
current_span.add_event(
243+
f"Requested for {requested_session_count}, returned {returned_session_count}",
244+
{"kind": "fixed_size_pool"},
245+
)
217246

218247
def get(self, timeout=None):
219248
"""Check a session out from the pool.
@@ -229,12 +258,23 @@ def get(self, timeout=None):
229258
if timeout is None:
230259
timeout = self.default_timeout
231260

261+
start_time = time.time()
262+
current_span = get_current_span()
263+
current_span.add_event("Acquiring session", {"kind": type(self).__name__})
232264
session = self._sessions.get(block=True, timeout=timeout)
233265

234266
if not session.exists():
235267
session = self._database.session()
236268
session.create()
237269

270+
current_span.add_event(
271+
"Acquired session",
272+
{
273+
"time.elapsed": time.time() - start_time,
274+
"session.id": session.session_id,
275+
"kind": type(self).__name__,
276+
},
277+
)
238278
return session
239279

240280
def put(self, session):
@@ -307,6 +347,10 @@ def get(self):
307347
:returns: an existing session from the pool, or a newly-created
308348
session.
309349
"""
350+
start_time = time.time()
351+
current_span = get_current_span()
352+
current_span.add_event("Acquiring session", {"kind": type(self).__name__})
353+
310354
try:
311355
session = self._sessions.get_nowait()
312356
except queue.Empty:
@@ -316,6 +360,15 @@ def get(self):
316360
if not session.exists():
317361
session = self._new_session()
318362
session.create()
363+
else:
364+
current_span.add_event(
365+
"Cache hit: has usable session",
366+
{
367+
"id": session.session_id,
368+
"kind": type(self).__name__,
369+
},
370+
)
371+
319372
return session
320373

321374
def put(self, session):
@@ -422,6 +475,18 @@ def bind(self, database):
422475
session_template=Session(creator_role=self.database_role),
423476
)
424477

478+
requested_session_count = request.session_count
479+
current_span = get_current_span()
480+
current_span.add_event(f"Requesting {requested_session_count} sessions")
481+
482+
if created_session_count >= self.size:
483+
current_span.add_event(
484+
"Created no new sessions as sessionPool is full",
485+
{"kind": type(self).__name__},
486+
)
487+
return
488+
489+
returned_session_count = 0
425490
while created_session_count < self.size:
426491
resp = api.batch_create_sessions(
427492
request=request,
@@ -431,8 +496,17 @@ def bind(self, database):
431496
session = self._new_session()
432497
session._session_id = session_pb.name.split("/")[-1]
433498
self.put(session)
499+
returned_session_count += 1
500+
434501
created_session_count += len(resp.session)
435502

503+
current_span.add_event(
504+
"Requested for {requested_session_count} sessions, return {returned_session_count}",
505+
{
506+
"kind": "pinging_pool",
507+
},
508+
)
509+
436510
def get(self, timeout=None):
437511
"""Check a session out from the pool.
438512
@@ -447,6 +521,12 @@ def get(self, timeout=None):
447521
if timeout is None:
448522
timeout = self.default_timeout
449523

524+
start_time = time.time()
525+
current_span = get_current_span()
526+
current_span.add_event(
527+
"Waiting for a session to become available", {"kind": "pinging_pool"}
528+
)
529+
450530
ping_after, session = self._sessions.get(block=True, timeout=timeout)
451531

452532
if _NOW() > ping_after:
@@ -457,6 +537,14 @@ def get(self, timeout=None):
457537
session = self._new_session()
458538
session.create()
459539

540+
current_span.add_event(
541+
"Acquired session",
542+
{
543+
"time.elapsed": time.time() - start_time,
544+
"session.id": session.session_id,
545+
"kind": "pinging_pool",
546+
},
547+
)
460548
return session
461549

462550
def put(self, session):

google/cloud/spanner_v1/session.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030
_metadata_with_prefix,
3131
_metadata_with_leader_aware_routing,
3232
)
33-
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
33+
from google.cloud.spanner_v1._opentelemetry_tracing import (
34+
get_current_span,
35+
set_span_error_and_record_exception,
36+
trace_call,
37+
)
3438
from google.cloud.spanner_v1.batch import Batch
3539
from google.cloud.spanner_v1.snapshot import Snapshot
3640
from google.cloud.spanner_v1.transaction import Transaction
@@ -113,6 +117,10 @@ def name(self):
113117
:raises ValueError: if session is not yet created
114118
"""
115119
if self._session_id is None:
120+
err = "No session available"
121+
current_span = get_current_span()
122+
current_span.add_event(err)
123+
set_span_error_and_record_exception(current_span, err)
116124
raise ValueError("No session ID set by back-end")
117125
return self._database.name + "/sessions/" + self._session_id
118126

@@ -124,8 +132,14 @@ def create(self):
124132
125133
:raises ValueError: if :attr:`session_id` is already set.
126134
"""
135+
current_span = get_current_span()
136+
current_span.add_event("Creating Session")
137+
127138
if self._session_id is not None:
128-
raise ValueError("Session ID already set by back-end")
139+
err = "Session ID already set by back-end"
140+
current_span.add_event(err)
141+
set_span_error_and_record_exception(current_span, err)
142+
raise ValueError(err)
129143
api = self._database.spanner_api
130144
metadata = _metadata_with_prefix(self._database.name)
131145
if self._database._route_to_leader_enabled:
@@ -148,6 +162,7 @@ def create(self):
148162
metadata=metadata,
149163
)
150164
self._session_id = session_pb.name.split("/")[-1]
165+
current_span.add_event("Using Session", {"id": self._session_id})
151166

152167
def exists(self):
153168
"""Test for the existence of this session.

tests/_helpers.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,14 @@ def assertSpanAttributes(
9292
self.assertEqual(span.name, name)
9393
self.assertEqual(span.status.status_code, status)
9494
self.assertEqual(dict(span.attributes), attributes)
95+
96+
def assertSpanEvents(self, name, wantEventNames=[], span=None):
97+
if HAS_OPENTELEMETRY_INSTALLED:
98+
if not span:
99+
span_list = self.ot_exporter.get_finished_spans()
100+
self.assertEqual(len(span_list) > 0, true)
101+
span = span_list[0]
102+
103+
print("\033[31massertSpanEvent\033[00m")
104+
self.assertEqual(span.name, name)
105+
self.assertEqual(len(span.events), len(wantEventNames))

tests/unit/test_batch.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,10 @@ def __init__(self, database=None, name=TestBatch.SESSION_NAME):
611611
self._database = database
612612
self.name = name
613613

614+
@property
615+
def session_id(self):
616+
return self.name
617+
614618

615619
class _Database(object):
616620
name = "testing"

tests/unit/test_database.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3188,6 +3188,10 @@ def run_in_transaction(self, func, *args, **kw):
31883188
self._retried = (func, args, kw)
31893189
return self._committed
31903190

3191+
@property
3192+
def session_id(self):
3193+
return self.name
3194+
31913195

31923196
class _MockIterator(object):
31933197
def __init__(self, *values, **kw):

tests/unit/test_pool.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515

1616
from functools import total_ordering
17+
import time
1718
import unittest
1819

1920
import mock
@@ -923,6 +924,8 @@ def __init__(self, database, exists=True, transaction=None):
923924
self.create = mock.Mock()
924925
self._deleted = False
925926
self._transaction = transaction
927+
# Generate a faux id.
928+
self._session_id = f"time.time()"
926929

927930
def __lt__(self, other):
928931
return id(self) < id(other)
@@ -949,6 +952,10 @@ def transaction(self):
949952
txn = self._transaction = _make_transaction(self)
950953
return txn
951954

955+
@property
956+
def session_id(self):
957+
return self._session_id
958+
952959

953960
class _Database(object):
954961
def __init__(self, name):

tests/unit/test_session.py

Lines changed: 43 additions & 0 deletions
< 10000 /div>
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import google.api_core.gapic_v1.method
1717
from google.cloud.spanner_v1 import RequestOptions
18+
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
1819
import mock
1920
from tests._helpers import (
2021
OpenTelemetryBase,
@@ -174,6 +175,48 @@ def test_create_w_database_role(self):
174175
"CloudSpanner.CreateSession", attributes=TestSession.BASE_ATTRIBUTES
175176
)
176177

178+
def test_create_session_span_annotations(self):
179+
from google.cloud.spanner_v1 import CreateSessionRequest
180+
from google.cloud.spanner_v1 import Session as SessionRequestProto
181+
182+
session_pb = self._make_session_pb(
183+
self.SESSION_NAME, database_role=self.DATABASE_ROLE
184+
)
185+
186+
gax_api = self._make_spanner_api()
187+
gax_api.create_session.return_value = session_pb
188+
database = self._make_database(database_role=self.DATABASE_ROLE)
189+
database.spanner_api = gax_api
190+
session = self._make_one(database, database_role=self.DATABASE_ROLE)
191+
192+
with trace_call("TestSessionSpan", session):
193+
session.create()
194+
195+
self.assertEqual(session.session_id, self.SESSION_ID)
196+
self.assertEqual(session.database_role, self.DATABASE_ROLE)
197+
session_template = SessionRequestProto(creator_role=self.DATABASE_ROLE)
198+
199+
request = CreateSessionRequest(
200+
database=database.name,
201+
session=session_template,
202+
)
203+
204+
gax_api.create_session.assert_called_once_with(
205+
request=request,
206+
metadata=[
207+
("google-cloud-resource-prefix", database.name),
208+
("x-goog-spanner-route-to-leader", "true"),
209+
],
210+
)
211+
212+
# Firstly there should not be any spans in the Session
213+
# creation routine in this with statement, but afterwards
214+
# there should be spans created.
215+
self.assertNoSpans()
216+
217+
wantEventNames = ["Acquering session", "Creating Session", "Using Session"]
218+
self.assertSpanEvents("TestSessionSpan", wantEventNames)
219+
177220
def test_create_wo_database_role(self):
178221
from google.cloud.spanner_v1 import CreateSessionRequest
179222

tests/unit/test_snapshot.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1822,6 +1822,10 @@ def __init__(self, database=None, name=TestSnapshot.SESSION_NAME):
18221822
self._database = database
18231823
self.name = name
18241824

1825+
@property
1826+
def session_id(self):
1827+
return self.name
1828+
18251829

18261830
class _MockIterator(object):
18271831
def __init__(self, *values, **kw):

tests/unit/test_spanner.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,10 @@ def __init__(self, database=None, name=TestTransaction.SESSION_NAME):
10821082
self._database = database
10831083
self.name = name
10841084

1085+
@property
1086+
def session_id(self):
1087+
return self.name
1088+
10851089

10861090
class _MockIterator(object):
10871091
def __init__(self, *values, **kw):

0 commit comments

Comments
 (0)
0