Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,8 @@ class OP:
COHERE_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.cohere"
COHERE_EMBEDDINGS_CREATE = "ai.embeddings.create.cohere"
DB = "db"
DB_CURSOR_ITERATOR = "db.cursor.iter"
DB_CURSOR_FETCH = "db.cursor.fetch"
DB_REDIS = "db.redis"
EVENT_DJANGO = "event.django"
FUNCTION = "function"
Expand Down
58 changes: 57 additions & 1 deletion sentry_sdk/integrations/asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,29 @@
record_sql_queries,
)
from sentry_sdk.utils import (
ContextVar,
capture_internal_exceptions,
parse_version,
)

try:
import asyncpg # type: ignore[import-not-found]
from asyncpg.cursor import BaseCursor # type: ignore
from asyncpg.cursor import ( # type: ignore
BaseCursor,
Cursor,
CursorIterator,
)

except ImportError:
raise DidNotEnable("asyncpg not installed.")

_asyncpg_cursor_iterator_is_invoked = ContextVar(
"asyncpg_cursor_iterator_is_invoked", default=False
)
_asyncpg_cursor_fetch_is_invoked = ContextVar(
"asyncpg_cursor_fetch_is_invoked", default=False
)


class AsyncPGIntegration(Integration):
identifier = "asyncpg"
Expand Down Expand Up @@ -53,6 +65,10 @@ def setup_once() -> None:
)
asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare)

CursorIterator.__anext__ = _wrap_cursor_iterator_method(
CursorIterator.__anext__
)
Cursor.fetch = _wrap_cursor_fetch_method(Cursor.fetch)
BaseCursor._bind_exec = _wrap_cursor_method(BaseCursor._bind_exec)
BaseCursor._exec = _wrap_cursor_method(BaseCursor._exec)

Expand Down Expand Up @@ -161,13 +177,52 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
return _inner


def _wrap_cursor_fetch_method(
f: "Callable[..., Awaitable[T]]",
) -> "Callable[..., Awaitable[T]]":
async def _inner(*args: "Any", **kwargs: "Any") -> "T":
if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
return await f(*args, **kwargs)

_asyncpg_cursor_fetch_is_invoked.set(True)
try:
return await f(*args, **kwargs)
finally:
_asyncpg_cursor_fetch_is_invoked.set(False)

return _inner


def _wrap_cursor_iterator_method(
f: "Callable[..., Awaitable[T]]",
) -> "Callable[..., Awaitable[T]]":
async def _inner(*args: "Any", **kwargs: "Any") -> "T":
if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
return await f(*args, **kwargs)

_asyncpg_cursor_iterator_is_invoked.set(True)
try:
return await f(*args, **kwargs)
finally:
_asyncpg_cursor_iterator_is_invoked.set(False)

return _inner


def _wrap_cursor_method(
f: "Callable[..., Awaitable[T]]",
) -> "Callable[..., Awaitable[T]]":
async def _inner(*args: "Any", **kwargs: "Any") -> "T":
if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
return await f(*args, **kwargs)

if _asyncpg_cursor_iterator_is_invoked.get():
span_op_override_value = OP.DB_CURSOR_ITERATOR
elif _asyncpg_cursor_fetch_is_invoked.get():
span_op_override_value = OP.DB_CURSOR_FETCH
else:
span_op_override_value = None

cursor = args[0]
query = _normalize_query(cursor._query)
with record_sql_queries(
Expand All @@ -178,6 +233,7 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
executemany=False,
record_cursor_repr=True,
span_origin=AsyncPGIntegration.origin,
span_op_override_value=span_op_override_value,
) as span:
_set_db_data(span, cursor._connection)
res = await f(*args, **kwargs)
Expand Down
7 changes: 5 additions & 2 deletions sentry_sdk/tracing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def record_sql_queries(
executemany: bool,
record_cursor_repr: bool = False,
span_origin: str = "manual",
span_op_override_value: "Optional[str]" = None,
) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]":
# TODO: Bring back capturing of params by default
client = sentry_sdk.get_client()
Expand Down Expand Up @@ -167,13 +168,15 @@ def record_sql_queries(
name="<unknown SQL query>" if query is None else query,
attributes={
"sentry.origin": span_origin,
"sentry.op": OP.DB,
"sentry.op": span_op_override_value
if span_op_override_value
else OP.DB,
},
) as span:
yield span
else:
with sentry_sdk.start_span(
op=OP.DB,
op=span_op_override_value if span_op_override_value is not None else OP.DB,
name=query,
origin=span_origin,
) as span:
Expand Down
172 changes: 170 additions & 2 deletions tests/integrations/asyncpg/test_asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import sentry_sdk
from sentry_sdk import capture_message, start_transaction
from sentry_sdk.consts import SPANDATA
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations.asyncpg import AsyncPGIntegration
from sentry_sdk.tracing_utils import record_sql_queries
from tests.conftest import ApproxDict
Expand Down Expand Up @@ -1423,7 +1423,7 @@ async def test_cursor__bind_exec_creates_spans(
assert segment["name"] == "test_segment"

assert bind_exec_span["attributes"]["sentry.origin"] == "auto.db.asyncpg"
assert bind_exec_span["attributes"]["sentry.op"] == "db"
assert bind_exec_span["attributes"]["sentry.op"] == OP.DB_CURSOR_ITERATOR
assert bind_exec_span["attributes"]["db.system.name"] == "postgresql"
assert bind_exec_span["attributes"]["db.driver.name"] == "asyncpg"
assert bind_exec_span["attributes"]["server.address"] == PG_HOST
Expand Down Expand Up @@ -1487,6 +1487,173 @@ async def test_cursor__bind_exec_creates_spans(
)


@pytest.mark.asyncio
@pytest.mark.parametrize("span_streaming", [True, False])
async def test_cursor_iteration_creates_db_cursor_iter_spans(
sentry_init, capture_events, capture_items, span_streaming
) -> None:
"""
Regression test for https://github.com/getsentry/sentry-python/issues/6576

When iterating a server-side cursor with a small prefetch, asyncpg fetches
rows in batches. Each batch triggers BaseCursor._bind_exec (on first query) and
BaseCursor._exec (second query onwards) through CursorIterator.__anext__, which creates a
span with the same query description. The resulting burst of identical spans
causes Sentry's N+1 query detector to raise a false positive.

To mitigate, we set the "op"/"sentry.op" to `db.cursor.iter` instead of `db`
so that the sentry backend can exclude these spans from n+1 detection.
"""
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={
"trace_lifecycle": "stream" if span_streaming else "static",
},
)

if span_streaming:
items = capture_items("span")

with sentry_sdk.traces.start_span(name="test_segment"):
conn: Connection = await connect(PG_CONNECTION_URI)

await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)],
)

async with conn.transaction():
async for _record in conn.cursor("SELECT * FROM users", prefetch=5):
pass

await conn.close()

sentry_sdk.flush()

cursor_iter_spans = [
item.payload
for item in items
if item.payload.get("name") == "SELECT * FROM users"
]

assert len(cursor_iter_spans) == 5
for span in cursor_iter_spans:
assert span["attributes"]["sentry.op"] == OP.DB_CURSOR_ITERATOR
else:
events = capture_events()

with start_transaction(name="test_transaction", sampled=True):
conn: Connection = await connect(PG_CONNECTION_URI)

await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)],
)

async with conn.transaction():
async for _record in conn.cursor("SELECT * FROM users", prefetch=5):
pass

await conn.close()

(event,) = events

cursor_iter_spans = [
s for s in event["spans"] if s.get("description") == "SELECT * FROM users"
]

assert len(cursor_iter_spans) == 5
for span in cursor_iter_spans:
assert span["op"] == OP.DB_CURSOR_ITERATOR


@pytest.mark.asyncio
@pytest.mark.parametrize("span_streaming", [True, False])
async def test_cursor_fetch_creates_db_cursor_fetch_spans(
sentry_init, capture_events, capture_items, span_streaming
) -> None:
"""
Regression test for https://github.com/getsentry/sentry-python/issues/6576

When a user invokes "connection.stream" within SQLAlchemy, SQLAlchemy's dialect
for asyncpg uses asyncpg's "Cursor.fetch" method instead of the "CursorIterator.__anext__"
method.

Because the "fetch" methods use `_exec` (and our patch for it) under the hood, it makes it appear that
the same query is being executed many times when it is in fact iterating over a result set.

This results in an accidental trigger of our n+1 detection.

To mitigate, we set the "op"/"sentry.op" to `db.cursor.fetch` instead of `db`
so that the sentry backend can exclude these spans from n+1 detection.
"""
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={
"trace_lifecycle": "stream" if span_streaming else "static",
},
)

if span_streaming:
items = capture_items("span")

with sentry_sdk.traces.start_span(name="test_segment"):
conn: Connection = await connect(PG_CONNECTION_URI)

await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)],
)

async with conn.transaction():
cur = await conn.cursor("SELECT * FROM users")
await cur.fetch(10)
await cur.fetch(10)

await conn.close()

sentry_sdk.flush()

cursor_fetch_spans = [
item.payload
for item in items
if item.payload.get("name") == "SELECT * FROM users"
]

assert len(cursor_fetch_spans) == 2
for span in cursor_fetch_spans:
assert span["attributes"]["sentry.op"] == OP.DB_CURSOR_FETCH
else:
events = capture_events()

with start_transaction(name="test_transaction", sampled=True):
conn: Connection = await connect(PG_CONNECTION_URI)

await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)],
)

async with conn.transaction():
cur = await conn.cursor("SELECT * FROM users")
await cur.fetch(10)
await cur.fetch(10)

await conn.close()

(event,) = events

cursor_fetch_spans = [
s for s in event["spans"] if s.get("description") == "SELECT * FROM users"
]

assert len(cursor_fetch_spans) == 2
for span in cursor_fetch_spans:
assert span["op"] == OP.DB_CURSOR_FETCH


@pytest.mark.asyncio
async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) -> None:
sentry_init(
Expand Down Expand Up @@ -1543,6 +1710,7 @@ async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) ->
assert span["data"]["db.cursor"] is not None
assert span["data"]["db.system"] == "postgresql"
assert span["data"]["db.driver.name"] == "asyncpg"
assert span["op"] == OP.DB
assert span["origin"] == "auto.db.asyncpg"
_assert_query_source(
span,
Expand Down
Loading