Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions examples/event_manager/subscribe_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Create a Cloud POS order and subscribe to its event stream."""

import os
import time

from dotenv import load_dotenv
from multisafepay import Sdk
from multisafepay.api.paths.orders.request import OrderRequest
from multisafepay.client import ScopedCredentialResolver

# Load environment variables from a .env file
load_dotenv()


def _get_first_env(*names: str) -> str:
for name in names:
value = os.getenv(name, "").strip()
if value:
return value

return ""


def _require_first_env(*names: str) -> str:
value = _get_first_env(*names)
if value:
return value

raise RuntimeError(
f"Missing required environment variable. Set one of: {', '.join(names)}",
)


DEFAULT_ACCOUNT_API_KEY = _require_first_env("API_KEY", "E2E_API_KEY")
TERMINAL_GROUP_DEFAULT_API_KEY = _require_first_env(
"TERMINAL_GROUP_API_KEY_GROUP_DEFAULT",
"E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT",
)
CLOUD_POS_TERMINAL_GROUP_ID = _require_first_env(
"CLOUD_POS_TERMINAL_GROUP_ID",
)
TERMINAL_ID = _require_first_env(
"CLOUD_POS_TERMINAL_ID",
"E2E_CLOUD_POS_TERMINAL_ID",
)

if __name__ == "__main__":
# This example executes Cloud POS calls with terminal-group scope.
scoped_terminal_group_id = CLOUD_POS_TERMINAL_GROUP_ID
resolver_kwargs = {
"default_api_key": DEFAULT_ACCOUNT_API_KEY,
}
if scoped_terminal_group_id:
resolver_kwargs["terminal_group_api_keys"] = {
scoped_terminal_group_id: TERMINAL_GROUP_DEFAULT_API_KEY,
}

credential_resolver = ScopedCredentialResolver(**resolver_kwargs)

multisafepay_sdk = Sdk(
is_production=False,
credential_resolver=credential_resolver,
)
order_manager = multisafepay_sdk.get_order_manager()
event_manager = multisafepay_sdk.get_event_manager()

order_id = f"cloud-pos-{int(time.time())}"

order_request = (
OrderRequest()
.add_type("redirect")
.add_order_id(order_id)
.add_description("Cloud POS order")
.add_amount(100)
.add_currency("EUR")
.add_gateway_info(
{
"terminal_id": TERMINAL_ID,
},
)
)

create_response = order_manager.create(
order_request,
terminal_group_id=scoped_terminal_group_id,
)
order = create_response.get_data()

if order is None:
raise RuntimeError("Order creation did not return order data")

print(f"Created Cloud POS order: {order.order_id}")
print("Listening for events. Press Ctrl+C to stop.")

try:
with event_manager.subscribe_order_events(order, timeout=45.0) as stream:
for event in stream:
print(event)
except KeyboardInterrupt:
print("Stream interrupted by user.")
14 changes: 14 additions & 0 deletions src/multisafepay/api/paths/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Events API endpoints."""

from multisafepay.api.paths.events.event_manager import EventManager

__all__ = [
"EventManager",
]
87 changes: 87 additions & 0 deletions src/multisafepay/api/paths/events/event_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Event manager for event stream subscription helpers."""

from __future__ import annotations

from multisafepay.api.base.abstract_manager import AbstractManager
from multisafepay.api.paths.events.stream import EventStream
from multisafepay.api.paths.orders.response.order_response import Order
from multisafepay.client.client import Client


class EventManager(AbstractManager):
"""Manages event stream subscriptions for order events."""

def __init__(self: EventManager, client: Client) -> None:
"""Initialize the EventManager with a client."""
super().__init__(client)

def subscribe_events(
self: EventManager,
events_token: str,
events_stream_url: str,
last_event_id: str | None = None,
timeout: float = 30.0,
) -> EventStream:
"""
Subscribe to order events using the SSE stream endpoint.

Parameters
----------
events_token (str): Token returned by order creation for event auth.
events_stream_url (str): Full SSE stream URL.
last_event_id (str | None): Optional resume cursor.
timeout (float): Socket timeout in seconds.

Returns
-------
EventStream: An iterator over incoming SSE messages.

"""
return EventStream.open(
events_token=events_token,
events_stream_url=events_stream_url,
last_event_id=last_event_id,
timeout=timeout,
)

def subscribe_order_events(
self: EventManager,
order: Order,
last_event_id: str | None = None,
timeout: float = 30.0,
) -> EventStream:
"""
Subscribe to events for an existing order response object.

Parameters
----------
order (Order): Order response that contains event credentials.
last_event_id (str | None): Optional resume cursor.
timeout (float): Socket timeout in seconds.

Returns
-------
EventStream: An iterator over incoming SSE messages.

"""
events_token = order.events_token or order.event_token
events_stream_url = order.events_stream_url or order.event_stream_url

if not events_token or not events_stream_url:
raise ValueError(
"Order does not contain events_token/events_stream_url.",
)

return self.subscribe_events(
events_token=events_token,
events_stream_url=events_stream_url,
last_event_id=last_event_id,
timeout=timeout,
)
132 changes: 132 additions & 0 deletions src/multisafepay/api/paths/events/stream/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Event stream contracts for the events path."""

from __future__ import annotations

import json

from multisafepay.api.paths.events.stream.response import Event, EventData
from multisafepay.client.sse import (
ServerSentEvent,
ServerSentEventStream,
StreamingResponse,
)
from typing_extensions import Self


def _deserialize_event_payload(raw_payload: str | None) -> object | None:
"""Parse raw SSE data for this path and fall back to plain text."""
if raw_payload is None:
return None

try:
return json.loads(raw_payload)
except json.JSONDecodeError:
return raw_payload


def _to_event(server_sent_event: ServerSentEvent) -> Event:
"""Adapt a generic SSE message into the events-path response model."""
event = Event.from_dict(
{
"event": server_sent_event.event,
"data": _deserialize_event_payload(server_sent_event.data),
"event_id": server_sent_event.event_id,
"retry": server_sent_event.retry,
"raw_data": server_sent_event.raw_data,
},
)
if event is None:
raise ValueError("Unable to adapt SSE payload to Event.")
return event


class EventStream:
"""Iterator over events received from the MultiSafepay SSE endpoint."""

def __init__(
self: EventStream,
response: StreamingResponse | None = None,
stream: ServerSentEventStream | None = None,
) -> None:
"""Initialize the stream from an HTTP response or generic SSE stream."""
if stream is not None:
self._stream = stream
return

if response is None:
raise ValueError(
"response is required when stream is not provided.",
)

self._stream = ServerSentEventStream(response=response)

@classmethod
def _from_stream(
cls: type[EventStream],
stream: ServerSentEventStream,
) -> EventStream:
"""Build an EventStream around an already-open generic SSE stream."""
return cls(stream=stream)

@classmethod
def open(
cls: type[EventStream],
events_token: str,
events_stream_url: str,
last_event_id: str | None = None,
timeout: float = 30.0,
) -> EventStream:
"""Open a new SSE stream using the event token and stream URL."""
headers = {
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
"Authorization": f"Bearer {events_token}",
}
if last_event_id is not None:
headers["Last-Event-ID"] = last_event_id

stream = ServerSentEventStream.open(
url=events_stream_url,
headers=headers,
timeout=timeout,
)
return cls._from_stream(stream)

@property
def closed(self: EventStream) -> bool:
"""Return whether this stream is already closed."""
return self._stream.closed

def close(self: EventStream) -> None:
"""Close the underlying HTTP response stream."""
self._stream.close()

def __iter__(self: EventStream) -> EventStream:
"""Return self as an iterator over events."""
return self

def __next__(self: EventStream) -> Event:
"""Read the next SSE message and return it as an Event."""
return _to_event(next(self._stream))

def __enter__(self: Self) -> Self:
"""Support context manager protocol."""
return self

def __exit__(self: EventStream, *args: object) -> None:
"""Close stream when exiting context manager."""
self.close()


__all__ = [
"Event",
"EventData",
"EventStream",
]
16 changes: 16 additions & 0 deletions src/multisafepay/api/paths/events/stream/response/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Response models for event stream payloads."""

from multisafepay.api.paths.events.stream.response.components import EventData
from multisafepay.api.paths.events.stream.response.event import Event

__all__ = [
"Event",
"EventData",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Component models for event stream response payloads."""

from multisafepay.api.paths.events.stream.response.components.event_data import (
EventData,
EventDataPayload,
)

__all__ = [
"EventData",
"EventDataPayload",
]
Loading