diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index 9b37119564..5b0d75ee1a 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -143,8 +143,17 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": params_list = args[2] if len(args) > 2 else None with _record(None, query, params_list, executemany=executemany) as span: _set_db_data(span, args[0]) + res = await f(*args, **kwargs) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + if not isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + return res return _inner @@ -163,8 +172,17 @@ def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807 executemany=False, ) as span: _set_db_data(span, args[0]) + res = f(*args, **kwargs) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + if not isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + return res return _inner diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index 35195e757b..4c4fa7061b 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -657,7 +657,12 @@ async def test_query_source_enabled( @pytest.mark.asyncio @pytest.mark.parametrize("span_streaming", [True, False]) -async def test_query_source(sentry_init, capture_events, capture_items, span_streaming): +async def test_query_source_with_module_in_search_path( + sentry_init, capture_events, capture_items, span_streaming +): + """ + Test that query source is relative to the path of the module it ran in + """ sentry_init( integrations=[AsyncPGIntegration()], traces_sample_rate=1.0, @@ -668,13 +673,16 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str }, ) + from asyncpg_helpers.helpers import execute_query_in_connection + if span_streaming: items = capture_items("span") with sentry_sdk.traces.start_span(name="test_transaction"): conn: Connection = await connect(PG_CONNECTION_URI) - await conn.execute( + await execute_query_in_connection( "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + conn, ) await conn.close() @@ -698,13 +706,15 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str with start_transaction(name="test_transaction", sampled=True): conn: Connection = await connect(PG_CONNECTION_URI) - await conn.execute( + await execute_query_in_connection( "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + conn, ) await conn.close() (event,) = events + span = event["spans"][-1] assert span["description"].startswith("INSERT INTO") data = span.get("data", {}) @@ -719,61 +729,10 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str assert type(data.get(lineno_key)) == int assert data.get(lineno_key) > 0 - assert ( - data.get(SPANDATA.CODE_NAMESPACE) == "tests.integrations.asyncpg.test_asyncpg" - ) - assert data.get(filepath_key).endswith("tests/integrations/asyncpg/test_asyncpg.py") - - is_relative_path = data.get(filepath_key)[0] != os.sep - assert is_relative_path - - assert data.get(SPANDATA.CODE_FUNCTION) == "test_query_source" - - -@pytest.mark.asyncio -async def test_query_source_with_module_in_search_path(sentry_init, capture_events): - """ - Test that query source is relative to the path of the module it ran in - """ - sentry_init( - integrations=[AsyncPGIntegration()], - traces_sample_rate=1.0, - enable_db_query_source=True, - db_query_source_threshold_ms=0, - ) - - events = capture_events() - - from asyncpg_helpers.helpers import execute_query_in_connection - - with start_transaction(name="test_transaction", sampled=True): - conn: Connection = await connect(PG_CONNECTION_URI) - - await execute_query_in_connection( - "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", - conn, - ) - - await conn.close() - - (event,) = events - - span = event["spans"][-1] - assert span["description"].startswith("INSERT INTO") - - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data - - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 + assert data.get(filepath_key) == "asyncpg_helpers/helpers.py" assert data.get(SPANDATA.CODE_NAMESPACE) == "asyncpg_helpers.helpers" - assert data.get(SPANDATA.CODE_FILEPATH) == "asyncpg_helpers/helpers.py" - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + is_relative_path = data.get(filepath_key)[0] != os.sep assert is_relative_path assert data.get(SPANDATA.CODE_FUNCTION) == "execute_query_in_connection" @@ -1102,3 +1061,260 @@ def before_send_transaction(event, hint): assert len(spans) == 1 assert spans[0]["description"] == "filtered" + + +def _assert_query_source(span, span_streaming, expected_function): + if span_streaming: + data = span.get("attributes", {}) + lineno_key = "code.line.number" + filepath_key = "code.file.path" + else: + data = span.get("data", {}) + lineno_key = SPANDATA.CODE_LINENO + filepath_key = SPANDATA.CODE_FILEPATH + + assert lineno_key in data + assert filepath_key in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FUNCTION in data + + assert type(data.get(lineno_key)) == int + assert data.get(lineno_key) > 0 + assert data[SPANDATA.CODE_NAMESPACE] == "tests.integrations.asyncpg.test_asyncpg" + assert data.get(filepath_key).endswith("tests/integrations/asyncpg/test_asyncpg.py") + assert data.get(filepath_key)[0] != os.sep + assert data[SPANDATA.CODE_FUNCTION] == expected_function + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_execute( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + "Alice", + "pw", + datetime.date(1990, 12, 25), + ) + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 3 + + connect_span = spans[0] + query_span = spans[1] + segment = spans[2] + + assert connect_span["name"] == "connect" + assert query_span["name"].startswith("INSERT INTO") + assert segment["name"] == "test_transaction" + assert segment["is_segment"] is True + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + "Alice", + "pw", + datetime.date(1990, 12, 25), + ) + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 2 + assert spans[0]["description"] == "connect" + assert spans[1]["description"].startswith("INSERT INTO") + query_span = spans[1] + + _assert_query_source(query_span, span_streaming, "test_query_source_execute") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_executemany( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [("Bob", "secret_pw", datetime.date(1984, 3, 1))], + ) + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 3 + + connect_span = spans[0] + query_span = spans[1] + segment = spans[2] + + assert connect_span["name"] == "connect" + assert query_span["name"].startswith("INSERT INTO") + assert segment["name"] == "test_transaction" + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [("Bob", "secret_pw", datetime.date(1984, 3, 1))], + ) + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 2 + assert spans[0]["description"] == "connect" + assert spans[1]["description"].startswith("INSERT INTO") + query_span = spans[1] + + _assert_query_source(query_span, span_streaming, "test_query_source_executemany") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_prepare( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.prepare("SELECT * FROM users WHERE name = $1") + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 3 + connect_span = spans[0] + query_span = spans[1] + segment = spans[2] + + assert connect_span["name"] == "connect" + assert query_span["name"] == "SELECT * FROM users WHERE name = $1" + assert segment["name"] == "test_transaction" + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.prepare("SELECT * FROM users WHERE name = $1") + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 2 + assert spans[0]["description"] == "connect" + assert spans[1]["description"] == "SELECT * FROM users WHERE name = $1" + query_span = spans[1] + + _assert_query_source(query_span, span_streaming, "test_query_source_prepare") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_cursor( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + async with conn.transaction(): + async for _ in conn.cursor( + "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) + ): + pass + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 5 + connect_span = spans[0] + begin_span = spans[1] + query_span = spans[2] + commit_span = spans[3] + segment = spans[4] + + assert connect_span["name"] == "connect" + assert begin_span["name"] == "BEGIN;" + assert query_span["name"] == "SELECT * FROM users WHERE dob > $1" + assert commit_span["name"] == "COMMIT;" + assert segment["name"] == "test_transaction" + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + async with conn.transaction(): + async for _ in conn.cursor( + "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) + ): + pass + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 4 + + connect_span = spans[0] + begin_span = spans[1] + query_span = spans[2] + commit_span = spans[3] + + assert connect_span["description"] == "connect" + assert begin_span["description"] == "BEGIN;" + assert query_span["description"] == "SELECT * FROM users WHERE dob > $1" + assert commit_span["description"] == "COMMIT;" + + _assert_query_source(query_span, span_streaming, "test_query_source_cursor")