Skip to content

fix(metrics): correct GFE metrics extraction and enable by default#17561

Draft
sinhasubham wants to merge 1 commit into
mainfrom
gfe-metrics-fix
Draft

fix(metrics): correct GFE metrics extraction and enable by default#17561
sinhasubham wants to merge 1 commit into
mainfrom
gfe-metrics-fix

Conversation

@sinhasubham

Copy link
Copy Markdown
Contributor

Description

This PR resolves a critical issue where Spanner GFE (Google Front End) latency metrics were not being properly captured, and ensures these metrics are always enabled by default.

Key Changes:

  • Fix Metadata Extraction Logic: Fixed a truthiness bug in MetricsInterceptor where initial_metadata() returning standard headers (e.g., content-type) masked trailing_metadata(). The server-timing header is now properly extracted from both initial and trailing metadata.
  • Support for Streaming RPCs: Added _StreamingResponseWrapper and _AsyncStreamingResponseWrapper to the interceptor. This correctly defers metrics recording until the streaming iterators finish, ensuring trailing_metadata is fully populated and available before attempting extraction.
  • Enable GFE Metrics by Default: Removed the gfe_enabled toggle in SpannerMetricsTracerFactory. GFE metrics capture is now always-on whenever OpenTelemetry tracing is enabled.
  • Testing: Added unit tests and a new mockserver test to ensure GFE metrics are correctly published end-to-end.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces deferred metrics recording for streaming and async streaming RPC responses in Google Cloud Spanner by wrapping responses in specialized wrappers. It also implements GFE latency extraction from response metadata and adds corresponding tests. The review feedback highlights several critical improvements for robustness: refining streaming response detection to check for iterator methods (next and anext) rather than iterable methods to prevent incorrect wrapping of standard iterables; wrapping telemetry and metrics recording blocks in try-except blocks to ensure telemetry failures do not disrupt the main application flow; defensively validating metadata elements before unpacking to avoid unpacking errors; and properly decoding bytes metadata values before regex matching.

Comment on lines +131 to +134
if hasattr(response, "__anext__") or hasattr(response, "__aiter__"):
return _AsyncStreamingResponseWrapper(response, tracer)
elif hasattr(response, "__next__") or hasattr(response, "__iter__"):
return _StreamingResponseWrapper(response, tracer)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Checking hasattr(response, "__iter__") or hasattr(response, "__aiter__") is too broad because many standard unary response types (such as lists, dicts, tuples, or custom iterables) are iterable but are not streaming responses. If a unary RPC returns an iterable, it will be incorrectly wrapped in _StreamingResponseWrapper, which will break the caller's expectations.

Additionally, in unit tests, MagicMock objects have __iter__ by default, which causes them to be incorrectly wrapped.

To correctly identify streaming responses, we should only check for the iterator protocol methods __next__ and __anext__. Any gRPC streaming response is an iterator and must implement these methods.

Suggested change
if hasattr(response, "__anext__") or hasattr(response, "__aiter__"):
return _AsyncStreamingResponseWrapper(response, tracer)
elif hasattr(response, "__next__") or hasattr(response, "__iter__"):
return _StreamingResponseWrapper(response, tracer)
if hasattr(response, "__anext__"):
return _AsyncStreamingResponseWrapper(response, tracer)
elif hasattr(response, "__next__"):
return _StreamingResponseWrapper(response, tracer)

Comment on lines +135 to 150
else:
# Unary call: execute completion and record metrics immediately
tracer.record_attempt_completion()
metadata = []
if hasattr(response, "initial_metadata"):
try:
metadata.extend(response.initial_metadata() or [])
except Exception:
pass
if hasattr(response, "trailing_metadata"):
try:
metadata.extend(response.trailing_metadata() or [])
except Exception:
pass
tracer.record_gfe_metrics(metadata)
return response

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The metrics recording block for unary calls is not wrapped in a try-except block. If tracer.record_attempt_completion() or tracer.record_gfe_metrics(metadata) raises an exception (e.g., due to OpenTelemetry configuration issues or unexpected metadata formats), it will crash the entire unary RPC call and prevent the response from being returned. Telemetry and metrics collection should be non-blocking and fail-safe, meaning they should never disrupt the main application flow. Avoid broad except Exception: blocks that silently pass; instead, log the exception to aid in debugging.

Suggested change
else:
# Unary call: execute completion and record metrics immediately
tracer.record_attempt_completion()
metadata = []
if hasattr(response, "initial_metadata"):
try:
metadata.extend(response.initial_metadata() or [])
except Exception:
pass
if hasattr(response, "trailing_metadata"):
try:
metadata.extend(response.trailing_metadata() or [])
except Exception:
pass
tracer.record_gfe_metrics(metadata)
return response
else:
# Unary call: execute completion and record metrics immediately
try:
tracer.record_attempt_completion()
metadata = []
if hasattr(response, "initial_metadata"):
try:
metadata.extend(response.initial_metadata() or [])
except Exception as e:
logger.warning(f"Failed to retrieve initial metadata: {e}")
if hasattr(response, "trailing_metadata"):
try:
metadata.extend(response.trailing_metadata() or [])
except Exception as e:
logger.warning(f"Failed to retrieve trailing metadata: {e}")
tracer.record_gfe_metrics(metadata)
except Exception as e:
logger.warning(f"Failed to record metrics: {e}")
return response
References
  1. Avoid broad except Exception: blocks that silently return None. Instead, log the exception (e.g., using logger.warning) to aid in debugging and prevent masking underlying issues.

Comment on lines +174 to +190
def _record_metrics(self):
if self._metrics_recorded:
return
self._metrics_recorded = True
self._tracer.record_attempt_completion()
metadata = []
if hasattr(self._response, "initial_metadata"):
try:
metadata.extend(self._response.initial_metadata() or [])
except Exception:
pass
if hasattr(self._response, "trailing_metadata"):
try:
metadata.extend(self._response.trailing_metadata() or [])
except Exception:
pass
self._tracer.record_gfe_metrics(metadata)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the unary call metrics recording, if any exception is raised during _record_metrics (e.g., in record_attempt_completion or record_gfe_metrics), it will propagate to the caller of __next__. This can mask the original StopIteration or other exceptions, or crash the application during stream consumption. We should wrap the entire metrics recording logic in a try-except block to ensure telemetry failures are safe, and log any exceptions to avoid silent failures.

Suggested change
def _record_metrics(self):
if self._metrics_recorded:
return
self._metrics_recorded = True
self._tracer.record_attempt_completion()
metadata = []
if hasattr(self._response, "initial_metadata"):
try:
metadata.extend(self._response.initial_metadata() or [])
except Exception:
pass
if hasattr(self._response, "trailing_metadata"):
try:
metadata.extend(self._response.trailing_metadata() or [])
except Exception:
pass
self._tracer.record_gfe_metrics(metadata)
def _record_metrics(self):
if self._metrics_recorded:
return
self._metrics_recorded = True
try:
self._tracer.record_attempt_completion()
metadata = []
if hasattr(self._response, "initial_metadata"):
try:
metadata.extend(self._response.initial_metadata() or [])
except Exception as e:
logger.warning(f"Failed to retrieve initial metadata: {e}")
if hasattr(self._response, "trailing_metadata"):
try:
metadata.extend(self._response.trailing_metadata() or [])
except Exception as e:
logger.warning(f"Failed to retrieve trailing metadata: {e}")
self._tracer.record_gfe_metrics(metadata)
except Exception as e:
logger.warning(f"Failed to record metrics: {e}")
References
  1. Avoid broad except Exception: blocks that silently return None. Instead, log the exception (e.g., using logger.warning) to aid in debugging and prevent masking underlying issues.

Comment on lines +217 to +233
def _record_metrics(self):
if self._metrics_recorded:
return
self._metrics_recorded = True
self._tracer.record_attempt_completion()
metadata = []
if hasattr(self._response, "initial_metadata"):
try:
metadata.extend(self._response.initial_metadata() or [])
except Exception:
pass
if hasattr(self._response, "trailing_metadata"):
try:
metadata.extend(self._response.trailing_metadata() or [])
except Exception:
pass
self._tracer.record_gfe_metrics(metadata)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

For the asynchronous streaming wrapper, we should also wrap the metrics recording logic in a try-except block to prevent telemetry failures from crashing the async stream or masking StopAsyncIteration. Ensure exceptions are logged rather than silently ignored.

Suggested change
def _record_metrics(self):
if self._metrics_recorded:
return
self._metrics_recorded = True
self._tracer.record_attempt_completion()
metadata = []
if hasattr(self._response, "initial_metadata"):
try:
metadata.extend(self._response.initial_metadata() or [])
except Exception:
pass
if hasattr(self._response, "trailing_metadata"):
try:
metadata.extend(self._response.trailing_metadata() or [])
except Exception:
pass
self._tracer.record_gfe_metrics(metadata)
def _record_metrics(self):
if self._metrics_recorded:
return
self._metrics_recorded = True
try:
self._tracer.record_attempt_completion()
metadata = []
if hasattr(self._response, "initial_metadata"):
try:
metadata.extend(self._response.initial_metadata() or [])
except Exception as e:
logger.warning(f"Failed to retrieve initial metadata: {e}")
if hasattr(self._response, "trailing_metadata"):
try:
metadata.extend(self._response.trailing_metadata() or [])
except Exception as e:
logger.warning(f"Failed to retrieve trailing metadata: {e}")
self._tracer.record_gfe_metrics(metadata)
except Exception as e:
logger.warning(f"Failed to record metrics: {e}")
References
  1. Avoid broad except Exception: blocks that silently return None. Instead, log the exception (e.g., using logger.warning) to aid in debugging and prevent masking underlying issues.

Comment on lines +449 to +455
elif isinstance(metadata, (list, tuple)):
for key, val in metadata:
if key and str(key).lower() in ("server-timing", "server_timing"):
if isinstance(val, (list, tuple)):
header_vals.extend(val)
else:
header_vals.append(val)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When iterating over metadata as a list or tuple, unpacking for key, val in metadata assumes that every element in metadata is a sequence of exactly two elements. If metadata contains any malformed elements (e.g., a 1-tuple, a string, or None), this will raise a ValueError and crash the metrics extraction. We should defensively verify that each item is a sequence of length 2 before unpacking.

Suggested change
elif isinstance(metadata, (list, tuple)):
for key, val in metadata:
if key and str(key).lower() in ("server-timing", "server_timing"):
if isinstance(val, (list, tuple)):
header_vals.extend(val)
else:
header_vals.append(val)
elif isinstance(metadata, (list, tuple)):
for item in metadata:
if isinstance(item, (list, tuple)) and len(item) == 2:
key, val = item
if key and str(key).lower() in ("server-timing", "server_timing"):
if isinstance(val, (list, tuple)):
header_vals.extend(val)
else:
header_vals.append(val)

Comment on lines +460 to +461
if not isinstance(header_val, str):
header_val = str(header_val)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If header_val is of type bytes (which is common for gRPC metadata values), calling str(header_val) in Python 3 will produce the string representation b'...' (including the literal b and quotes). This will prevent the regex from matching correctly or cause unexpected behavior. We should decode bytes to str using .decode("utf-8") first.

            if isinstance(header_val, bytes):
                try:
                    header_val = header_val.decode("utf-8")
                except Exception:
                    header_val = str(header_val)
            elif not isinstance(header_val, str):
                header_val = str(header_val)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants