Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
No-op if TANGLE_BUGSNAG_API_KEY or TANGLE_ENV are not set.

Environment variables:
TANGLE_BUGSNAG_API_KEY Required to enable Bugsnag reporting.
TANGLE_ENV Release stage (e.g. "staging", "production").
TANGLE_SERVICE_VERSION App version tag (e.g. git SHA). Optional.
TANGLE_BUGSNAG_NOTIFY_ENDPOINT Custom notify URL. Optional.
TANGLE_BUGSNAG_SESSIONS_ENDPOINT Custom sessions URL. Optional.
TANGLE_BUGSNAG_API_KEY Required to enable Bugsnag reporting.
TANGLE_ENV Release stage (e.g. "staging", "production").
TANGLE_SERVICE_VERSION App version tag (e.g. git SHA). Optional.
TANGLE_BUGSNAG_NOTIFY_ENDPOINT Custom notify URL. Optional.
TANGLE_BUGSNAG_SESSIONS_ENDPOINT Custom sessions URL. Optional.
TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY Metadata key for normalized error grouping. Optional.
"""

import logging
Expand All @@ -21,7 +22,8 @@
import bugsnag as bugsnag_sdk
import bugsnag.event as bugsnag_event

from cloud_pipelines_backend.instrumentation import contextual_logging
from . import contextual_logging
from . import error_normalization

_logger = logging.getLogger(__name__)

Expand All @@ -30,6 +32,7 @@
_SERVICE_VERSION = os.environ.get("TANGLE_SERVICE_VERSION", "")
_NOTIFY_ENDPOINT = os.environ.get("TANGLE_BUGSNAG_NOTIFY_ENDPOINT", "")
_SESSIONS_ENDPOINT = os.environ.get("TANGLE_BUGSNAG_SESSIONS_ENDPOINT", "")
_CUSTOM_GROUPING_KEY = os.environ.get("TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY", "")

IS_BUGSNAG_ENABLED: bool = bool(_BUGSNAG_API_KEY and _TANGLE_ENV)

Expand All @@ -51,6 +54,13 @@ def _before_notify(event: bugsnag_event.Event) -> None:
context = contextual_logging.get_all_context_metadata()
if context:
event.add_tab("tangle_context", context)
if _CUSTOM_GROUPING_KEY and event.exception:
normalized = error_normalization.normalize_error_message(
exception=event.exception
)
prefix = (event.metadata.get("extra") or {}).get("grouping_prefix")
key_value = f"{prefix}: {normalized}" if prefix else normalized
event.add_tab("custom", {_CUSTOM_GROUPING_KEY: key_value})


def setup(*, service_name: str | None = None) -> None:
Expand Down
100 changes: 100 additions & 0 deletions cloud_pipelines_backend/instrumentation/error_normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Normalizes exception messages into stable strings for error grouping.

Strips instance-specific values (pod names, IDs, memory addresses, byte
offsets) so that structurally identical errors produce the same key
regardless of which specific resource was involved.
"""

import json
import re

_POD_NAME_PATTERN = re.compile(r"task-[a-zA-Z0-9]+-[a-zA-Z0-9]+")
_OBJECT_REPR_PATTERN = re.compile(r"<[^>]+ object at 0x[0-9a-fA-F]+>")
_HEX_ADDRESS_PATTERN = re.compile(r"\b0x[0-9a-fA-F]+\b")
_UUID_PATTERN = re.compile(
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", re.IGNORECASE
)
_LONG_ALNUM_ID_PATTERN = re.compile(r"\b[a-zA-Z0-9]{16,}\b")


def _strip_generic(*, message: str) -> str:
message = _OBJECT_REPR_PATTERN.sub("{object}", message)
message = _HEX_ADDRESS_PATTERN.sub("{addr}", message)
message = _UUID_PATTERN.sub("{uuid}", message)
message = _LONG_ALNUM_ID_PATTERN.sub("{id}", message)
return message.strip()


def _normalize_k8s_api_exception(*, exception: BaseException) -> str | None:
try:
from kubernetes.client import exceptions as k8s_exceptions

if not isinstance(exception, k8s_exceptions.ApiException):
return None
except ImportError:
return None

status_code = exception.status
try:
body = json.loads(exception.body)
reason = body.get("reason", "")
message = body.get("message", "")
except (json.JSONDecodeError, TypeError):
reason = ""
message = str(exception)

message = _POD_NAME_PATTERN.sub("{pod}", message)
parts = [f"kubernetes ApiException ({status_code})"]
if reason:
parts.append(reason)
if message:
parts.append(message)
return ": ".join(parts)


def _normalize_max_retry_error(*, exception: BaseException) -> str | None:
try:
from urllib3.exceptions import MaxRetryError

if not isinstance(exception, MaxRetryError):
return None
except ImportError:
return None

cause = type(exception.reason).__name__ if exception.reason else "unknown"
return f"MaxRetryError: k8s connection pool max retries exceeded ({cause})"


def _normalize_unicode_decode_error(*, exception: BaseException) -> str | None:
if not isinstance(exception, UnicodeDecodeError):
return None
return f"UnicodeDecodeError: '{exception.encoding}' codec can't decode byte at position {{n}}"


def _normalize_orchestrator_error(*, exception: BaseException) -> str | None:
try:
from ..orchestrator_sql import OrchestratorError

if not isinstance(exception, OrchestratorError):
return None
except ImportError:
return None

message = _OBJECT_REPR_PATTERN.sub("{object}", str(exception))
return f"OrchestratorError: {message}"


def normalize_error_message(*, exception: BaseException) -> str:
"""Return a stable normalized string for error grouping."""
for normalizer in (
_normalize_k8s_api_exception,
_normalize_max_retry_error,
_normalize_unicode_decode_error,
_normalize_orchestrator_error,
):
result = normalizer(exception=exception)
if result is not None:
return result

return f"{type(exception).__name__}: {_strip_generic(message=str(exception))}"
6 changes: 5 additions & 1 deletion cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,11 @@ def _retry(

def record_system_error_exception(execution: bts.ExecutionNode, exception: Exception):
app_metrics.execution_system_errors.add(1)
bugsnag_instrumentation.notify(exception=exception, execution_id=str(execution.id))
bugsnag_instrumentation.notify(
exception=exception,
execution_id=str(execution.id),
grouping_prefix="SYSTEM_ERROR",
)

if execution.extra_data is None:
execution.extra_data = {}
Expand Down
178 changes: 178 additions & 0 deletions tests/instrumentation/test_error_normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""Tests for error_normalization module."""

import json
import unittest.mock as mock

import pytest

from cloud_pipelines_backend.instrumentation import error_normalization


class TestNormalizeK8sApiException:
def _make_exception(self, status: int, body: dict) -> Exception:
try:
from kubernetes.client.exceptions import ApiException
except ImportError:
pytest.skip("kubernetes not installed")
exc = ApiException(status=status, reason=body.get("reason", ""))
exc.body = json.dumps(body)
return exc

def test_pod_not_found(self):
exc = self._make_exception(
404,
{
"reason": "NotFound",
"message": 'pods "task-abc123-xyz789" not found',
},
)
result = error_normalization.normalize_error_message(exception=exc)
assert (
result == 'kubernetes ApiException (404): NotFound: pods "{pod}" not found'
)

def test_container_terminated(self):
exc = self._make_exception(
400,
{
"reason": "BadRequest",
"message": 'container "main" in pod task-abc123-xyz789 is terminated',
},
)
result = error_normalization.normalize_error_message(exception=exc)
assert (
result
== 'kubernetes ApiException (400): BadRequest: container "main" in pod {pod} is terminated'
)

def test_pod_initializing(self):
exc = self._make_exception(
400,
{
"reason": "BadRequest",
"message": 'container "main" in pod task-abc123-xyz789 is waiting to start: PodInitializing',
},
)
result = error_normalization.normalize_error_message(exception=exc)
assert result == (
'kubernetes ApiException (400): BadRequest: container "main" in pod {pod} is waiting to start: PodInitializing'
)

def test_container_not_available(self):
exc = self._make_exception(
400,
{
"reason": "BadRequest",
"message": 'container "main" in pod task-abc123-xyz789 is not available',
},
)
result = error_normalization.normalize_error_message(exception=exc)
assert (
result
== 'kubernetes ApiException (400): BadRequest: container "main" in pod {pod} is not available'
)

def test_webhook_timeout(self):
exc = self._make_exception(
500,
{
"reason": "InternalError",
"message": 'failed calling webhook "validate.kyverno.svc-fail": context deadline exceeded',
},
)
result = error_normalization.normalize_error_message(exception=exc)
assert result == (
'kubernetes ApiException (500): InternalError: failed calling webhook "validate.kyverno.svc-fail": context deadline exceeded'
)

def test_invalid_body_falls_back_to_str(self):
try:
from kubernetes.client.exceptions import ApiException
except ImportError:
pytest.skip("kubernetes not installed")
exc = ApiException(status=503, reason="ServiceUnavailable")
exc.body = "not json"
result = error_normalization.normalize_error_message(exception=exc)
assert result.startswith("kubernetes ApiException (503)")


class TestNormalizeMaxRetryError:
def test_read_timeout(self):
try:
from urllib3.exceptions import MaxRetryError, ReadTimeoutError
except ImportError:
pytest.skip("urllib3 not installed")
cause = ReadTimeoutError(pool=None, url="/api/v1/pods", message="timed out")
exc = MaxRetryError(pool=None, url="http://10.0.0.1/api/v1/pods", reason=cause)
result = error_normalization.normalize_error_message(exception=exc)
assert (
result
== "MaxRetryError: k8s connection pool max retries exceeded (ReadTimeoutError)"
)

def test_no_cause(self):
try:
from urllib3.exceptions import MaxRetryError
except ImportError:
pytest.skip("urllib3 not installed")
exc = MaxRetryError(pool=None, url="http://10.0.0.1/api/v1/pods", reason=None)
result = error_normalization.normalize_error_message(exception=exc)
assert (
result
== "MaxRetryError: k8s connection pool max retries exceeded (unknown)"
)


class TestNormalizeUnicodeDecodeError:
def test_strips_offset(self):
exc = UnicodeDecodeError("utf-8", b"\xff\xfe", 42, 43, "invalid start byte")
result = error_normalization.normalize_error_message(exception=exc)
assert (
result
== "UnicodeDecodeError: 'utf-8' codec can't decode byte at position {n}"
)

def test_different_encoding(self):
exc = UnicodeDecodeError("ascii", b"\x80", 0, 1, "ordinal not in range")
result = error_normalization.normalize_error_message(exception=exc)
assert (
result
== "UnicodeDecodeError: 'ascii' codec can't decode byte at position {n}"
)


class TestNormalizeOrchestratorError:
def test_strips_object_repr(self):
try:
from cloud_pipelines_backend.orchestrator_sql import OrchestratorError # absolute OK in tests
except ImportError:
pytest.skip("OrchestratorError not importable")
exc = OrchestratorError(
"Unexpected running container status: <ContainerStatus object at 0xdeadbeef>"
)
result = error_normalization.normalize_error_message(exception=exc)
assert (
result == "OrchestratorError: Unexpected running container status: {object}"
)


class TestFallback:
def test_strips_hex_address(self):
exc = ValueError("object at 0xdeadbeef failed")
result = error_normalization.normalize_error_message(exception=exc)
assert result == "ValueError: object at {addr} failed"

def test_strips_uuid(self):
exc = RuntimeError("resource 550e8400-e29b-41d4-a716-446655440000 not found")
result = error_normalization.normalize_error_message(exception=exc)
assert result == "RuntimeError: resource {uuid} not found"

def test_strips_long_alnum_id(self):
exc = KeyError("abcdefghijklmnopq") # 17 chars
result = error_normalization.normalize_error_message(exception=exc)
assert "abcdefghijklmnopq" not in result

def test_stable_message_unchanged(self):
exc = AttributeError("'NoneType' object has no attribute 'encode'")
result = error_normalization.normalize_error_message(exception=exc)
assert result == "AttributeError: 'NoneType' object has no attribute 'encode'"