From 6406ae78e955f477de286147b7e2bee388edb78b Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Mon, 11 May 2026 13:36:42 -0400 Subject: [PATCH 1/2] feat(asyncpg): Add cursor span support via BaseCursor method patching Patch BaseCursor._bind_exec, _bind, and _exec instead of wrapping Connection.cursor creation. This creates spans for each cursor operation (iteration via _bind_exec, and explicit cursor usage via _bind/_exec), and supports both static and streaming span lifecycles. Fixes PY-2408 Fixes #6240 --- sentry_sdk/integrations/asyncpg.py | 69 ++++--- tests/integrations/asyncpg/test_asyncpg.py | 198 ++++++++++++++++++++- 2 files changed, 237 insertions(+), 30 deletions(-) diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index 29f3bad152..68a26540b8 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -1,17 +1,22 @@ from __future__ import annotations + import contextlib import re -from typing import Any, TypeVar, Callable, Awaitable, Iterator +from typing import Any, Awaitable, Callable, Iterator, TypeVar, Union import sentry_sdk from sentry_sdk.consts import OP, SPANDATA -from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable +from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version +from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing import Span -from sentry_sdk.tracing_utils import add_query_source, record_sql_queries +from sentry_sdk.tracing_utils import ( + add_query_source, + record_sql_queries, + record_sql_queries_supporting_streaming, +) from sentry_sdk.utils import ( - ensure_integration_enabled, - parse_version, capture_internal_exceptions, + parse_version, ) try: @@ -46,8 +51,12 @@ def setup_once() -> None: asyncpg.Connection._executemany = _wrap_connection_method( asyncpg.Connection._executemany, executemany=True ) - asyncpg.Connection.cursor = _wrap_cursor_creation(asyncpg.Connection.cursor) asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare) + + BaseCursor._bind_exec = _wrap_cursor_method(BaseCursor._bind_exec) + BaseCursor._bind = _wrap_cursor_method(BaseCursor._bind) + BaseCursor._exec = _wrap_cursor_method(BaseCursor._exec) + asyncpg.connect_utils._connect_addr = _wrap_connect_addr( asyncpg.connect_utils._connect_addr ) @@ -138,21 +147,26 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": return _inner -def _wrap_cursor_creation(f: "Callable[..., T]") -> "Callable[..., T]": - @ensure_integration_enabled(AsyncPGIntegration, f) - def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807 - query = args[1] - params_list = args[2] if len(args) > 2 else None +def _wrap_cursor_method( + f: "Callable[..., Awaitable[T]]", +) -> "Callable[..., Awaitable[T]]": + async def _inner(*args: "Any", **kwargs: "Any") -> "T": + if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None: + return await f(*args, **kwargs) - with _record( - None, - query, - params_list, + cursor = args[0] + query = _normalize_query(cursor._query) + with record_sql_queries_supporting_streaming( + cursor=cursor, + query=query, + params_list=None, + paramstyle=None, executemany=False, + record_cursor_repr=True, + span_origin=AsyncPGIntegration.origin, ) as span: - _set_db_data(span, args[0]) - res = f(*args, **kwargs) - span.set_data("db.cursor", res) + _set_db_data(span, cursor._connection) + res = await f(*args, **kwargs) return res @@ -197,22 +211,27 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": return _inner -def _set_db_data(span: "Span", conn: "Any") -> None: - span.set_data(SPANDATA.DB_SYSTEM, "postgresql") - span.set_data(SPANDATA.DB_DRIVER_NAME, "asyncpg") +def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None: + if isinstance(span, StreamedSpan): + set_on_span = span.set_attribute + else: + set_on_span = span.set_data + + set_on_span(SPANDATA.DB_SYSTEM, "postgresql") + set_on_span(SPANDATA.DB_DRIVER_NAME, "asyncpg") addr = conn._addr if addr: try: - span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) - span.set_data(SPANDATA.SERVER_PORT, addr[1]) + set_on_span(SPANDATA.SERVER_ADDRESS, addr[0]) + set_on_span(SPANDATA.SERVER_PORT, addr[1]) except IndexError: pass database = conn._params.database if database: - span.set_data(SPANDATA.DB_NAME, database) + set_on_span(SPANDATA.DB_NAME, database) user = conn._params.user if user: - span.set_data(SPANDATA.DB_USER, user) + set_on_span(SPANDATA.DB_USER, user) diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index 2dcce52070..cd983403ba 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -9,19 +9,20 @@ The tests use the following credentials to establish a database connection. """ -import os import datetime +import os from contextlib import contextmanager from unittest import mock import asyncpg import pytest import pytest_asyncio -from asyncpg import connect, Connection +from asyncpg import Connection, connect +import sentry_sdk from sentry_sdk import capture_message, start_transaction -from sentry_sdk.integrations.asyncpg import AsyncPGIntegration from sentry_sdk.consts import SPANDATA +from sentry_sdk.integrations.asyncpg import AsyncPGIntegration from sentry_sdk.tracing_utils import record_sql_queries from tests.conftest import ApproxDict @@ -302,7 +303,7 @@ async def test_cursor(sentry_init, capture_events) -> None: {"category": "query", "data": {}, "message": "BEGIN;", "type": "default"}, { "category": "query", - "data": {}, + "data": {"db.cursor": mock.ANY}, "message": "SELECT * FROM users WHERE dob > $1", "type": "default", }, @@ -360,7 +361,19 @@ async def test_cursor_manual(sentry_init, capture_events) -> None: {"category": "query", "data": {}, "message": "BEGIN;", "type": "default"}, { "category": "query", - "data": {}, + "data": {"db.cursor": mock.ANY}, + "message": "SELECT * FROM users WHERE dob > $1", + "type": "default", + }, + { + "category": "query", + "data": {"db.cursor": mock.ANY}, + "message": "SELECT * FROM users WHERE dob > $1", + "type": "default", + }, + { + "category": "query", + "data": {"db.cursor": mock.ANY}, "message": "SELECT * FROM users WHERE dob > $1", "type": "default", }, @@ -857,3 +870,178 @@ def before_send_transaction(event, hint): assert len(spans) == 1 assert spans[0]["description"] == "filtered" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_cursor__bind_exec_creates_spans( + sentry_init, capture_events, capture_items, span_streaming +) -> None: + """ + Exercises the bind_exec patch through the iterator that's created in asyncpg when "for record in conn.cursor" is called. + See https://github.com/MagicStack/asyncpg/blob/db8ecc2a38e16fb0c090aef6f5506547c2831c24/asyncpg/cursor.py#L234 + """ + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [ + ("Bob", "secret_pw", datetime.date(1984, 3, 1)), + ("Alice", "pw", datetime.date(1990, 12, 25)), + ], + ) + + async with conn.transaction(): + async for record in conn.cursor( + "SELECT * FROM users WHERE dob > $1", + datetime.date(1970, 1, 1), + ): + pass + + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + + # _bind_exec span + segment + assert len(spans) == 2 + + bind_exec_span = spans[0] + segment = spans[1] + + assert segment["name"] == "test_transaction" + assert bind_exec_span["name"] == "SELECT * FROM users WHERE dob > $1" + + assert bind_exec_span["attributes"]["sentry.origin"] == "auto.db.asyncpg" + assert bind_exec_span["attributes"]["sentry.op"] == "db" + assert bind_exec_span["attributes"]["db.system"] == "postgresql" + assert bind_exec_span["attributes"]["db.driver.name"] == "asyncpg" + assert bind_exec_span["attributes"]["server.address"] == PG_HOST + assert bind_exec_span["attributes"]["server.port"] == PG_PORT + assert bind_exec_span["attributes"]["db.name"] == PG_NAME + assert bind_exec_span["attributes"]["db.user"] == PG_USER + else: + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [ + ("Bob", "secret_pw", datetime.date(1984, 3, 1)), + ("Alice", "pw", datetime.date(1990, 12, 25)), + ], + ) + + async with conn.transaction(): + async for record in conn.cursor( + "SELECT * FROM users WHERE dob > $1", + datetime.date(1970, 1, 1), + ): + pass + + await conn.close() + + (event,) = events + + assert len(event["spans"]) == 5 + + connect_span = event["spans"][0] + executemany_span = event["spans"][1] + begin_span = event["spans"][2] + bind_exec_span = event["spans"][3] + commit_span = event["spans"][4] + + assert connect_span["description"] == "connect" + assert ( + executemany_span["description"] + == "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)" + ) + assert begin_span["description"] == "BEGIN;" + assert bind_exec_span["description"] == "SELECT * FROM users WHERE dob > $1" + assert commit_span["description"] == "COMMIT;" + + assert bind_exec_span["origin"] == "auto.db.asyncpg" + assert bind_exec_span["data"]["db.system"] == "postgresql" + assert bind_exec_span["data"]["db.driver.name"] == "asyncpg" + assert bind_exec_span["data"]["server.address"] == PG_HOST + assert bind_exec_span["data"]["server.port"] == PG_PORT + assert bind_exec_span["data"]["db.name"] == PG_NAME + assert bind_exec_span["data"]["db.user"] == PG_USER + + +@pytest.mark.asyncio +async def test_cursor__bind_and__exec_creates_spans( + sentry_init, capture_events +) -> None: + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + ) + events = capture_events() + + with start_transaction(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [ + ("Bob", "secret_pw", datetime.date(1984, 3, 1)), + ("Alice", "pw", datetime.date(1990, 12, 25)), + ], + ) + + async with conn.transaction(): + # This exercises the `_bind` patch and the `cursor` patch + cur = await conn.cursor( + "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) + ) + # These exercises the `_exec` patch + await cur.fetchrow() + await cur.fetchrow() + + await conn.close() + + (event,) = events + + assert len(event["spans"]) == 7 + + connect_span = event["spans"][0] + executemany_span = event["spans"][1] + begin_span = event["spans"][2] + cursor_creation_and_bind_span = event["spans"][3] + fetchrow_span_1 = event["spans"][4] + fetchrow_span_2 = event["spans"][5] + commit_span = event["spans"][6] + + assert connect_span["description"] == "connect" + assert ( + executemany_span["description"] + == "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)" + ) + assert begin_span["description"] == "BEGIN;" + assert fetchrow_span_1["description"] == "SELECT * FROM users WHERE dob > $1" + assert ( + cursor_creation_and_bind_span["description"] + == "SELECT * FROM users WHERE dob > $1" + ) + assert fetchrow_span_2["description"] == "SELECT * FROM users WHERE dob > $1" + assert commit_span["description"] == "COMMIT;" + + for span in (cursor_creation_and_bind_span, fetchrow_span_1, fetchrow_span_2): + assert span["data"]["db.cursor"] is not None + assert span["data"]["db.system"] == "postgresql" + assert span["data"]["db.driver.name"] == "asyncpg" + assert span["origin"] == "auto.db.asyncpg" From 72c0455f9529f17be03be0211c3f4c403c47fb79 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Tue, 12 May 2026 07:51:08 -0400 Subject: [PATCH 2/2] Update tests/integrations/asyncpg/test_asyncpg.py Co-authored-by: Ivana Kellyer --- tests/integrations/asyncpg/test_asyncpg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index d2fa493748..7b048013a3 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -1262,7 +1262,7 @@ async def test_cursor__bind_and__exec_methods_create_spans( cur = await conn.cursor( "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) ) - # These exercises the `_exec` patch + # These exercise the `_exec` patch await cur.fetchrow() await cur.fetchrow()