Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit e83b4af

Browse files
committed
fix(tracing): ensure nesting of Transaction.begin under commit + fix suggestions from feature review
This change ensures that: * If a transaction was not yet begin, that if .commit() is invoked the resulting span hierarchy has .begin nested under .commit * We use "CloudSpanner.Transaction.execute_sql" instead of "CloudSpanner.Transaction.execute_streaming_sql" * If we have a tracer_provider that produces non-recordings spans, that it won't crash due to lacking `span._status` Fixes #1286
1 parent 04a11a6 commit e83b4af

9 files changed

Lines changed: 405 additions & 52 deletions

File tree

google/cloud/spanner_v1/_opentelemetry_tracing.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
117117
# invoke .record_exception on our own else we shall have 2 exceptions.
118118
raise
119119
else:
120-
if (not span._status) or span._status.status_code == StatusCode.UNSET:
120+
# All spans still have set_status available even if for example
121+
# NonRecordingSpan doesn't have "_status".
122+
absent_span_status = getattr(span, "_status", None) is None
123+
if absent_span_status or span._status.status_code == StatusCode.UNSET:
121124
# OpenTelemetry-Python only allows a status change
122125
# if the current code is UNSET or ERROR. At the end
123126
# of the generator's consumption, only set it to OK

google/cloud/spanner_v1/snapshot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ def _get_streamed_result_set(
583583
iterator = _restart_on_unavailable(
584584
restart,
585585
request,
586-
f"CloudSpanner.{type(self).__name__}.execute_streaming_sql",
586+
f"CloudSpanner.{type(self).__name__}.execute_sql",
587587
self._session,
588588
trace_attributes,
589589
transaction=self,

google/cloud/spanner_v1/transaction.py

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -242,39 +242,7 @@ def commit(
242242
:returns: timestamp of the committed changes.
243243
:raises ValueError: if there are no mutations to commit.
244244
"""
245-
self._check_state()
246-
if self._transaction_id is None and len(self._mutations) > 0:
247-
self.begin()
248-
elif self._transaction_id is None and len(self._mutations) == 0:
249-
raise ValueError("Transaction is not begun")
250-
251245
database = self._session._database
252-
api = database.spanner_api
253-
metadata = _metadata_with_prefix(database.name)
254-
if database._route_to_leader_enabled:
255-
metadata.append(
256-
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
257-
)
258-
259-
if request_options is None:
260-
request_options = RequestOptions()
261-
elif type(request_options) is dict:
262-
request_options = RequestOptions(request_options)
263-
if self.transaction_tag is not None:
264-
request_options.transaction_tag = self.transaction_tag
265-
266-
# Request tags are not supported for commit requests.
267-
request_options.request_tag = None
268-
269-
request = CommitRequest(
270-
session=self._session.name,
271-
mutations=self._mutations,
272-
transaction_id=self._transaction_id,
273-
return_commit_stats=return_commit_stats,
274-
max_commit_delay=max_commit_delay,
275-
request_options=request_options,
276-
)
277-
278246
trace_attributes = {"num_mutations": len(self._mutations)}
279247
observability_options = getattr(database, "observability_options", None)
280248
with trace_call(
@@ -283,6 +251,40 @@ def commit(
283251
trace_attributes,
284252
observability_options,
285253
) as span:
254+
self._check_state()
255+
if self._transaction_id is None and len(self._mutations) > 0:
256+
self.begin()
257+
elif self._transaction_id is None and len(self._mutations) == 0:
258+
raise ValueError("Transaction is not begun")
259+
260+
api = database.spanner_api
261+
metadata = _metadata_with_prefix(database.name)
262+
if database._route_to_leader_enabled:
263+
metadata.append(
264+
_metadata_with_leader_aware_routing(
265+
database._route_to_leader_enabled
266+
)
267+
)
268+
269+
if request_options is None:
270+
request_options = RequestOptions()
271+
elif type(request_options) is dict:
272+
request_options = RequestOptions(request_options)
273+
if self.transaction_tag is not None:
274+
request_options.transaction_tag = self.transaction_tag
275+
276+
# Request tags are not supported for commit requests.
277+
request_options.request_tag = None
278+
279+
request = CommitRequest(
280+
session=self._session.name,
281+
mutations=self._mutations,
282+
transaction_id=self._transaction_id,
283+
return_commit_stats=return_commit_stats,
284+
max_commit_delay=max_commit_delay,
285+
request_options=request_options,
286+
)
287+
286288
add_span_event(span, "Starting Commit")
287289

288290
method = functools.partial(

tests/_helpers.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,20 @@ def get_finished_spans(self):
132132

133133
def reset(self):
134134
self.tearDown()
135+
136+
def finished_spans_events_statuses(self):
137+
span_list = self.get_finished_spans()
138+
# Some event attributes are noisy/highly ephemeral
139+
# and can't be directly compared against.
140+
got_all_events = []
141+
imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"]
142+
for span in span_list:
143+
for event in span.events:
144+
evt_attributes = event.attributes.copy()
145+
for attr_name in imprecise_event_attributes:
146+
if attr_name in evt_attributes:
147+
evt_attributes[attr_name] = "EPHEMERAL"
148+
149+
got_all_events.append((event.name, evt_attributes))
150+
151+
return got_all_events

tests/system/test_observability_options.py

Lines changed: 199 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def test_propagation(enable_extended_tracing):
107107
gotNames = [span.name for span in from_inject_spans]
108108
wantNames = [
109109
"CloudSpanner.CreateSession",
110-
"CloudSpanner.Snapshot.execute_streaming_sql",
110+
"CloudSpanner.Snapshot.execute_sql",
111111
]
112112
assert gotNames == wantNames
113113

@@ -216,8 +216,8 @@ def select_in_txn(txn):
216216
"CloudSpanner.Database.run_in_transaction",
217217
"CloudSpanner.CreateSession",
218218
"CloudSpanner.Session.run_in_transaction",
219-
"CloudSpanner.Transaction.execute_streaming_sql",
220-
"CloudSpanner.Transaction.execute_streaming_sql",
219+
"CloudSpanner.Transaction.execute_sql",
220+
"CloudSpanner.Transaction.execute_sql",
221221
"CloudSpanner.Transaction.commit",
222222
]
223223

@@ -262,13 +262,207 @@ def select_in_txn(txn):
262262
("CloudSpanner.Database.run_in_transaction", codes.OK, None),
263263
("CloudSpanner.CreateSession", codes.OK, None),
264264
("CloudSpanner.Session.run_in_transaction", codes.OK, None),
265-
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
266-
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
265+
("CloudSpanner.Transaction.execute_sql", codes.OK, None),
266+
("CloudSpanner.Transaction.execute_sql", codes.OK, None),
267267
("CloudSpanner.Transaction.commit", codes.OK, None),
268268
]
269269
assert got_statuses == want_statuses
270270

271271

272+
@pytest.mark.skipif(
273+
not _helpers.USE_EMULATOR,
274+
reason="Emulator needed to run this tests",
275+
)
276+
@pytest.mark.skipif(
277+
not HAS_OTEL_INSTALLED,
278+
reason="Tracing requires OpenTelemetry",
279+
)
280+
def test_transaction_update_implicit_begin_nested_inside_commit():
281+
# Tests to ensure that transaction.commit() without a began transaction
282+
# has transaction.begin() inlined and nested under the commit span.
283+
from google.auth.credentials import AnonymousCredentials
284+
from google.api_core.exceptions import Aborted
285+
from google.rpc import code_pb2
286+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
287+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
288+
InMemorySpanExporter,
289+
)
290+
from opentelemetry.trace.status import StatusCode
291+
from opentelemetry.sdk.trace import TracerProvider
292+
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
293+
294+
PROJECT = _helpers.EMULATOR_PROJECT
295+
CONFIGURATION_NAME = "config-name"
296+
INSTANCE_ID = _helpers.INSTANCE_ID
297+
DISPLAY_NAME = "display-name"
298+
DATABASE_ID = _helpers.unique_id("temp_db")
299+
NODE_COUNT = 5
300+
LABELS = {"test": "true"}
301+
302+
counters = dict(aborted=0)
303+
304+
def tx_update(txn):
305+
txn.update(
306+
"Singers",
307+
columns=["SingerId", "FirstName"],
308+
values=[["1", "Bryan"], ["2", "Slash"]],
309+
)
310+
311+
tracer_provider = TracerProvider(sampler=ALWAYS_ON)
312+
trace_exporter = InMemorySpanExporter()
313+
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
314+
observability_options = dict(
315+
tracer_provider=tracer_provider,
316+
enable_extended_tracing=True,
317+
)
318+
319+
client = Client(
320+
project=PROJECT,
321+
observability_options=observability_options,
322+
credentials=AnonymousCredentials(),
323+
)
324+
325+
instance = client.instance(
326+
INSTANCE_ID,
327+
CONFIGURATION_NAME,
328+
display_name=DISPLAY_NAME,
329+
node_count=NODE_COUNT,
330+
labels=LABELS,
331+
)
332+
333+
try:
334+
instance.create()
335+
except Exception:
336+
pass
337+
338+
db = instance.database(DATABASE_ID)
339+
try:
340+
db._ddl_statements = [
341+
"""CREATE TABLE Singers (
342+
SingerId INT64 NOT NULL,
343+
FirstName STRING(1024),
344+
LastName STRING(1024),
345+
SingerInfo BYTES(MAX),
346+
FullName STRING(2048) AS (
347+
ARRAY_TO_STRING([FirstName, LastName], " ")
348+
) STORED
349+
) PRIMARY KEY (SingerId)""",
350+
"""CREATE TABLE Albums (
351+
SingerId INT64 NOT NULL,
352+
AlbumId INT64 NOT NULL,
353+
AlbumTitle STRING(MAX),
354+
MarketingBudget INT64,
355+
) PRIMARY KEY (SingerId, AlbumId),
356+
INTERLEAVE IN PARENT Singers ON DELETE CASCADE""",
357+
]
358+
db.create()
359+
except Exception:
360+
pass
361+
362+
try:
363+
db.run_in_transaction(tx_update)
364+
except:
365+
pass
366+
367+
span_list = trace_exporter.get_finished_spans()
368+
# Sort the spans by their start time in the hierarchy.
369+
span_list = sorted(span_list, key=lambda span: span.start_time)
370+
got_span_names = [span.name for span in span_list]
371+
want_span_names = [
372+
"CloudSpanner.Database.run_in_transaction",
373+
"CloudSpanner.CreateSession",
374+
"CloudSpanner.Session.run_in_transaction",
375+
"CloudSpanner.Transaction.commit",
376+
"CloudSpanner.Transaction.begin",
377+
]
378+
379+
assert got_span_names == want_span_names
380+
span_tx_begin = span_list[-1]
381+
span_tx_commit = span_list[-2]
382+
383+
got_events = []
384+
got_statuses = []
385+
386+
# Some event attributes are noisy/highly ephemeral
387+
# and can't be directly compared against.
388+
imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"]
389+
for span in span_list:
390+
got_statuses.append(
391+
(span.name, span.status.status_code, span.status.description)
392+
)
393+
for event in span.events:
394+
evt_attributes = event.attributes.copy()
395+
for attr_name in imprecise_event_attributes:
396+
if attr_name in evt_attributes:
397+
evt_attributes[attr_name] = "EPHEMERAL"
398+
399+
got_events.append((event.name, evt_attributes))
400+
401+
# Check for the series of events
402+
want_events = [
403+
("Acquiring session", {"kind": "BurstyPool"}),
404+
("Waiting for a session to become available", {"kind": "BurstyPool"}),
405+
("No sessions available in pool. Creating session", {"kind": "BurstyPool"}),
406+
("Creating Session", {}),
407+
(
408+
"exception",
409+
{
410+
"exception.type": "google.api_core.exceptions.NotFound",
411+
"exception.message": "404 Table Singers: Row {Int64(1)} not found.",
412+
"exception.stacktrace": "EPHEMERAL",
413+
"exception.escaped": "False",
414+
},
415+
),
416+
(
417+
"Transaction.commit failed due to GoogleAPICallError, not retrying",
418+
{"attempt": 1},
419+
),
420+
(
421+
"exception",
422+
{
423+
"exception.type": "google.api_core.exceptions.NotFound",
424+
"exception.message": "404 Table Singers: Row {Int64(1)} not found.",
425+
"exception.stacktrace": "EPHEMERAL",
426+
"exception.escaped": "False",
427+
},
428+
),
429+
("Starting Commit", {}),
430+
(
431+
"exception",
432+
{
433+
"exception.type": "google.api_core.exceptions.NotFound",
434+
"exception.message": "404 Table Singers: Row {Int64(1)} not found.",
435+
"exception.stacktrace": "EPHEMERAL",
436+
"exception.escaped": "False",
437+
},
438+
),
439+
]
440+
assert got_events == want_events
441+
442+
# Check for the statues.
443+
codes = StatusCode
444+
want_statuses = [
445+
(
446+
"CloudSpanner.Database.run_in_transaction",
447+
codes.ERROR,
448+
"NotFound: 404 Table Singers: Row {Int64(1)} not found.",
449+
),
450+
("CloudSpanner.CreateSession", codes.OK, None),
451+
(
452+
"CloudSpanner.Session.run_in_transaction",
453+
codes.ERROR,
454+
"NotFound: 404 Table Singers: Row {Int64(1)} not found.",
455+
),
456+
(
457+
"CloudSpanner.Transaction.commit",
458+
codes.ERROR,
459+
"NotFound: 404 Table Singers: Row {Int64(1)} not found.",
460+
),
461+
("CloudSpanner.Transaction.begin", codes.OK, None),
462+
]
463+
assert got_statuses == want_statuses
464+
465+
272466
def _make_credentials():
273467
from google.auth.credentials import AnonymousCredentials
274468

tests/unit/test__opentelemetry_tracing.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def test_trace_codeless_error(self):
159159
span = span_list[0]
160160
self.assertEqual(span.status.status_code, StatusCode.ERROR)
161161

162-
def test_trace_call_terminal_span_status(self):
162+
def test_trace_call_terminal_span_status_ALWAYS_ON_sampler(self):
163163
# Verify that we don't unconditionally set the terminal span status to
164164
# SpanStatus.OK per https://github.com/googleapis/python-spanner/issues/1246
165165
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
@@ -195,3 +195,33 @@ def test_trace_call_terminal_span_status(self):
195195
("VerifyTerminalSpanStatus", StatusCode.ERROR, "Our error exhibit"),
196196
]
197197
assert got_statuses == want_statuses
198+
199+
def test_trace_call_terminal_span_status_ALWAYS_OFF_sampler(self):
200+
# Verify that we get the correct status even when using the ALWAYS_OFF
201+
# sampler which produces the NonRecordingSpan per
202+
# https://github.com/googleapis/python-spanner/issues/1286
203+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
204+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
205+
InMemorySpanExporter,
206+
)
207+
from opentelemetry.trace.status import Status, StatusCode
208+
from opentelemetry.sdk.trace import TracerProvider
209+
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF
210+
211+
tracer_provider = TracerProvider(sampler=ALWAYS_OFF)
212+
trace_exporter = InMemorySpanExporter()
213+
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
214+
observability_options = dict(tracer_provider=tracer_provider)
215+
216+
session = _make_session()
217+
used_span = None
218+
with _opentelemetry_tracing.trace_call(
219+
"VerifyWithNonRecordingSpan",
220+
session,
221+
observability_options=observability_options,
222+
) as span:
223+
used_span = span
224+
225+
assert type(used_span).__name__ == "NonRecordingSpan"
226+
span_list = list(trace_exporter.get_finished_spans())
227+
assert span_list == []

0 commit comments

Comments
 (0)