diff --git a/sentry_sdk/integrations/sqlalchemy.py b/sentry_sdk/integrations/sqlalchemy.py index a4354f4228..e9eed16149 100644 --- a/sentry_sdk/integrations/sqlalchemy.py +++ b/sentry_sdk/integrations/sqlalchemy.py @@ -1,11 +1,16 @@ from sentry_sdk.consts import SPANSTATUS, SPANDATA from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable -from sentry_sdk.tracing_utils import add_query_source, record_sql_queries +from sentry_sdk.tracing_utils import ( + add_query_source, + record_sql_queries_supporting_streaming, +) from sentry_sdk.utils import ( capture_internal_exceptions, ensure_integration_enabled, parse_version, ) +from sentry_sdk.traces import StreamedSpan, SpanStatus +from sentry_sdk.tracing import Span try: from sqlalchemy.engine import Engine # type: ignore @@ -20,8 +25,7 @@ from typing import Any from typing import ContextManager from typing import Optional - - from sentry_sdk.tracing import Span + from typing import Union class SqlalchemyIntegration(Integration): @@ -48,7 +52,7 @@ def _before_cursor_execute( executemany: bool, *args: "Any", ) -> None: - ctx_mgr = record_sql_queries( + ctx_mgr = record_sql_queries_supporting_streaming( cursor, statement, parameters, @@ -78,12 +82,19 @@ def _after_cursor_execute( context, "_sentry_sql_span_manager", None ) + # Record query source immediately before span is finished: accurate end timestamp and before the span is flushed. + span: "Optional[Union[Span, StreamedSpan]]" = getattr( + context, "_sentry_sql_span", None + ) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + if ctx_mgr is not None: context._sentry_sql_span_manager = None ctx_mgr.__exit__(None, None, None) - span: "Optional[Span]" = getattr(context, "_sentry_sql_span", None) - if span is not None: + if isinstance(span, Span): with capture_internal_exceptions(): add_query_source(span) @@ -96,7 +107,10 @@ def _handle_error(context: "Any", *args: "Any") -> None: span: "Optional[Span]" = getattr(execution_context, "_sentry_sql_span", None) if span is not None: - span.set_status(SPANSTATUS.INTERNAL_ERROR) + if isinstance(span, StreamedSpan): + span.status = SpanStatus.ERROR + else: + span.set_status(SPANSTATUS.INTERNAL_ERROR) # _after_cursor_execute does not get called for crashing SQL stmts. Judging # from SQLAlchemy codebase it does seem like any error coming into this @@ -132,15 +146,20 @@ def _get_db_system(name: str) -> "Optional[str]": return None -def _set_db_data(span: "Span", conn: "Any") -> None: +def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None: + if isinstance(span, StreamedSpan): + set_on_span = span.set_attribute + else: + set_on_span = span.set_data + db_system = _get_db_system(conn.engine.name) if db_system is not None: - span.set_data(SPANDATA.DB_SYSTEM, db_system) + set_on_span(SPANDATA.DB_SYSTEM, db_system) try: driver = conn.dialect.driver if driver: - span.set_data(SPANDATA.DB_DRIVER_NAME, driver) + set_on_span(SPANDATA.DB_DRIVER_NAME, driver) except Exception: pass @@ -149,12 +168,12 @@ def _set_db_data(span: "Span", conn: "Any") -> None: db_name = conn.engine.url.database if db_name is not None: - span.set_data(SPANDATA.DB_NAME, db_name) + set_on_span(SPANDATA.DB_NAME, db_name) server_address = conn.engine.url.host if server_address is not None: - span.set_data(SPANDATA.SERVER_ADDRESS, server_address) + set_on_span(SPANDATA.SERVER_ADDRESS, server_address) server_port = conn.engine.url.port if server_port is not None: - span.set_data(SPANDATA.SERVER_PORT, server_port) + set_on_span(SPANDATA.SERVER_PORT, server_port) diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 18ab832ad7..ecfeee155b 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -170,6 +170,65 @@ def record_sql_queries( yield span +@contextlib.contextmanager +def record_sql_queries_supporting_streaming( + cursor: "Any", + query: "Any", + params_list: "Any", + paramstyle: "Optional[str]", + executemany: bool, + record_cursor_repr: bool = False, + span_origin: str = "manual", +) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]": + # TODO: Bring back capturing of params by default + client = sentry_sdk.get_client() + if client.options["_experiments"].get("record_sql_params", False): + if not params_list or params_list == [None]: + params_list = None + + if paramstyle == "pyformat": + paramstyle = "format" + else: + params_list = None + paramstyle = None + + query = _format_sql(cursor, query) + + data = {} + if params_list is not None: + data["db.params"] = params_list + if paramstyle is not None: + data["db.paramstyle"] = paramstyle + if executemany: + data["db.executemany"] = True + if record_cursor_repr and cursor is not None: + data["db.cursor"] = cursor + + with capture_internal_exceptions(): + sentry_sdk.add_breadcrumb(message=query, category="query", data=data) + + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name="" if query is None else query, + attributes={ + "sentry.origin": span_origin, + "sentry.op": OP.DB, + }, + ) as span: + for k, v in data.items(): + span.set_attribute(k, v) + yield span + else: + with sentry_sdk.start_span( + op=OP.DB, + name=query, + origin=span_origin, + ) as span: + for k, v in data.items(): + span.set_data(k, v) + yield span + + def maybe_create_breadcrumbs_from_span( scope: "sentry_sdk.Scope", span: "sentry_sdk.tracing.Span" ) -> None: @@ -313,22 +372,36 @@ def add_source( span.set_attribute("code.function.name", frame.f_code.co_name) -def add_query_source(span: "sentry_sdk.tracing.Span") -> None: +def add_query_source( + span: "Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan]", +) -> None: """ Adds OTel compatible source code information to a database query span """ client = sentry_sdk.get_client() - if not client.is_active(): - return - if span.timestamp is None or span.start_timestamp is None: + if isinstance(span, LegacySpan): + if not client.is_active(): + return + + # In the StreamedSpan case, we need to add the extra span information before + # the span finishes, so it's expected that this will be None. In the LegacySpan case, + # it should already be finished. + if span.timestamp is None: + return + + if span.start_timestamp is None: return should_add_query_source = client.options.get("enable_db_query_source", True) if not should_add_query_source: return - duration = span.timestamp - span.start_timestamp + end_timestamp = ( + datetime.now(timezone.utc) if span.timestamp is None else span.timestamp + ) + + duration = end_timestamp - span.start_timestamp threshold = client.options.get("db_query_source_threshold_ms", 0) slow_query = duration / timedelta(milliseconds=1) > threshold diff --git a/tests/conftest.py b/tests/conftest.py index ba28e4991c..8e7a2ffec6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -476,23 +476,34 @@ def maybe_monkeypatched_threading(request): @pytest.fixture def render_span_tree(): - def inner(event): - assert event["type"] == "transaction" + def inner(spans, root_span=None): + streamed_spans = False + if root_span is None: + streamed_spans = True by_parent = {} - for span in event["spans"]: + for span in spans: + if "parent_span_id" not in span: + root_span = span + continue + by_parent.setdefault(span["parent_span_id"], []).append(span) def render_span(span): - yield "- op={}: description={}".format( - json.dumps(span.get("op")), json.dumps(span.get("description")) - ) + if streamed_spans: + yield "- sentry.op={}: name={}".format( + json.dumps(span["attributes"].get("sentry.op")), + json.dumps(span["name"]), + ) + else: + yield "- op={}: description={}".format( + json.dumps(span.get("op")), json.dumps(span.get("description")) + ) + for subspan in by_parent.get(span["span_id"]) or (): for line in render_span(subspan): yield " {}".format(line) - root_span = event["contexts"]["trace"] - return "\n".join(render_span(root_span)) return inner diff --git a/tests/integrations/django/asgi/test_asgi.py b/tests/integrations/django/asgi/test_asgi.py index f956d12f82..174ceb77bb 100644 --- a/tests/integrations/django/asgi/test_asgi.py +++ b/tests/integrations/django/asgi/test_asgi.py @@ -292,8 +292,9 @@ async def test_async_middleware_spans( (transaction,) = events + assert transaction["type"] == "transaction" assert ( - render_span_tree(transaction) + render_span_tree(transaction["spans"], transaction["contexts"]["trace"]) == """\ - op="http.server": description=null - op="event.django": description="django.db.reset_queries" diff --git a/tests/integrations/django/test_basic.py b/tests/integrations/django/test_basic.py index 1c6bb141bd..f14be92ceb 100644 --- a/tests/integrations/django/test_basic.py +++ b/tests/integrations/django/test_basic.py @@ -457,7 +457,7 @@ def test_response_trace(sentry_init, client, capture_events, render_span_tree): assert ( '- op="view.response.render": description="serialize response"' - in render_span_tree(events[0]) + in render_span_tree(events[0]["spans"], events[0]["contexts"]["trace"]) ) @@ -596,7 +596,9 @@ def test_django_connect_trace(sentry_init, client, capture_events, render_span_t data = span.get("data") assert data.get(SPANDATA.DB_SYSTEM) == "postgresql" - assert '- op="db": description="connect"' in render_span_tree(event) + assert '- op="db": description="connect"' in render_span_tree( + event["spans"], event["contexts"]["trace"] + ) @pytest.mark.forked @@ -954,7 +956,9 @@ def test_render_spans(sentry_init, client, capture_events, render_span_tree): events = capture_events() client.get(url) transaction = events[0] - assert expected_line in render_span_tree(transaction) + assert expected_line in render_span_tree( + transaction["spans"], transaction["contexts"]["trace"] + ) @pytest.mark.skipif(DJANGO_VERSION < (1, 9), reason="Requires Django >= 1.9") @@ -1034,7 +1038,10 @@ def test_middleware_spans(sentry_init, client, capture_events, render_span_tree) message, transaction = events assert message["message"] == "hi" - assert render_span_tree(transaction) == EXPECTED_MIDDLEWARE_SPANS + assert ( + render_span_tree(transaction["spans"], transaction["contexts"]["trace"]) + == EXPECTED_MIDDLEWARE_SPANS + ) def test_middleware_spans_disabled(sentry_init, client, capture_events): @@ -1075,7 +1082,10 @@ def test_signals_spans(sentry_init, client, capture_events, render_span_tree): message, transaction = events assert message["message"] == "hi" - assert render_span_tree(transaction) == EXPECTED_SIGNALS_SPANS + assert ( + render_span_tree(transaction["spans"], transaction["contexts"]["trace"]) + == EXPECTED_SIGNALS_SPANS + ) assert transaction["spans"][0]["op"] == "event.django" assert transaction["spans"][0]["description"] == "django.db.reset_queries" @@ -1127,7 +1137,10 @@ def test_signals_spans_filtering(sentry_init, client, capture_events, render_spa (transaction,) = events - assert render_span_tree(transaction) == EXPECTED_SIGNALS_SPANS_FILTERED + assert ( + render_span_tree(transaction["spans"], transaction["contexts"]["trace"]) + == EXPECTED_SIGNALS_SPANS_FILTERED + ) assert transaction["spans"][0]["op"] == "event.django" assert transaction["spans"][0]["description"] == "django.db.reset_queries" @@ -1206,7 +1219,9 @@ def test_custom_urlconf_middleware( event = events.pop(0) assert event["transaction"] == "/custom/ok" if middleware_spans: - assert "custom_urlconf_middleware" in render_span_tree(event) + assert "custom_urlconf_middleware" in render_span_tree( + event["spans"], event["contexts"]["trace"] + ) _content, status, _headers = unpack_werkzeug_response(client.get("/custom/exc")) assert status.lower() == "500 internal server error" @@ -1216,7 +1231,9 @@ def test_custom_urlconf_middleware( assert error_event["exception"]["values"][-1]["mechanism"]["type"] == "django" assert transaction_event["transaction"] == "/custom/exc" if middleware_spans: - assert "custom_urlconf_middleware" in render_span_tree(transaction_event) + assert "custom_urlconf_middleware" in render_span_tree( + transaction_event["spans"], transaction_event["contexts"]["trace"] + ) settings.MIDDLEWARE.pop(0) diff --git a/tests/integrations/sqlalchemy/test_sqlalchemy.py b/tests/integrations/sqlalchemy/test_sqlalchemy.py index 7c7ce3d845..48fc354a34 100644 --- a/tests/integrations/sqlalchemy/test_sqlalchemy.py +++ b/tests/integrations/sqlalchemy/test_sqlalchemy.py @@ -14,15 +14,24 @@ from sentry_sdk.consts import DEFAULT_MAX_VALUE_LENGTH, SPANDATA from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from sentry_sdk.serializer import MAX_EVENT_BYTES -from sentry_sdk.tracing_utils import record_sql_queries +from sentry_sdk.tracing_utils import record_sql_queries_supporting_streaming from sentry_sdk.utils import json_dumps -def test_orm_queries(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_orm_queries( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( - integrations=[SqlalchemyIntegration()], _experiments={"record_sql_params": True} + integrations=[SqlalchemyIntegration()], + _experiments={ + "record_sql_params": True, + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() Base = declarative_base() # noqa: N806 @@ -53,9 +62,14 @@ class Address(Base): assert session.query(Person).first() == bob - capture_message("hi") - - (event,) = events + if span_streaming: + items = capture_items("event") + capture_message("hi") + (event,) = (item.payload for item in items if item.type == "event") + else: + events = capture_events() + capture_message("hi") + (event,) = events for crumb in event["breadcrumbs"]["values"]: del crumb["timestamp"] @@ -78,13 +92,22 @@ class Address(Base): ] -def test_transactions(sentry_init, capture_events, render_span_tree): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_transactions( + sentry_init, + capture_events, + capture_items, + render_span_tree, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], - _experiments={"record_sql_params": True}, + _experiments={ + "record_sql_params": True, + "trace_lifecycle": "stream" if span_streaming else "static", + }, traces_sample_rate=1.0, ) - events = capture_events() Base = declarative_base() # noqa: N806 @@ -110,31 +133,79 @@ class Address(Base): Session = sessionmaker(bind=engine) # noqa: N806 session = Session() - with start_transaction(name="test_transaction", sampled=True): - with session.begin_nested(): - session.query(Person).first() - - for _ in range(2): - with pytest.raises(IntegrityError): - with session.begin_nested(): - session.add(Person(id=1, name="bob")) - session.add(Person(id=1, name="bob")) - - with session.begin_nested(): - session.query(Person).first() - - (event,) = events - - for span in event["spans"]: - assert span["data"][SPANDATA.DB_SYSTEM] == "sqlite" - assert span["data"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" - assert span["data"][SPANDATA.DB_NAME] == ":memory:" - assert SPANDATA.SERVER_ADDRESS not in span["data"] - assert SPANDATA.SERVER_PORT not in span["data"] - - assert ( - render_span_tree(event) - == """\ + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent"): + with session.begin_nested(): + session.query(Person).first() + + for _ in range(2): + with pytest.raises(IntegrityError): + with session.begin_nested(): + session.add(Person(id=1, name="bob")) + session.add(Person(id=1, name="bob")) + + with session.begin_nested(): + session.query(Person).first() + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + sqlalchemy_spans = [ + span + for span in spans + if span["attributes"]["sentry.origin"] == "auto.db.sqlalchemy" + ] + for span in sqlalchemy_spans: + assert span["attributes"][SPANDATA.DB_SYSTEM] == "sqlite" + assert span["attributes"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" + assert span["attributes"][SPANDATA.DB_NAME] == ":memory:" + assert SPANDATA.SERVER_PORT not in span["attributes"] + + assert ( + render_span_tree(spans) + == """\ +- sentry.op=null: name="custom parent" + - sentry.op="db": name="SAVEPOINT sa_savepoint_1" + - sentry.op="db": name="SELECT person.id AS person_id, person.name AS person_name \\nFROM person\\n LIMIT ? OFFSET ?" + - sentry.op="db": name="RELEASE SAVEPOINT sa_savepoint_1" + - sentry.op="db": name="SAVEPOINT sa_savepoint_2" + - sentry.op="db": name="INSERT INTO person (id, name) VALUES (?, ?)" + - sentry.op="db": name="ROLLBACK TO SAVEPOINT sa_savepoint_2" + - sentry.op="db": name="SAVEPOINT sa_savepoint_3" + - sentry.op="db": name="INSERT INTO person (id, name) VALUES (?, ?)" + - sentry.op="db": name="ROLLBACK TO SAVEPOINT sa_savepoint_3" + - sentry.op="db": name="SAVEPOINT sa_savepoint_4" + - sentry.op="db": name="SELECT person.id AS person_id, person.name AS person_name \\nFROM person\\n LIMIT ? OFFSET ?" + - sentry.op="db": name="RELEASE SAVEPOINT sa_savepoint_4"\ +""" + ) + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + with session.begin_nested(): + session.query(Person).first() + + for _ in range(2): + with pytest.raises(IntegrityError): + with session.begin_nested(): + session.add(Person(id=1, name="bob")) + session.add(Person(id=1, name="bob")) + + with session.begin_nested(): + session.query(Person).first() + + (event,) = events + + for span in event["spans"]: + assert span["data"][SPANDATA.DB_SYSTEM] == "sqlite" + assert span["data"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" + assert span["data"][SPANDATA.DB_NAME] == ":memory:" + assert SPANDATA.SERVER_ADDRESS not in span["data"] + assert SPANDATA.SERVER_PORT not in span["data"] + + assert ( + render_span_tree(event["spans"], event["contexts"]["trace"]) + == """\ - op=null: description=null - op="db": description="SAVEPOINT sa_savepoint_1" - op="db": description="SELECT person.id AS person_id, person.name AS person_name \\nFROM person\\n LIMIT ? OFFSET ?" @@ -149,16 +220,24 @@ class Address(Base): - op="db": description="SELECT person.id AS person_id, person.name AS person_name \\nFROM person\\n LIMIT ? OFFSET ?" - op="db": description="RELEASE SAVEPOINT sa_savepoint_4"\ """ - ) + ) -def test_transactions_no_engine_url(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_transactions_no_engine_url( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], - _experiments={"record_sql_params": True}, + _experiments={ + "record_sql_params": True, + "trace_lifecycle": "stream" if span_streaming else "static", + }, traces_sample_rate=1.0, ) - events = capture_events() Base = declarative_base() # noqa: N806 @@ -185,47 +264,98 @@ class Address(Base): Session = sessionmaker(bind=engine) # noqa: N806 session = Session() - with start_transaction(name="test_transaction", sampled=True): - with session.begin_nested(): - session.query(Person).first() - - for _ in range(2): - with pytest.raises(IntegrityError): - with session.begin_nested(): - session.add(Person(id=1, name="bob")) - session.add(Person(id=1, name="bob")) - - with session.begin_nested(): - session.query(Person).first() - - (event,) = events - - for span in event["spans"]: - assert span["data"][SPANDATA.DB_SYSTEM] == "sqlite" - assert span["data"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" - assert SPANDATA.DB_NAME not in span["data"] - assert SPANDATA.SERVER_ADDRESS not in span["data"] - assert SPANDATA.SERVER_PORT not in span["data"] - - -def test_long_sql_query_preserved(sentry_init, capture_events): + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent"): + with session.begin_nested(): + session.query(Person).first() + + for _ in range(2): + with pytest.raises(IntegrityError): + with session.begin_nested(): + session.add(Person(id=1, name="bob")) + session.add(Person(id=1, name="bob")) + + with session.begin_nested(): + session.query(Person).first() + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + sqlalchemy_spans = [ + span + for span in spans + if span["attributes"]["sentry.origin"] == "auto.db.sqlalchemy" + ] + for span in sqlalchemy_spans: + assert span["attributes"][SPANDATA.DB_SYSTEM] == "sqlite" + assert span["attributes"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" + assert SPANDATA.DB_NAME not in span["attributes"] + assert SPANDATA.SERVER_PORT not in span["attributes"] + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + with session.begin_nested(): + session.query(Person).first() + + for _ in range(2): + with pytest.raises(IntegrityError): + with session.begin_nested(): + session.add(Person(id=1, name="bob")) + session.add(Person(id=1, name="bob")) + + with session.begin_nested(): + session.query(Person).first() + + (event,) = events + for span in event["spans"]: + assert span["data"][SPANDATA.DB_SYSTEM] == "sqlite" + assert span["data"][SPANDATA.DB_DRIVER_NAME] == "pysqlite" + assert SPANDATA.DB_NAME not in span["data"] + assert SPANDATA.SERVER_ADDRESS not in span["data"] + assert SPANDATA.SERVER_PORT not in span["data"] + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_long_sql_query_preserved( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( traces_sample_rate=1, integrations=[SqlalchemyIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False} ) - with start_transaction(name="test"): - with engine.connect() as con: - con.execute(text(" UNION ".join("SELECT {}".format(i) for i in range(100)))) + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent"): + with engine.connect() as con: + con.execute( + text(" UNION ".join("SELECT {}".format(i) for i in range(100))) + ) - (event,) = events - description = event["spans"][0]["description"] - assert description.startswith("SELECT 0 UNION SELECT 1") - assert description.endswith("SELECT 98 UNION SELECT 99") + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + name = spans[0]["name"] + assert name.startswith("SELECT 0 UNION SELECT 1") + assert name.endswith("SELECT 98 UNION SELECT 99") + else: + events = capture_events() + with start_transaction(name="test"): + with engine.connect() as con: + con.execute( + text(" UNION ".join("SELECT {}".format(i) for i in range(100))) + ) + + (event,) = events + description = event["spans"][0]["description"] + assert description.startswith("SELECT 0 UNION SELECT 1") + assert description.endswith("SELECT 98 UNION SELECT 99") def test_large_event_not_truncated(sentry_init, capture_events): @@ -286,9 +416,14 @@ def processor(event, hint): } -def test_engine_name_not_string(sentry_init): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_engine_name_not_string( + sentry_init, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) engine = create_engine( @@ -300,171 +435,331 @@ def test_engine_name_not_string(sentry_init): con.execute(text("SELECT 0")) -def test_query_source_disabled(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source_disabled( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_options = { "integrations": [SqlalchemyIntegration()], "traces_sample_rate": 1.0, "enable_db_query_source": False, "db_query_source_threshold_ms": 0, + "_experiments": {"trace_lifecycle": "stream" if span_streaming else "static"}, } sentry_init(**sentry_options) - events = capture_events() + if span_streaming: + items = capture_items("span") - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - bob = Person(name="Bob") - session.add(bob) + bob = Person(name="Bob") + session.add(bob) - assert session.query(Person).first() == bob + assert session.query(Person).first() == bob - (event,) = events + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER not in attributes + assert SPANDATA.CODE_FILE_PATH not in attributes + assert SPANDATA.CODE_FUNCTION_NAME not in attributes + break + else: + raise AssertionError("No db span found") - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO not in data - assert SPANDATA.CODE_NAMESPACE not in data - assert SPANDATA.CODE_FILEPATH not in data - assert SPANDATA.CODE_FUNCTION not in data - break else: - raise AssertionError("No db span found") + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 + + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) + + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + assert session.query(Person).first() == bob + + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO not in data + assert SPANDATA.CODE_NAMESPACE not in data + assert SPANDATA.CODE_FILEPATH not in data + assert SPANDATA.CODE_FUNCTION not in data + break + else: + raise AssertionError("No db span found") @pytest.mark.parametrize("enable_db_query_source", [None, True]) -def test_query_source_enabled(sentry_init, capture_events, enable_db_query_source): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source_enabled( + sentry_init, + capture_events, + capture_items, + enable_db_query_source, + span_streaming, +): sentry_options = { "integrations": [SqlalchemyIntegration()], "traces_sample_rate": 1.0, "db_query_source_threshold_ms": 0, + "_experiments": {"trace_lifecycle": "stream" if span_streaming else "static"}, } + if enable_db_query_source is not None: sentry_options["enable_db_query_source"] = enable_db_query_source sentry_init(**sentry_options) - events = capture_events() - - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + if span_streaming: + items = capture_items("span") - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - bob = Person(name="Bob") - session.add(bob) + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - assert session.query(Person).first() == bob + bob = Person(name="Bob") + session.add(bob) - (event,) = events + assert session.query(Person).first() == bob - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data - break + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER in attributes + assert SPANDATA.CODE_FILE_PATH in attributes + assert SPANDATA.CODE_FUNCTION_NAME in attributes + break + else: + raise AssertionError("No db span found") else: - raise AssertionError("No db span found") + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) -def test_query_source(sentry_init, capture_events): + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + assert session.query(Person).first() == bob + + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data + break + else: + raise AssertionError("No db span found") + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], traces_sample_rate=1.0, enable_db_query_source=True, db_query_source_threshold_ms=0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() + if span_streaming: + items = capture_items("span") - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - bob = Person(name="Bob") - session.add(bob) + bob = Person(name="Bob") + session.add(bob) - assert session.query(Person).first() == bob + assert session.query(Person).first() == bob - (event,) = events + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER in attributes + assert SPANDATA.CODE_FILE_PATH in attributes + assert SPANDATA.CODE_FUNCTION_NAME in attributes + + assert type(attributes.get(SPANDATA.CODE_LINE_NUMBER)) == int + assert attributes.get(SPANDATA.CODE_LINE_NUMBER) > 0 + assert attributes.get(SPANDATA.CODE_FILE_PATH).endswith( + "tests/integrations/sqlalchemy/test_sqlalchemy.py" + ) - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data - - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 - assert ( - data.get(SPANDATA.CODE_NAMESPACE) - == "tests.integrations.sqlalchemy.test_sqlalchemy" - ) - assert data.get(SPANDATA.CODE_FILEPATH).endswith( - "tests/integrations/sqlalchemy/test_sqlalchemy.py" + is_relative_path = attributes.get(SPANDATA.CODE_FILE_PATH)[0] != os.sep + assert is_relative_path + + assert ( + attributes.get(SPANDATA.CODE_FUNCTION_NAME) == "test_query_source" + ) + break + else: + raise AssertionError("No db span found") + else: + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 + + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) + + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} ) + Base.metadata.create_all(engine) - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep - assert is_relative_path + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - assert data.get(SPANDATA.CODE_FUNCTION) == "test_query_source" - break - else: - raise AssertionError("No db span found") + bob = Person(name="Bob") + session.add(bob) + + assert session.query(Person).first() == bob + + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data + + assert type(data.get(SPANDATA.CODE_LINENO)) == int + assert data.get(SPANDATA.CODE_LINENO) > 0 + assert ( + data.get(SPANDATA.CODE_NAMESPACE) + == "tests.integrations.sqlalchemy.test_sqlalchemy" + ) + assert data.get(SPANDATA.CODE_FILEPATH).endswith( + "tests/integrations/sqlalchemy/test_sqlalchemy.py" + ) + is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + assert is_relative_path -def test_query_source_with_module_in_search_path(sentry_init, capture_events): + assert data.get(SPANDATA.CODE_FUNCTION) == "test_query_source" + break + else: + raise AssertionError("No db span found") + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source_with_module_in_search_path( + sentry_init, + capture_events, + capture_items, + span_streaming, +): """ Test that query source is relative to the path of the module it ran in """ @@ -473,227 +768,464 @@ def test_query_source_with_module_in_search_path(sentry_init, capture_events): traces_sample_rate=1.0, enable_db_query_source=True, db_query_source_threshold_ms=0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() from sqlalchemy_helpers.helpers import ( add_model_to_session, query_first_model_from_session, ) - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) - - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - bob = Person(name="Bob") + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - add_model_to_session(bob, session) + bob = Person(name="Bob") - assert query_first_model_from_session(Person, session) == bob + add_model_to_session(bob, session) - (event,) = events + assert query_first_model_from_session(Person, session) == bob - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data + assert SPANDATA.CODE_LINE_NUMBER in attributes + assert SPANDATA.CODE_FILE_PATH in attributes + assert SPANDATA.CODE_FUNCTION_NAME in attributes - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 - assert data.get(SPANDATA.CODE_NAMESPACE) == "sqlalchemy_helpers.helpers" - assert data.get(SPANDATA.CODE_FILEPATH) == "sqlalchemy_helpers/helpers.py" + assert type(attributes.get(SPANDATA.CODE_LINE_NUMBER)) == int + assert attributes.get(SPANDATA.CODE_LINE_NUMBER) > 0 + assert ( + attributes.get(SPANDATA.CODE_FILE_PATH) + == "sqlalchemy_helpers/helpers.py" + ) - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep - assert is_relative_path + is_relative_path = attributes.get(SPANDATA.CODE_FILE_PATH)[0] != os.sep + assert is_relative_path - assert data.get(SPANDATA.CODE_FUNCTION) == "query_first_model_from_session" - break + assert ( + attributes.get(SPANDATA.CODE_FUNCTION_NAME) + == "query_first_model_from_session" + ) + break + else: + raise AssertionError("No db span found") else: - raise AssertionError("No db span found") + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) -def test_no_query_source_if_duration_too_short(sentry_init, capture_events): - sentry_init( - integrations=[SqlalchemyIntegration()], - traces_sample_rate=1.0, - enable_db_query_source=True, - db_query_source_threshold_ms=100, - ) - events = capture_events() + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) + bob = Person(name="Bob") - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) + add_model_to_session(bob, session) - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() + assert query_first_model_from_session(Person, session) == bob - bob = Person(name="Bob") - session.add(bob) + (event,) = events - class fake_record_sql_queries: # noqa: N801 - def __init__(self, *args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: - self.span = span + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) - self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) - self.span.timestamp = datetime(2024, 1, 1, microsecond=99999) + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data - def __enter__(self): - return self.span + assert type(data.get(SPANDATA.CODE_LINENO)) == int + assert data.get(SPANDATA.CODE_LINENO) > 0 + assert data.get(SPANDATA.CODE_NAMESPACE) == "sqlalchemy_helpers.helpers" + assert ( + data.get(SPANDATA.CODE_FILEPATH) == "sqlalchemy_helpers/helpers.py" + ) - def __exit__(self, type, value, traceback): - pass + is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + assert is_relative_path - with mock.patch( - "sentry_sdk.integrations.sqlalchemy.record_sql_queries", - fake_record_sql_queries, - ): - assert session.query(Person).first() == bob + assert ( + data.get(SPANDATA.CODE_FUNCTION) == "query_first_model_from_session" + ) + break + else: + raise AssertionError("No db span found") + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_no_query_source_if_duration_too_short( + sentry_init, + capture_events, + capture_items, + span_streaming, +): + sentry_init( + integrations=[SqlalchemyIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=100, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) + if span_streaming: + items = capture_items("span") - (event,) = events + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - assert SPANDATA.CODE_LINENO not in data - assert SPANDATA.CODE_NAMESPACE not in data - assert SPANDATA.CODE_FILEPATH not in data - assert SPANDATA.CODE_FUNCTION not in data + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + class fake_record_sql_queries: # noqa: N801 + def __init__(self, *args, **kwargs): + with record_sql_queries_supporting_streaming( + *args, **kwargs + ) as span: + self.span = span + + if span_streaming: + self.span._start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span._timestamp = datetime(2024, 1, 1, microsecond=99999) + else: + self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span.timestamp = datetime(2024, 1, 1, microsecond=99999) + + def __enter__(self): + return self.span + + def __exit__(self, type, value, traceback): + pass + + with mock.patch( + "sentry_sdk.integrations.sqlalchemy.record_sql_queries_supporting_streaming", + fake_record_sql_queries, + ): + assert session.query(Person).first() == bob + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER not in attributes + assert SPANDATA.CODE_FILE_PATH not in attributes + assert SPANDATA.CODE_FUNCTION_NAME not in attributes + break + else: + raise AssertionError("No db span found") - break else: - raise AssertionError("No db span found") + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 -def test_query_source_if_duration_over_threshold(sentry_init, capture_events): + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) + + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + class fake_record_sql_queries: # noqa: N801 + def __init__(self, *args, **kwargs): + with record_sql_queries_supporting_streaming( + *args, **kwargs + ) as span: + self.span = span + + if span_streaming: + self.span._start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span._timestamp = datetime(2024, 1, 1, microsecond=99999) + else: + self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span.timestamp = datetime(2024, 1, 1, microsecond=99999) + + def __enter__(self): + return self.span + + def __exit__(self, type, value, traceback): + pass + + with mock.patch( + "sentry_sdk.integrations.sqlalchemy.record_sql_queries_supporting_streaming", + fake_record_sql_queries, + ): + assert session.query(Person).first() == bob + + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO not in data + assert SPANDATA.CODE_NAMESPACE not in data + assert SPANDATA.CODE_FILEPATH not in data + assert SPANDATA.CODE_FUNCTION not in data + break + else: + raise AssertionError("No db span found") + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_query_source_if_duration_over_threshold( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], traces_sample_rate=1.0, enable_db_query_source=True, db_query_source_threshold_ms=100, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() - - with start_transaction(name="test_transaction", sampled=True): - Base = declarative_base() # noqa: N806 - - class Person(Base): - __tablename__ = "person" - id = Column(Integer, primary_key=True) - name = Column(String(250), nullable=False) - - engine = create_engine( - "sqlite:///:memory:", connect_args={"check_same_thread": False} - ) - Base.metadata.create_all(engine) - - Session = sessionmaker(bind=engine) # noqa: N806 - session = Session() - - bob = Person(name="Bob") - session.add(bob) - class fake_record_sql_queries: # noqa: N801 - def __init__(self, *args, **kwargs): - with record_sql_queries(*args, **kwargs) as span: - self.span = span + if span_streaming: + items = capture_items("span") - self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) - self.span.timestamp = datetime(2024, 1, 1, microsecond=101000) + with sentry_sdk.traces.start_span(name="custom parent"): + Base = declarative_base() # noqa: N806 - def __enter__(self): - return self.span + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - def __exit__(self, type, value, traceback): - pass + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + class fake_record_sql_queries: # noqa: N801 + def __init__(self, *args, **kwargs): + with record_sql_queries_supporting_streaming( + *args, **kwargs + ) as span: + self.span = span + + self.span._start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span._timestamp = datetime(2024, 1, 1, microsecond=101000) + + def __enter__(self): + return self.span + + def __exit__(self, type, value, traceback): + pass + + with mock.patch( + "sentry_sdk.integrations.sqlalchemy.record_sql_queries_supporting_streaming", + fake_record_sql_queries, + ): + assert session.query(Person).first() == bob + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + for span in spans: + if span["attributes"].get("sentry.op") == "db" and span["name"].startswith( + "SELECT person" + ): + attributes = span["attributes"] + + assert SPANDATA.CODE_LINE_NUMBER in attributes + assert SPANDATA.CODE_FILE_PATH in attributes + assert SPANDATA.CODE_FUNCTION_NAME in attributes + + assert type(attributes.get(SPANDATA.CODE_LINE_NUMBER)) == int + assert attributes.get(SPANDATA.CODE_LINE_NUMBER) > 0 + assert attributes.get(SPANDATA.CODE_FILE_PATH).endswith( + "tests/integrations/sqlalchemy/test_sqlalchemy.py" + ) - with mock.patch( - "sentry_sdk.integrations.sqlalchemy.record_sql_queries", - fake_record_sql_queries, - ): - assert session.query(Person).first() == bob + is_relative_path = attributes.get(SPANDATA.CODE_FILE_PATH)[0] != os.sep + assert is_relative_path - (event,) = events + assert ( + attributes.get(SPANDATA.CODE_FUNCTION_NAME) + == "test_query_source_if_duration_over_threshold" + ) + break + else: + raise AssertionError("No db span found") + else: + events = capture_events() - for span in event["spans"]: - if span.get("op") == "db" and span.get("description").startswith( - "SELECT person" - ): - data = span.get("data", {}) - - assert SPANDATA.CODE_LINENO in data - assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data - assert SPANDATA.CODE_FUNCTION in data - - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 - assert ( - data.get(SPANDATA.CODE_NAMESPACE) - == "tests.integrations.sqlalchemy.test_sqlalchemy" - ) - assert data.get(SPANDATA.CODE_FILEPATH).endswith( - "tests/integrations/sqlalchemy/test_sqlalchemy.py" - ) + with start_transaction(name="test_transaction", sampled=True): + Base = declarative_base() # noqa: N806 - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep - assert is_relative_path + class Person(Base): + __tablename__ = "person" + id = Column(Integer, primary_key=True) + name = Column(String(250), nullable=False) - assert ( - data.get(SPANDATA.CODE_FUNCTION) - == "test_query_source_if_duration_over_threshold" + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} ) - break - else: - raise AssertionError("No db span found") + Base.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) # noqa: N806 + session = Session() + + bob = Person(name="Bob") + session.add(bob) + + class fake_record_sql_queries: # noqa: N801 + def __init__(self, *args, **kwargs): + with record_sql_queries_supporting_streaming( + *args, **kwargs + ) as span: + self.span = span + + self.span.start_timestamp = datetime(2024, 1, 1, microsecond=0) + self.span.timestamp = datetime(2024, 1, 1, microsecond=101000) + + def __enter__(self): + return self.span + + def __exit__(self, type, value, traceback): + pass + + with mock.patch( + "sentry_sdk.integrations.sqlalchemy.record_sql_queries_supporting_streaming", + fake_record_sql_queries, + ): + assert session.query(Person).first() == bob + + (event,) = events + + for span in event["spans"]: + if span.get("op") == "db" and span.get("description").startswith( + "SELECT person" + ): + data = span.get("data", {}) + + assert SPANDATA.CODE_LINENO in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FILEPATH in data + assert SPANDATA.CODE_FUNCTION in data + + assert type(data.get(SPANDATA.CODE_LINENO)) == int + assert data.get(SPANDATA.CODE_LINENO) > 0 + assert ( + data.get(SPANDATA.CODE_NAMESPACE) + == "tests.integrations.sqlalchemy.test_sqlalchemy" + ) + assert data.get(SPANDATA.CODE_FILEPATH).endswith( + "tests/integrations/sqlalchemy/test_sqlalchemy.py" + ) + is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + assert is_relative_path -def test_span_origin(sentry_init, capture_events): + assert ( + data.get(SPANDATA.CODE_FUNCTION) + == "test_query_source_if_duration_over_threshold" + ) + break + else: + raise AssertionError("No db span found") + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_span_origin( + sentry_init, + capture_events, + capture_items, + span_streaming, +): sentry_init( integrations=[SqlalchemyIntegration()], traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False} ) - with start_transaction(name="foo"): - with engine.connect() as con: - con.execute(text("SELECT 0")) + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="custom parent"): + with engine.connect() as con: + con.execute(text("SELECT 0")) - (event,) = events + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + + assert spans[0]["attributes"]["sentry.origin"] == "auto.db.sqlalchemy" + assert spans[1]["attributes"]["sentry.origin"] == "manual" + else: + events = capture_events() + with start_transaction(name="foo"): + with engine.connect() as con: + con.execute(text("SELECT 0")) + + (event,) = events - assert event["contexts"]["trace"]["origin"] == "manual" - assert event["spans"][0]["origin"] == "auto.db.sqlalchemy" + assert event["contexts"]["trace"]["origin"] == "manual" + assert event["spans"][0]["origin"] == "auto.db.sqlalchemy" diff --git a/tests/integrations/threading/test_threading.py b/tests/integrations/threading/test_threading.py index 788fb24fdf..9c06ce24a7 100644 --- a/tests/integrations/threading/test_threading.py +++ b/tests/integrations/threading/test_threading.py @@ -254,7 +254,8 @@ def do_some_work(number): # Free-threaded builds set thread_inherit_context to True, otherwise thread_inherit_context is False if propagate_scope or getattr(sys.flags, "thread_inherit_context", None): - assert render_span_tree(event) == dedent( + assert event["type"] == "transaction" + assert render_span_tree(event["spans"], event["contexts"]["trace"]) == dedent( """\ - op="outer-trx": description=null - op="outer-submit-0": description="Thread: main" @@ -271,7 +272,8 @@ def do_some_work(number): ) elif not propagate_scope: - assert render_span_tree(event) == dedent( + assert event["type"] == "transaction" + assert render_span_tree(event["spans"], event["contexts"]["trace"]) == dedent( """\ - op="outer-trx": description=null - op="outer-submit-0": description="Thread: main" @@ -316,7 +318,8 @@ def do_some_work(number): # Free-threaded builds set thread_inherit_context to True, otherwise thread_inherit_context is False if propagate_scope or getattr(sys.flags, "thread_inherit_context", None): - assert render_span_tree(event) == dedent( + assert event["type"] == "transaction" + assert render_span_tree(event["spans"], event["contexts"]["trace"]) == dedent( """\ - op="outer-trx": description=null - op="outer-submit-0": description="Thread: main" @@ -333,7 +336,8 @@ def do_some_work(number): ) elif not propagate_scope: - assert render_span_tree(event) == dedent( + assert event["type"] == "transaction" + assert render_span_tree(event["spans"], event["contexts"]["trace"]) == dedent( """\ - op="outer-trx": description=null - op="outer-submit-0": description="Thread: main"