From 060924eee7ce238e9830914e54ca188703bbeae0 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 8 May 2026 13:10:31 -0400 Subject: [PATCH 1/2] feat(asyncpg): Add query source to methods that were missing it Add add_query_source() calls to execute(), executemany(), and cursor() methods. Previously only fetch() had query source tracking. Handle StreamedSpan and regular span cases separately since StreamedSpan requires the call while still inside the context manager. Add comprehensive test coverage for query source tracking across all wrapped methods. --- sentry_sdk/integrations/asyncpg.py | 18 ++ tests/integrations/asyncpg/test_asyncpg.py | 312 +++++++++++++++++---- 2 files changed, 274 insertions(+), 56 deletions(-) diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index 9b37119564..5b0d75ee1a 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -143,8 +143,17 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": params_list = args[2] if len(args) > 2 else None with _record(None, query, params_list, executemany=executemany) as span: _set_db_data(span, args[0]) + res = await f(*args, **kwargs) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + if not isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + return res return _inner @@ -163,8 +172,17 @@ def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807 executemany=False, ) as span: _set_db_data(span, args[0]) + res = f(*args, **kwargs) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + if not isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + return res return _inner diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index 35195e757b..bd8f729ee3 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -657,7 +657,12 @@ async def test_query_source_enabled( @pytest.mark.asyncio @pytest.mark.parametrize("span_streaming", [True, False]) -async def test_query_source(sentry_init, capture_events, capture_items, span_streaming): +async def test_query_source_with_module_in_search_path( + sentry_init, capture_events, capture_items, span_streaming +): + """ + Test that query source is relative to the path of the module it ran in + """ sentry_init( integrations=[AsyncPGIntegration()], traces_sample_rate=1.0, @@ -668,13 +673,16 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str }, ) + from asyncpg_helpers.helpers import execute_query_in_connection + if span_streaming: items = capture_items("span") with sentry_sdk.traces.start_span(name="test_transaction"): conn: Connection = await connect(PG_CONNECTION_URI) - await conn.execute( + await execute_query_in_connection( "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + conn, ) await conn.close() @@ -698,13 +706,15 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str with start_transaction(name="test_transaction", sampled=True): conn: Connection = await connect(PG_CONNECTION_URI) - await conn.execute( + await execute_query_in_connection( "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + conn, ) await conn.close() (event,) = events + span = event["spans"][-1] assert span["description"].startswith("INSERT INTO") data = span.get("data", {}) @@ -719,61 +729,10 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str assert type(data.get(lineno_key)) == int assert data.get(lineno_key) > 0 - assert ( - data.get(SPANDATA.CODE_NAMESPACE) == "tests.integrations.asyncpg.test_asyncpg" - ) - assert data.get(filepath_key).endswith("tests/integrations/asyncpg/test_asyncpg.py") - - is_relative_path = data.get(filepath_key)[0] != os.sep - assert is_relative_path - - assert data.get(SPANDATA.CODE_FUNCTION) == "test_query_source" - - -@pytest.mark.asyncio -async def test_query_source_with_module_in_search_path(sentry_init, capture_events): - """ - Test that query source is relative to the path of the module it ran in - """ - sentry_init( - integrations=[AsyncPGIntegration()], - traces_sample_rate=1.0, - enable_db_query_source=True, - db_query_source_threshold_ms=0, - ) - - events = capture_events() - - from asyncpg_helpers.helpers import execute_query_in_connection - - with start_transaction(name="test_transaction", sampled=True): - conn: Connection = await connect(PG_CONNECTION_URI) - - await execute_query_in_connection( - "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", - conn, - ) - - await conn.close() - - (event,) = events - - span = event["spans"][-1] - assert span["description"].startswith("INSERT INTO") - - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data - - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 + assert data.get(filepath_key) == "asyncpg_helpers/helpers.py" assert data.get(SPANDATA.CODE_NAMESPACE) == "asyncpg_helpers.helpers" - assert data.get(SPANDATA.CODE_FILEPATH) == "asyncpg_helpers/helpers.py" - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + is_relative_path = data.get(filepath_key)[0] != os.sep assert is_relative_path assert data.get(SPANDATA.CODE_FUNCTION) == "execute_query_in_connection" @@ -1102,3 +1061,244 @@ def before_send_transaction(event, hint): assert len(spans) == 1 assert spans[0]["description"] == "filtered" + + +def _assert_query_source(span, span_streaming, expected_function): + if span_streaming: + data = span.get("attributes", {}) + lineno_key = "code.line.number" + filepath_key = "code.file.path" + else: + data = span.get("data", {}) + lineno_key = SPANDATA.CODE_LINENO + filepath_key = SPANDATA.CODE_FILEPATH + + assert lineno_key in data + assert filepath_key in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FUNCTION in data + + assert type(data.get(lineno_key)) == int + assert data.get(lineno_key) > 0 + assert data[SPANDATA.CODE_NAMESPACE] == "tests.integrations.asyncpg.test_asyncpg" + assert data.get(filepath_key).endswith("tests/integrations/asyncpg/test_asyncpg.py") + assert data.get(filepath_key)[0] != os.sep + assert data[SPANDATA.CODE_FUNCTION] == expected_function + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_execute( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + "Alice", + "pw", + datetime.date(1990, 12, 25), + ) + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 3 + + connect_span = spans[0] + insert_span = spans[1] + segment = spans[2] + + assert connect_span["name"] == "connect" + assert insert_span["name"].startswith("INSERT INTO") + assert segment["name"] == "test_transaction" + assert segment["is_segment"] is True + query_span = spans[1] + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + "Alice", + "pw", + datetime.date(1990, 12, 25), + ) + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 2 + assert spans[0]["description"] == "connect" + assert spans[1]["description"].startswith("INSERT INTO") + query_span = spans[1] + + _assert_query_source(query_span, span_streaming, "test_query_source_execute") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_executemany( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [("Bob", "secret_pw", datetime.date(1984, 3, 1))], + ) + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 3 + assert spans[0]["name"] == "connect" + assert spans[1]["name"].startswith("INSERT INTO") + assert spans[2]["name"] == "test_transaction" + query_span = spans[1] + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [("Bob", "secret_pw", datetime.date(1984, 3, 1))], + ) + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 2 + assert spans[0]["description"] == "connect" + assert spans[1]["description"].startswith("INSERT INTO") + query_span = spans[1] + + _assert_query_source(query_span, span_streaming, "test_query_source_executemany") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_prepare( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.prepare("SELECT * FROM users WHERE name = $1") + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 3 + assert spans[0]["name"] == "connect" + assert spans[1]["name"] == "SELECT * FROM users WHERE name = $1" + assert spans[2]["name"] == "test_transaction" + query_span = spans[1] + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.prepare("SELECT * FROM users WHERE name = $1") + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 2 + assert spans[0]["description"] == "connect" + assert spans[1]["description"] == "SELECT * FROM users WHERE name = $1" + query_span = spans[1] + + _assert_query_source(query_span, span_streaming, "test_query_source_prepare") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_cursor( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + async with conn.transaction(): + async for _ in conn.cursor( + "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) + ): + pass + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 5 + assert spans[0]["name"] == "connect" + assert spans[1]["name"] == "BEGIN;" + assert spans[2]["name"] == "SELECT * FROM users WHERE dob > $1" + assert spans[3]["name"] == "COMMIT;" + assert spans[4]["name"] == "test_transaction" + query_span = spans[2] + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + async with conn.transaction(): + async for _ in conn.cursor( + "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) + ): + pass + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 4 + assert spans[0]["description"] == "connect" + assert spans[1]["description"] == "BEGIN;" + assert spans[2]["description"] == "SELECT * FROM users WHERE dob > $1" + assert spans[3]["description"] == "COMMIT;" + query_span = spans[2] + + _assert_query_source(query_span, span_streaming, "test_query_source_cursor") From 310a2e3044fe39209c8c3f0c6c098ae2ed0054e4 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 8 May 2026 14:32:51 -0400 Subject: [PATCH 2/2] Clean ups --- tests/integrations/asyncpg/test_asyncpg.py | 52 ++++++++++++++-------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index bd8f729ee3..4c4fa7061b 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -1118,14 +1118,13 @@ async def test_query_source_execute( assert len(spans) == 3 connect_span = spans[0] - insert_span = spans[1] + query_span = spans[1] segment = spans[2] assert connect_span["name"] == "connect" - assert insert_span["name"].startswith("INSERT INTO") + assert query_span["name"].startswith("INSERT INTO") assert segment["name"] == "test_transaction" assert segment["is_segment"] is True - query_span = spans[1] else: events = capture_events() with start_transaction(name="test_transaction", sampled=True): @@ -1176,10 +1175,14 @@ async def test_query_source_executemany( spans = [item.payload for item in items] assert len(spans) == 3 - assert spans[0]["name"] == "connect" - assert spans[1]["name"].startswith("INSERT INTO") - assert spans[2]["name"] == "test_transaction" + + connect_span = spans[0] query_span = spans[1] + segment = spans[2] + + assert connect_span["name"] == "connect" + assert query_span["name"].startswith("INSERT INTO") + assert segment["name"] == "test_transaction" else: events = capture_events() with start_transaction(name="test_transaction", sampled=True): @@ -1225,10 +1228,13 @@ async def test_query_source_prepare( spans = [item.payload for item in items] assert len(spans) == 3 - assert spans[0]["name"] == "connect" - assert spans[1]["name"] == "SELECT * FROM users WHERE name = $1" - assert spans[2]["name"] == "test_transaction" + connect_span = spans[0] query_span = spans[1] + segment = spans[2] + + assert connect_span["name"] == "connect" + assert query_span["name"] == "SELECT * FROM users WHERE name = $1" + assert segment["name"] == "test_transaction" else: events = capture_events() with start_transaction(name="test_transaction", sampled=True): @@ -1275,12 +1281,17 @@ async def test_query_source_cursor( spans = [item.payload for item in items] assert len(spans) == 5 - assert spans[0]["name"] == "connect" - assert spans[1]["name"] == "BEGIN;" - assert spans[2]["name"] == "SELECT * FROM users WHERE dob > $1" - assert spans[3]["name"] == "COMMIT;" - assert spans[4]["name"] == "test_transaction" + connect_span = spans[0] + begin_span = spans[1] query_span = spans[2] + commit_span = spans[3] + segment = spans[4] + + assert connect_span["name"] == "connect" + assert begin_span["name"] == "BEGIN;" + assert query_span["name"] == "SELECT * FROM users WHERE dob > $1" + assert commit_span["name"] == "COMMIT;" + assert segment["name"] == "test_transaction" else: events = capture_events() with start_transaction(name="test_transaction", sampled=True): @@ -1295,10 +1306,15 @@ async def test_query_source_cursor( (event,) = events spans = event["spans"] assert len(spans) == 4 - assert spans[0]["description"] == "connect" - assert spans[1]["description"] == "BEGIN;" - assert spans[2]["description"] == "SELECT * FROM users WHERE dob > $1" - assert spans[3]["description"] == "COMMIT;" + + connect_span = spans[0] + begin_span = spans[1] query_span = spans[2] + commit_span = spans[3] + + assert connect_span["description"] == "connect" + assert begin_span["description"] == "BEGIN;" + assert query_span["description"] == "SELECT * FROM users WHERE dob > $1" + assert commit_span["description"] == "COMMIT;" _assert_query_source(query_span, span_streaming, "test_query_source_cursor")