diff --git a/news/6626.performance.md b/news/6626.performance.md new file mode 100644 index 00000000000..978fd041b4d --- /dev/null +++ b/news/6626.performance.md @@ -0,0 +1 @@ +Run anonymous telemetry collection and delivery on a dedicated single-worker background thread instead of inline on the asyncio event loop. The blocking syscalls, subprocess calls and synchronous HTTP request used to gather and post an event no longer stall the event loop — notably when reporting backend errors at a high rate. Delivery is best-effort and any failure is suppressed, so telemetry can never affect the running app. diff --git a/reflex/utils/telemetry.py b/reflex/utils/telemetry.py index 6d4a0189a91..241412cbeb8 100644 --- a/reflex/utils/telemetry.py +++ b/reflex/utils/telemetry.py @@ -1,6 +1,5 @@ """Anonymous telemetry for Reflex.""" -import asyncio import dataclasses import importlib.metadata import json @@ -8,8 +7,10 @@ import os import platform import sys +import threading import uuid -import warnings +from collections.abc import Callable +from concurrent.futures import ThreadPoolExecutor from contextlib import suppress from datetime import datetime, timezone from pathlib import Path @@ -463,7 +464,86 @@ def _send( return False -background_tasks = set() +_executor_lock = threading.Lock() +_executor: ThreadPoolExecutor | None = None + + +def _get_telemetry_executor() -> ThreadPoolExecutor: + """Return the process-wide, lazily-created telemetry executor. + + A single-worker thread pool runs all telemetry collection and delivery off + the caller's thread — in particular the asyncio event loop, which the + blocking syscalls, subprocess calls and synchronous HTTP request used to + gather and post an event would otherwise stall. The pool's queue serializes + events through the one worker, and the interpreter drains it at exit via + ``concurrent.futures``' atexit handler. + + Returns: + The shared single-worker telemetry executor. + """ + global _executor + if _executor is None: + with _executor_lock: + if _executor is None: + _executor = ThreadPoolExecutor( + max_workers=1, thread_name_prefix="reflex-telemetry" + ) + return _executor + + +def _run_suppressed(fn: Callable[..., Any], /, *args, **kwargs) -> None: + """Run ``fn`` in the worker thread, never letting a failure escape. + + Telemetry must never break the app, so any error (including a failed send) + is reported at debug level and otherwise discarded. + + Args: + fn: The callable to run. + args: Positional arguments forwarded to ``fn``. + kwargs: Keyword arguments forwarded to ``fn``. + """ + try: + fn(*args, **kwargs) + except Exception as err: + console.debug(f"Failed to process telemetry event: {err}") + + +def _submit(fn: Callable[..., Any], /, *args, **kwargs) -> None: + """Queue telemetry work on the background executor, swallowing all errors. + + Args: + fn: The callable to run in the telemetry worker thread. + args: Positional arguments forwarded to ``fn``. + kwargs: Keyword arguments forwarded to ``fn``. + """ + with suppress(Exception): + _get_telemetry_executor().submit(_run_suppressed, fn, *args, **kwargs) + + +def _flush(timeout: float | None = None) -> bool: + """Block until telemetry queued before this call has been processed. + + The executor has a single worker draining a FIFO queue, so waiting on a + sentinel submitted now guarantees every previously-queued event has been + handled. Intended for tests and best-effort shutdown; ordinary call sites + fire and forget. + + Args: + timeout: Maximum number of seconds to wait, or ``None`` to wait + indefinitely. + + Returns: + ``True`` if the queue drained (or no worker was ever started), + ``False`` if the wait timed out before the queue emptied. + """ + if _executor is None: + return True + try: + _executor.submit(lambda: None).result(timeout) + except Exception: + return False + return True + _legacy_alias_attempted = False @@ -527,6 +607,12 @@ def send( ): """Send anonymous telemetry for Reflex. + The event is collected and delivered on a dedicated single-worker thread + pool, so neither the data gathering (blocking syscalls and subprocess calls) + nor the synchronous HTTP request blocks the caller — in particular the + asyncio event loop serving a running app. Delivery is best-effort: any + failure is suppressed and never surfaces to the caller. + Args: event: The event name. telemetry_enabled: Whether to send the telemetry (If None, get from config). @@ -534,28 +620,37 @@ def send( properties. Preferred over ``kwargs`` for new events. kwargs: Additional data to send with the event. """ - _maybe_alias_legacy_distinct_id(telemetry_enabled) + with suppress(Exception): + if telemetry_enabled is None: + telemetry_enabled = get_config().telemetry_enabled + # Only spin up the worker pool when there is actually something to send. + if telemetry_enabled: + _submit( + _process_event, + event, + telemetry_enabled, + properties=properties, + **kwargs, + ) - async def async_send( # noqa: RUF029 - event: str, - telemetry_enabled: bool | None, - properties: dict[str, Any] | None, - **kwargs, - ): - return _send(event, telemetry_enabled, properties=properties, **kwargs) - try: - # Within an event loop context, send the event asynchronously. - task = asyncio.create_task( - async_send(event, telemetry_enabled, properties, **kwargs), - name=f"reflex_send_telemetry_event|{event}", - ) - background_tasks.add(task) - task.add_done_callback(background_tasks.discard) - except RuntimeError: - # If there is no event loop, send the event synchronously. - warnings.filterwarnings("ignore", category=RuntimeWarning) - _send(event, telemetry_enabled, properties=properties, **kwargs) +def _process_event( + event: str, + telemetry_enabled: bool, + *, + properties: dict[str, Any] | None = None, + **kwargs, +) -> None: + """Collect and deliver a single telemetry event from the worker thread. + + Args: + event: The event name. + telemetry_enabled: Whether telemetry is enabled. + properties: Structured payload merged into the event properties. + kwargs: Additional allow-listed event data. + """ + _maybe_alias_legacy_distinct_id(telemetry_enabled) + _send(event, telemetry_enabled, properties=properties, **kwargs) def send_error(error: Exception, context: str): diff --git a/tests/units/test_telemetry.py b/tests/units/test_telemetry.py index 35780ccbf90..aa530e39ccf 100644 --- a/tests/units/test_telemetry.py +++ b/tests/units/test_telemetry.py @@ -1,4 +1,6 @@ +import asyncio import importlib.metadata +import threading import uuid from types import SimpleNamespace @@ -9,6 +11,22 @@ from reflex.utils import telemetry +@pytest.fixture(autouse=True) +def _drain_telemetry_executor(): + """Drain any queued telemetry work so jobs never leak across tests. + + Most tests mock ``telemetry.send`` and never touch the worker pool, but the + ones exercising the real ``send`` path enqueue work on the shared executor. + Flushing on teardown keeps a slow or failed job from running against the + next test's mocks. + + Yields: + Control to the test, flushing the telemetry executor afterwards. + """ + yield + telemetry._flush() + + @pytest.fixture def event_defaults(mocker: MockerFixture) -> dict: """Patch ``get_event_defaults()`` with a fresh dict. @@ -587,6 +605,8 @@ def test_maybe_alias_create_alias_payload( ) telemetry._maybe_alias_legacy_distinct_id(telemetry_enabled=True) + # The $create_alias is now sent on the telemetry worker thread; wait for it. + telemetry._flush() httpx_post.assert_called_once() payload = httpx_post.call_args.kwargs["json"] @@ -597,3 +617,131 @@ def test_maybe_alias_create_alias_payload( assert props["alias"] == legacy_id # distinct_id is the new UUID-string identity (from the event defaults). assert props["distinct_id"] == event_defaults["properties"]["distinct_id"] + + +def test_telemetry_executor_is_lazy_and_single_worker(): + """The telemetry executor is created once and runs exactly one worker.""" + executor = telemetry._get_telemetry_executor() + # Cached: repeated lookups return the same lazily-created pool. + assert executor is telemetry._get_telemetry_executor() + assert executor._max_workers == 1 + + +def test_process_event_aliases_then_sends(mocker: MockerFixture): + """``_process_event`` runs the alias migration before delivering the event.""" + alias = mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + send = mocker.patch.object(telemetry, "_send") + + telemetry._process_event("init", True, properties={"x": 1}, template="t") + + alias.assert_called_once_with(True) + send.assert_called_once_with("init", True, properties={"x": 1}, template="t") + + +def test_send_disabled_does_not_submit(mocker: MockerFixture): + """Disabled telemetry queues no work and never spins up the worker pool.""" + submit = mocker.patch.object(telemetry, "_submit") + + telemetry.send("run-dev", telemetry_enabled=False) + + submit.assert_not_called() + + +def test_send_resolves_enabled_from_config( + mocker: MockerFixture, patch_telemetry_config +): + """With ``telemetry_enabled`` unspecified, ``send`` consults the config.""" + submit = mocker.patch.object(telemetry, "_submit") + + patch_telemetry_config(enabled=False) + telemetry.send("run-dev") + submit.assert_not_called() + + patch_telemetry_config(enabled=True) + telemetry.send("run-dev") + submit.assert_called_once() + + +def test_send_processes_event_off_caller_thread(mocker: MockerFixture): + """``send`` collects and delivers the event on the worker, not the caller. + + Regression for #6618: telemetry collection (blocking syscalls/subprocess) + and the synchronous HTTP post previously ran on the caller's thread — and, + inside an event loop, on the loop thread via a task that never awaited — + blocking the event loop. The work must now happen on a separate thread. + """ + mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + seen: dict[str, threading.Thread] = {} + + def record(*_args, **_kwargs) -> bool: + seen["thread"] = threading.current_thread() + return True + + mocker.patch.object(telemetry, "_send", side_effect=record) + + telemetry.send("run-dev", telemetry_enabled=True) + telemetry._flush() + + assert seen["thread"] is not threading.current_thread() + assert seen["thread"] is not threading.main_thread() + + +async def test_send_within_event_loop_runs_off_loop_thread(mocker: MockerFixture): + """Inside a running event loop, ``send`` offloads work to the worker thread. + + This is the core scenario from #6618: at runtime ``send`` is called from the + asyncio event loop (e.g. backend error telemetry), and the blocking work + must not execute on the loop thread. + """ + loop_thread = threading.current_thread() + mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + seen: dict[str, threading.Thread] = {} + + def record(*_args, **_kwargs) -> bool: + seen["thread"] = threading.current_thread() + return True + + mocker.patch.object(telemetry, "_send", side_effect=record) + + telemetry.send("error", telemetry_enabled=True) + # Wait for the worker without blocking the event loop ourselves. + await asyncio.to_thread(telemetry._flush) + + assert seen["thread"] is not loop_thread + + +def test_send_suppresses_worker_errors(mocker: MockerFixture): + """A failed telemetry send is swallowed and never reaches the caller.""" + mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + mocker.patch.object(telemetry, "_send", side_effect=RuntimeError("boom")) + + # Neither queuing the event nor draining the failed job may raise. + telemetry.send("run-prod", telemetry_enabled=True) + telemetry._flush() + + +def test_flush_returns_true_when_queue_drains(mocker: MockerFixture): + """``_flush`` reports success once the queued work has been processed.""" + mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + mocker.patch.object(telemetry, "_send", return_value=True) + + telemetry.send("run-dev", telemetry_enabled=True) + + assert telemetry._flush() is True + + +def test_flush_returns_false_when_worker_does_not_drain_in_time(): + """``_flush`` reports failure when the queue cannot drain within the timeout. + + A blocking job occupies the single worker so the flush sentinel cannot run; + the bounded wait must surface the incomplete drain rather than swallow it. + """ + release = threading.Event() + # Bound the worker's wait so a failing assertion can never wedge the shared + # executor (and the autouse drain fixture) indefinitely. + blocker = telemetry._get_telemetry_executor().submit(release.wait, 10) + try: + assert telemetry._flush(timeout=0.05) is False + finally: + release.set() + blocker.result(timeout=5)