From a13962689b0ae0d65ab92b34c904aeb4cb10ab1e Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 10:36:06 +0200 Subject: [PATCH 01/15] feat(sqlalchemy): Support span streaming --- sentry_sdk/integrations/sqlalchemy.py | 73 +- tests/integrations/django/asgi/test_asgi.py | 3 +- .../sqlalchemy/test_sqlalchemy.py | 1184 ++++++++++++----- .../integrations/threading/test_threading.py | 12 +- 4 files changed, 915 insertions(+), 357 deletions(-) diff --git a/sentry_sdk/integrations/sqlalchemy.py b/sentry_sdk/integrations/sqlalchemy.py index a4354f4228..2706b2c595 100644 --- a/sentry_sdk/integrations/sqlalchemy.py +++ b/sentry_sdk/integrations/sqlalchemy.py @@ -6,6 +6,7 @@ ensure_integration_enabled, parse_version, ) +from sentry_sdk.traces import StreamedSpan, SpanStatus try: from sqlalchemy.engine import Engine # type: ignore @@ -20,6 +21,7 @@ from typing import Any from typing import ContextManager from typing import Optional + from typing import Union from sentry_sdk.tracing import Span @@ -96,7 +98,10 @@ def _handle_error(context: "Any", *args: "Any") -> None: span: "Optional[Span]" = getattr(execution_context, "_sentry_sql_span", None) if span is not None: - span.set_status(SPANSTATUS.INTERNAL_ERROR) + if isinstance(span, StreamedSpan): + span.status = SpanStatus.ERROR + else: + span.set_status(SPANSTATUS.INTERNAL_ERROR) # _after_cursor_execute does not get called for crashing SQL stmts. Judging # from SQLAlchemy codebase it does seem like any error coming into this @@ -132,29 +137,53 @@ def _get_db_system(name: str) -> "Optional[str]": return None -def _set_db_data(span: "Span", conn: "Any") -> None: +def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None: db_system = _get_db_system(conn.engine.name) - if db_system is not None: - span.set_data(SPANDATA.DB_SYSTEM, db_system) - - try: - driver = conn.dialect.driver - if driver: - span.set_data(SPANDATA.DB_DRIVER_NAME, driver) - except Exception: - pass + if isinstance(span, StreamedSpan): + if db_system is not None: + span.set_attribute(SPANDATA.DB_SYSTEM, db_system) + + try: + driver = conn.dialect.driver + if driver: + span.set_attribute(SPANDATA.DB_DRIVER_NAME, driver) + except Exception: + pass + else: + if db_system is not None: + span.set_data(SPANDATA.DB_SYSTEM, db_system) + + try: + driver = conn.dialect.driver + if driver: + span.set_data(SPANDATA.DB_DRIVER_NAME, driver) + except Exception: + pass if conn.engine.url is None: return - db_name = conn.engine.url.database - if db_name is not None: - span.set_data(SPANDATA.DB_NAME, db_name) - - server_address = conn.engine.url.host - if server_address is not None: - span.set_data(SPANDATA.SERVER_ADDRESS, server_address) - - server_port = conn.engine.url.port - if server_port is not None: - span.set_data(SPANDATA.SERVER_PORT, server_port) + if isinstance(span, StreamedSpan): + db_name = conn.engine.url.database + if db_name is not None: + span.set_attribute(SPANDATA.DB_NAME, db_name) + + server_address = conn.engine.url.host + if server_address is not None: + span.set_attribute(SPANDATA.SERVER_ADDRESS, server_address) + + server_port = conn.engine.url.port + if server_port is not None: + span.set_attribute(SPANDATA.SERVER_PORT, server_port) + else: + db_name = conn.engine.url.database + if db_name is not None: + span.set_data(SPANDATA.DB_NAME, db_name) + + server_address = conn.engine.url.host + if server_address is not None: + span.set_data(SPANDATA.SERVER_ADDRESS, server_address) + + server_port = conn.engine.url.port + if server_port is not None: + span.set_data(SPANDATA.SERVER_PORT, server_port) diff --git a/tests/integrations/django/asgi/test_asgi.py b/tests/integrations/django/asgi/test_asgi.py index f956d12f82..174ceb77bb 100644 --- a/tests/integrations/django/asgi/test_asgi.py +++ b/tests/integrations/django/asgi/test_asgi.py @@ -292,8 +292,9 @@ async def test_async_middleware_spans( (transaction,) = events + assert transaction["type"] == "transaction" assert ( - render_span_tree(transaction) + render_span_tree(transaction["spans"], transaction["contexts"]["trace"]) == """\ - op="http.server": description=null - op="event.django": description="django.db.reset_queries" diff --git a/tests/integrations/sqlalchemy/test_sqlalchemy.py b/tests/integrations/sqlalchemy/test_sqlalchemy.py index 7c7ce3d845..55eb781acb 100644 --- a/tests/integrations/sqlalchemy/test_sqlalchemy.py +++ b/tests/integrations/sqlalchemy/test_sqlalchemy.py @@ -18,11 +18,20 @@ from sentry_sdk.utils import json_dumps -def test_orm_queries(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_orm_queries( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( - integrations=[SqlalchemyIntegration()], _experiments={"record_sql_params": True} + integrations=[SqlalchemyIntegration()], + _experiments={ + "record_sql_params": True, + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() Base = declarative_base() # noqa: N806 @@ -53,9 +62,14 @@ class Address(Base): assert session.query(Person).first() == bob - capture_message("hi") - - (event,) = events + if span_streaming: + items = capture_items("event", "transaction", "span") + capture_message("hi") + (event,) = (item.payload for item in items if item.type == "event") + else: + events = capture_events() + capture_message("hi") + (event,) = events for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -78,13 +92,22 @@ class Address(Base): ] -def test_transactions(sentry_init, capture_events, render_span_tree): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_transactions( + sentry_init, + capture_events, + capture_items, + render_span_tree, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], - _experiments={"record_sql_params": True}, + _experiments={ + "record_sql_params": True, + "trace_lifecycle": "stream" if span_streaming else "static", + }, traces_sample_rate=1.0, ) - events = capture_events() Base = declarative_base() # noqa: N806 @@ -110,31 +133,79 @@ class Address(Base): Session = sessionmaker(bind=engine) # noqa: N806 session = Session() - with start_transaction(name="test_transaction", sampled=True): - with session.begin_nested(): - session.query(Person).first() - - for _ in range(2): - with pytest.raises(IntegrityError): - with session.begin_nested(): - session.add(Person(id=1, name="bob")) - session.add(Person(id=1, name="bob")) - - with session.begin_nested(): - session.query(Person).first() - - (event,) = events - - for span in event["spans"]: - assert span["data"][SPANDATA.DB_SYSTEM] == "sqlite" - assert span["data"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" - assert span["data"][SPANDATA.DB_NAME] == ":memory:" - assert SPANDATA.SERVER_ADDRESS not in span["data"] - assert SPANDATA.SERVER_PORT not in span["data"] - - assert ( - render_span_tree(event) - == """\ + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent"): + with session.begin_nested(): + session.query(Person).first() + + for _ in range(2): + with pytest.raises(IntegrityError): + with session.begin_nested(): + session.add(Person(id=1, name="bob")) + session.add(Person(id=1, name="bob")) + + with session.begin_nested(): + session.query(Person).first() + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + sqlalchemy_spans = [ + span + for span in spans + if span["attributes"]["sentry.origin"] == "auto.db.sqlalchemy" + ] + for span in sqlalchemy_spans: + assert span["attributes"][SPANDATA.DB_SYSTEM] == "sqlite" + assert span["attributes"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" + assert span["attributes"][SPANDATA.DB_NAME] == ":memory:" + assert SPANDATA.SERVER_PORT not in span["attributes"] + + assert ( + render_span_tree(spans) + == """\ +- sentry.op=null: name="custom parent" + - sentry.op="db": name="SAVEPOINT sa_savepoint_1" + - sentry.op="db": name="SELECT person.id AS person_id, person.name AS person_name \\nFROM person\\n LIMIT ? OFFSET ?" + - sentry.op="db": name="RELEASE SAVEPOINT sa_savepoint_1" + - sentry.op="db": name="SAVEPOINT sa_savepoint_2" + - sentry.op="db": name="INSERT INTO person (id, name) VALUES (?, ?)" + - sentry.op="db": name="ROLLBACK TO SAVEPOINT sa_savepoint_2" + - sentry.op="db": name="SAVEPOINT sa_savepoint_3" + - sentry.op="db": name="INSERT INTO person (id, name) VALUES (?, ?)" + - sentry.op="db": name="ROLLBACK TO SAVEPOINT sa_savepoint_3" + - sentry.op="db": name="SAVEPOINT sa_savepoint_4" + - sentry.op="db": name="SELECT person.id AS person_id, person.name AS person_name \\nFROM person\\n LIMIT ? OFFSET ?" + - sentry.op="db": name="RELEASE SAVEPOINT sa_savepoint_4"\ +""" + ) + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + with session.begin_nested(): + session.query(Person).first() + + for _ in range(2): + with pytest.raises(IntegrityError): + with session.begin_nested(): + session.add(Person(id=1, name="bob")) + session.add(Person(id=1, name="bob")) + + with session.begin_nested(): + session.query(Person).first() + + (event,) = events + + for span in event["spans"]: + assert span["data"][SPANDATA.DB_SYSTEM] == "sqlite" + assert span["data"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" + assert span["data"][SPANDATA.DB_NAME] == ":memory:" + assert SPANDATA.SERVER_ADDRESS not in span["data"] + assert SPANDATA.SERVER_PORT not in span["data"] + + assert ( + render_span_tree(event["spans"], event["contexts"]["trace"]) + == """\ - op=null: description=null - op="db": description="SAVEPOINT sa_savepoint_1" - op="db": description="SELECT person.id AS person_id, person.name AS person_name \\nFROM person\\n LIMIT ? OFFSET ?" @@ -149,16 +220,24 @@ class Address(Base): - op="db": description="SELECT person.id AS person_id, person.name AS person_name \\nFROM person\\n LIMIT ? OFFSET ?" - op="db": description="RELEASE SAVEPOINT sa_savepoint_4"\ """ - ) + ) -def test_transactions_no_engine_url(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_transactions_no_engine_url( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], - _experiments={"record_sql_params": True}, + _experiments={ + "record_sql_params": True, + "trace_lifecycle": "stream" if span_streaming else "static", + }, traces_sample_rate=1.0, ) - events = capture_events() Base = declarative_base() # noqa: N806 @@ -185,47 +264,98 @@ class Address(Base): Session = sessionmaker(bind=engine) # noqa: N806 session = Session() - with start_transaction(name="test_transaction", sampled=True): - with session.begin_nested(): - session.query(Person).first() - - for _ in range(2): - with pytest.raises(IntegrityError): - with session.begin_nested(): - session.add(Person(id=1, name="bob")) - session.add(Person(id=1, name="bob")) - - with session.begin_nested(): - session.query(Person).first() - - (event,) = events - - for span in event["spans"]: - assert span["data"][SPANDATA.DB_SYSTEM] == "sqlite" - assert span["data"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" - assert SPANDATA.DB_NAME not in span["data"] - assert SPANDATA.SERVER_ADDRESS not in span["data"] - assert SPANDATA.SERVER_PORT not in span["data"] - - -def test_long_sql_query_preserved(sentry_init, capture_events): + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent"): + with session.begin_nested(): + session.query(Person).first() + + for _ in range(2): + with pytest.raises(IntegrityError): + with session.begin_nested(): + session.add(Person(id=1, name="bob")) + session.add(Person(id=1, name="bob")) + + with session.begin_nested(): + session.query(Person).first() + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + sqlalchemy_spans = [ + span + for span in spans + if span["attributes"]["sentry.origin"] == "auto.db.sqlalchemy" + ] + for span in sqlalchemy_spans: + assert span["attributes"][SPANDATA.DB_SYSTEM] == "sqlite" + assert span["attributes"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" + assert SPANDATA.DB_NAME not in span["attributes"] + assert SPANDATA.SERVER_PORT not in span["attributes"] + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + with session.begin_nested(): + session.query(Person).first() + + for _ in range(2): + with pytest.raises(IntegrityError): + with session.begin_nested(): + session.add(Person(id=1, name="bob")) + session.add(Person(id=1, name="bob")) + + with session.begin_nested(): + session.query(Person).first() + + (event,) = events + for span in event["spans"]: + assert span["data"][SPANDATA.DB_SYSTEM] == "sqlite" + assert span["data"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" + assert SPANDATA.DB_NAME not in span["data"] + assert SPANDATA.SERVER_ADDRESS not in span["data"] + assert SPANDATA.SERVER_PORT not in span["data"] + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_long_sql_query_preserved( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( traces_sample_rate=1, integrations=[SqlalchemyIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False} ) - with start_transaction(name="test"): - with engine.connect() as con: - con.execute(text(" UNION ".join("SELECT {}".format(i) for i in range(100)))) + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent"): + with engine.connect() as con: + con.execute( + text(" UNION ".join("SELECT {}".format(i) for i in range(100))) + ) - (event,) = events - description = event["spans"][0]["description"] - assert description.startswith("SELECT 0 UNION SELECT 1") - assert description.endswith("SELECT 98 UNION SELECT 99") + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + name = spans[0]["name"] + assert name.startswith("SELECT 0 UNION SELECT 1") + assert name.endswith("SELECT 98 UNION SELECT 99") + else: + events = capture_events() + with start_transaction(name="test"): + with engine.connect() as con: + con.execute( + text(" UNION ".join("SELECT {}".format(i) for i in range(100))) + ) + + (event,) = events + description = event["spans"][0]["description"] + assert description.startswith("SELECT 0 UNION SELECT 1") + assert description.endswith("SELECT 98 UNION SELECT 99") def test_large_event_not_truncated(sentry_init, capture_events): @@ -286,9 +416,14 @@ def processor(event, hint): } -def test_engine_name_not_string(sentry_init): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_engine_name_not_string( + sentry_init, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) engine = create_engine( @@ -300,171 +435,331 @@ def test_engine_name_not_string(sentry_init): con.execute(text("SELECT 0")) -def test_query_source_disabled(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source_disabled( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_options = { "integrations": [SqlalchemyIntegration()], "traces_sample_rate": 1.0, "enable_db_query_source": False, "db_query_source_threshold_ms": 0, + "_experiments": {"trace_lifecycle": "stream" if span_streaming else "static"}, } sentry_init(**sentry_options) - events = capture_events() + if span_streaming: + items = capture_items("span") - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - bob = Person(name="Bob") - session.add(bob) + bob = Person(name="Bob") + session.add(bob) - assert session.query(Person).first() == bob + assert session.query(Person).first() == bob - (event,) = events + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER not in attributes + assert SPANDATA.CODE_FILE_PATH not in attributes + assert SPANDATA.CODE_FUNCTION_NAME not in attributes + break + else: + raise AssertionError("No db span found") - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO not in data - assert SPANDATA.CODE_NAMESPACE not in data - assert SPANDATA.CODE_FILEPATH not in data - assert SPANDATA.CODE_FUNCTION not in data - break else: - raise AssertionError("No db span found") + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 + + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) + + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + assert session.query(Person).first() == bob + + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO not in data + assert SPANDATA.CODE_NAMESPACE not in data + assert SPANDATA.CODE_FILEPATH not in data + assert SPANDATA.CODE_FUNCTION not in data + break + else: + raise AssertionError("No db span found") @pytest.mark.parametrize("enable_db_query_source", [None, True]) -def test_query_source_enabled(sentry_init, capture_events, enable_db_query_source): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source_enabled( + sentry_init, + capture_events, + capture_items, + enable_db_query_source, + span_streaming, +): sentry_options = { "integrations": [SqlalchemyIntegration()], "traces_sample_rate": 1.0, "db_query_source_threshold_ms": 0, + "_experiments": {"trace_lifecycle": "stream" if span_streaming else "static"}, } + if enable_db_query_source is not None: sentry_options["enable_db_query_source"] = enable_db_query_source sentry_init(**sentry_options) - events = capture_events() + if span_streaming: + items = capture_items("span") - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) - - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - bob = Person(name="Bob") - session.add(bob) + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - assert session.query(Person).first() == bob + bob = Person(name="Bob") + session.add(bob) - (event,) = events + assert session.query(Person).first() == bob - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data - break + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER in attributes + assert SPANDATA.CODE_FILE_PATH in attributes + assert SPANDATA.CODE_FUNCTION_NAME in attributes + break + else: + raise AssertionError("No db span found") else: - raise AssertionError("No db span found") + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 -def test_query_source(sentry_init, capture_events): + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) + + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + assert session.query(Person).first() == bob + + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data + break + else: + raise AssertionError("No db span found") + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], traces_sample_rate=1.0, enable_db_query_source=True, db_query_source_threshold_ms=0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() + if span_streaming: + items = capture_items("span") - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + bob = Person(name="Bob") + session.add(bob) - bob = Person(name="Bob") - session.add(bob) + assert session.query(Person).first() == bob - assert session.query(Person).first() == bob + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER in attributes + assert SPANDATA.CODE_FILE_PATH in attributes + assert SPANDATA.CODE_FUNCTION_NAME in attributes + + assert type(attributes.get(SPANDATA.CODE_LINE_NUMBER)) == int + assert attributes.get(SPANDATA.CODE_LINE_NUMBER) > 0 + assert attributes.get(SPANDATA.CODE_FILE_PATH).endswith( + "tests/integrations/sqlalchemy/test_sqlalchemy.py" + ) - (event,) = events + is_relative_path = attributes.get(SPANDATA.CODE_FILE_PATH)[0] != os.sep + assert is_relative_path - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data - - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 - assert ( - data.get(SPANDATA.CODE_NAMESPACE) - == "tests.integrations.sqlalchemy.test_sqlalchemy" - ) - assert data.get(SPANDATA.CODE_FILEPATH).endswith( - "tests/integrations/sqlalchemy/test_sqlalchemy.py" + assert ( + attributes.get(SPANDATA.CODE_FUNCTION_NAME) == "test_query_source" + ) + break + else: + raise AssertionError("No db span found") + else: + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 + + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) + + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} ) + Base.metadata.create_all(engine) - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep - assert is_relative_path + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - assert data.get(SPANDATA.CODE_FUNCTION) == "test_query_source" - break - else: - raise AssertionError("No db span found") + bob = Person(name="Bob") + session.add(bob) + + assert session.query(Person).first() == bob + + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data + + assert type(data.get(SPANDATA.CODE_LINENO)) == int + assert data.get(SPANDATA.CODE_LINENO) > 0 + assert ( + data.get(SPANDATA.CODE_NAMESPACE) + == "tests.integrations.sqlalchemy.test_sqlalchemy" + ) + assert data.get(SPANDATA.CODE_FILEPATH).endswith( + "tests/integrations/sqlalchemy/test_sqlalchemy.py" + ) + + is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + assert is_relative_path + + assert data.get(SPANDATA.CODE_FUNCTION) == "test_query_source" + break + else: + raise AssertionError("No db span found") -def test_query_source_with_module_in_search_path(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source_with_module_in_search_path( + sentry_init, + capture_events, + capture_items, + span_streaming, +): """ Test that query source is relative to the path of the module it ran in """ @@ -473,227 +768,456 @@ def test_query_source_with_module_in_search_path(sentry_init, capture_events): traces_sample_rate=1.0, enable_db_query_source=True, db_query_source_threshold_ms=0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() from sqlalchemy_helpers.helpers import ( add_model_to_session, query_first_model_from_session, ) - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 - - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - bob = Person(name="Bob") + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - add_model_to_session(bob, session) + bob = Person(name="Bob") - assert query_first_model_from_session(Person, session) == bob + add_model_to_session(bob, session) - (event,) = events + assert query_first_model_from_session(Person, session) == bob - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data + assert SPANDATA.CODE_LINE_NUMBER in attributes + assert SPANDATA.CODE_FILE_PATH in attributes + assert SPANDATA.CODE_FUNCTION_NAME in attributes - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 - assert data.get(SPANDATA.CODE_NAMESPACE) == "sqlalchemy_helpers.helpers" - assert data.get(SPANDATA.CODE_FILEPATH) == "sqlalchemy_helpers/helpers.py" + assert type(attributes.get(SPANDATA.CODE_LINE_NUMBER)) == int + assert attributes.get(SPANDATA.CODE_LINE_NUMBER) > 0 + assert ( + attributes.get(SPANDATA.CODE_FILE_PATH) + == "sqlalchemy_helpers/helpers.py" + ) - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep - assert is_relative_path + is_relative_path = attributes.get(SPANDATA.CODE_FILE_PATH)[0] != os.sep + assert is_relative_path - assert data.get(SPANDATA.CODE_FUNCTION) == "query_first_model_from_session" - break + assert ( + attributes.get(SPANDATA.CODE_FUNCTION_NAME) + == "query_first_model_from_session" + ) + break + else: + raise AssertionError("No db span found") else: - raise AssertionError("No db span found") + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) -def test_no_query_source_if_duration_too_short(sentry_init, capture_events): - sentry_init( - integrations=[SqlalchemyIntegration()], - traces_sample_rate=1.0, - enable_db_query_source=True, - db_query_source_threshold_ms=100, - ) - events = capture_events() + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) + bob = Person(name="Bob") - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + add_model_to_session(bob, session) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + assert query_first_model_from_session(Person, session) == bob - bob = Person(name="Bob") - session.add(bob) + (event,) = events - class fake_record_sql_queries: # noqa: N801 - def __init__(self, *args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: - self.span = span + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) - self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) - self.span.timestamp = datetime(2024, 1, 1, microsecond=99999) + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data - def __enter__(self): - return self.span + assert type(data.get(SPANDATA.CODE_LINENO)) == int + assert data.get(SPANDATA.CODE_LINENO) > 0 + assert data.get(SPANDATA.CODE_NAMESPACE) == "sqlalchemy_helpers.helpers" + assert ( + data.get(SPANDATA.CODE_FILEPATH) == "sqlalchemy_helpers/helpers.py" + ) - def __exit__(self, type, value, traceback): - pass + is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + assert is_relative_path - with mock.patch( - "sentry_sdk.integrations.sqlalchemy.record_sql_queries", - fake_record_sql_queries, - ): - assert session.query(Person).first() == bob + assert ( + data.get(SPANDATA.CODE_FUNCTION) == "query_first_model_from_session" + ) + break + else: + raise AssertionError("No db span found") + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_no_query_source_if_duration_too_short( + sentry_init, + capture_events, + capture_items, + span_streaming, +): + sentry_init( + integrations=[SqlalchemyIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=100, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) + if span_streaming: + items = capture_items("span") - (event,) = events + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - assert SPANDATA.CODE_LINENO not in data - assert SPANDATA.CODE_NAMESPACE not in data - assert SPANDATA.CODE_FILEPATH not in data - assert SPANDATA.CODE_FUNCTION not in data + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + class fake_record_sql_queries: # noqa: N801 + def __init__(self, *args, **kwargs): + with record_sql_queries(*args, **kwargs) as span: + self.span = span + + if span_streaming: + self.span._start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span._timestamp = datetime(2024, 1, 1, microsecond=99999) + else: + self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span.timestamp = datetime(2024, 1, 1, microsecond=99999) + + def __enter__(self): + return self.span + + def __exit__(self, type, value, traceback): + pass + + with mock.patch( + "sentry_sdk.integrations.sqlalchemy.record_sql_queries", + fake_record_sql_queries, + ): + assert session.query(Person).first() == bob + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER not in attributes + assert SPANDATA.CODE_FILE_PATH not in attributes + assert SPANDATA.CODE_FUNCTION_NAME not in attributes + break + else: + raise AssertionError("No db span found") - break else: - raise AssertionError("No db span found") + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 -def test_query_source_if_duration_over_threshold(sentry_init, capture_events): + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) + + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + class fake_record_sql_queries: # noqa: N801 + def __init__(self, *args, **kwargs): + with record_sql_queries(*args, **kwargs) as span: + self.span = span + + if span_streaming: + self.span._start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span._timestamp = datetime(2024, 1, 1, microsecond=99999) + else: + self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span.timestamp = datetime(2024, 1, 1, microsecond=99999) + + def __enter__(self): + return self.span + + def __exit__(self, type, value, traceback): + pass + + with mock.patch( + "sentry_sdk.integrations.sqlalchemy.record_sql_queries", + fake_record_sql_queries, + ): + assert session.query(Person).first() == bob + + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO not in data + assert SPANDATA.CODE_NAMESPACE not in data + assert SPANDATA.CODE_FILEPATH not in data + assert SPANDATA.CODE_FUNCTION not in data + break + else: + raise AssertionError("No db span found") + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source_if_duration_over_threshold( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], traces_sample_rate=1.0, enable_db_query_source=True, db_query_source_threshold_ms=100, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + if span_streaming: + items = capture_items("span") - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + class fake_record_sql_queries: # noqa: N801 + def __init__(self, *args, **kwargs): + with record_sql_queries(*args, **kwargs) as span: + self.span = span + + self.span._start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span._timestamp = datetime(2024, 1, 1, microsecond=101000) + + def __enter__(self): + return self.span + + def __exit__(self, type, value, traceback): + pass + + with mock.patch( + "sentry_sdk.integrations.sqlalchemy.record_sql_queries", + fake_record_sql_queries, + ): + assert session.query(Person).first() == bob + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER in attributes + assert SPANDATA.CODE_FILE_PATH in attributes + assert SPANDATA.CODE_FUNCTION_NAME in attributes + + assert type(attributes.get(SPANDATA.CODE_LINE_NUMBER)) == int + assert attributes.get(SPANDATA.CODE_LINE_NUMBER) > 0 + assert attributes.get(SPANDATA.CODE_FILE_PATH).endswith( + "tests/integrations/sqlalchemy/test_sqlalchemy.py" + ) - bob = Person(name="Bob") - session.add(bob) + is_relative_path = attributes.get(SPANDATA.CODE_FILE_PATH)[0] != os.sep + assert is_relative_path - class fake_record_sql_queries: # noqa: N801 - def __init__(self, *args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: - self.span = span + assert ( + attributes.get(SPANDATA.CODE_FUNCTION_NAME) + == "test_query_source_if_duration_over_threshold" + ) + break + else: + raise AssertionError("No db span found") + else: + events = capture_events() - self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) - self.span.timestamp = datetime(2024, 1, 1, microsecond=101000) + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 - def __enter__(self): - return self.span + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - def __exit__(self, type, value, traceback): - pass + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - with mock.patch( - "sentry_sdk.integrations.sqlalchemy.record_sql_queries", - fake_record_sql_queries, - ): - assert session.query(Person).first() == bob + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - (event,) = events + bob = Person(name="Bob") + session.add(bob) - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data - - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 - assert ( - data.get(SPANDATA.CODE_NAMESPACE) - == "tests.integrations.sqlalchemy.test_sqlalchemy" - ) - assert data.get(SPANDATA.CODE_FILEPATH).endswith( - "tests/integrations/sqlalchemy/test_sqlalchemy.py" - ) + class fake_record_sql_queries: # noqa: N801 + def __init__(self, *args, **kwargs): + with record_sql_queries(*args, **kwargs) as span: + self.span = span - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep - assert is_relative_path + self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span.timestamp = datetime(2024, 1, 1, microsecond=101000) - assert ( - data.get(SPANDATA.CODE_FUNCTION) - == "test_query_source_if_duration_over_threshold" - ) - break - else: - raise AssertionError("No db span found") + def __enter__(self): + return self.span + + def __exit__(self, type, value, traceback): + pass + with mock.patch( + "sentry_sdk.integrations.sqlalchemy.record_sql_queries", + fake_record_sql_queries, + ): + assert session.query(Person).first() == bob -def test_span_origin(sentry_init, capture_events): + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data + + assert type(data.get(SPANDATA.CODE_LINENO)) == int + assert data.get(SPANDATA.CODE_LINENO) > 0 + assert ( + data.get(SPANDATA.CODE_NAMESPACE) + == "tests.integrations.sqlalchemy.test_sqlalchemy" + ) + assert data.get(SPANDATA.CODE_FILEPATH).endswith( + "tests/integrations/sqlalchemy/test_sqlalchemy.py" + ) + + is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + assert is_relative_path + + assert ( + data.get(SPANDATA.CODE_FUNCTION) + == "test_query_source_if_duration_over_threshold" + ) + break + else: + raise AssertionError("No db span found") + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_span_origin( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False} ) - with start_transaction(name="foo"): - with engine.connect() as con: - con.execute(text("SELECT 0")) + if span_streaming: + items = capture_items("span", "transaction") + with sentry_sdk.traces.start_span(name="custom parent"): + with engine.connect() as con: + con.execute(text("SELECT 0")) - (event,) = events + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + + assert spans[0]["attributes"]["sentry.origin"] == "auto.db.sqlalchemy" + assert spans[1]["attributes"]["sentry.origin"] == "manual" + else: + events = capture_events() + with start_transaction(name="foo"): + with engine.connect() as con: + con.execute(text("SELECT 0")) + + (event,) = events - assert event["contexts"]["trace"]["origin"] == "manual" - assert event["spans"][0]["origin"] == "auto.db.sqlalchemy" + assert event["contexts"]["trace"]["origin"] == "manual" + assert event["spans"][0]["origin"] == "auto.db.sqlalchemy" diff --git a/tests/integrations/threading/test_threading.py b/tests/integrations/threading/test_threading.py index 788fb24fdf..9c06ce24a7 100644 --- a/tests/integrations/threading/test_threading.py +++ b/tests/integrations/threading/test_threading.py @@ -254,7 +254,8 @@ def do_some_work(number): # Free-threaded builds set thread_inherit_context to True, otherwise thread_inherit_context is False if propagate_scope or getattr(sys.flags, "thread_inherit_context", None): - assert render_span_tree(event) == dedent( + assert event["type"] == "transaction" + assert render_span_tree(event["spans"], event["contexts"]["trace"]) == dedent( """\ - op="outer-trx": description=null - op="outer-submit-0": description="Thread: main" @@ -271,7 +272,8 @@ def do_some_work(number): ) elif not propagate_scope: - assert render_span_tree(event) == dedent( + assert event["type"] == "transaction" + assert render_span_tree(event["spans"], event["contexts"]["trace"]) == dedent( """\ - op="outer-trx": description=null - op="outer-submit-0": description="Thread: main" @@ -316,7 +318,8 @@ def do_some_work(number): # Free-threaded builds set thread_inherit_context to True, otherwise thread_inherit_context is False if propagate_scope or getattr(sys.flags, "thread_inherit_context", None): - assert render_span_tree(event) == dedent( + assert event["type"] == "transaction" + assert render_span_tree(event["spans"], event["contexts"]["trace"]) == dedent( """\ - op="outer-trx": description=null - op="outer-submit-0": description="Thread: main" @@ -333,7 +336,8 @@ def do_some_work(number): ) elif not propagate_scope: - assert render_span_tree(event) == dedent( + assert event["type"] == "transaction" + assert render_span_tree(event["spans"], event["contexts"]["trace"]) == dedent( """\ - op="outer-trx": description=null - op="outer-submit-0": description="Thread: main" From 38b5933749e40f01dc0ae7a2b5126ec0c2a20664 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 10:46:51 +0200 Subject: [PATCH 02/15] . --- tests/conftest.py | 29 ++++++++++++++----- .../sqlalchemy/test_sqlalchemy.py | 4 +-- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index ba28e4991c..c92180aefd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -476,23 +476,36 @@ def maybe_monkeypatched_threading(request): @pytest.fixture def render_span_tree(): - def inner(event): - assert event["type"] == "transaction" + def inner(spans, root_span=None): + streamed_spans = False + if root_span is None: + streamed_spans = True by_parent = {} - for span in event["spans"]: + print("spans are", spans) + for span in spans: + print(span) + if "parent_span_id" not in span: + root_span = span + continue + by_parent.setdefault(span["parent_span_id"], []).append(span) def render_span(span): - yield "- op={}: description={}".format( - json.dumps(span.get("op")), json.dumps(span.get("description")) - ) + if streamed_spans: + yield "- sentry.op={}: name={}".format( + json.dumps(span["attributes"].get("sentry.op")), + json.dumps(span["name"]), + ) + else: + yield "- op={}: description={}".format( + json.dumps(span.get("op")), json.dumps(span.get("description")) + ) + for subspan in by_parent.get(span["span_id"]) or (): for line in render_span(subspan): yield " {}".format(line) - root_span = event["contexts"]["trace"] - return "\n".join(render_span(root_span)) return inner diff --git a/tests/integrations/sqlalchemy/test_sqlalchemy.py b/tests/integrations/sqlalchemy/test_sqlalchemy.py index 55eb781acb..1d40b9baad 100644 --- a/tests/integrations/sqlalchemy/test_sqlalchemy.py +++ b/tests/integrations/sqlalchemy/test_sqlalchemy.py @@ -63,7 +63,7 @@ class Address(Base): assert session.query(Person).first() == bob if span_streaming: - items = capture_items("event", "transaction", "span") + items = capture_items("event") capture_message("hi") (event,) = (item.payload for item in items if item.type == "event") else: @@ -1201,7 +1201,7 @@ def test_span_origin( "sqlite:///:memory:", connect_args={"check_same_thread": False} ) if span_streaming: - items = capture_items("span", "transaction") + items = capture_items("span") with sentry_sdk.traces.start_span(name="custom parent"): with engine.connect() as con: con.execute(text("SELECT 0")) From 3d89b9e9db3153d639228b43ff1e5bee69a4e2fe Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 11:02:56 +0200 Subject: [PATCH 03/15] . --- tests/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index c92180aefd..6cd4f10c4a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -482,7 +482,6 @@ def inner(spans, root_span=None): streamed_spans = True by_parent = {} - print("spans are", spans) for span in spans: print(span) if "parent_span_id" not in span: From b1fa3b548f8b239534b4a54111fa069633a21188 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 11:03:47 +0200 Subject: [PATCH 04/15] . --- sentry_sdk/tracing_utils.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 18ab832ad7..aa428961c7 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -135,7 +135,8 @@ def record_sql_queries( span_origin: str = "manual", ) -> "Generator[sentry_sdk.tracing.Span, None, None]": # TODO: Bring back capturing of params by default - if sentry_sdk.get_client().options["_experiments"].get("record_sql_params", False): + client = sentry_sdk.get_client() + if client.options["_experiments"].get("record_sql_params", False): if not params_list or params_list == [None]: params_list = None @@ -160,14 +161,26 @@ def record_sql_queries( with capture_internal_exceptions(): sentry_sdk.add_breadcrumb(message=query, category="query", data=data) - with sentry_sdk.start_span( - op=OP.DB, - name=query, - origin=span_origin, - ) as span: - for k, v in data.items(): - span.set_data(k, v) - yield span + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=query, + attributes={ + "sentry.origin": span_origin, + "sentry.op": OP.DB, + }, + ) as span: + for k, v in data.items(): + span.set_attribute(k, v) + yield span + else: + with sentry_sdk.start_span( + op=OP.DB, + name=query, + origin=span_origin, + ) as span: + for k, v in data.items(): + span.set_data(k, v) + yield span def maybe_create_breadcrumbs_from_span( From 9929f556e09acf927d9bb85982e6fc1e13492d34 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 11:14:45 +0200 Subject: [PATCH 05/15] . --- tests/integrations/django/test_basic.py | 31 +++++++++++++++++++------ 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/tests/integrations/django/test_basic.py b/tests/integrations/django/test_basic.py index 1c6bb141bd..28c4dc54a2 100644 --- a/tests/integrations/django/test_basic.py +++ b/tests/integrations/django/test_basic.py @@ -596,7 +596,9 @@ def test_django_connect_trace(sentry_init, client, capture_events, render_span_t data = span.get("data") assert data.get(SPANDATA.DB_SYSTEM) == "postgresql" - assert '- op="db": description="connect"' in render_span_tree(event) + assert '- op="db": description="connect"' in render_span_tree( + event["spans"], event["contexts"]["trace"] + ) @pytest.mark.forked @@ -954,7 +956,9 @@ def test_render_spans(sentry_init, client, capture_events, render_span_tree): events = capture_events() client.get(url) transaction = events[0] - assert expected_line in render_span_tree(transaction) + assert expected_line in render_span_tree( + transaction["spans"], transaction["contexts"]["trace"] + ) @pytest.mark.skipif(DJANGO_VERSION < (1, 9), reason="Requires Django >= 1.9") @@ -1034,7 +1038,10 @@ def test_middleware_spans(sentry_init, client, capture_events, render_span_tree) message, transaction = events assert message["message"] == "hi" - assert render_span_tree(transaction) == EXPECTED_MIDDLEWARE_SPANS + assert ( + render_span_tree(transaction["spans"], transaction["contexts"]["trace"]) + == EXPECTED_MIDDLEWARE_SPANS + ) def test_middleware_spans_disabled(sentry_init, client, capture_events): @@ -1075,7 +1082,10 @@ def test_signals_spans(sentry_init, client, capture_events, render_span_tree): message, transaction = events assert message["message"] == "hi" - assert render_span_tree(transaction) == EXPECTED_SIGNALS_SPANS + assert ( + render_span_tree(transaction["spans"], transaction["contexts"]["trace"]) + == EXPECTED_SIGNALS_SPANS + ) assert transaction["spans"][0]["op"] == "event.django" assert transaction["spans"][0]["description"] == "django.db.reset_queries" @@ -1127,7 +1137,10 @@ def test_signals_spans_filtering(sentry_init, client, capture_events, render_spa (transaction,) = events - assert render_span_tree(transaction) == EXPECTED_SIGNALS_SPANS_FILTERED + assert ( + render_span_tree(transaction["spans"], transaction["contexts"]["trace"]) + == EXPECTED_SIGNALS_SPANS_FILTERED + ) assert transaction["spans"][0]["op"] == "event.django" assert transaction["spans"][0]["description"] == "django.db.reset_queries" @@ -1206,7 +1219,9 @@ def test_custom_urlconf_middleware( event = events.pop(0) assert event["transaction"] == "/custom/ok" if middleware_spans: - assert "custom_urlconf_middleware" in render_span_tree(event) + assert "custom_urlconf_middleware" in render_span_tree( + event["spans"], event["contexts"]["trace"] + ) _content, status, _headers = unpack_werkzeug_response(client.get("/custom/exc")) assert status.lower() == "500 internal server error" @@ -1216,7 +1231,9 @@ def test_custom_urlconf_middleware( assert error_event["exception"]["values"][-1]["mechanism"]["type"] == "django" assert transaction_event["transaction"] == "/custom/exc" if middleware_spans: - assert "custom_urlconf_middleware" in render_span_tree(transaction_event) + assert "custom_urlconf_middleware" in render_span_tree( + transaction_event["spans"], transaction_event["contexts"]["trace"] + ) settings.MIDDLEWARE.pop(0) From 75c0cc116aaa07f573d94021197478edf8600946 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 11:19:43 +0200 Subject: [PATCH 06/15] . --- sentry_sdk/tracing_utils.py | 4 ++-- tests/integrations/django/test_basic.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index aa428961c7..a71f2a4a39 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -133,7 +133,7 @@ def record_sql_queries( executemany: bool, record_cursor_repr: bool = False, span_origin: str = "manual", -) -> "Generator[sentry_sdk.tracing.Span, None, None]": +) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]": # TODO: Bring back capturing of params by default client = sentry_sdk.get_client() if client.options["_experiments"].get("record_sql_params", False): @@ -163,7 +163,7 @@ def record_sql_queries( if has_span_streaming_enabled(client.options): with sentry_sdk.traces.start_span( - name=query, + name="" if query is None else query, attributes={ "sentry.origin": span_origin, "sentry.op": OP.DB, diff --git a/tests/integrations/django/test_basic.py b/tests/integrations/django/test_basic.py index 28c4dc54a2..f14be92ceb 100644 --- a/tests/integrations/django/test_basic.py +++ b/tests/integrations/django/test_basic.py @@ -457,7 +457,7 @@ def test_response_trace(sentry_init, client, capture_events, render_span_tree): assert ( '- op="view.response.render": description="serialize response"' - in render_span_tree(events[0]) + in render_span_tree(events[0]["spans"], events[0]["contexts"]["trace"]) ) From ad4e14ded4a7bc5e707b472bd8a623606f0ae177 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 11:29:23 +0200 Subject: [PATCH 07/15] fix mypy --- sentry_sdk/tracing_utils.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index a71f2a4a39..1926e5488c 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -326,22 +326,36 @@ def add_source( span.set_attribute("code.function.name", frame.f_code.co_name) -def add_query_source(span: "sentry_sdk.tracing.Span") -> None: +def add_query_source( + span: "Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan]", +) -> None: """ Adds OTel compatible source code information to a database query span """ client = sentry_sdk.get_client() - if not client.is_active(): - return - if span.timestamp is None or span.start_timestamp is None: + if isinstance(span, LegacySpan): + if not client.is_active(): + return + + # In the StreamedSpan case, we need to add the extra span information before + # the span finishes, so it's expected that this will be None. In the LegacySpan case, + # it should already be finished. + if span.timestamp is None: + return + + if span.start_timestamp is None: return should_add_query_source = client.options.get("enable_db_query_source", True) if not should_add_query_source: return - duration = span.timestamp - span.start_timestamp + end_timestamp = ( + datetime.now(timezone.utc) if span.timestamp is None else span.timestamp + ) + + duration = end_timestamp - span.start_timestamp threshold = client.options.get("db_query_source_threshold_ms", 0) slow_query = duration / timedelta(milliseconds=1) > threshold From 2bf7c778dc9d247832315f73b8731fdc886683a0 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 11:40:00 +0200 Subject: [PATCH 08/15] . --- sentry_sdk/integrations/asyncpg.py | 24 +++++---- sentry_sdk/integrations/django/__init__.py | 22 +++++--- sentry_sdk/integrations/sqlalchemy.py | 61 ++++++++-------------- 3 files changed, 51 insertions(+), 56 deletions(-) diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index 29f3bad152..4f15c1e872 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -1,7 +1,7 @@ from __future__ import annotations import contextlib import re -from typing import Any, TypeVar, Callable, Awaitable, Iterator +from typing import Any, TypeVar, Callable, Awaitable, Iterator, Union import sentry_sdk from sentry_sdk.consts import OP, SPANDATA @@ -13,6 +13,7 @@ parse_version, capture_internal_exceptions, ) +from sentry_sdk.traces import StreamedSpan try: import asyncpg # type: ignore[import-not-found] @@ -101,7 +102,7 @@ def _record( params_list: "tuple[Any, ...] | None", *, executemany: bool = False, -) -> "Iterator[Span]": +) -> "Iterator[Union[Span, StreamedSpan]]": integration = sentry_sdk.get_client().get_integration(AsyncPGIntegration) if integration is not None and not integration._record_params: params_list = None @@ -197,22 +198,27 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": return _inner -def _set_db_data(span: "Span", conn: "Any") -> None: - span.set_data(SPANDATA.DB_SYSTEM, "postgresql") - span.set_data(SPANDATA.DB_DRIVER_NAME, "asyncpg") +def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None: + if isinstance(span, StreamedSpan): + set_on_span = span.set_attribute + else: + set_on_span = span.set_data + + set_on_span(SPANDATA.DB_SYSTEM, "postgresql") + set_on_span(SPANDATA.DB_DRIVER_NAME, "asyncpg") addr = conn._addr if addr: try: - span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) - span.set_data(SPANDATA.SERVER_PORT, addr[1]) + set_on_span(SPANDATA.SERVER_ADDRESS, addr[0]) + set_on_span(SPANDATA.SERVER_PORT, addr[1]) except IndexError: pass database = conn._params.database if database: - span.set_data(SPANDATA.DB_NAME, database) + set_on_span(SPANDATA.DB_NAME, database) user = conn._params.user if user: - span.set_data(SPANDATA.DB_USER, user) + set_on_span(SPANDATA.DB_USER, user) diff --git a/sentry_sdk/integrations/django/__init__.py b/sentry_sdk/integrations/django/__init__.py index 2595c33ea8..43412c9e29 100644 --- a/sentry_sdk/integrations/django/__init__.py +++ b/sentry_sdk/integrations/django/__init__.py @@ -9,6 +9,7 @@ from sentry_sdk.scope import add_global_event_processor, should_send_default_pii from sentry_sdk.serializer import add_global_repr_processor, add_repr_sequence_type from sentry_sdk.tracing import SOURCE_FOR_STYLE, TransactionSource +from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing_utils import add_query_source, record_sql_queries from sentry_sdk.utils import ( AnnotatedValue, @@ -720,14 +721,21 @@ def _rollback(self: "BaseDatabaseWrapper") -> None: def _set_db_data( - span: "Span", cursor_or_db: "Any", db_operation: "Optional[str]" = None + span: "Union[Span, StreamedSpan]", + cursor_or_db: "Any", + db_operation: "Optional[str]" = None, ) -> None: + if isinstance(span, StreamedSpan): + set_on_span = span.set_attribute + else: + set_on_span = span.set_data + db = cursor_or_db.db if hasattr(cursor_or_db, "db") else cursor_or_db vendor = db.vendor - span.set_data(SPANDATA.DB_SYSTEM, vendor) + set_on_span(SPANDATA.DB_SYSTEM, vendor) if db_operation is not None: - span.set_data(SPANDATA.DB_OPERATION, db_operation) + set_on_span(SPANDATA.DB_OPERATION, db_operation) # Some custom backends override `__getattr__`, making it look like `cursor_or_db` # actually has a `connection` and the `connection` has a `get_dsn_parameters` @@ -760,19 +768,19 @@ def _set_db_data( db_name = connection_params.get("dbname") or connection_params.get("database") if db_name is not None: - span.set_data(SPANDATA.DB_NAME, db_name) + set_on_span(SPANDATA.DB_NAME, db_name) server_address = connection_params.get("host") if server_address is not None: - span.set_data(SPANDATA.SERVER_ADDRESS, server_address) + set_on_span(SPANDATA.SERVER_ADDRESS, server_address) server_port = connection_params.get("port") if server_port is not None: - span.set_data(SPANDATA.SERVER_PORT, str(server_port)) + set_on_span(SPANDATA.SERVER_PORT, str(server_port)) server_socket_address = connection_params.get("unix_socket") if server_socket_address is not None: - span.set_data(SPANDATA.SERVER_SOCKET_ADDRESS, server_socket_address) + set_on_span(SPANDATA.SERVER_SOCKET_ADDRESS, server_socket_address) def add_template_context_repr_sequence() -> None: diff --git a/sentry_sdk/integrations/sqlalchemy.py b/sentry_sdk/integrations/sqlalchemy.py index 2706b2c595..e8619ea453 100644 --- a/sentry_sdk/integrations/sqlalchemy.py +++ b/sentry_sdk/integrations/sqlalchemy.py @@ -138,52 +138,33 @@ def _get_db_system(name: str) -> "Optional[str]": def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None: - db_system = _get_db_system(conn.engine.name) if isinstance(span, StreamedSpan): - if db_system is not None: - span.set_attribute(SPANDATA.DB_SYSTEM, db_system) - - try: - driver = conn.dialect.driver - if driver: - span.set_attribute(SPANDATA.DB_DRIVER_NAME, driver) - except Exception: - pass + set_on_span = span.set_attribute else: - if db_system is not None: - span.set_data(SPANDATA.DB_SYSTEM, db_system) + set_on_span = span.set_data + + db_system = _get_db_system(conn.engine.name) + if db_system is not None: + set_on_span(SPANDATA.DB_SYSTEM, db_system) - try: - driver = conn.dialect.driver - if driver: - span.set_data(SPANDATA.DB_DRIVER_NAME, driver) - except Exception: - pass + try: + driver = conn.dialect.driver + if driver: + set_on_span(SPANDATA.DB_DRIVER_NAME, driver) + except Exception: + pass if conn.engine.url is None: return - if isinstance(span, StreamedSpan): - db_name = conn.engine.url.database - if db_name is not None: - span.set_attribute(SPANDATA.DB_NAME, db_name) - - server_address = conn.engine.url.host - if server_address is not None: - span.set_attribute(SPANDATA.SERVER_ADDRESS, server_address) - - server_port = conn.engine.url.port - if server_port is not None: - span.set_attribute(SPANDATA.SERVER_PORT, server_port) - else: - db_name = conn.engine.url.database - if db_name is not None: - span.set_data(SPANDATA.DB_NAME, db_name) + db_name = conn.engine.url.database + if db_name is not None: + set_on_span(SPANDATA.DB_NAME, db_name) - server_address = conn.engine.url.host - if server_address is not None: - span.set_data(SPANDATA.SERVER_ADDRESS, server_address) + server_address = conn.engine.url.host + if server_address is not None: + set_on_span(SPANDATA.SERVER_ADDRESS, server_address) - server_port = conn.engine.url.port - if server_port is not None: - span.set_data(SPANDATA.SERVER_PORT, server_port) + server_port = conn.engine.url.port + if server_port is not None: + set_on_span(SPANDATA.SERVER_PORT, server_port) From f873d09fe3d1e6821a1a628c313c84d36aba7139 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 11:44:55 +0200 Subject: [PATCH 09/15] . --- sentry_sdk/integrations/asyncpg.py | 24 ++++++++-------------- sentry_sdk/integrations/django/__init__.py | 22 +++++++------------- 2 files changed, 16 insertions(+), 30 deletions(-) diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index 4f15c1e872..29f3bad152 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -1,7 +1,7 @@ from __future__ import annotations import contextlib import re -from typing import Any, TypeVar, Callable, Awaitable, Iterator, Union +from typing import Any, TypeVar, Callable, Awaitable, Iterator import sentry_sdk from sentry_sdk.consts import OP, SPANDATA @@ -13,7 +13,6 @@ parse_version, capture_internal_exceptions, ) -from sentry_sdk.traces import StreamedSpan try: import asyncpg # type: ignore[import-not-found] @@ -102,7 +101,7 @@ def _record( params_list: "tuple[Any, ...] | None", *, executemany: bool = False, -) -> "Iterator[Union[Span, StreamedSpan]]": +) -> "Iterator[Span]": integration = sentry_sdk.get_client().get_integration(AsyncPGIntegration) if integration is not None and not integration._record_params: params_list = None @@ -198,27 +197,22 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": return _inner -def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None: - if isinstance(span, StreamedSpan): - set_on_span = span.set_attribute - else: - set_on_span = span.set_data - - set_on_span(SPANDATA.DB_SYSTEM, "postgresql") - set_on_span(SPANDATA.DB_DRIVER_NAME, "asyncpg") +def _set_db_data(span: "Span", conn: "Any") -> None: + span.set_data(SPANDATA.DB_SYSTEM, "postgresql") + span.set_data(SPANDATA.DB_DRIVER_NAME, "asyncpg") addr = conn._addr if addr: try: - set_on_span(SPANDATA.SERVER_ADDRESS, addr[0]) - set_on_span(SPANDATA.SERVER_PORT, addr[1]) + span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) + span.set_data(SPANDATA.SERVER_PORT, addr[1]) except IndexError: pass database = conn._params.database if database: - set_on_span(SPANDATA.DB_NAME, database) + span.set_data(SPANDATA.DB_NAME, database) user = conn._params.user if user: - set_on_span(SPANDATA.DB_USER, user) + span.set_data(SPANDATA.DB_USER, user) diff --git a/sentry_sdk/integrations/django/__init__.py b/sentry_sdk/integrations/django/__init__.py index 43412c9e29..2595c33ea8 100644 --- a/sentry_sdk/integrations/django/__init__.py +++ b/sentry_sdk/integrations/django/__init__.py @@ -9,7 +9,6 @@ from sentry_sdk.scope import add_global_event_processor, should_send_default_pii from sentry_sdk.serializer import add_global_repr_processor, add_repr_sequence_type from sentry_sdk.tracing import SOURCE_FOR_STYLE, TransactionSource -from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing_utils import add_query_source, record_sql_queries from sentry_sdk.utils import ( AnnotatedValue, @@ -721,21 +720,14 @@ def _rollback(self: "BaseDatabaseWrapper") -> None: def _set_db_data( - span: "Union[Span, StreamedSpan]", - cursor_or_db: "Any", - db_operation: "Optional[str]" = None, + span: "Span", cursor_or_db: "Any", db_operation: "Optional[str]" = None ) -> None: - if isinstance(span, StreamedSpan): - set_on_span = span.set_attribute - else: - set_on_span = span.set_data - db = cursor_or_db.db if hasattr(cursor_or_db, "db") else cursor_or_db vendor = db.vendor - set_on_span(SPANDATA.DB_SYSTEM, vendor) + span.set_data(SPANDATA.DB_SYSTEM, vendor) if db_operation is not None: - set_on_span(SPANDATA.DB_OPERATION, db_operation) + span.set_data(SPANDATA.DB_OPERATION, db_operation) # Some custom backends override `__getattr__`, making it look like `cursor_or_db` # actually has a `connection` and the `connection` has a `get_dsn_parameters` @@ -768,19 +760,19 @@ def _set_db_data( db_name = connection_params.get("dbname") or connection_params.get("database") if db_name is not None: - set_on_span(SPANDATA.DB_NAME, db_name) + span.set_data(SPANDATA.DB_NAME, db_name) server_address = connection_params.get("host") if server_address is not None: - set_on_span(SPANDATA.SERVER_ADDRESS, server_address) + span.set_data(SPANDATA.SERVER_ADDRESS, server_address) server_port = connection_params.get("port") if server_port is not None: - set_on_span(SPANDATA.SERVER_PORT, str(server_port)) + span.set_data(SPANDATA.SERVER_PORT, str(server_port)) server_socket_address = connection_params.get("unix_socket") if server_socket_address is not None: - set_on_span(SPANDATA.SERVER_SOCKET_ADDRESS, server_socket_address) + span.set_data(SPANDATA.SERVER_SOCKET_ADDRESS, server_socket_address) def add_template_context_repr_sequence() -> None: From 56ee084cc8f6e7120e844e2d38992c828c93b1d1 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 11:48:56 +0200 Subject: [PATCH 10/15] add type ignores --- sentry_sdk/integrations/asyncpg.py | 2 +- sentry_sdk/integrations/django/__init__.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index 29f3bad152..3ca9702f0a 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -118,7 +118,7 @@ def _record( record_cursor_repr=cursor is not None, span_origin=AsyncPGIntegration.origin, ) as span: - yield span + yield span # type: ignore def _wrap_connection_method( diff --git a/sentry_sdk/integrations/django/__init__.py b/sentry_sdk/integrations/django/__init__.py index 2595c33ea8..6bc9abf018 100644 --- a/sentry_sdk/integrations/django/__init__.py +++ b/sentry_sdk/integrations/django/__init__.py @@ -641,7 +641,7 @@ def execute( executemany=False, span_origin=DjangoIntegration.origin_db, ) as span: - _set_db_data(span, self) + _set_db_data(span, self) # type: ignore result = real_execute(self, sql, params) with capture_internal_exceptions(): @@ -661,7 +661,7 @@ def executemany( executemany=True, span_origin=DjangoIntegration.origin_db, ) as span: - _set_db_data(span, self) + _set_db_data(span, self) # type: ignore result = real_executemany(self, sql, param_list) From 2c22c16969ab9574ee7925335d0fe6adc04ec6fd Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 12:05:02 +0200 Subject: [PATCH 11/15] move query source before exit --- sentry_sdk/integrations/sqlalchemy.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/sqlalchemy.py b/sentry_sdk/integrations/sqlalchemy.py index e8619ea453..532352ab89 100644 --- a/sentry_sdk/integrations/sqlalchemy.py +++ b/sentry_sdk/integrations/sqlalchemy.py @@ -80,15 +80,16 @@ def _after_cursor_execute( context, "_sentry_sql_span_manager", None ) - if ctx_mgr is not None: - context._sentry_sql_span_manager = None - ctx_mgr.__exit__(None, None, None) - + # Record query source immediately before span is finished: accurate end timestamp and before the span is flushed. span: "Optional[Span]" = getattr(context, "_sentry_sql_span", None) if span is not None: with capture_internal_exceptions(): add_query_source(span) + if ctx_mgr is not None: + context._sentry_sql_span_manager = None + ctx_mgr.__exit__(None, None, None) + def _handle_error(context: "Any", *args: "Any") -> None: execution_context = context.execution_context From f9faa2fa3eb2dcd7df5a72830744b96890b6449a Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 12:12:34 +0200 Subject: [PATCH 12/15] . --- sentry_sdk/integrations/sqlalchemy.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/sqlalchemy.py b/sentry_sdk/integrations/sqlalchemy.py index 532352ab89..07411309fe 100644 --- a/sentry_sdk/integrations/sqlalchemy.py +++ b/sentry_sdk/integrations/sqlalchemy.py @@ -7,6 +7,7 @@ parse_version, ) from sentry_sdk.traces import StreamedSpan, SpanStatus +from sentry_sdk.tracing import Span try: from sqlalchemy.engine import Engine # type: ignore @@ -23,8 +24,6 @@ from typing import Optional from typing import Union - from sentry_sdk.tracing import Span - class SqlalchemyIntegration(Integration): identifier = "sqlalchemy" @@ -81,8 +80,10 @@ def _after_cursor_execute( ) # Record query source immediately before span is finished: accurate end timestamp and before the span is flushed. - span: "Optional[Span]" = getattr(context, "_sentry_sql_span", None) - if span is not None: + span: "Optional[Union[Span, StreamedSpan]]" = getattr( + context, "_sentry_sql_span", None + ) + if isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_query_source(span) @@ -90,6 +91,10 @@ def _after_cursor_execute( context._sentry_sql_span_manager = None ctx_mgr.__exit__(None, None, None) + if isinstance(span, Span): + with capture_internal_exceptions(): + add_query_source(span) + def _handle_error(context: "Any", *args: "Any") -> None: execution_context = context.execution_context From 064dd83df3405332dfdebfa7947c5ad17060d9be Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 13:35:03 +0200 Subject: [PATCH 13/15] use separate path for streaming --- sentry_sdk/integrations/sqlalchemy.py | 7 ++- sentry_sdk/tracing_utils.py | 46 +++++++++++++++++++ .../sqlalchemy/test_sqlalchemy.py | 26 +++++++---- 3 files changed, 68 insertions(+), 11 deletions(-) diff --git a/sentry_sdk/integrations/sqlalchemy.py b/sentry_sdk/integrations/sqlalchemy.py index 07411309fe..e9eed16149 100644 --- a/sentry_sdk/integrations/sqlalchemy.py +++ b/sentry_sdk/integrations/sqlalchemy.py @@ -1,6 +1,9 @@ from sentry_sdk.consts import SPANSTATUS, SPANDATA from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable -from sentry_sdk.tracing_utils import add_query_source, record_sql_queries +from sentry_sdk.tracing_utils import ( + add_query_source, + record_sql_queries_supporting_streaming, +) from sentry_sdk.utils import ( capture_internal_exceptions, ensure_integration_enabled, @@ -49,7 +52,7 @@ def _before_cursor_execute( executemany: bool, *args: "Any", ) -> None: - ctx_mgr = record_sql_queries( + ctx_mgr = record_sql_queries_supporting_streaming( cursor, statement, parameters, diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 1926e5488c..ecfeee155b 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -133,6 +133,52 @@ def record_sql_queries( executemany: bool, record_cursor_repr: bool = False, span_origin: str = "manual", +) -> "Generator[sentry_sdk.tracing.Span, None, None]": + # TODO: Bring back capturing of params by default + if sentry_sdk.get_client().options["_experiments"].get("record_sql_params", False): + if not params_list or params_list == [None]: + params_list = None + + if paramstyle == "pyformat": + paramstyle = "format" + else: + params_list = None + paramstyle = None + + query = _format_sql(cursor, query) + + data = {} + if params_list is not None: + data["db.params"] = params_list + if paramstyle is not None: + data["db.paramstyle"] = paramstyle + if executemany: + data["db.executemany"] = True + if record_cursor_repr and cursor is not None: + data["db.cursor"] = cursor + + with capture_internal_exceptions(): + sentry_sdk.add_breadcrumb(message=query, category="query", data=data) + + with sentry_sdk.start_span( + op=OP.DB, + name=query, + origin=span_origin, + ) as span: + for k, v in data.items(): + span.set_data(k, v) + yield span + + +@contextlib.contextmanager +def record_sql_queries_supporting_streaming( + cursor: "Any", + query: "Any", + params_list: "Any", + paramstyle: "Optional[str]", + executemany: bool, + record_cursor_repr: bool = False, + span_origin: str = "manual", ) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]": # TODO: Bring back capturing of params by default client = sentry_sdk.get_client() diff --git a/tests/integrations/sqlalchemy/test_sqlalchemy.py b/tests/integrations/sqlalchemy/test_sqlalchemy.py index 1d40b9baad..48fc354a34 100644 --- a/tests/integrations/sqlalchemy/test_sqlalchemy.py +++ b/tests/integrations/sqlalchemy/test_sqlalchemy.py @@ -14,7 +14,7 @@ from sentry_sdk.consts import DEFAULT_MAX_VALUE_LENGTH, SPANDATA from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from sentry_sdk.serializer import MAX_EVENT_BYTES -from sentry_sdk.tracing_utils import record_sql_queries +from sentry_sdk.tracing_utils import record_sql_queries_supporting_streaming from sentry_sdk.utils import json_dumps @@ -922,7 +922,9 @@ class Person(Base): class fake_record_sql_queries: # noqa: N801 def __init__(self, *args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: + with record_sql_queries_supporting_streaming( + *args, **kwargs + ) as span: self.span = span if span_streaming: @@ -939,7 +941,7 @@ def __exit__(self, type, value, traceback): pass with mock.patch( - "sentry_sdk.integrations.sqlalchemy.record_sql_queries", + "sentry_sdk.integrations.sqlalchemy.record_sql_queries_supporting_streaming", fake_record_sql_queries, ): assert session.query(Person).first() == bob @@ -983,7 +985,9 @@ class Person(Base): class fake_record_sql_queries: # noqa: N801 def __init__(self, *args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: + with record_sql_queries_supporting_streaming( + *args, **kwargs + ) as span: self.span = span if span_streaming: @@ -1000,7 +1004,7 @@ def __exit__(self, type, value, traceback): pass with mock.patch( - "sentry_sdk.integrations.sqlalchemy.record_sql_queries", + "sentry_sdk.integrations.sqlalchemy.record_sql_queries_supporting_streaming", fake_record_sql_queries, ): assert session.query(Person).first() == bob @@ -1061,7 +1065,9 @@ class Person(Base): class fake_record_sql_queries: # noqa: N801 def __init__(self, *args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: + with record_sql_queries_supporting_streaming( + *args, **kwargs + ) as span: self.span = span self.span._start_timestamp = datetime(2024, 1, 1, microsecond=0) @@ -1074,7 +1080,7 @@ def __exit__(self, type, value, traceback): pass with mock.patch( - "sentry_sdk.integrations.sqlalchemy.record_sql_queries", + "sentry_sdk.integrations.sqlalchemy.record_sql_queries_supporting_streaming", fake_record_sql_queries, ): assert session.query(Person).first() == bob @@ -1131,7 +1137,9 @@ class Person(Base): class fake_record_sql_queries: # noqa: N801 def __init__(self, *args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: + with record_sql_queries_supporting_streaming( + *args, **kwargs + ) as span: self.span = span self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) @@ -1144,7 +1152,7 @@ def __exit__(self, type, value, traceback): pass with mock.patch( - "sentry_sdk.integrations.sqlalchemy.record_sql_queries", + "sentry_sdk.integrations.sqlalchemy.record_sql_queries_supporting_streaming", fake_record_sql_queries, ): assert session.query(Person).first() == bob From 4b965c4a4b8f3a48f07e5c9a9d31046cf89b5857 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 13:35:33 +0200 Subject: [PATCH 14/15] use separate path for streaming --- sentry_sdk/integrations/asyncpg.py | 2 +- sentry_sdk/integrations/django/__init__.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index 3ca9702f0a..29f3bad152 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -118,7 +118,7 @@ def _record( record_cursor_repr=cursor is not None, span_origin=AsyncPGIntegration.origin, ) as span: - yield span # type: ignore + yield span def _wrap_connection_method( diff --git a/sentry_sdk/integrations/django/__init__.py b/sentry_sdk/integrations/django/__init__.py index 6bc9abf018..2595c33ea8 100644 --- a/sentry_sdk/integrations/django/__init__.py +++ b/sentry_sdk/integrations/django/__init__.py @@ -641,7 +641,7 @@ def execute( executemany=False, span_origin=DjangoIntegration.origin_db, ) as span: - _set_db_data(span, self) # type: ignore + _set_db_data(span, self) result = real_execute(self, sql, params) with capture_internal_exceptions(): @@ -661,7 +661,7 @@ def executemany( executemany=True, span_origin=DjangoIntegration.origin_db, ) as span: - _set_db_data(span, self) # type: ignore + _set_db_data(span, self) result = real_executemany(self, sql, param_list) From 98c67f0e5e4b9edf38509d70fd0e50bac9ab6b38 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 24 Apr 2026 13:46:54 +0200 Subject: [PATCH 15/15] remove print --- tests/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 6cd4f10c4a..8e7a2ffec6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -483,7 +483,6 @@ def inner(spans, root_span=None): by_parent = {} for span in spans: - print(span) if "parent_span_id" not in span: root_span = span continue