From 556ba68cab3865825d9085ae3c46d1cf51a10654 Mon Sep 17 00:00:00 2001 From: Marco Antonio Gil Date: Thu, 23 Apr 2026 17:36:28 +0200 Subject: [PATCH] PTHMINT-119: SSE event stream support and EventManager Introduce Server-Sent Events support and helpers for subscribing to order event streams. Adds a generic SSE client (ServerSentEvent, ServerSentEventStream, StreamingResponse) and an EventStream adapter with Event/EventData response models under api.paths.events.stream. Adds EventManager to provide convenient subscribe_events/subscribe_order_events methods and exposes it via Sdk.get_event_manager(). Order response model now normalizes new plural events_* and legacy event_* fields for compatibility. Also export SSE types from multisafepay.client, include an example script, and add unit and E2E tests covering stream parsing, manager behavior, and compatibility. --- examples/event_manager/subscribe_events.py | 107 +++++++++ src/multisafepay/api/paths/events/__init__.py | 14 ++ .../api/paths/events/event_manager.py | 87 +++++++ .../api/paths/events/stream/__init__.py | 132 +++++++++++ .../paths/events/stream/response/__init__.py | 16 ++ .../stream/response/components/__init__.py | 18 ++ .../stream/response/components/event_data.py | 55 +++++ .../api/paths/events/stream/response/event.py | 50 ++++ .../paths/orders/response/order_response.py | 27 ++- src/multisafepay/client/__init__.py | 3 + src/multisafepay/client/sse.py | 224 ++++++++++++++++++ src/multisafepay/sdk.py | 13 + .../e2e/examples/event_manager/conftest.py | 211 +++++++++++++++++ .../event_manager/test_subscribe_events.py | 88 +++++++ .../events/stream/test_unit_event_stream.py | 186 +++++++++++++++ .../path/events/test_unit_event_manager.py | 121 ++++++++++ .../response/test_unit_order_response.py | 74 ++++++ tests/multisafepay/unit/test_unit_sdk.py | 14 ++ 18 files changed, 1439 insertions(+), 1 deletion(-) create mode 100644 examples/event_manager/subscribe_events.py create mode 100644 src/multisafepay/api/paths/events/__init__.py create mode 100644 src/multisafepay/api/paths/events/event_manager.py create mode 100644 src/multisafepay/api/paths/events/stream/__init__.py create mode 100644 src/multisafepay/api/paths/events/stream/response/__init__.py create mode 100644 src/multisafepay/api/paths/events/stream/response/components/__init__.py create mode 100644 src/multisafepay/api/paths/events/stream/response/components/event_data.py create mode 100644 src/multisafepay/api/paths/events/stream/response/event.py create mode 100644 src/multisafepay/client/sse.py create mode 100644 tests/multisafepay/e2e/examples/event_manager/conftest.py create mode 100644 tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py create mode 100644 tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py create mode 100644 tests/multisafepay/unit/api/path/events/test_unit_event_manager.py create mode 100644 tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py diff --git a/examples/event_manager/subscribe_events.py b/examples/event_manager/subscribe_events.py new file mode 100644 index 0000000..826fa89 --- /dev/null +++ b/examples/event_manager/subscribe_events.py @@ -0,0 +1,107 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Create a Cloud POS order and subscribe to its event stream.""" + +import os +import time + +from dotenv import load_dotenv +from multisafepay import Sdk +from multisafepay.api.paths.orders.request import OrderRequest +from multisafepay.client import ScopedCredentialResolver + +# Load environment variables from a .env file +load_dotenv() + + +def _get_first_env(*names: str) -> str: + for name in names: + value = os.getenv(name, "").strip() + if value: + return value + + return "" + + +def _require_first_env(*names: str) -> str: + value = _get_first_env(*names) + if value: + return value + + raise RuntimeError( + f"Missing required environment variable. Set one of: {', '.join(names)}", + ) + + +DEFAULT_ACCOUNT_API_KEY = _require_first_env("API_KEY", "E2E_API_KEY") +TERMINAL_GROUP_DEFAULT_API_KEY = _require_first_env( + "TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", + "E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", +) +CLOUD_POS_TERMINAL_GROUP_ID = _require_first_env( + "CLOUD_POS_TERMINAL_GROUP_ID", +) +TERMINAL_ID = _require_first_env( + "CLOUD_POS_TERMINAL_ID", + "E2E_CLOUD_POS_TERMINAL_ID", +) + +if __name__ == "__main__": + # This example executes Cloud POS calls with terminal-group scope. + scoped_terminal_group_id = CLOUD_POS_TERMINAL_GROUP_ID + resolver_kwargs = { + "default_api_key": DEFAULT_ACCOUNT_API_KEY, + } + if scoped_terminal_group_id: + resolver_kwargs["terminal_group_api_keys"] = { + scoped_terminal_group_id: TERMINAL_GROUP_DEFAULT_API_KEY, + } + + credential_resolver = ScopedCredentialResolver(**resolver_kwargs) + + multisafepay_sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, + ) + order_manager = multisafepay_sdk.get_order_manager() + event_manager = multisafepay_sdk.get_event_manager() + + order_id = f"cloud-pos-{int(time.time())}" + + order_request = ( + OrderRequest() + .add_type("redirect") + .add_order_id(order_id) + .add_description("Cloud POS order") + .add_amount(100) + .add_currency("EUR") + .add_gateway_info( + { + "terminal_id": TERMINAL_ID, + }, + ) + ) + + create_response = order_manager.create( + order_request, + terminal_group_id=scoped_terminal_group_id, + ) + order = create_response.get_data() + + if order is None: + raise RuntimeError("Order creation did not return order data") + + print(f"Created Cloud POS order: {order.order_id}") + print("Listening for events. Press Ctrl+C to stop.") + + try: + with event_manager.subscribe_order_events(order, timeout=45.0) as stream: + for event in stream: + print(event) + except KeyboardInterrupt: + print("Stream interrupted by user.") diff --git a/src/multisafepay/api/paths/events/__init__.py b/src/multisafepay/api/paths/events/__init__.py new file mode 100644 index 0000000..cf12cf7 --- /dev/null +++ b/src/multisafepay/api/paths/events/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Events API endpoints.""" + +from multisafepay.api.paths.events.event_manager import EventManager + +__all__ = [ + "EventManager", +] diff --git a/src/multisafepay/api/paths/events/event_manager.py b/src/multisafepay/api/paths/events/event_manager.py new file mode 100644 index 0000000..8cd490a --- /dev/null +++ b/src/multisafepay/api/paths/events/event_manager.py @@ -0,0 +1,87 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Event manager for event stream subscription helpers.""" + +from __future__ import annotations + +from multisafepay.api.base.abstract_manager import AbstractManager +from multisafepay.api.paths.events.stream import EventStream +from multisafepay.api.paths.orders.response.order_response import Order +from multisafepay.client.client import Client + + +class EventManager(AbstractManager): + """Manages event stream subscriptions for order events.""" + + def __init__(self: EventManager, client: Client) -> None: + """Initialize the EventManager with a client.""" + super().__init__(client) + + def subscribe_events( + self: EventManager, + events_token: str, + events_stream_url: str, + last_event_id: str | None = None, + timeout: float = 30.0, + ) -> EventStream: + """ + Subscribe to order events using the SSE stream endpoint. + + Parameters + ---------- + events_token (str): Token returned by order creation for event auth. + events_stream_url (str): Full SSE stream URL. + last_event_id (str | None): Optional resume cursor. + timeout (float): Socket timeout in seconds. + + Returns + ------- + EventStream: An iterator over incoming SSE messages. + + """ + return EventStream.open( + events_token=events_token, + events_stream_url=events_stream_url, + last_event_id=last_event_id, + timeout=timeout, + ) + + def subscribe_order_events( + self: EventManager, + order: Order, + last_event_id: str | None = None, + timeout: float = 30.0, + ) -> EventStream: + """ + Subscribe to events for an existing order response object. + + Parameters + ---------- + order (Order): Order response that contains event credentials. + last_event_id (str | None): Optional resume cursor. + timeout (float): Socket timeout in seconds. + + Returns + ------- + EventStream: An iterator over incoming SSE messages. + + """ + events_token = order.events_token or order.event_token + events_stream_url = order.events_stream_url or order.event_stream_url + + if not events_token or not events_stream_url: + raise ValueError( + "Order does not contain events_token/events_stream_url.", + ) + + return self.subscribe_events( + events_token=events_token, + events_stream_url=events_stream_url, + last_event_id=last_event_id, + timeout=timeout, + ) diff --git a/src/multisafepay/api/paths/events/stream/__init__.py b/src/multisafepay/api/paths/events/stream/__init__.py new file mode 100644 index 0000000..84b2268 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/__init__.py @@ -0,0 +1,132 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Event stream contracts for the events path.""" + +from __future__ import annotations + +import json + +from multisafepay.api.paths.events.stream.response import Event, EventData +from multisafepay.client.sse import ( + ServerSentEvent, + ServerSentEventStream, + StreamingResponse, +) +from typing_extensions import Self + + +def _deserialize_event_payload(raw_payload: str | None) -> object | None: + """Parse raw SSE data for this path and fall back to plain text.""" + if raw_payload is None: + return None + + try: + return json.loads(raw_payload) + except json.JSONDecodeError: + return raw_payload + + +def _to_event(server_sent_event: ServerSentEvent) -> Event: + """Adapt a generic SSE message into the events-path response model.""" + event = Event.from_dict( + { + "event": server_sent_event.event, + "data": _deserialize_event_payload(server_sent_event.data), + "event_id": server_sent_event.event_id, + "retry": server_sent_event.retry, + "raw_data": server_sent_event.raw_data, + }, + ) + if event is None: + raise ValueError("Unable to adapt SSE payload to Event.") + return event + + +class EventStream: + """Iterator over events received from the MultiSafepay SSE endpoint.""" + + def __init__( + self: EventStream, + response: StreamingResponse | None = None, + stream: ServerSentEventStream | None = None, + ) -> None: + """Initialize the stream from an HTTP response or generic SSE stream.""" + if stream is not None: + self._stream = stream + return + + if response is None: + raise ValueError( + "response is required when stream is not provided.", + ) + + self._stream = ServerSentEventStream(response=response) + + @classmethod + def _from_stream( + cls: type[EventStream], + stream: ServerSentEventStream, + ) -> EventStream: + """Build an EventStream around an already-open generic SSE stream.""" + return cls(stream=stream) + + @classmethod + def open( + cls: type[EventStream], + events_token: str, + events_stream_url: str, + last_event_id: str | None = None, + timeout: float = 30.0, + ) -> EventStream: + """Open a new SSE stream using the event token and stream URL.""" + headers = { + "Accept": "text/event-stream", + "Cache-Control": "no-cache", + "Authorization": f"Bearer {events_token}", + } + if last_event_id is not None: + headers["Last-Event-ID"] = last_event_id + + stream = ServerSentEventStream.open( + url=events_stream_url, + headers=headers, + timeout=timeout, + ) + return cls._from_stream(stream) + + @property + def closed(self: EventStream) -> bool: + """Return whether this stream is already closed.""" + return self._stream.closed + + def close(self: EventStream) -> None: + """Close the underlying HTTP response stream.""" + self._stream.close() + + def __iter__(self: EventStream) -> EventStream: + """Return self as an iterator over events.""" + return self + + def __next__(self: EventStream) -> Event: + """Read the next SSE message and return it as an Event.""" + return _to_event(next(self._stream)) + + def __enter__(self: Self) -> Self: + """Support context manager protocol.""" + return self + + def __exit__(self: EventStream, *args: object) -> None: + """Close stream when exiting context manager.""" + self.close() + + +__all__ = [ + "Event", + "EventData", + "EventStream", +] diff --git a/src/multisafepay/api/paths/events/stream/response/__init__.py b/src/multisafepay/api/paths/events/stream/response/__init__.py new file mode 100644 index 0000000..bd51138 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Response models for event stream payloads.""" + +from multisafepay.api.paths.events.stream.response.components import EventData +from multisafepay.api.paths.events.stream.response.event import Event + +__all__ = [ + "Event", + "EventData", +] diff --git a/src/multisafepay/api/paths/events/stream/response/components/__init__.py b/src/multisafepay/api/paths/events/stream/response/components/__init__.py new file mode 100644 index 0000000..9b13bc9 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/components/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Component models for event stream response payloads.""" + +from multisafepay.api.paths.events.stream.response.components.event_data import ( + EventData, + EventDataPayload, +) + +__all__ = [ + "EventData", + "EventDataPayload", +] diff --git a/src/multisafepay/api/paths/events/stream/response/components/event_data.py b/src/multisafepay/api/paths/events/stream/response/components/event_data.py new file mode 100644 index 0000000..67eedb1 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/components/event_data.py @@ -0,0 +1,55 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Component response models for event stream payloads.""" + +from __future__ import annotations + +from typing import Optional, Union + +from multisafepay.model.response_model import ResponseModel + +# ruff: noqa: UP007 + +EventDataPayload = Union[ + "EventData", + str, + int, + float, + bool, + list[object], + None, +] + + +class EventData(ResponseModel): + """Structured nested event payload for known order-event attributes.""" + + status: Optional[str] + order_id: Optional[str] + type: Optional[str] + data: EventDataPayload + + @staticmethod + def from_dict(d: dict) -> Optional[EventData]: + """Create EventData from dictionary data.""" + if d is None: + return None + + args = d.copy() + payload = d.get("data") + if isinstance(payload, dict): + args["data"] = EventData.from_dict(payload) + elif isinstance(payload, list): + args["data"] = [ + EventData.from_dict(item) if isinstance(item, dict) else item + for item in payload + ] + else: + args["data"] = payload + + return EventData(**args) diff --git a/src/multisafepay/api/paths/events/stream/response/event.py b/src/multisafepay/api/paths/events/stream/response/event.py new file mode 100644 index 0000000..52af229 --- /dev/null +++ b/src/multisafepay/api/paths/events/stream/response/event.py @@ -0,0 +1,50 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Response models for order event stream messages.""" + +from __future__ import annotations + +from typing import Optional + +from multisafepay.api.paths.events.stream.response.components import ( + EventData, + EventDataPayload, +) +from multisafepay.model.response_model import ResponseModel + +# ruff: noqa: UP007 + + +class Event(ResponseModel): + """Structured event message returned by the MultiSafepay SSE stream.""" + + event: Optional[str] + data: EventDataPayload + event_id: Optional[str] + retry: Optional[int] + raw_data: Optional[str] + + @staticmethod + def from_dict(d: dict) -> Optional[Event]: + """Create Event from dictionary data.""" + if d is None: + return None + + args = d.copy() + payload = d.get("data") + if isinstance(payload, dict): + args["data"] = EventData.from_dict(payload) + elif isinstance(payload, list): + args["data"] = [ + EventData.from_dict(item) if isinstance(item, dict) else item + for item in payload + ] + else: + args["data"] = payload + + return Event(**args) diff --git a/src/multisafepay/api/paths/orders/response/order_response.py b/src/multisafepay/api/paths/orders/response/order_response.py index 4581929..89e8f04 100644 --- a/src/multisafepay/api/paths/orders/response/order_response.py +++ b/src/multisafepay/api/paths/orders/response/order_response.py @@ -103,6 +103,11 @@ class Order(ResponseModel): payment_url: Optional[str] cancel_url: Optional[str] session_id: Optional[str] + events_token: Optional[str] + events_url: Optional[str] + events_stream_url: Optional[str] + + # Backward compatibility aliases for older API payloads. event_token: Optional[str] event_url: Optional[str] event_stream_url: Optional[str] @@ -118,6 +123,23 @@ def get_order_id(self: "Order") -> str: """ return self.order_id + @staticmethod + def _normalize_event_fields(d: dict) -> dict: + """Normalize singular/plural event keys for compatibility.""" + mapping = [ + ("events_token", "event_token"), + ("events_url", "event_url"), + ("events_stream_url", "event_stream_url"), + ] + + for plural_key, singular_key in mapping: + if d.get(plural_key) is None and d.get(singular_key) is not None: + d[plural_key] = d[singular_key] + if d.get(singular_key) is None and d.get(plural_key) is not None: + d[singular_key] = d[plural_key] + + return d + @staticmethod def from_dict(d: dict) -> Optional["Order"]: """ @@ -134,7 +156,10 @@ def from_dict(d: dict) -> Optional["Order"]: """ if d is None: return None - order_dependency_adapter = Decorator(dependencies=d) + normalized_dependencies = Order._normalize_event_fields(d.copy()) + order_dependency_adapter = Decorator( + dependencies=normalized_dependencies, + ) dependencies = ( order_dependency_adapter.adapt_order_adjustment( d.get("order_adjustment"), diff --git a/src/multisafepay/client/__init__.py b/src/multisafepay/client/__init__.py index 922e1d2..30fdbf6 100644 --- a/src/multisafepay/client/__init__.py +++ b/src/multisafepay/client/__init__.py @@ -3,9 +3,12 @@ from multisafepay.client.api_key import ApiKey from multisafepay.client.client import Client from multisafepay.client.credential_resolver import ScopedCredentialResolver +from multisafepay.client.sse import ServerSentEvent, ServerSentEventStream __all__ = [ "ApiKey", "Client", "ScopedCredentialResolver", + "ServerSentEvent", + "ServerSentEventStream", ] diff --git a/src/multisafepay/client/sse.py b/src/multisafepay/client/sse.py new file mode 100644 index 0000000..16c0586 --- /dev/null +++ b/src/multisafepay/client/sse.py @@ -0,0 +1,224 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Generic Server-Sent Events support utilities.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import ClassVar, Protocol +from urllib.parse import urlparse +from urllib.request import Request, urlopen + +from typing_extensions import Self + + +class StreamingResponse(Protocol): + """Protocol for the minimal stream response interface used by SSE streams.""" + + def readline(self: StreamingResponse) -> bytes: + """Read one line from the stream response.""" + + def close(self: StreamingResponse) -> None: + """Close the stream response.""" + + +@dataclass(frozen=True) +class ServerSentEvent: + """Generic representation of one SSE message.""" + + event: str | None = None + data: str | None = None + event_id: str | None = None + retry: int | None = None + raw_data: str | None = None + + +@dataclass +class _ServerSentEventBuilder: + """Mutable builder used while parsing one SSE message.""" + + _FIELD_HANDLERS: ClassVar[dict[str, str]] = { + "event": "_consume_event", + "data": "_consume_data", + "id": "_consume_id", + "retry": "_consume_retry", + } + + event_name: str | None = None + event_id: str | None = None + event_retry: int | None = None + data_lines: list[str] = field(default_factory=list) + has_fields: bool = False + + def consume_line(self: _ServerSentEventBuilder, line: str) -> None: + """Consume one SSE line and update the builder state.""" + if line.startswith(":"): + return + + field_name, field_value = _parse_line(line) + if field_name is None: + return + + handler_name = self._FIELD_HANDLERS.get(field_name) + if handler_name is None: + return + + getattr(self, handler_name)(field_value) + + def _consume_event( + self: _ServerSentEventBuilder, + field_value: str, + ) -> None: + """Consume the SSE event field.""" + self.has_fields = True + self.event_name = field_value + + def _consume_data(self: _ServerSentEventBuilder, field_value: str) -> None: + """Consume one SSE data line.""" + self.has_fields = True + self.data_lines.append(field_value) + + def _consume_id(self: _ServerSentEventBuilder, field_value: str) -> None: + """Consume the SSE id field.""" + self.has_fields = True + self.event_id = field_value + + def _consume_retry( + self: _ServerSentEventBuilder, + field_value: str, + ) -> None: + """Consume the SSE retry field when it is a valid integer.""" + try: + self.event_retry = int(field_value) + except ValueError: + return + + self.has_fields = True + + def build(self: _ServerSentEventBuilder) -> ServerSentEvent | None: + """Build an immutable SSE message or None when no message exists.""" + if not self.has_fields and not self.data_lines: + return None + + raw_data = "\n".join(self.data_lines) if self.data_lines else None + return ServerSentEvent( + event=self.event_name, + data=raw_data, + event_id=self.event_id, + retry=self.event_retry, + raw_data=raw_data, + ) + + +class ServerSentEventStream: + """Iterator over messages received from a generic SSE endpoint.""" + + def __init__( + self: ServerSentEventStream, + response: StreamingResponse, + ) -> None: + """Initialize the stream from an already-open HTTP response.""" + self._response = response + self._closed = False + + @classmethod + def open( + cls: type[ServerSentEventStream], + url: str, + headers: dict[str, str] | None = None, + timeout: float = 30.0, + ) -> ServerSentEventStream: + """Open a new SSE stream using a URL and optional headers.""" + cls._validate_url(url) + + request = Request( # noqa: S310 + url=url, + headers=headers or {}, + method="GET", + ) + # Keep the response open; close manages the lifecycle. + # pylint: disable=consider-using-with + response = urlopen(request, timeout=timeout) # noqa: S310 + # pylint: enable=consider-using-with + + return cls(response=response) + + @staticmethod + def _validate_url(url: str) -> None: + """Validate the stream URL before opening the network connection.""" + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise ValueError("Invalid SSE URL.") + + @property + def closed(self: ServerSentEventStream) -> bool: + """Return whether this stream is already closed.""" + return self._closed + + def close(self: ServerSentEventStream) -> None: + """Close the underlying HTTP response stream.""" + if self._closed: + return + + self._response.close() + self._closed = True + + def __iter__(self: ServerSentEventStream) -> ServerSentEventStream: + """Return self as an iterator over SSE messages.""" + return self + + def __next__(self: ServerSentEventStream) -> ServerSentEvent: + """Read the next SSE message and return it.""" + if self._closed: + raise StopIteration + + builder = _ServerSentEventBuilder() + while True: + line = self._read_line() + if line is None: + self.close() + raise StopIteration + + if line == "": + event = builder.build() + if event is not None: + return event + builder = _ServerSentEventBuilder() + continue + + builder.consume_line(line) + + def _read_line(self: ServerSentEventStream) -> str | None: + """Read and decode one line from the underlying stream response.""" + raw_line = self._response.readline() + if not raw_line: + return None + + return raw_line.decode("utf-8", errors="replace").rstrip("\r\n") + + def __enter__(self: Self) -> Self: + """Support context manager protocol.""" + return self + + def __exit__(self: ServerSentEventStream, *args: object) -> None: + """Close stream when exiting context manager.""" + self.close() + + +def _parse_line(line: str) -> tuple[str | None, str]: + """Parse one SSE line into field and value parts.""" + if ":" not in line: + return line or None, "" + + field_name, field_value = line.split(":", 1) + if field_name == "": + return None, "" + if field_value.startswith(" "): + field_value = field_value[1:] + + return field_name, field_value diff --git a/src/multisafepay/sdk.py b/src/multisafepay/sdk.py index 2572035..af2151e 100644 --- a/src/multisafepay/sdk.py +++ b/src/multisafepay/sdk.py @@ -11,6 +11,7 @@ from multisafepay.api.paths.auth.auth_manager import AuthManager from multisafepay.api.paths.categories.category_manager import CategoryManager +from multisafepay.api.paths.events.event_manager import EventManager from multisafepay.api.paths.gateways.gateway_manager import GatewayManager from multisafepay.api.paths.issuers.issuer_manager import IssuerManager from multisafepay.api.paths.orders.order_manager import OrderManager @@ -174,6 +175,18 @@ def get_category_manager(self: "Sdk") -> CategoryManager: """ return CategoryManager(self.client) + def get_event_manager(self: "Sdk") -> EventManager: + """ + Get the event manager. + + Returns + ------- + EventManager + The event manager instance. + + """ + return EventManager(self.client) + def get_order_manager(self: "Sdk") -> OrderManager: """ Get the order manager. diff --git a/tests/multisafepay/e2e/examples/event_manager/conftest.py b/tests/multisafepay/e2e/examples/event_manager/conftest.py new file mode 100644 index 0000000..8fb1792 --- /dev/null +++ b/tests/multisafepay/e2e/examples/event_manager/conftest.py @@ -0,0 +1,211 @@ +"""Event-manager-specific E2E fixtures and selective skip behavior.""" + +import os +from urllib.parse import urlparse + +import pytest + +from multisafepay.client import ScopedCredentialResolver +from multisafepay.client.client import Client +from multisafepay.client.credential_resolver import AuthScope +from multisafepay.sdk import Sdk + +EVENT_MANAGER_E2E_NODE_PREFIXES = ( + "tests/multisafepay/e2e/examples/event_manager/", +) +DEFAULT_API_KEY_ENVS = ("E2E_API_KEY", "API_KEY") +PARTNER_API_KEY_ENVS = ("E2E_PARTNER_API_KEY", "PARTNER_API_KEY") +TERMINAL_GROUP_API_KEY_ENVS = ( + "E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", + "TERMINAL_GROUP_API_KEY_GROUP_DEFAULT", +) +TERMINAL_GROUP_ID_ENVS = ("CLOUD_POS_TERMINAL_GROUP_ID",) +TERMINAL_ID_ENVS = ("E2E_CLOUD_POS_TERMINAL_ID", "CLOUD_POS_TERMINAL_ID") +BASE_URL_ENVS = ("E2E_NO_SANDBOX_BASE_URL", "MSP_SDK_CUSTOM_BASE_URL") + + +def _get_first_env(*names: str) -> str: + for name in names: + value = os.getenv(name, "").strip() + if value: + return value + + return "" + + +def _require_first_env(*names: str) -> str: + value = _get_first_env(*names) + if value: + return value + + msg = f"SSE E2E tests require one of: {', '.join(names)}" + raise pytest.UsageError(msg) + + +def _validate_base_url(base_url: str, *env_names: str) -> str: + parsed = urlparse(base_url) + if parsed.scheme != "https" or not parsed.netloc: + msg = f"{', '.join(env_names)} must be a valid https URL" + raise pytest.UsageError(msg) + + path = parsed.path.rstrip("/") + normalized_path = "/" if not path else f"{path}/" + return f"{parsed.scheme}://{parsed.netloc}{normalized_path}" + + +def _has_event_manager_e2e_env() -> bool: + has_base = bool( + _get_first_env(*DEFAULT_API_KEY_ENVS) + and _get_first_env(*TERMINAL_GROUP_API_KEY_ENVS) + and _get_first_env(*TERMINAL_ID_ENVS) + and _get_first_env(*BASE_URL_ENVS), + ) + if not has_base: + return False + + return bool( + _get_first_env(*TERMINAL_GROUP_ID_ENVS) + or _get_first_env(*PARTNER_API_KEY_ENVS), + ) + + +def _resolve_terminal_group_id( + base_url: str, + default_api_key: str, + partner_api_key: str, + terminal_id: str, +) -> str: + credential_resolver = ScopedCredentialResolver( + default_api_key=default_api_key, + partner_affiliate_api_key=partner_api_key, + ) + sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, + ) + sdk.get_client().url = base_url + + limit = 100 + max_pages = 10 + for page in range(1, max_pages + 1): + response = sdk.get_client().create_get_request( + "json/terminals", + params={ + "limit": limit, + "page": page, + }, + auth_scope=AuthScope( + scope=Client.AUTH_SCOPE_PARTNER_AFFILIATE, + ), + ) + if ( + response.get_status_code() != 200 + or not response.get_body_success() + ): + raise pytest.UsageError( + "Unable to resolve Cloud POS terminal group id: " + "GET /json/terminals did not return a successful response", + ) + + listing = response.get_body_data() + if not isinstance(listing, list) or not listing: + break + + for terminal in listing: + listed_terminal_id = terminal.get("id") + terminal_code = terminal.get("code") + if terminal_id not in {listed_terminal_id, terminal_code}: + continue + + group_id = terminal.get("group_id") + if group_id is None: + raise pytest.UsageError( + f"Unable to resolve Cloud POS terminal group id: " + f"terminal {terminal_id} has no group_id", + ) + return str(group_id) + + if len(listing) < limit: + break + + raise pytest.UsageError( + f"Unable to resolve Cloud POS terminal group id for terminal {terminal_id}", + ) + + +@pytest.fixture(scope="session") +def cloud_pos_terminal_id() -> str: + """Return terminal id used by SSE E2E tests.""" + return _require_first_env(*TERMINAL_ID_ENVS) + + +@pytest.fixture(scope="session") +def cloud_pos_base_url() -> str: + """Return dev-backed base URL used by SSE E2E tests.""" + return _validate_base_url( + _require_first_env(*BASE_URL_ENVS), + *BASE_URL_ENVS, + ) + + +@pytest.fixture(scope="session") +def cloud_pos_terminal_group_id( + cloud_pos_base_url: str, + cloud_pos_terminal_id: str, +) -> str: + """Return terminal group id from env or resolve it via /json/terminals.""" + explicit_group_id = _get_first_env(*TERMINAL_GROUP_ID_ENVS) + if explicit_group_id: + return explicit_group_id + + return _resolve_terminal_group_id( + base_url=cloud_pos_base_url, + default_api_key=_require_first_env(*DEFAULT_API_KEY_ENVS), + partner_api_key=_require_first_env(*PARTNER_API_KEY_ENVS), + terminal_id=cloud_pos_terminal_id, + ) + + +@pytest.fixture(scope="session") +def cloud_pos_events_sdk( + cloud_pos_base_url: str, + cloud_pos_terminal_group_id: str, +) -> Sdk: + """Return SDK configured for Cloud POS creation plus SSE subscription.""" + credential_resolver = ScopedCredentialResolver( + default_api_key=_require_first_env(*DEFAULT_API_KEY_ENVS), + terminal_group_api_keys={ + cloud_pos_terminal_group_id: _require_first_env( + *TERMINAL_GROUP_API_KEY_ENVS, + ), + }, + ) + sdk = Sdk( + is_production=False, + credential_resolver=credential_resolver, + ) + sdk.get_client().url = cloud_pos_base_url + return sdk + + +def pytest_collection_modifyitems( + config: pytest.Config, # noqa: ARG001 + items: list[pytest.Item], +) -> None: + """Skip SSE E2E tests when the required credentials are missing.""" + if _has_event_manager_e2e_env(): + return + + skip = pytest.mark.skip( + reason=( + "SSE E2E tests require E2E_API_KEY or API_KEY, " + "E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT or " + "TERMINAL_GROUP_API_KEY_GROUP_DEFAULT, " + "E2E_CLOUD_POS_TERMINAL_ID or CLOUD_POS_TERMINAL_ID, " + "E2E_NO_SANDBOX_BASE_URL or MSP_SDK_CUSTOM_BASE_URL, " + "and either CLOUD_POS_TERMINAL_GROUP_ID or PARTNER_API_KEY" + ), + ) + for item in items: + if item.nodeid.startswith(EVENT_MANAGER_E2E_NODE_PREFIXES): + item.add_marker(skip) diff --git a/tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py b/tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py new file mode 100644 index 0000000..c7d137b --- /dev/null +++ b/tests/multisafepay/e2e/examples/event_manager/test_subscribe_events.py @@ -0,0 +1,88 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""E2E coverage for examples/event_manager/subscribe_events.py.""" + +import time + +import pytest + +from multisafepay.api.base.response.custom_api_response import ( + CustomApiResponse, +) +from multisafepay.api.paths.events.event_manager import EventManager +from multisafepay.api.paths.events.stream import EventStream +from multisafepay.api.paths.orders.order_manager import OrderManager +from multisafepay.api.paths.orders.request.order_request import OrderRequest +from multisafepay.api.paths.orders.response.order_response import Order +from multisafepay.sdk import Sdk + + +@pytest.fixture(scope="module") +def order_manager(cloud_pos_events_sdk: Sdk) -> OrderManager: + """Return OrderManager for Cloud POS event-stream tests.""" + return cloud_pos_events_sdk.get_order_manager() + + +@pytest.fixture(scope="module") +def event_manager(cloud_pos_events_sdk: Sdk) -> EventManager: + """Return EventManager for Cloud POS event-stream tests.""" + return cloud_pos_events_sdk.get_event_manager() + + +def _build_cloud_pos_order_request( + order_id: str, + terminal_id: str, +) -> OrderRequest: + """Create the Cloud POS order request used by the SSE example.""" + return ( + OrderRequest() + .add_type("redirect") + .add_order_id(order_id) + .add_description("Cloud POS order for event streaming") + .add_amount(100) + .add_currency("EUR") + .add_gateway_info( + { + "terminal_id": terminal_id, + }, + ) + ) + + +def test_subscribe_order_events_opens_stream( + order_manager: OrderManager, + event_manager: EventManager, + cloud_pos_terminal_group_id: str, + cloud_pos_terminal_id: str, +) -> None: + """Create a Cloud POS order and verify the SSE stream opens successfully.""" + order_id = f"cloud-pos-events-e2e-{int(time.time())}" + + create_response = order_manager.create( + request_order=_build_cloud_pos_order_request( + order_id=order_id, + terminal_id=cloud_pos_terminal_id, + ), + terminal_group_id=cloud_pos_terminal_group_id, + ) + + assert isinstance(create_response, CustomApiResponse) + assert create_response.get_status_code() == 200 + assert create_response.get_body_success() is True + + order = create_response.get_data() + assert isinstance(order, Order) + assert order.order_id == order_id + assert order.events_token or order.event_token + assert order.events_stream_url or order.event_stream_url + + with event_manager.subscribe_order_events(order, timeout=10.0) as stream: + assert isinstance(stream, EventStream) + assert stream.closed is False + + assert stream.closed is True diff --git a/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py b/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py new file mode 100644 index 0000000..eb09647 --- /dev/null +++ b/tests/multisafepay/unit/api/path/events/stream/test_unit_event_stream.py @@ -0,0 +1,186 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Unit tests for event-path stream contracts.""" + +from __future__ import annotations + +import io + +import pytest + +from multisafepay.api.paths.events.stream import Event, EventData, EventStream +from multisafepay.client.sse import ServerSentEventStream + +EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" +EVENTS_TOKEN = "events-token" +LAST_EVENT_ID = "last-10" +PING_PAYLOAD = b"data: ping\n\n" + + +class _FakeStreamingResponse: + """Small streaming response stub used for unit testing.""" + + def __init__(self: _FakeStreamingResponse, payload: bytes) -> None: + self._buffer = io.BytesIO(payload) + self.closed = False + + def readline(self: _FakeStreamingResponse) -> bytes: + return self._buffer.readline() + + def close(self: _FakeStreamingResponse) -> None: + self._buffer.close() + self.closed = True + + +def test_open_builds_expected_headers(monkeypatch: pytest.MonkeyPatch) -> None: + """Build event-specific auth headers before opening the generic SSE stream.""" + captured: dict[str, object] = {} + + def fake_open( + url: str, + headers: dict[str, str] | None = None, + timeout: float = 30.0, + ) -> ServerSentEventStream: + captured["url"] = url + captured["headers"] = headers + captured["timeout"] = timeout + return ServerSentEventStream( + response=_FakeStreamingResponse(PING_PAYLOAD), + ) + + monkeypatch.setattr( + "multisafepay.api.paths.events.stream.ServerSentEventStream.open", + fake_open, + ) + + stream = EventStream.open( + events_token=EVENTS_TOKEN, + events_stream_url=EVENTS_STREAM_URL, + last_event_id=LAST_EVENT_ID, + timeout=9.5, + ) + event = next(stream) + + assert isinstance(event, Event) + assert event.data == "ping" + assert captured["url"] == EVENTS_STREAM_URL + assert captured["timeout"] == 9.5 + headers = captured["headers"] + assert headers["Authorization"] == f"Bearer {EVENTS_TOKEN}" + assert headers["Accept"] == "text/event-stream" + assert headers["Cache-Control"] == "no-cache" + assert headers["Last-Event-ID"] == LAST_EVENT_ID + + +def test_wraps_generic_sse_messages_as_event_contracts() -> None: + """Adapt generic SSE messages into the events-path Event contract.""" + payload = ( + b"event: order.updated\n" + b"id: 15\n" + b"retry: 1000\n" + b'data: {"status": "completed", "order_id": "123"}\n\n' + ) + stream = EventStream(response=_FakeStreamingResponse(payload)) + + event = next(stream) + + assert isinstance(event, Event) + assert event.event == "order.updated" + assert event.event_id == "15" + assert event.retry == 1000 + assert isinstance(event.data, EventData) + assert event.data.status == "completed" + assert event.data.order_id == "123" + + +def test_event_from_dict_adapts_nested_payloads() -> None: + """Build nested event payload models through the common from_dict path.""" + event = Event.from_dict( + { + "event": "order.updated", + "data": { + "status": "processing", + "data": { + "status": "completed", + "order_id": "nested-1", + }, + }, + }, + ) + + assert event is not None + assert isinstance(event.data, EventData) + assert event.data.status == "processing" + assert isinstance(event.data.data, EventData) + assert event.data.data.status == "completed" + assert event.data.data.order_id == "nested-1" + + +def test_event_data_from_dict_adapts_nested_list_payloads() -> None: + """Build nested EventData items when payload data contains a list.""" + payload = EventData.from_dict( + { + "status": "processing", + "data": [ + { + "status": "completed", + "order_id": "nested-2", + }, + "keep-me", + ], + }, + ) + + assert payload is not None + assert payload.status == "processing" + assert isinstance(payload.data, list) + assert isinstance(payload.data[0], EventData) + assert payload.data[0].status == "completed" + assert payload.data[0].order_id == "nested-2" + assert payload.data[1] == "keep-me" + + +def test_event_from_dict_adapts_list_payload_items() -> None: + """Build top-level event payload lists through the common from_dict path.""" + event = Event.from_dict( + { + "event": "order.batch", + "data": [ + { + "status": "completed", + "order_id": "batch-1", + }, + "tail", + ], + }, + ) + + assert event is not None + assert isinstance(event.data, list) + assert isinstance(event.data[0], EventData) + assert event.data[0].status == "completed" + assert event.data[0].order_id == "batch-1" + assert event.data[1] == "tail" + + +def test_from_dict_returns_none_for_none_payload() -> None: + """Return None when from_dict receives None input.""" + assert Event.from_dict(None) is None + assert EventData.from_dict(None) is None + + +def test_closes_wrapped_stream_on_eof() -> None: + """Close the wrapped generic stream when EOF is reached.""" + response = _FakeStreamingResponse(payload=b"") + stream = EventStream(response=response) + + with pytest.raises(StopIteration): + next(stream) + + assert response.closed is True + assert stream.closed is True diff --git a/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py b/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py new file mode 100644 index 0000000..efbe3d0 --- /dev/null +++ b/tests/multisafepay/unit/api/path/events/test_unit_event_manager.py @@ -0,0 +1,121 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Unit tests for event manager subscription helpers.""" + +from typing import Optional +from unittest.mock import MagicMock + +import pytest + +from multisafepay.api.paths.events.event_manager import EventManager +from multisafepay.api.paths.orders.response.order_response import Order + +TEST_EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" +ORDER_EVENTS_STREAM_URL = "https://stream.example/events/stream/" +LEGACY_EVENTS_STREAM_URL = "https://legacy.example/events/stream/" +MISSING_EVENTS_ERROR = "events_token/events_stream_url" + + +def _patch_event_stream_open( + monkeypatch: pytest.MonkeyPatch, +) -> tuple[dict[str, object], object]: + """Patch EventStream.open and return capture dict plus sentinel stream.""" + captured: dict[str, object] = {} + expected_stream = object() + + def fake_open( + events_token: str, + events_stream_url: str, + last_event_id: Optional[str] = None, + timeout: float = 30.0, + ) -> object: + captured["events_token"] = events_token + captured["events_stream_url"] = events_stream_url + captured["last_event_id"] = last_event_id + captured["timeout"] = timeout + return expected_stream + + monkeypatch.setattr( + "multisafepay.api.paths.events.event_manager.EventStream.open", + fake_open, + ) + + return captured, expected_stream + + +def test_subscribe_events_delegates_to_stream_open( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Delegate direct subscriptions to EventStream.open.""" + captured, expected_stream = _patch_event_stream_open(monkeypatch) + + manager = EventManager(MagicMock()) + stream = manager.subscribe_events( + events_token="token-abc", + events_stream_url=TEST_EVENTS_STREAM_URL, + last_event_id="last-15", + timeout=10.0, + ) + + assert stream is expected_stream + assert captured["events_token"] == "token-abc" + assert captured["events_stream_url"] == TEST_EVENTS_STREAM_URL + assert captured["last_event_id"] == "last-15" + assert captured["timeout"] == 10.0 + + +def test_subscribe_order_events_uses_plural_fields( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Read events credentials from events_* fields when present.""" + captured, expected_stream = _patch_event_stream_open(monkeypatch) + + manager = EventManager(MagicMock()) + order = Order( + order_id="order-1", + events_token="events-token", + events_stream_url=ORDER_EVENTS_STREAM_URL, + ) + + stream = manager.subscribe_order_events(order) + + assert stream is expected_stream + assert captured["events_token"] == "events-token" + assert captured["events_stream_url"] == ORDER_EVENTS_STREAM_URL + + +def test_subscribe_order_events_falls_back_to_legacy_fields( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Support old event_* field names for backward compatibility.""" + captured, expected_stream = _patch_event_stream_open(monkeypatch) + + manager = EventManager(MagicMock()) + order = Order( + order_id="order-2", + event_token="legacy-token", + event_stream_url=LEGACY_EVENTS_STREAM_URL, + ) + + stream = manager.subscribe_order_events(order) + + assert stream is expected_stream + assert captured["events_token"] == "legacy-token" + assert captured["events_stream_url"] == LEGACY_EVENTS_STREAM_URL + + +def test_subscribe_order_events_requires_token_and_stream_url() -> None: + """Raise a clear error when event credentials are missing in order.""" + manager = EventManager(MagicMock()) + order = Order(order_id="order-3") + + with pytest.raises( + ValueError, + match=MISSING_EVENTS_ERROR, + ): + manager.subscribe_order_events(order) diff --git a/tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py b/tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py new file mode 100644 index 0000000..60a5b5c --- /dev/null +++ b/tests/multisafepay/unit/api/path/orders/response/test_unit_order_response.py @@ -0,0 +1,74 @@ +# Copyright (c) MultiSafepay, Inc. All rights reserved. + +# This file is licensed under the Open Software License (OSL) version 3.0. +# For a copy of the license, see the LICENSE.txt file in the project root. + +# See the DISCLAIMER.md file for disclaimer details. + +"""Unit tests for order response event fields compatibility.""" + +from typing import Optional + +from multisafepay.api.paths.orders.response.order_response import Order + +PLURAL_EVENTS_TOKEN = "token-123" +PLURAL_EVENTS_URL = "wss://testapi.multisafepay.com/events/" +PLURAL_EVENTS_STREAM_URL = "https://testapi.multisafepay.com/events/stream/" +LEGACY_EVENTS_TOKEN = "legacy-token" +LEGACY_EVENTS_URL = "wss://legacy.example.com/events/" +LEGACY_EVENTS_STREAM_URL = "https://legacy.example.com/events/stream/" + + +def _assert_event_fields( + order: Optional[Order], + expected_token: str, + expected_url: str, + expected_stream_url: str, +) -> None: + """Assert both plural and legacy event fields are populated consistently.""" + assert order is not None + + assert order.events_token == expected_token + assert order.events_url == expected_url + assert order.events_stream_url == expected_stream_url + assert order.event_token == expected_token + assert order.event_url == expected_url + assert order.event_stream_url == expected_stream_url + + +def test_from_dict_maps_plural_event_fields_to_legacy_aliases() -> None: + """Map events_* fields to both plural and legacy singular attributes.""" + data = { + "order_id": "order-1", + "events_token": PLURAL_EVENTS_TOKEN, + "events_url": PLURAL_EVENTS_URL, + "events_stream_url": PLURAL_EVENTS_STREAM_URL, + } + + order = Order.from_dict(data) + + _assert_event_fields( + order=order, + expected_token=PLURAL_EVENTS_TOKEN, + expected_url=PLURAL_EVENTS_URL, + expected_stream_url=PLURAL_EVENTS_STREAM_URL, + ) + + +def test_from_dict_maps_legacy_event_fields_to_plural_names() -> None: + """Map event_* fields to newer plural names for consistency.""" + data = { + "order_id": "order-2", + "event_token": LEGACY_EVENTS_TOKEN, + "event_url": LEGACY_EVENTS_URL, + "event_stream_url": LEGACY_EVENTS_STREAM_URL, + } + + order = Order.from_dict(data) + + _assert_event_fields( + order=order, + expected_token=LEGACY_EVENTS_TOKEN, + expected_url=LEGACY_EVENTS_URL, + expected_stream_url=LEGACY_EVENTS_STREAM_URL, + ) diff --git a/tests/multisafepay/unit/test_unit_sdk.py b/tests/multisafepay/unit/test_unit_sdk.py index f160905..a7fa84c 100644 --- a/tests/multisafepay/unit/test_unit_sdk.py +++ b/tests/multisafepay/unit/test_unit_sdk.py @@ -7,9 +7,12 @@ """Unit tests for SDK-level environment/base URL guardrails.""" +from unittest.mock import MagicMock + import pytest from multisafepay import Sdk +from multisafepay.api.paths.events.event_manager import EventManager from multisafepay.client.client import Client from multisafepay.client.credential_resolver import ScopedCredentialResolver @@ -122,6 +125,17 @@ def test_sdk_requires_api_key_or_resolver() -> None: Sdk(is_production=False) +def test_sdk_returns_event_manager() -> None: + """Expose EventManager through SDK convenience getter.""" + sdk = Sdk( + api_key="mock_api_key", + is_production=False, + transport=MagicMock(), + ) + + assert isinstance(sdk.get_event_manager(), EventManager) + + def test_sdk_uses_credential_resolver_with_custom_transport() -> None: """Wire resolver + transport together and use resolved auth header.""" transport = _CaptureTransport()