From 78f246a46a404efef8ce4309f6b0b738582c80c5 Mon Sep 17 00:00:00 2001 From: Peter Evers Date: Wed, 22 Apr 2026 12:58:49 +0200 Subject: [PATCH 1/8] chore: add wf connector integration --- src/mistralai/extra/__init__.py | 14 + src/mistralai/extra/workflows/__init__.py | 22 ++ .../extra/workflows/connector_auth.py | 326 ++++++++++++++++++ .../extra/workflows/connector_helpers.py | 96 ++++++ .../extra/workflows/connector_slot.py | 34 ++ 5 files changed, 492 insertions(+) create mode 100644 src/mistralai/extra/workflows/connector_auth.py create mode 100644 src/mistralai/extra/workflows/connector_helpers.py create mode 100644 src/mistralai/extra/workflows/connector_slot.py diff --git a/src/mistralai/extra/__init__.py b/src/mistralai/extra/__init__.py index cabda728..c2c9d3b3 100644 --- a/src/mistralai/extra/__init__.py +++ b/src/mistralai/extra/__init__.py @@ -6,6 +6,14 @@ ) from .utils import response_format_from_pydantic_model from .utils.response_format import CustomPydanticModel +from .workflows import ( + ConnectorAuthTaskState, + ConnectorBindings, + ConnectorExtensions, + ConnectorSlot, + WorkflowExtensions, + execute_with_connector_auth_async, +) if TYPE_CHECKING: from .realtime import ( @@ -44,7 +52,13 @@ def __getattr__(name: str): __all__ = [ + "ConnectorAuthTaskState", + "ConnectorBindings", + "ConnectorExtensions", + "ConnectorSlot", + "WorkflowExtensions", "convert_to_parsed_chat_completion_response", + "execute_with_connector_auth_async", "response_format_from_pydantic_model", "CustomPydanticModel", "ParsedChatCompletionResponse", diff --git a/src/mistralai/extra/workflows/__init__.py b/src/mistralai/extra/workflows/__init__.py index 08b4b4fa..13d9c4a5 100644 --- a/src/mistralai/extra/workflows/__init__.py +++ b/src/mistralai/extra/workflows/__init__.py @@ -1,3 +1,16 @@ +from .connector_auth import ( + ConnectorAuthTaskState, + ConnectorBindings, + ConnectorExtensions, + WorkflowExtensions, + execute_with_connector_auth_async, +) +from .connector_helpers import ( + build_connector_extensions, + on_auth_required, + run_workflow_with_connectors, +) +from .connector_slot import ConnectorSlot from .encoding import ( WorkflowEncodingConfig, PayloadOffloadingConfig, @@ -11,6 +24,15 @@ ) __all__ = [ + "ConnectorAuthTaskState", + "ConnectorBindings", + "ConnectorExtensions", + "ConnectorSlot", + "WorkflowExtensions", + "build_connector_extensions", + "execute_with_connector_auth_async", + "on_auth_required", + "run_workflow_with_connectors", "WorkflowEncodingConfig", "PayloadOffloadingConfig", "PayloadEncryptionConfig", diff --git a/src/mistralai/extra/workflows/connector_auth.py b/src/mistralai/extra/workflows/connector_auth.py new file mode 100644 index 00000000..73d0295b --- /dev/null +++ b/src/mistralai/extra/workflows/connector_auth.py @@ -0,0 +1,326 @@ +"""Helper for executing workflows that require connector OAuth authentication. + +When a workflow uses connectors that need OAuth, it emits ``connector-auth`` +custom task events. This module provides a high-level async function that +automates the handshake: + +1. Start the workflow execution. +2. Stream events, watching for ``connector-auth`` custom task events. +3. When a ``waiting_for_auth`` event arrives, invoke a user-supplied callback. +4. The interceptor polls for credentials server-side and resumes automatically. +5. Return the final execution result once the workflow completes. + +Example:: + + from mistralai import Mistral + from mistralai.extra.workflows import ( + ConnectorAuthTaskState, + ConnectorSlot, + execute_with_connector_auth_async, + ) + + async def prompt_user(state: ConnectorAuthTaskState) -> None: + print(f"Please authenticate: {state.auth_url}") + input("Press Enter when done...") + + gmail = ConnectorSlot(connector_name="gmail") + + client = Mistral(api_key="...") + result = await execute_with_connector_auth_async( + client, + workflow_identifier="my-workflow", + input_data={"query": "summarize my emails"}, + on_auth_required=prompt_user, + connectors=[gmail], + ) +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + List, + Literal, + Optional, + Sequence, +) + +import httpx +from pydantic import BaseModel + +from mistralai.client.models import ( + CustomTaskStartedResponse, + WorkflowExecutionCanceledResponse, + WorkflowExecutionCompletedResponse, + WorkflowExecutionFailedResponse, + WorkflowExecutionResponse, +) + +from .connector_slot import ConnectorSlot + +if TYPE_CHECKING: + from mistralai.client.sdk import Mistral + +logger = logging.getLogger(__name__) + +_TERMINAL_EVENT_TYPES = ( + WorkflowExecutionCompletedResponse, + WorkflowExecutionFailedResponse, + WorkflowExecutionCanceledResponse, +) + +_MAX_RECONNECT_ATTEMPTS = 10 + + +class ConnectorAuthTaskState(BaseModel): + """State emitted by a ``connector-auth`` custom task when it needs OAuth. + + Attributes: + connector_name: Identifier of the connector requiring authentication. + status: Current state of the auth flow. + auth_url: URL the user should visit to complete authentication. + message: Optional human-readable context about the auth request. + """ + + connector_name: str + connector_id: str + status: Literal[ + "waiting_for_auth", + "connected", + "access_denied", + "timed_out", + "error", + ] + auth_url: Optional[str] = None + message: Optional[str] = None + + +class ConnectorBindings(BaseModel): + """Connector bindings passed within extensions.""" + + bindings: List[ConnectorSlot] + + +class ConnectorExtensions(BaseModel): + """The ``mistralai`` key within workflow extensions.""" + + connectors: ConnectorBindings + + +class WorkflowExtensions(BaseModel): + """Top-level extensions dict forwarded to ``execute_workflow_async``.""" + + mistralai: ConnectorExtensions + + +async def execute_with_connector_auth_async( + client: Mistral, + workflow_identifier: str, + input_data: Any = None, + *, + on_auth_required: Optional[ + Callable[[ConnectorAuthTaskState], Awaitable[None]] + ] = None, + execution_id: Optional[str] = None, + task_queue: Optional[str] = None, + deployment_name: Optional[str] = None, + connectors: Sequence[ConnectorSlot] = (), + extensions: Optional[Dict[str, Any]] = None, + polling_interval: float = 2, + max_polling_attempts: Optional[int] = None, +) -> WorkflowExecutionResponse: + """Execute a workflow, automatically handling connector OAuth flows. + + Args: + client: An initialised :class:`Mistral` client. + workflow_identifier: Name or ID of the workflow to execute. + input_data: Input payload for the workflow. Pydantic models are + serialised via ``model_dump(mode="json")``. + on_auth_required: Async callback invoked when a connector needs + the user to authenticate. Receives a + :class:`ConnectorAuthTaskState` whose ``auth_url`` field + contains the OAuth URL. The workflow resumes automatically + after this callback returns. + execution_id: Optional custom execution ID. + task_queue: Optional task queue name (deprecated upstream). + deployment_name: Optional deployment target. + connectors: Typed connector slots that declare which connectors + the workflow needs. Converted to the ``extensions`` dict + automatically. + extensions: Additional plugin-specific data to propagate into + ``WorkflowContext.extensions`` at execution time. Merged + with connector bindings from *connectors*. + polling_interval: Seconds between status polls after the event + stream ends. + max_polling_attempts: Maximum number of polling iterations before + raising :class:`TimeoutError`. ``None`` means poll forever. + + Returns: + The completed :class:`WorkflowExecutionResponse`. + + Raises: + RuntimeError: If the workflow finishes with a non-COMPLETED status. + TimeoutError: If *max_polling_attempts* is set and exceeded. + """ + input_dict = _serialize_input(input_data) + + merged_extensions = _build_extensions(connectors, extensions) + + execute_kwargs: Dict[str, Any] = dict( + workflow_identifier=workflow_identifier, + input_=input_dict, + execution_id=execution_id, + task_queue=task_queue, + deployment_name=deployment_name, + ) + if merged_extensions is not None: + execute_kwargs["extensions"] = merged_extensions + + execution = await client.workflows.execute_workflow_async(**execute_kwargs) + exec_id = execution.execution_id + + await _stream_and_handle_auth(client, exec_id, on_auth_required) + + return await _poll_until_done( + client, exec_id, polling_interval, max_polling_attempts + ) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _serialize_input(input_data: Any) -> Optional[Dict[str, Any]]: + if input_data is None: + return None + if hasattr(input_data, "model_dump"): + return input_data.model_dump(mode="json") + return input_data + + +def _build_extensions( + connectors: Sequence[ConnectorSlot], + extra: Optional[Dict[str, Any]], +) -> Optional[Dict[str, Any]]: + """Merge connector slot bindings with any additional extensions.""" + if not connectors and not extra: + return None + + result: Dict[str, Any] = dict(extra) if extra else {} + if connectors: + ext = WorkflowExtensions( + mistralai=ConnectorExtensions( + connectors=ConnectorBindings( + bindings=list(connectors), + ), + ), + ) + ext_dict = ext.model_dump(mode="json", exclude_none=True) + # Merge: connector bindings go under result["mistralai"]["connectors"] + mistralai = result.setdefault("mistralai", {}) + existing_bindings = mistralai.get("connectors", {}).get("bindings", []) + new_bindings = ext_dict["mistralai"]["connectors"]["bindings"] + mistralai["connectors"] = {"bindings": existing_bindings + new_bindings} + return result + + +async def _stream_and_handle_auth( + client: Mistral, + exec_id: str, + on_auth_required: Optional[Callable[[ConnectorAuthTaskState], Awaitable[None]]], +) -> None: + """Stream workflow events, handling connector-auth tasks. + + Reconnects automatically with exponential back-off when the SSE + connection drops. + """ + last_seq = 0 + + for attempt in range(_MAX_RECONNECT_ATTEMPTS): + try: + event_stream = await client.workflows.events.get_stream_events_async( + root_workflow_exec_id=exec_id, + workflow_exec_id="*", + parent_workflow_exec_id="*", + start_seq=last_seq, + ) + async with event_stream: + async for sse_event in event_stream: + if sse_event.data is None: + continue + + payload = sse_event.data + last_seq = payload.broker_sequence + 1 + event = payload.data + + if isinstance(event, _TERMINAL_EVENT_TYPES): + return + + if not isinstance(event, CustomTaskStartedResponse): + continue + if event.attributes.custom_task_type != "connector-auth": + continue + + payload_value = ( + event.attributes.payload.value + if event.attributes.payload is not None + else None + ) + if ( + not isinstance(payload_value, dict) + or payload_value.get("status") != "waiting_for_auth" + ): + continue + + state = ConnectorAuthTaskState.model_validate(payload_value) + + if on_auth_required: + await on_auth_required(state) + + # The interceptor polls for credentials server-side — + # no signal or update needed from the client. + else: + # Stream exhausted without a terminal event — retry. + continue + # Terminal event received. + return + except (ConnectionError, httpx.RemoteProtocolError): + logger.debug( + "Event stream connection lost, reconnecting " + "(execution_id=%s, attempt=%d)", + exec_id, + attempt, + ) + await asyncio.sleep(min(2**attempt, 30)) + + +async def _poll_until_done( + client: Mistral, + exec_id: str, + polling_interval: float, + max_attempts: Optional[int], +) -> WorkflowExecutionResponse: + """Poll the execution status until it reaches a terminal state.""" + attempts = 0 + while True: + result = await client.workflows.executions.get_workflow_execution_async( + execution_id=exec_id, + ) + if result.status != "RUNNING": + if result.status == "COMPLETED": + return result + raise RuntimeError(f"Workflow failed with status: {result.status}") + + attempts += 1 + if max_attempts is not None and attempts >= max_attempts: + raise TimeoutError( + f"Workflow still running after {max_attempts} polling attempts" + ) + await asyncio.sleep(polling_interval) diff --git a/src/mistralai/extra/workflows/connector_helpers.py b/src/mistralai/extra/workflows/connector_helpers.py new file mode 100644 index 00000000..fc052a92 --- /dev/null +++ b/src/mistralai/extra/workflows/connector_helpers.py @@ -0,0 +1,96 @@ +"""Reusable helpers for running connector-backed workflows. + +Provides: + +- :func:`on_auth_required` — default callback that opens the browser for OAuth. +- :func:`build_connector_extensions` — builds the ``extensions`` dict from + :class:`ConnectorSlot` instances. +- :func:`run_workflow_with_connectors` — high-level convenience that wires up a + :class:`Mistral` client, calls + :func:`execute_with_connector_auth_async`, and logs the result. +""" + +from __future__ import annotations + +import logging +import webbrowser +from typing import Any, Dict, Optional, Sequence + +import pydantic + +from .connector_auth import ( + ConnectorAuthTaskState, + ConnectorBindings, + ConnectorExtensions, + WorkflowExtensions, + execute_with_connector_auth_async, +) +from .connector_slot import ConnectorSlot + +logger = logging.getLogger(__name__) + + +async def on_auth_required(state: ConnectorAuthTaskState) -> None: + """Default callback: opens the OAuth URL in the browser and waits.""" + if state.auth_url: + logger.info( + "Auth required — opening browser (connector=%s, auth_url=%s)", + state.connector_name, + state.auth_url, + ) + webbrowser.open(state.auth_url) + else: + logger.info( + "Auth required — authenticate the connector manually (connector=%s)", + state.connector_name, + ) + input("Press Enter after completing the OAuth flow...") + + +def build_connector_extensions( + connectors: Sequence[ConnectorSlot], +) -> Dict[str, Any]: + """Build an extensions dict from :class:`ConnectorSlot` instances. + + The resulting dict is forwarded via ``execute_workflow_async(extensions=…)`` + and lands in ``WorkflowContext.extensions`` where the connector plugin + reads it. + """ + ext = WorkflowExtensions( + mistralai=ConnectorExtensions( + connectors=ConnectorBindings( + bindings=list(connectors), + ), + ), + ) + return ext.model_dump(mode="json", exclude_none=True) + + +async def run_workflow_with_connectors( + workflow_name: str, + input_data: pydantic.BaseModel, + *, + api_key: str, + server_url: str, + task_queue: Optional[str] = None, + connectors: Sequence[ConnectorSlot] = (), +) -> None: + """Execute a workflow, handling connector auth automatically.""" + from mistralai.client.sdk import Mistral + + extensions = build_connector_extensions(connectors) if connectors else None + + async with Mistral(api_key=api_key, server_url=server_url) as client: + result = await execute_with_connector_auth_async( + client, + workflow_identifier=workflow_name, + input_data=input_data, + on_auth_required=on_auth_required, + task_queue=task_queue, + extensions=extensions, + ) + logger.info( + "Workflow completed (status=%s, result=%s)", + result.status, + result.result, + ) diff --git a/src/mistralai/extra/workflows/connector_slot.py b/src/mistralai/extra/workflows/connector_slot.py new file mode 100644 index 00000000..18290dc0 --- /dev/null +++ b/src/mistralai/extra/workflows/connector_slot.py @@ -0,0 +1,34 @@ +"""Typed descriptor for a connector dependency.""" + +from __future__ import annotations + +from typing import Optional + +from pydantic import BaseModel + + +class ConnectorSlot(BaseModel): + """A declared connector dependency for a workflow execution. + + Mirrors the server-side ``ConnectorSlot`` from the workflow SDK plugin, + providing a typed interface for specifying connector bindings instead of + raw ``Dict[str, Any]`` extension dicts. + + Example:: + + from mistralai.extra.workflows import ConnectorSlot, run_workflow_with_connectors + + gmail = ConnectorSlot(connector_name="gmail") + notion = ConnectorSlot(connector_name="notion", credentials_name="work-account") + + await run_workflow_with_connectors( + "my-workflow", + input_data=payload, + connectors=[gmail, notion], + api_key="...", + server_url="...", + ) + """ + + connector_name: str + credentials_name: Optional[str] = None From 61e1cd26d26786f759552f469d5584a8d6cf787d Mon Sep 17 00:00:00 2001 From: Peter Evers Date: Wed, 22 Apr 2026 15:21:20 +0200 Subject: [PATCH 2/8] wip --- src/mistralai/extra/__init__.py | 6 -- src/mistralai/extra/workflows/__init__.py | 8 -- .../extra/workflows/connector_auth.py | 83 +++++-------------- .../extra/workflows/connector_helpers.py | 30 +------ 4 files changed, 22 insertions(+), 105 deletions(-) diff --git a/src/mistralai/extra/__init__.py b/src/mistralai/extra/__init__.py index c2c9d3b3..4ee07b4a 100644 --- a/src/mistralai/extra/__init__.py +++ b/src/mistralai/extra/__init__.py @@ -8,10 +8,7 @@ from .utils.response_format import CustomPydanticModel from .workflows import ( ConnectorAuthTaskState, - ConnectorBindings, - ConnectorExtensions, ConnectorSlot, - WorkflowExtensions, execute_with_connector_auth_async, ) @@ -53,10 +50,7 @@ def __getattr__(name: str): __all__ = [ "ConnectorAuthTaskState", - "ConnectorBindings", - "ConnectorExtensions", "ConnectorSlot", - "WorkflowExtensions", "convert_to_parsed_chat_completion_response", "execute_with_connector_auth_async", "response_format_from_pydantic_model", diff --git a/src/mistralai/extra/workflows/__init__.py b/src/mistralai/extra/workflows/__init__.py index 13d9c4a5..c178e425 100644 --- a/src/mistralai/extra/workflows/__init__.py +++ b/src/mistralai/extra/workflows/__init__.py @@ -1,12 +1,8 @@ from .connector_auth import ( ConnectorAuthTaskState, - ConnectorBindings, - ConnectorExtensions, - WorkflowExtensions, execute_with_connector_auth_async, ) from .connector_helpers import ( - build_connector_extensions, on_auth_required, run_workflow_with_connectors, ) @@ -25,11 +21,7 @@ __all__ = [ "ConnectorAuthTaskState", - "ConnectorBindings", - "ConnectorExtensions", "ConnectorSlot", - "WorkflowExtensions", - "build_connector_extensions", "execute_with_connector_auth_async", "on_auth_required", "run_workflow_with_connectors", diff --git a/src/mistralai/extra/workflows/connector_auth.py b/src/mistralai/extra/workflows/connector_auth.py index 73d0295b..85820f27 100644 --- a/src/mistralai/extra/workflows/connector_auth.py +++ b/src/mistralai/extra/workflows/connector_auth.py @@ -46,7 +46,6 @@ async def prompt_user(state: ConnectorAuthTaskState) -> None: Callable, Dict, List, - Literal, Optional, Sequence, ) @@ -79,46 +78,23 @@ async def prompt_user(state: ConnectorAuthTaskState) -> None: class ConnectorAuthTaskState(BaseModel): - """State emitted by a ``connector-auth`` custom task when it needs OAuth. + """State emitted by a ``connector_auth`` custom task when it needs OAuth. Attributes: connector_name: Identifier of the connector requiring authentication. - status: Current state of the auth flow. + connector_id: Server-side connector ID. + credentials_name: Optional named credential set used for this connector. auth_url: URL the user should visit to complete authentication. message: Optional human-readable context about the auth request. """ connector_name: str connector_id: str - status: Literal[ - "waiting_for_auth", - "connected", - "access_denied", - "timed_out", - "error", - ] + credentials_name: Optional[str] = None auth_url: Optional[str] = None message: Optional[str] = None -class ConnectorBindings(BaseModel): - """Connector bindings passed within extensions.""" - - bindings: List[ConnectorSlot] - - -class ConnectorExtensions(BaseModel): - """The ``mistralai`` key within workflow extensions.""" - - connectors: ConnectorBindings - - -class WorkflowExtensions(BaseModel): - """Top-level extensions dict forwarded to ``execute_workflow_async``.""" - - mistralai: ConnectorExtensions - - async def execute_with_connector_auth_async( client: Mistral, workflow_identifier: str, @@ -131,7 +107,6 @@ async def execute_with_connector_auth_async( task_queue: Optional[str] = None, deployment_name: Optional[str] = None, connectors: Sequence[ConnectorSlot] = (), - extensions: Optional[Dict[str, Any]] = None, polling_interval: float = 2, max_polling_attempts: Optional[int] = None, ) -> WorkflowExecutionResponse: @@ -151,11 +126,7 @@ async def execute_with_connector_auth_async( task_queue: Optional task queue name (deprecated upstream). deployment_name: Optional deployment target. connectors: Typed connector slots that declare which connectors - the workflow needs. Converted to the ``extensions`` dict - automatically. - extensions: Additional plugin-specific data to propagate into - ``WorkflowContext.extensions`` at execution time. Merged - with connector bindings from *connectors*. + the workflow needs. polling_interval: Seconds between status polls after the event stream ends. max_polling_attempts: Maximum number of polling iterations before @@ -170,7 +141,7 @@ async def execute_with_connector_auth_async( """ input_dict = _serialize_input(input_data) - merged_extensions = _build_extensions(connectors, extensions) + extensions = _build_connector_extensions(connectors) execute_kwargs: Dict[str, Any] = dict( workflow_identifier=workflow_identifier, @@ -179,8 +150,8 @@ async def execute_with_connector_auth_async( task_queue=task_queue, deployment_name=deployment_name, ) - if merged_extensions is not None: - execute_kwargs["extensions"] = merged_extensions + if extensions is not None: + execute_kwargs["extensions"] = extensions execution = await client.workflows.execute_workflow_async(**execute_kwargs) exec_id = execution.execution_id @@ -205,30 +176,19 @@ def _serialize_input(input_data: Any) -> Optional[Dict[str, Any]]: return input_data -def _build_extensions( +def _build_connector_extensions( connectors: Sequence[ConnectorSlot], - extra: Optional[Dict[str, Any]], ) -> Optional[Dict[str, Any]]: - """Merge connector slot bindings with any additional extensions.""" - if not connectors and not extra: - return None + """Build the extensions dict from connector slots. - result: Dict[str, Any] = dict(extra) if extra else {} - if connectors: - ext = WorkflowExtensions( - mistralai=ConnectorExtensions( - connectors=ConnectorBindings( - bindings=list(connectors), - ), - ), - ) - ext_dict = ext.model_dump(mode="json", exclude_none=True) - # Merge: connector bindings go under result["mistralai"]["connectors"] - mistralai = result.setdefault("mistralai", {}) - existing_bindings = mistralai.get("connectors", {}).get("bindings", []) - new_bindings = ext_dict["mistralai"]["connectors"]["bindings"] - mistralai["connectors"] = {"bindings": existing_bindings + new_bindings} - return result + Connectors are passed to the API via ``extensions.mistralai.connectors``. + """ + if not connectors: + return None + bindings = [ + slot.model_dump(mode="json", exclude_none=True) for slot in connectors + ] + return {"mistralai": {"connectors": {"bindings": bindings}}} async def _stream_and_handle_auth( @@ -265,7 +225,7 @@ async def _stream_and_handle_auth( if not isinstance(event, CustomTaskStartedResponse): continue - if event.attributes.custom_task_type != "connector-auth": + if event.attributes.custom_task_type != "connector_auth": continue payload_value = ( @@ -273,10 +233,7 @@ async def _stream_and_handle_auth( if event.attributes.payload is not None else None ) - if ( - not isinstance(payload_value, dict) - or payload_value.get("status") != "waiting_for_auth" - ): + if not isinstance(payload_value, dict): continue state = ConnectorAuthTaskState.model_validate(payload_value) diff --git a/src/mistralai/extra/workflows/connector_helpers.py b/src/mistralai/extra/workflows/connector_helpers.py index fc052a92..1d56f49a 100644 --- a/src/mistralai/extra/workflows/connector_helpers.py +++ b/src/mistralai/extra/workflows/connector_helpers.py @@ -3,8 +3,6 @@ Provides: - :func:`on_auth_required` — default callback that opens the browser for OAuth. -- :func:`build_connector_extensions` — builds the ``extensions`` dict from - :class:`ConnectorSlot` instances. - :func:`run_workflow_with_connectors` — high-level convenience that wires up a :class:`Mistral` client, calls :func:`execute_with_connector_auth_async`, and logs the result. @@ -14,15 +12,12 @@ import logging import webbrowser -from typing import Any, Dict, Optional, Sequence +from typing import Optional, Sequence import pydantic from .connector_auth import ( ConnectorAuthTaskState, - ConnectorBindings, - ConnectorExtensions, - WorkflowExtensions, execute_with_connector_auth_async, ) from .connector_slot import ConnectorSlot @@ -47,25 +42,6 @@ async def on_auth_required(state: ConnectorAuthTaskState) -> None: input("Press Enter after completing the OAuth flow...") -def build_connector_extensions( - connectors: Sequence[ConnectorSlot], -) -> Dict[str, Any]: - """Build an extensions dict from :class:`ConnectorSlot` instances. - - The resulting dict is forwarded via ``execute_workflow_async(extensions=…)`` - and lands in ``WorkflowContext.extensions`` where the connector plugin - reads it. - """ - ext = WorkflowExtensions( - mistralai=ConnectorExtensions( - connectors=ConnectorBindings( - bindings=list(connectors), - ), - ), - ) - return ext.model_dump(mode="json", exclude_none=True) - - async def run_workflow_with_connectors( workflow_name: str, input_data: pydantic.BaseModel, @@ -78,8 +54,6 @@ async def run_workflow_with_connectors( """Execute a workflow, handling connector auth automatically.""" from mistralai.client.sdk import Mistral - extensions = build_connector_extensions(connectors) if connectors else None - async with Mistral(api_key=api_key, server_url=server_url) as client: result = await execute_with_connector_auth_async( client, @@ -87,7 +61,7 @@ async def run_workflow_with_connectors( input_data=input_data, on_auth_required=on_auth_required, task_queue=task_queue, - extensions=extensions, + connectors=connectors, ) logger.info( "Workflow completed (status=%s, result=%s)", From 2f281004c84d5b4167da3a56a65d90055c04068a Mon Sep 17 00:00:00 2001 From: Peter Evers Date: Wed, 22 Apr 2026 15:44:30 +0200 Subject: [PATCH 3/8] wip --- examples/mistral/agents/async_agents_no_streaming.py | 1 + src/mistralai/extra/workflows/connector_auth.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/mistral/agents/async_agents_no_streaming.py b/examples/mistral/agents/async_agents_no_streaming.py index bb2d767a..12250f55 100755 --- a/examples/mistral/agents/async_agents_no_streaming.py +++ b/examples/mistral/agents/async_agents_no_streaming.py @@ -24,6 +24,7 @@ async def main(): chat_response = await client.agents.complete_async( agent_id=agent.id, messages=[UserMessage(content="What is the best French cheese?")], + timeout_ms=120000, ) print(chat_response.choices[0].message.content) diff --git a/src/mistralai/extra/workflows/connector_auth.py b/src/mistralai/extra/workflows/connector_auth.py index 85820f27..292b6f1a 100644 --- a/src/mistralai/extra/workflows/connector_auth.py +++ b/src/mistralai/extra/workflows/connector_auth.py @@ -45,7 +45,6 @@ async def prompt_user(state: ConnectorAuthTaskState) -> None: Awaitable, Callable, Dict, - List, Optional, Sequence, ) From f2785f3036ac31ac2834de1b5b39d46c4eda649f Mon Sep 17 00:00:00 2001 From: Peter Evers Date: Thu, 23 Apr 2026 11:17:34 +0200 Subject: [PATCH 4/8] wip --- src/mistralai/extra/workflows/connector_auth.py | 2 +- src/mistralai/extra/workflows/connector_helpers.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/mistralai/extra/workflows/connector_auth.py b/src/mistralai/extra/workflows/connector_auth.py index 292b6f1a..8e45057e 100644 --- a/src/mistralai/extra/workflows/connector_auth.py +++ b/src/mistralai/extra/workflows/connector_auth.py @@ -144,7 +144,7 @@ async def execute_with_connector_auth_async( execute_kwargs: Dict[str, Any] = dict( workflow_identifier=workflow_identifier, - input_=input_dict, + input=input_dict, execution_id=execution_id, task_queue=task_queue, deployment_name=deployment_name, diff --git a/src/mistralai/extra/workflows/connector_helpers.py b/src/mistralai/extra/workflows/connector_helpers.py index 1d56f49a..904553b0 100644 --- a/src/mistralai/extra/workflows/connector_helpers.py +++ b/src/mistralai/extra/workflows/connector_helpers.py @@ -26,7 +26,7 @@ async def on_auth_required(state: ConnectorAuthTaskState) -> None: - """Default callback: opens the OAuth URL in the browser and waits.""" + """Default callback: opens the OAuth URL in the browser.""" if state.auth_url: logger.info( "Auth required — opening browser (connector=%s, auth_url=%s)", @@ -39,7 +39,6 @@ async def on_auth_required(state: ConnectorAuthTaskState) -> None: "Auth required — authenticate the connector manually (connector=%s)", state.connector_name, ) - input("Press Enter after completing the OAuth flow...") async def run_workflow_with_connectors( From c09b0008c2f6979fd162e300e364234eaf14c756 Mon Sep 17 00:00:00 2001 From: Peter Evers Date: Thu, 23 Apr 2026 11:18:00 +0200 Subject: [PATCH 5/8] wip --- examples/mistral/agents/async_agents_no_streaming.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/mistral/agents/async_agents_no_streaming.py b/examples/mistral/agents/async_agents_no_streaming.py index 12250f55..bb2d767a 100755 --- a/examples/mistral/agents/async_agents_no_streaming.py +++ b/examples/mistral/agents/async_agents_no_streaming.py @@ -24,7 +24,6 @@ async def main(): chat_response = await client.agents.complete_async( agent_id=agent.id, messages=[UserMessage(content="What is the best French cheese?")], - timeout_ms=120000, ) print(chat_response.choices[0].message.content) From 316a0a822a25d59ef2dce755fa3908b54c1be222 Mon Sep 17 00:00:00 2001 From: Peter Evers Date: Thu, 23 Apr 2026 11:29:59 +0200 Subject: [PATCH 6/8] wip --- src/mistralai/extra/workflows/__init__.py | 10 ++++- .../extra/workflows/connector_auth.py | 36 ++++------------ .../extra/workflows/connector_slot.py | 42 ++++++++++++++++++- 3 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/mistralai/extra/workflows/__init__.py b/src/mistralai/extra/workflows/__init__.py index c178e425..0315baf9 100644 --- a/src/mistralai/extra/workflows/__init__.py +++ b/src/mistralai/extra/workflows/__init__.py @@ -6,7 +6,12 @@ on_auth_required, run_workflow_with_connectors, ) -from .connector_slot import ConnectorSlot +from .connector_slot import ( + ConnectorBindings, + ConnectorExtension, + ConnectorSlot, + WorkflowExtensions, +) from .encoding import ( WorkflowEncodingConfig, PayloadOffloadingConfig, @@ -21,7 +26,10 @@ __all__ = [ "ConnectorAuthTaskState", + "ConnectorBindings", + "ConnectorExtension", "ConnectorSlot", + "WorkflowExtensions", "execute_with_connector_auth_async", "on_auth_required", "run_workflow_with_connectors", diff --git a/src/mistralai/extra/workflows/connector_auth.py b/src/mistralai/extra/workflows/connector_auth.py index 8e45057e..9089d7f6 100644 --- a/src/mistralai/extra/workflows/connector_auth.py +++ b/src/mistralai/extra/workflows/connector_auth.py @@ -21,7 +21,6 @@ async def prompt_user(state: ConnectorAuthTaskState) -> None: print(f"Please authenticate: {state.auth_url}") - input("Press Enter when done...") gmail = ConnectorSlot(connector_name="gmail") @@ -60,7 +59,7 @@ async def prompt_user(state: ConnectorAuthTaskState) -> None: WorkflowExecutionResponse, ) -from .connector_slot import ConnectorSlot +from .connector_slot import ConnectorSlot, WorkflowExtensions if TYPE_CHECKING: from mistralai.client.sdk import Mistral @@ -138,13 +137,15 @@ async def execute_with_connector_auth_async( RuntimeError: If the workflow finishes with a non-COMPLETED status. TimeoutError: If *max_polling_attempts* is set and exceeded. """ - input_dict = _serialize_input(input_data) - - extensions = _build_connector_extensions(connectors) + extensions = ( + WorkflowExtensions.from_connectors(connectors).to_dict() + if connectors + else None + ) execute_kwargs: Dict[str, Any] = dict( workflow_identifier=workflow_identifier, - input=input_dict, + input=input_data, execution_id=execution_id, task_queue=task_queue, deployment_name=deployment_name, @@ -167,29 +168,6 @@ async def execute_with_connector_auth_async( # --------------------------------------------------------------------------- -def _serialize_input(input_data: Any) -> Optional[Dict[str, Any]]: - if input_data is None: - return None - if hasattr(input_data, "model_dump"): - return input_data.model_dump(mode="json") - return input_data - - -def _build_connector_extensions( - connectors: Sequence[ConnectorSlot], -) -> Optional[Dict[str, Any]]: - """Build the extensions dict from connector slots. - - Connectors are passed to the API via ``extensions.mistralai.connectors``. - """ - if not connectors: - return None - bindings = [ - slot.model_dump(mode="json", exclude_none=True) for slot in connectors - ] - return {"mistralai": {"connectors": {"bindings": bindings}}} - - async def _stream_and_handle_auth( client: Mistral, exec_id: str, diff --git a/src/mistralai/extra/workflows/connector_slot.py b/src/mistralai/extra/workflows/connector_slot.py index 18290dc0..175bfc83 100644 --- a/src/mistralai/extra/workflows/connector_slot.py +++ b/src/mistralai/extra/workflows/connector_slot.py @@ -1,8 +1,8 @@ -"""Typed descriptor for a connector dependency.""" +"""Typed descriptors for connector dependencies and extensions.""" from __future__ import annotations -from typing import Optional +from typing import Any, Dict, List, Optional, Sequence from pydantic import BaseModel @@ -32,3 +32,41 @@ class ConnectorSlot(BaseModel): connector_name: str credentials_name: Optional[str] = None + + +class ConnectorBindings(BaseModel): + """Container for a list of connector bindings.""" + + bindings: List[ConnectorSlot] + + +class ConnectorExtension(BaseModel): + """Mistral-specific extension carrying connector configuration.""" + + connectors: ConnectorBindings + + +class WorkflowExtensions(BaseModel): + """Top-level extensions dict passed to the workflow execution API. + + Serialises to the shape expected by the API:: + + {"mistralai": {"connectors": {"bindings": [...]}}} + """ + + mistralai: ConnectorExtension + + @classmethod + def from_connectors( + cls, connectors: Sequence[ConnectorSlot] + ) -> WorkflowExtensions: + """Build extensions from a sequence of connector slots.""" + return cls( + mistralai=ConnectorExtension( + connectors=ConnectorBindings(bindings=list(connectors)) + ) + ) + + def to_dict(self) -> Dict[str, Any]: + """Serialise to the ``Dict[str, Any]`` the API expects.""" + return self.model_dump(mode="json", exclude_none=True) From 8e61de944d43d3566364d1fbef01c1c770de0e3d Mon Sep 17 00:00:00 2001 From: Peter Evers Date: Thu, 23 Apr 2026 12:05:07 +0200 Subject: [PATCH 7/8] fix: remove opiniated client interaction --- src/mistralai/extra/workflows/__init__.py | 6 -- .../extra/workflows/connector_auth.py | 12 ++-- .../extra/workflows/connector_helpers.py | 69 ------------------- .../extra/workflows/connector_slot.py | 14 +--- 4 files changed, 9 insertions(+), 92 deletions(-) delete mode 100644 src/mistralai/extra/workflows/connector_helpers.py diff --git a/src/mistralai/extra/workflows/__init__.py b/src/mistralai/extra/workflows/__init__.py index 0315baf9..f26edf4e 100644 --- a/src/mistralai/extra/workflows/__init__.py +++ b/src/mistralai/extra/workflows/__init__.py @@ -2,10 +2,6 @@ ConnectorAuthTaskState, execute_with_connector_auth_async, ) -from .connector_helpers import ( - on_auth_required, - run_workflow_with_connectors, -) from .connector_slot import ( ConnectorBindings, ConnectorExtension, @@ -31,8 +27,6 @@ "ConnectorSlot", "WorkflowExtensions", "execute_with_connector_auth_async", - "on_auth_required", - "run_workflow_with_connectors", "WorkflowEncodingConfig", "PayloadOffloadingConfig", "PayloadEncryptionConfig", diff --git a/src/mistralai/extra/workflows/connector_auth.py b/src/mistralai/extra/workflows/connector_auth.py index 9089d7f6..94fd40ad 100644 --- a/src/mistralai/extra/workflows/connector_auth.py +++ b/src/mistralai/extra/workflows/connector_auth.py @@ -138,9 +138,7 @@ async def execute_with_connector_auth_async( TimeoutError: If *max_polling_attempts* is set and exceeded. """ extensions = ( - WorkflowExtensions.from_connectors(connectors).to_dict() - if connectors - else None + WorkflowExtensions.from_connectors(connectors).to_dict() if connectors else None ) execute_kwargs: Dict[str, Any] = dict( @@ -223,8 +221,6 @@ async def _stream_and_handle_auth( else: # Stream exhausted without a terminal event — retry. continue - # Terminal event received. - return except (ConnectionError, httpx.RemoteProtocolError): logger.debug( "Event stream connection lost, reconnecting " @@ -233,6 +229,12 @@ async def _stream_and_handle_auth( attempt, ) await asyncio.sleep(min(2**attempt, 30)) + else: + logger.warning( + "Exhausted %d reconnect attempts for event stream (execution_id=%s)", + _MAX_RECONNECT_ATTEMPTS, + exec_id, + ) async def _poll_until_done( diff --git a/src/mistralai/extra/workflows/connector_helpers.py b/src/mistralai/extra/workflows/connector_helpers.py deleted file mode 100644 index 904553b0..00000000 --- a/src/mistralai/extra/workflows/connector_helpers.py +++ /dev/null @@ -1,69 +0,0 @@ -"""Reusable helpers for running connector-backed workflows. - -Provides: - -- :func:`on_auth_required` — default callback that opens the browser for OAuth. -- :func:`run_workflow_with_connectors` — high-level convenience that wires up a - :class:`Mistral` client, calls - :func:`execute_with_connector_auth_async`, and logs the result. -""" - -from __future__ import annotations - -import logging -import webbrowser -from typing import Optional, Sequence - -import pydantic - -from .connector_auth import ( - ConnectorAuthTaskState, - execute_with_connector_auth_async, -) -from .connector_slot import ConnectorSlot - -logger = logging.getLogger(__name__) - - -async def on_auth_required(state: ConnectorAuthTaskState) -> None: - """Default callback: opens the OAuth URL in the browser.""" - if state.auth_url: - logger.info( - "Auth required — opening browser (connector=%s, auth_url=%s)", - state.connector_name, - state.auth_url, - ) - webbrowser.open(state.auth_url) - else: - logger.info( - "Auth required — authenticate the connector manually (connector=%s)", - state.connector_name, - ) - - -async def run_workflow_with_connectors( - workflow_name: str, - input_data: pydantic.BaseModel, - *, - api_key: str, - server_url: str, - task_queue: Optional[str] = None, - connectors: Sequence[ConnectorSlot] = (), -) -> None: - """Execute a workflow, handling connector auth automatically.""" - from mistralai.client.sdk import Mistral - - async with Mistral(api_key=api_key, server_url=server_url) as client: - result = await execute_with_connector_auth_async( - client, - workflow_identifier=workflow_name, - input_data=input_data, - on_auth_required=on_auth_required, - task_queue=task_queue, - connectors=connectors, - ) - logger.info( - "Workflow completed (status=%s, result=%s)", - result.status, - result.result, - ) diff --git a/src/mistralai/extra/workflows/connector_slot.py b/src/mistralai/extra/workflows/connector_slot.py index 175bfc83..7287cf32 100644 --- a/src/mistralai/extra/workflows/connector_slot.py +++ b/src/mistralai/extra/workflows/connector_slot.py @@ -16,18 +16,10 @@ class ConnectorSlot(BaseModel): Example:: - from mistralai.extra.workflows import ConnectorSlot, run_workflow_with_connectors + from mistralai.extra.workflows import ConnectorSlot gmail = ConnectorSlot(connector_name="gmail") notion = ConnectorSlot(connector_name="notion", credentials_name="work-account") - - await run_workflow_with_connectors( - "my-workflow", - input_data=payload, - connectors=[gmail, notion], - api_key="...", - server_url="...", - ) """ connector_name: str @@ -57,9 +49,7 @@ class WorkflowExtensions(BaseModel): mistralai: ConnectorExtension @classmethod - def from_connectors( - cls, connectors: Sequence[ConnectorSlot] - ) -> WorkflowExtensions: + def from_connectors(cls, connectors: Sequence[ConnectorSlot]) -> WorkflowExtensions: """Build extensions from a sequence of connector slots.""" return cls( mistralai=ConnectorExtension( From ce8d90888cb2c5f72e10df4adbf817badc1f30f2 Mon Sep 17 00:00:00 2001 From: Peter Evers Date: Thu, 23 Apr 2026 13:18:34 +0200 Subject: [PATCH 8/8] wip --- src/mistralai/extra/__init__.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/mistralai/extra/__init__.py b/src/mistralai/extra/__init__.py index 4ee07b4a..cabda728 100644 --- a/src/mistralai/extra/__init__.py +++ b/src/mistralai/extra/__init__.py @@ -6,11 +6,6 @@ ) from .utils import response_format_from_pydantic_model from .utils.response_format import CustomPydanticModel -from .workflows import ( - ConnectorAuthTaskState, - ConnectorSlot, - execute_with_connector_auth_async, -) if TYPE_CHECKING: from .realtime import ( @@ -49,10 +44,7 @@ def __getattr__(name: str): __all__ = [ - "ConnectorAuthTaskState", - "ConnectorSlot", "convert_to_parsed_chat_completion_response", - "execute_with_connector_auth_async", "response_format_from_pydantic_model", "CustomPydanticModel", "ParsedChatCompletionResponse",