From 148b0abfb2879715e06e25a8cc6f081777a72db1 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Fri, 8 May 2026 17:39:44 -0700 Subject: [PATCH] Add Bugsnag error grouping with stable normalized keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces error_normalization.py which strips instance-specific values (pod names, IDs, memory addresses, byte offsets) from exceptions so structurally identical errors collapse to one group in Bugsnag. TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY controls the metadata key name — no-op when unset, allowing Shopify deployments to set it without touching OSS code. System errors reported via record_system_error_exception are prefixed with "SYSTEM_ERROR: " for easy filtering. Co-Authored-By: Claude Sonnet 4.6 --- .../bugsnag_instrumentation.py | 22 ++- .../instrumentation/error_normalization.py | 100 ++++++++++ cloud_pipelines_backend/orchestrator_sql.py | 6 +- .../test_error_normalization.py | 178 ++++++++++++++++++ 4 files changed, 299 insertions(+), 7 deletions(-) create mode 100644 cloud_pipelines_backend/instrumentation/error_normalization.py create mode 100644 tests/instrumentation/test_error_normalization.py diff --git a/cloud_pipelines_backend/instrumentation/bugsnag_instrumentation.py b/cloud_pipelines_backend/instrumentation/bugsnag_instrumentation.py index e3a877b..d604a01 100644 --- a/cloud_pipelines_backend/instrumentation/bugsnag_instrumentation.py +++ b/cloud_pipelines_backend/instrumentation/bugsnag_instrumentation.py @@ -6,11 +6,12 @@ No-op if TANGLE_BUGSNAG_API_KEY or TANGLE_ENV are not set. Environment variables: - TANGLE_BUGSNAG_API_KEY Required to enable Bugsnag reporting. - TANGLE_ENV Release stage (e.g. "staging", "production"). - TANGLE_SERVICE_VERSION App version tag (e.g. git SHA). Optional. - TANGLE_BUGSNAG_NOTIFY_ENDPOINT Custom notify URL. Optional. - TANGLE_BUGSNAG_SESSIONS_ENDPOINT Custom sessions URL. Optional. + TANGLE_BUGSNAG_API_KEY Required to enable Bugsnag reporting. + TANGLE_ENV Release stage (e.g. "staging", "production"). + TANGLE_SERVICE_VERSION App version tag (e.g. git SHA). Optional. + TANGLE_BUGSNAG_NOTIFY_ENDPOINT Custom notify URL. Optional. + TANGLE_BUGSNAG_SESSIONS_ENDPOINT Custom sessions URL. Optional. + TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY Metadata key for normalized error grouping. Optional. """ import logging @@ -21,7 +22,8 @@ import bugsnag as bugsnag_sdk import bugsnag.event as bugsnag_event -from cloud_pipelines_backend.instrumentation import contextual_logging +from . import contextual_logging +from . import error_normalization _logger = logging.getLogger(__name__) @@ -30,6 +32,7 @@ _SERVICE_VERSION = os.environ.get("TANGLE_SERVICE_VERSION", "") _NOTIFY_ENDPOINT = os.environ.get("TANGLE_BUGSNAG_NOTIFY_ENDPOINT", "") _SESSIONS_ENDPOINT = os.environ.get("TANGLE_BUGSNAG_SESSIONS_ENDPOINT", "") +_CUSTOM_GROUPING_KEY = os.environ.get("TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY", "") IS_BUGSNAG_ENABLED: bool = bool(_BUGSNAG_API_KEY and _TANGLE_ENV) @@ -51,6 +54,13 @@ def _before_notify(event: bugsnag_event.Event) -> None: context = contextual_logging.get_all_context_metadata() if context: event.add_tab("tangle_context", context) + if _CUSTOM_GROUPING_KEY and event.exception: + normalized = error_normalization.normalize_error_message( + exception=event.exception + ) + prefix = (event.metadata.get("extra") or {}).get("grouping_prefix") + key_value = f"{prefix}: {normalized}" if prefix else normalized + event.add_tab("custom", {_CUSTOM_GROUPING_KEY: key_value}) def setup(*, service_name: str | None = None) -> None: diff --git a/cloud_pipelines_backend/instrumentation/error_normalization.py b/cloud_pipelines_backend/instrumentation/error_normalization.py new file mode 100644 index 0000000..21fbf76 --- /dev/null +++ b/cloud_pipelines_backend/instrumentation/error_normalization.py @@ -0,0 +1,100 @@ +""" +Normalizes exception messages into stable strings for error grouping. + +Strips instance-specific values (pod names, IDs, memory addresses, byte +offsets) so that structurally identical errors produce the same key +regardless of which specific resource was involved. +""" + +import json +import re + +_POD_NAME_PATTERN = re.compile(r"task-[a-zA-Z0-9]+-[a-zA-Z0-9]+") +_OBJECT_REPR_PATTERN = re.compile(r"<[^>]+ object at 0x[0-9a-fA-F]+>") +_HEX_ADDRESS_PATTERN = re.compile(r"\b0x[0-9a-fA-F]+\b") +_UUID_PATTERN = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", re.IGNORECASE +) +_LONG_ALNUM_ID_PATTERN = re.compile(r"\b[a-zA-Z0-9]{16,}\b") + + +def _strip_generic(*, message: str) -> str: + message = _OBJECT_REPR_PATTERN.sub("{object}", message) + message = _HEX_ADDRESS_PATTERN.sub("{addr}", message) + message = _UUID_PATTERN.sub("{uuid}", message) + message = _LONG_ALNUM_ID_PATTERN.sub("{id}", message) + return message.strip() + + +def _normalize_k8s_api_exception(*, exception: BaseException) -> str | None: + try: + from kubernetes.client import exceptions as k8s_exceptions + + if not isinstance(exception, k8s_exceptions.ApiException): + return None + except ImportError: + return None + + status_code = exception.status + try: + body = json.loads(exception.body) + reason = body.get("reason", "") + message = body.get("message", "") + except (json.JSONDecodeError, TypeError): + reason = "" + message = str(exception) + + message = _POD_NAME_PATTERN.sub("{pod}", message) + parts = [f"kubernetes ApiException ({status_code})"] + if reason: + parts.append(reason) + if message: + parts.append(message) + return ": ".join(parts) + + +def _normalize_max_retry_error(*, exception: BaseException) -> str | None: + try: + from urllib3.exceptions import MaxRetryError + + if not isinstance(exception, MaxRetryError): + return None + except ImportError: + return None + + cause = type(exception.reason).__name__ if exception.reason else "unknown" + return f"MaxRetryError: k8s connection pool max retries exceeded ({cause})" + + +def _normalize_unicode_decode_error(*, exception: BaseException) -> str | None: + if not isinstance(exception, UnicodeDecodeError): + return None + return f"UnicodeDecodeError: '{exception.encoding}' codec can't decode byte at position {{n}}" + + +def _normalize_orchestrator_error(*, exception: BaseException) -> str | None: + try: + from ..orchestrator_sql import OrchestratorError + + if not isinstance(exception, OrchestratorError): + return None + except ImportError: + return None + + message = _OBJECT_REPR_PATTERN.sub("{object}", str(exception)) + return f"OrchestratorError: {message}" + + +def normalize_error_message(*, exception: BaseException) -> str: + """Return a stable normalized string for error grouping.""" + for normalizer in ( + _normalize_k8s_api_exception, + _normalize_max_retry_error, + _normalize_unicode_decode_error, + _normalize_orchestrator_error, + ): + result = normalizer(exception=exception) + if result is not None: + return result + + return f"{type(exception).__name__}: {_strip_generic(message=str(exception))}" diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index ed402ec..89ac3e2 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -1056,7 +1056,11 @@ def _retry( def record_system_error_exception(execution: bts.ExecutionNode, exception: Exception): app_metrics.execution_system_errors.add(1) - bugsnag_instrumentation.notify(exception=exception, execution_id=str(execution.id)) + bugsnag_instrumentation.notify( + exception=exception, + execution_id=str(execution.id), + grouping_prefix="SYSTEM_ERROR", + ) if execution.extra_data is None: execution.extra_data = {} diff --git a/tests/instrumentation/test_error_normalization.py b/tests/instrumentation/test_error_normalization.py new file mode 100644 index 0000000..ba25a4d --- /dev/null +++ b/tests/instrumentation/test_error_normalization.py @@ -0,0 +1,178 @@ +"""Tests for error_normalization module.""" + +import json +import unittest.mock as mock + +import pytest + +from cloud_pipelines_backend.instrumentation import error_normalization + + +class TestNormalizeK8sApiException: + def _make_exception(self, status: int, body: dict) -> Exception: + try: + from kubernetes.client.exceptions import ApiException + except ImportError: + pytest.skip("kubernetes not installed") + exc = ApiException(status=status, reason=body.get("reason", "")) + exc.body = json.dumps(body) + return exc + + def test_pod_not_found(self): + exc = self._make_exception( + 404, + { + "reason": "NotFound", + "message": 'pods "task-abc123-xyz789" not found', + }, + ) + result = error_normalization.normalize_error_message(exception=exc) + assert ( + result == 'kubernetes ApiException (404): NotFound: pods "{pod}" not found' + ) + + def test_container_terminated(self): + exc = self._make_exception( + 400, + { + "reason": "BadRequest", + "message": 'container "main" in pod task-abc123-xyz789 is terminated', + }, + ) + result = error_normalization.normalize_error_message(exception=exc) + assert ( + result + == 'kubernetes ApiException (400): BadRequest: container "main" in pod {pod} is terminated' + ) + + def test_pod_initializing(self): + exc = self._make_exception( + 400, + { + "reason": "BadRequest", + "message": 'container "main" in pod task-abc123-xyz789 is waiting to start: PodInitializing', + }, + ) + result = error_normalization.normalize_error_message(exception=exc) + assert result == ( + 'kubernetes ApiException (400): BadRequest: container "main" in pod {pod} is waiting to start: PodInitializing' + ) + + def test_container_not_available(self): + exc = self._make_exception( + 400, + { + "reason": "BadRequest", + "message": 'container "main" in pod task-abc123-xyz789 is not available', + }, + ) + result = error_normalization.normalize_error_message(exception=exc) + assert ( + result + == 'kubernetes ApiException (400): BadRequest: container "main" in pod {pod} is not available' + ) + + def test_webhook_timeout(self): + exc = self._make_exception( + 500, + { + "reason": "InternalError", + "message": 'failed calling webhook "validate.kyverno.svc-fail": context deadline exceeded', + }, + ) + result = error_normalization.normalize_error_message(exception=exc) + assert result == ( + 'kubernetes ApiException (500): InternalError: failed calling webhook "validate.kyverno.svc-fail": context deadline exceeded' + ) + + def test_invalid_body_falls_back_to_str(self): + try: + from kubernetes.client.exceptions import ApiException + except ImportError: + pytest.skip("kubernetes not installed") + exc = ApiException(status=503, reason="ServiceUnavailable") + exc.body = "not json" + result = error_normalization.normalize_error_message(exception=exc) + assert result.startswith("kubernetes ApiException (503)") + + +class TestNormalizeMaxRetryError: + def test_read_timeout(self): + try: + from urllib3.exceptions import MaxRetryError, ReadTimeoutError + except ImportError: + pytest.skip("urllib3 not installed") + cause = ReadTimeoutError(pool=None, url="/api/v1/pods", message="timed out") + exc = MaxRetryError(pool=None, url="http://10.0.0.1/api/v1/pods", reason=cause) + result = error_normalization.normalize_error_message(exception=exc) + assert ( + result + == "MaxRetryError: k8s connection pool max retries exceeded (ReadTimeoutError)" + ) + + def test_no_cause(self): + try: + from urllib3.exceptions import MaxRetryError + except ImportError: + pytest.skip("urllib3 not installed") + exc = MaxRetryError(pool=None, url="http://10.0.0.1/api/v1/pods", reason=None) + result = error_normalization.normalize_error_message(exception=exc) + assert ( + result + == "MaxRetryError: k8s connection pool max retries exceeded (unknown)" + ) + + +class TestNormalizeUnicodeDecodeError: + def test_strips_offset(self): + exc = UnicodeDecodeError("utf-8", b"\xff\xfe", 42, 43, "invalid start byte") + result = error_normalization.normalize_error_message(exception=exc) + assert ( + result + == "UnicodeDecodeError: 'utf-8' codec can't decode byte at position {n}" + ) + + def test_different_encoding(self): + exc = UnicodeDecodeError("ascii", b"\x80", 0, 1, "ordinal not in range") + result = error_normalization.normalize_error_message(exception=exc) + assert ( + result + == "UnicodeDecodeError: 'ascii' codec can't decode byte at position {n}" + ) + + +class TestNormalizeOrchestratorError: + def test_strips_object_repr(self): + try: + from cloud_pipelines_backend.orchestrator_sql import OrchestratorError # absolute OK in tests + except ImportError: + pytest.skip("OrchestratorError not importable") + exc = OrchestratorError( + "Unexpected running container status: " + ) + result = error_normalization.normalize_error_message(exception=exc) + assert ( + result == "OrchestratorError: Unexpected running container status: {object}" + ) + + +class TestFallback: + def test_strips_hex_address(self): + exc = ValueError("object at 0xdeadbeef failed") + result = error_normalization.normalize_error_message(exception=exc) + assert result == "ValueError: object at {addr} failed" + + def test_strips_uuid(self): + exc = RuntimeError("resource 550e8400-e29b-41d4-a716-446655440000 not found") + result = error_normalization.normalize_error_message(exception=exc) + assert result == "RuntimeError: resource {uuid} not found" + + def test_strips_long_alnum_id(self): + exc = KeyError("abcdefghijklmnopq") # 17 chars + result = error_normalization.normalize_error_message(exception=exc) + assert "abcdefghijklmnopq" not in result + + def test_stable_message_unchanged(self): + exc = AttributeError("'NoneType' object has no attribute 'encode'") + result = error_normalization.normalize_error_message(exception=exc) + assert result == "AttributeError: 'NoneType' object has no attribute 'encode'"