diff --git a/sentry_sdk/integrations/pymongo.py b/sentry_sdk/integrations/pymongo.py index 59daab34da..e24332a816 100644 --- a/sentry_sdk/integrations/pymongo.py +++ b/sentry_sdk/integrations/pymongo.py @@ -2,10 +2,12 @@ import json import sentry_sdk -from sentry_sdk.consts import SPANSTATUS, SPANDATA, OP +from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.traces import SpanStatus, StreamedSpan from sentry_sdk.tracing import Span +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import capture_internal_exceptions try: @@ -87,12 +89,11 @@ def _strip_pii(command: "Dict[str, Any]") -> "Dict[str, Any]": def _get_db_data(event: "Any") -> "Dict[str, Any]": data = {} - data[SPANDATA.DB_SYSTEM] = "mongodb" - data[SPANDATA.DB_DRIVER_NAME] = "pymongo" + client = sentry_sdk.get_client() + is_span_streaming_enabled = has_span_streaming_enabled(client.options) + data[SPANDATA.DB_DRIVER_NAME] = "pymongo" db_name = event.database_name - if db_name is not None: - data[SPANDATA.DB_NAME] = db_name server_address = event.connection_id[0] if server_address is not None: @@ -102,12 +103,23 @@ def _get_db_data(event: "Any") -> "Dict[str, Any]": if server_port is not None: data[SPANDATA.SERVER_PORT] = server_port + if is_span_streaming_enabled: + data["db.system.name"] = "mongodb" + + if db_name is not None: + data["db.namespace"] = db_name + else: + data[SPANDATA.DB_SYSTEM] = "mongodb" + + if db_name is not None: + data[SPANDATA.DB_NAME] = db_name + return data class CommandTracer(monitoring.CommandListener): def __init__(self) -> None: - self._ongoing_operations: "Dict[int, Span]" = {} + self._ongoing_operations: "Dict[int, Union[Span, StreamedSpan]]" = {} def _operation_key( self, @@ -116,7 +128,8 @@ def _operation_key( return event.request_id def started(self, event: "CommandStartedEvent") -> None: - if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None: + client = sentry_sdk.get_client() + if client.get_integration(PyMongoIntegration) is None: return with capture_internal_exceptions(): @@ -126,56 +139,88 @@ def started(self, event: "CommandStartedEvent") -> None: command.pop("$clusterTime", None) command.pop("$signature", None) - tags = { - "db.name": event.database_name, - SPANDATA.DB_SYSTEM: "mongodb", - SPANDATA.DB_DRIVER_NAME: "pymongo", - SPANDATA.DB_OPERATION: event.command_name, - SPANDATA.DB_MONGODB_COLLECTION: command.get(event.command_name), - } - - try: - tags["net.peer.name"] = event.connection_id[0] - tags["net.peer.port"] = str(event.connection_id[1]) - except TypeError: - pass + db_data = _get_db_data(event) - data: "Dict[str, Any]" = {"operation_ids": {}} - data["operation_ids"]["operation"] = event.operation_id - data["operation_ids"]["request"] = event.request_id - - data.update(_get_db_data(event)) - - try: - lsid = command.pop("lsid")["id"] - data["operation_ids"]["session"] = str(lsid) - except KeyError: - pass + collection_name = command.get(event.command_name) + operation_name = event.command_name + db_name = event.database_name + lsid = command.pop("lsid", None) if not should_send_default_pii(): command = _strip_pii(command) query = json.dumps(command, default=str) - span = sentry_sdk.start_span( - op=OP.DB, - name=query, - origin=PyMongoIntegration.origin, - ) - for tag, value in tags.items(): - # set the tag for backwards-compatibility. - # TODO: remove the set_tag call in the next major release! - span.set_tag(tag, value) + if has_span_streaming_enabled(client.options): + span_first_data = { + "db.operation.name": operation_name, + "db.collection.name": collection_name, + "sentry.op": OP.DB, + "sentry.origin": PyMongoIntegration.origin, + **db_data, + } + + span = sentry_sdk.traces.start_span( + name=query, attributes=span_first_data + ) + + with capture_internal_exceptions(): + sentry_sdk.add_breadcrumb( + message=query, + category="query", + type=OP.DB, + data=span_first_data, + ) + + else: + tags = { + "db.name": db_name, + SPANDATA.DB_SYSTEM: "mongodb", + SPANDATA.DB_DRIVER_NAME: "pymongo", + SPANDATA.DB_OPERATION: operation_name, + # The below is a deprecated field, but leaving for legacy reasons. + # The v2 spans will use `db.collection.name` instead. + SPANDATA.DB_MONGODB_COLLECTION: collection_name, + } + + try: + tags["net.peer.name"] = event.connection_id[0] + tags["net.peer.port"] = str(event.connection_id[1]) + except TypeError: + pass + + data: "Dict[str, Any]" = {"operation_ids": {}} + data["operation_ids"]["operation"] = event.operation_id + data["operation_ids"]["request"] = event.request_id + + data.update(db_data) + + try: + if lsid: + lsid_id = lsid["id"] + data["operation_ids"]["session"] = str(lsid_id) + except KeyError: + pass + + span = sentry_sdk.start_span( + op=OP.DB, + name=query, + origin=PyMongoIntegration.origin, + ) - span.set_data(tag, value) + for tag, value in tags.items(): + # set the tag for backwards-compatibility. + # TODO: remove the set_tag call in the next major release! + span.set_tag(tag, value) + span.set_data(tag, value) - for key, value in data.items(): - span.set_data(key, value) + for key, value in data.items(): + span.set_data(key, value) - with capture_internal_exceptions(): - sentry_sdk.add_breadcrumb( - message=query, category="query", type=OP.DB, data=tags - ) + with capture_internal_exceptions(): + sentry_sdk.add_breadcrumb( + message=query, category="query", type=OP.DB, data=tags + ) self._ongoing_operations[self._operation_key(event)] = span.__enter__() @@ -185,7 +230,11 @@ def failed(self, event: "CommandFailedEvent") -> None: try: span = self._ongoing_operations.pop(self._operation_key(event)) - span.set_status(SPANSTATUS.INTERNAL_ERROR) + # Ignoring NoOpStreamedSpan as it will always have a status of "ok" + if type(span) is StreamedSpan: + span.status = SpanStatus.ERROR + elif type(span) is Span: + span.set_status(SPANSTATUS.INTERNAL_ERROR) span.__exit__(None, None, None) except KeyError: return @@ -196,7 +245,8 @@ def succeeded(self, event: "CommandSucceededEvent") -> None: try: span = self._ongoing_operations.pop(self._operation_key(event)) - span.set_status(SPANSTATUS.OK) + if type(span) is Span: + span.set_status(SPANSTATUS.OK) span.__exit__(None, None, None) except KeyError: pass diff --git a/tests/integrations/pymongo/test_pymongo.py b/tests/integrations/pymongo/test_pymongo.py index b57061b0a0..4f350c8aea 100644 --- a/tests/integrations/pymongo/test_pymongo.py +++ b/tests/integrations/pymongo/test_pymongo.py @@ -1,11 +1,12 @@ +import pytest +from mockupdb import MockupDB, OpQuery +from pymongo import MongoClient + +import sentry_sdk from sentry_sdk import capture_message, start_transaction from sentry_sdk.consts import SPANDATA from sentry_sdk.integrations.pymongo import PyMongoIntegration, _strip_pii -from mockupdb import MockupDB, OpQuery -from pymongo import MongoClient -import pytest - @pytest.fixture(scope="session") def mongo_server(): @@ -109,6 +110,72 @@ def test_transactions(sentry_init, capture_events, mongo_server, with_pii): assert insert_fail["tags"]["status"] == "internal_error" +@pytest.mark.parametrize("with_pii", [False, True]) +def test_segment_span_streaming(sentry_init, capture_items, mongo_server, with_pii): + sentry_init( + integrations=[PyMongoIntegration()], + traces_sample_rate=1.0, + send_default_pii=with_pii, + _experiments={"trace_lifecycle": "stream"}, + ) + items = capture_items("span") + + connection = MongoClient(mongo_server.uri) + + with sentry_sdk.traces.start_span(name="test_segment"): + list( + connection["test_db"]["test_collection"].find({"foobar": 1}) + ) # force query execution + connection["test_db"]["test_collection"].insert_one({"foo": 2}) + try: + connection["test_db"]["erroneous"].insert_many([{"bar": 3}, {"baz": 4}]) + pytest.fail("Request should raise") + except Exception: + pass + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 4 + + (find, insert_success, insert_fail, segment) = spans + assert segment["name"] == "test_segment" + + for span in find, insert_success, insert_fail: + attrs = span["attributes"] + assert attrs["db.system.name"] == "mongodb" + assert attrs[SPANDATA.DB_DRIVER_NAME] == "pymongo" + assert attrs["db.namespace"] == "test_db" + assert attrs[SPANDATA.SERVER_ADDRESS] == "localhost" + assert attrs[SPANDATA.SERVER_PORT] == mongo_server.port + assert attrs["sentry.op"] == "db" + assert attrs["sentry.origin"] == "auto.db.pymongo" + + assert find["attributes"]["db.operation.name"] == "find" + assert insert_success["attributes"]["db.operation.name"] == "insert" + assert insert_fail["attributes"]["db.operation.name"] == "insert" + + assert find["name"].startswith('{"find') + assert insert_success["name"].startswith('{"insert') + assert insert_fail["name"].startswith('{"insert') + + assert find["attributes"]["db.collection.name"] == "test_collection" + assert insert_success["attributes"]["db.collection.name"] == "test_collection" + assert insert_fail["attributes"]["db.collection.name"] == "erroneous" + + if with_pii: + assert "1" in find["name"] + assert "2" in insert_success["name"] + assert "3" in insert_fail["name"] and "4" in insert_fail["name"] + else: + assert "1" not in find["name"] + assert "2" not in insert_success["name"] + assert "3" not in insert_fail["name"] and "4" not in insert_fail["name"] + + assert find["status"] == "ok" + assert insert_success["status"] == "ok" + assert insert_fail["status"] == "error" + + @pytest.mark.parametrize("with_pii", [False, True]) def test_breadcrumbs(sentry_init, capture_events, mongo_server, with_pii): sentry_init( @@ -146,6 +213,46 @@ def test_breadcrumbs(sentry_init, capture_events, mongo_server, with_pii): } +@pytest.mark.parametrize("with_pii", [False, True]) +def test_breadcrumbs_span_streaming(sentry_init, capture_items, mongo_server, with_pii): + sentry_init( + integrations=[PyMongoIntegration()], + traces_sample_rate=1.0, + send_default_pii=with_pii, + _experiments={"trace_lifecycle": "stream"}, + ) + items = capture_items("event") + + connection = MongoClient(mongo_server.uri) + + list( + connection["test_db"]["test_collection"].find({"foobar": 1}) + ) # force query execution + capture_message("hi") + + event = items[0].payload + (crumb,) = event["breadcrumbs"]["values"] + + assert crumb["category"] == "query" + assert crumb["message"].startswith('{"find') + if with_pii: + assert "1" in crumb["message"] + else: + assert "1" not in crumb["message"] + assert crumb["type"] == "db" + + data = crumb["data"] + assert data["db.namespace"] == "test_db" + assert data["db.system.name"] == "mongodb" + assert data["db.driver.name"] == "pymongo" + assert data["db.operation.name"] == "find" + assert data["db.collection.name"] == "test_collection" + assert data["sentry.op"] == "db" + assert data["sentry.origin"] == "auto.db.pymongo" + assert data[SPANDATA.SERVER_ADDRESS] == "localhost" + assert data[SPANDATA.SERVER_PORT] == mongo_server.port + + @pytest.mark.parametrize( "testcase", [ @@ -460,3 +567,74 @@ def test_span_origin(sentry_init, capture_events, mongo_server): assert event["contexts"]["trace"]["origin"] == "manual" assert event["spans"][0]["origin"] == "auto.db.pymongo" + + +def test_span_origin_span_streaming(sentry_init, capture_items, mongo_server): + sentry_init( + integrations=[PyMongoIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + items = capture_items("span") + + connection = MongoClient(mongo_server.uri) + + with sentry_sdk.traces.start_span(name="test_segment"): + list( + connection["test_db"]["test_collection"].find({"foobar": 1}) + ) # force query execution + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 2 + (db_span, segment) = spans + assert segment["name"] == "test_segment" + assert db_span["attributes"]["sentry.origin"] == "auto.db.pymongo" + + +def test_span_streaming_status_on_success(sentry_init, capture_items, mongo_server): + sentry_init( + integrations=[PyMongoIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + items = capture_items("span") + + connection = MongoClient(mongo_server.uri) + + with sentry_sdk.traces.start_span(name="test_segment"): + connection["test_db"]["test_collection"].insert_one({"foo": 1}) + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 2 + (db_span, segment) = spans + assert segment["name"] == "test_segment" + assert db_span["status"] == "ok" + + +def test_span_streaming_status_on_failure(sentry_init, capture_items, mongo_server): + sentry_init( + integrations=[PyMongoIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream"}, + ) + items = capture_items("span") + + connection = MongoClient(mongo_server.uri) + + with sentry_sdk.traces.start_span(name="test_segment"): + try: + connection["test_db"]["erroneous"].insert_many([{"bar": 3}]) + pytest.fail("Request should raise") + except Exception: + pass + sentry_sdk.flush() + + spans = [item.payload for item in items] + + assert len(spans) == 2 + (db_span, segment) = spans + + assert segment["name"] == "test_segment" + assert db_span["status"] == "error"