Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions sentry_sdk/integrations/asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
)
from sentry_sdk.utils import (
capture_internal_exceptions,
ensure_integration_enabled,
parse_version,
)

Expand Down Expand Up @@ -52,8 +51,12 @@ def setup_once() -> None:
asyncpg.Connection._executemany = _wrap_connection_method(
asyncpg.Connection._executemany, executemany=True
)
asyncpg.Connection.cursor = _wrap_cursor_creation(asyncpg.Connection.cursor)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safe to remove because this patches the function that led to the creation/return of a CursorFactory. When that cursor is awaited, the CursorFactory calls _bind through the underlying cursor's init method, essentially capturing the creation and invocation of the cursor (as opposed to just capturing the creation of the factory which is what was happening previously)

asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare)

BaseCursor._bind_exec = _wrap_cursor_method(BaseCursor._bind_exec)
BaseCursor._bind = _wrap_cursor_method(BaseCursor._bind)
BaseCursor._exec = _wrap_cursor_method(BaseCursor._exec)

asyncpg.connect_utils._connect_addr = _wrap_connect_addr(
asyncpg.connect_utils._connect_addr
)
Expand Down Expand Up @@ -150,20 +153,26 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
return _inner


def _wrap_cursor_creation(f: "Callable[..., T]") -> "Callable[..., T]":
@ensure_integration_enabled(AsyncPGIntegration, f)
def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807
query = args[1]
params_list = args[2] if len(args) > 2 else None
def _wrap_cursor_method(
f: "Callable[..., Awaitable[T]]",
) -> "Callable[..., Awaitable[T]]":
async def _inner(*args: "Any", **kwargs: "Any") -> "T":
if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
return await f(*args, **kwargs)

with _record(
None,
query,
params_list,
cursor = args[0]
query = _normalize_query(cursor._query)
with record_sql_queries_supporting_streaming(
cursor=cursor,
query=query,
params_list=None,
paramstyle=None,
executemany=False,
record_cursor_repr=True,
span_origin=AsyncPGIntegration.origin,
) as span:
_set_db_data(span, args[0])
res = f(*args, **kwargs)
_set_db_data(span, cursor._connection)
res = await f(*args, **kwargs)

return res

Expand Down
201 changes: 199 additions & 2 deletions tests/integrations/asyncpg/test_asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ async def test_cursor(sentry_init, capture_events) -> None:
{"category": "query", "data": {}, "message": "BEGIN;", "type": "default"},
{
"category": "query",
"data": {},
"data": {"db.cursor": mock.ANY},
"message": "SELECT * FROM users WHERE dob > $1",
"type": "default",
},
Expand Down Expand Up @@ -400,7 +400,19 @@ async def test_cursor_manual(sentry_init, capture_events) -> None:
{"category": "query", "data": {}, "message": "BEGIN;", "type": "default"},
{
"category": "query",
"data": {},
"data": {"db.cursor": mock.ANY},
"message": "SELECT * FROM users WHERE dob > $1",
"type": "default",
},
{
"category": "query",
"data": {"db.cursor": mock.ANY},
"message": "SELECT * FROM users WHERE dob > $1",
"type": "default",
},
{
"category": "query",
"data": {"db.cursor": mock.ANY},
"message": "SELECT * FROM users WHERE dob > $1",
"type": "default",
},
Expand Down Expand Up @@ -1102,3 +1114,188 @@ def before_send_transaction(event, hint):

assert len(spans) == 1
assert spans[0]["description"] == "filtered"


@pytest.mark.asyncio
@pytest.mark.parametrize("span_streaming", [True, False])
async def test_cursor__bind_exec_creates_spans(
sentry_init, capture_events, capture_items, span_streaming
) -> None:
"""
Exercises the bind_exec patch through the iterator that's created in asyncpg when "for record in conn.cursor" is called.
See https://github.com/MagicStack/asyncpg/blob/db8ecc2a38e16fb0c090aef6f5506547c2831c24/asyncpg/cursor.py#L234
"""
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
_experiments={
"trace_lifecycle": "stream" if span_streaming else "static",
},
)

if span_streaming:
items = capture_items("span")
with sentry_sdk.traces.start_span(name="test_transaction"):
conn: Connection = await connect(PG_CONNECTION_URI)

await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
("Alice", "pw", datetime.date(1990, 12, 25)),
],
)

async with conn.transaction():
async for record in conn.cursor(
"SELECT * FROM users WHERE dob > $1",
datetime.date(1970, 1, 1),
):
pass

await conn.close()
sentry_sdk.flush()

spans = [item.payload for item in items]

assert len(spans) == 6

connect_span = spans[0]
executemany_span = spans[1]
begin_span = spans[2]
bind_exec_span = spans[3]
commit_span = spans[4]
segment = spans[5]

assert connect_span["name"] == "connect"
assert (
executemany_span["name"]
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
)
assert begin_span["name"] == "BEGIN;"
assert bind_exec_span["name"] == "SELECT * FROM users WHERE dob > $1"
assert commit_span["name"] == "COMMIT;"
assert segment["name"] == "test_transaction"

assert bind_exec_span["attributes"]["sentry.origin"] == "auto.db.asyncpg"
assert bind_exec_span["attributes"]["sentry.op"] == "db"
assert bind_exec_span["attributes"]["db.system"] == "postgresql"
assert bind_exec_span["attributes"]["db.driver.name"] == "asyncpg"
assert bind_exec_span["attributes"]["server.address"] == PG_HOST
assert bind_exec_span["attributes"]["server.port"] == PG_PORT
assert bind_exec_span["attributes"]["db.name"] == PG_NAME
assert bind_exec_span["attributes"]["db.user"] == PG_USER
else:
events = capture_events()

with start_transaction(name="test_transaction", sampled=True):
conn: Connection = await connect(PG_CONNECTION_URI)

await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
("Alice", "pw", datetime.date(1990, 12, 25)),
],
)

async with conn.transaction():
async for record in conn.cursor(
"SELECT * FROM users WHERE dob > $1",
datetime.date(1970, 1, 1),
):
pass

await conn.close()

(event,) = events

assert len(event["spans"]) == 5

connect_span = event["spans"][0]
executemany_span = event["spans"][1]
begin_span = event["spans"][2]
bind_exec_span = event["spans"][3]
commit_span = event["spans"][4]

assert connect_span["description"] == "connect"
assert (
executemany_span["description"]
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
)
assert begin_span["description"] == "BEGIN;"
assert bind_exec_span["description"] == "SELECT * FROM users WHERE dob > $1"
assert commit_span["description"] == "COMMIT;"

assert bind_exec_span["origin"] == "auto.db.asyncpg"
assert bind_exec_span["data"]["db.system"] == "postgresql"
assert bind_exec_span["data"]["db.driver.name"] == "asyncpg"
assert bind_exec_span["data"]["server.address"] == PG_HOST
assert bind_exec_span["data"]["server.port"] == PG_PORT
assert bind_exec_span["data"]["db.name"] == PG_NAME
assert bind_exec_span["data"]["db.user"] == PG_USER


@pytest.mark.asyncio
async def test_cursor__bind_and__exec_methods_create_spans(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did my best to create some distinction between this and test_cursor__bind_exec_creates_spans but in case this naming causes confusion:

This test tests the _bind and _exec patches, whereas the other test above checks the _bind_exec patch.

sentry_init, capture_events
) -> None:
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
)
events = capture_events()

with start_transaction(name="test_transaction"):
conn: Connection = await connect(PG_CONNECTION_URI)

await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
("Alice", "pw", datetime.date(1990, 12, 25)),
],
)

async with conn.transaction():
# This exercises the `_bind` patch and the `cursor` patch
cur = await conn.cursor(
"SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1)
)
# These exercise the `_exec` patch
await cur.fetchrow()
await cur.fetchrow()

await conn.close()

(event,) = events

assert len(event["spans"]) == 7

connect_span = event["spans"][0]
executemany_span = event["spans"][1]
begin_span = event["spans"][2]
cursor_creation_and_bind_span = event["spans"][3]
fetchrow_span_1 = event["spans"][4]
fetchrow_span_2 = event["spans"][5]
commit_span = event["spans"][6]

assert connect_span["description"] == "connect"
assert (
executemany_span["description"]
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
)
assert begin_span["description"] == "BEGIN;"
assert fetchrow_span_1["description"] == "SELECT * FROM users WHERE dob > $1"
assert (
cursor_creation_and_bind_span["description"]
== "SELECT * FROM users WHERE dob > $1"
)
assert fetchrow_span_2["description"] == "SELECT * FROM users WHERE dob > $1"
assert commit_span["description"] == "COMMIT;"

for span in (cursor_creation_and_bind_span, fetchrow_span_1, fetchrow_span_2):
assert span["data"]["db.cursor"] is not None
assert span["data"]["db.system"] == "postgresql"
assert span["data"]["db.driver.name"] == "asyncpg"
assert span["origin"] == "auto.db.asyncpg"
Loading