From f8897d1b6315dae76ceed4424ac24b82a51f7961 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 8 Jun 2026 17:36:20 +0000 Subject: [PATCH 1/3] fix(telemetry): process telemetry events on a background thread pool Telemetry collection uses blocking syscalls and subprocess calls and a synchronous httpx.post. The asyncio "send" path ran that work directly on the event loop thread (a task that never awaited), so it stalled the loop -- notably when emitting runtime-error telemetry at a high rate -- and the no-event-loop path blocked the caller synchronously. Run all telemetry collection and delivery on a lazily-initialized, single-worker ThreadPoolExecutor whose FIFO queue serializes events off the caller's thread. Delivery is best-effort: every failure is suppressed so telemetry can never break the app. The pool is created only when telemetry is enabled and there is something to send, and concurrent.futures drains its queue at interpreter exit so CLI events are not lost. Closes #6618 https://claude.ai/code/session_011DzmkWzgwGcJL9rdiBTLVN --- reflex/utils/telemetry.py | 134 ++++++++++++++++++++++++++++------ tests/units/test_telemetry.py | 121 ++++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+), 23 deletions(-) diff --git a/reflex/utils/telemetry.py b/reflex/utils/telemetry.py index 6d4a0189a91..c9670ad8e82 100644 --- a/reflex/utils/telemetry.py +++ b/reflex/utils/telemetry.py @@ -1,6 +1,5 @@ """Anonymous telemetry for Reflex.""" -import asyncio import dataclasses import importlib.metadata import json @@ -8,8 +7,10 @@ import os import platform import sys +import threading import uuid -import warnings +from collections.abc import Callable +from concurrent.futures import ThreadPoolExecutor from contextlib import suppress from datetime import datetime, timezone from pathlib import Path @@ -463,7 +464,79 @@ def _send( return False -background_tasks = set() +_executor_lock = threading.Lock() +_executor: ThreadPoolExecutor | None = None + + +def _get_telemetry_executor() -> ThreadPoolExecutor: + """Return the process-wide, lazily-created telemetry executor. + + A single-worker thread pool runs all telemetry collection and delivery off + the caller's thread — in particular the asyncio event loop, which the + blocking syscalls, subprocess calls and synchronous HTTP request used to + gather and post an event would otherwise stall. The pool's queue serializes + events through the one worker, and the interpreter drains it at exit via + ``concurrent.futures``' atexit handler. + + Returns: + The shared single-worker telemetry executor. + """ + global _executor + if _executor is None: + with _executor_lock: + if _executor is None: + _executor = ThreadPoolExecutor( + max_workers=1, thread_name_prefix="reflex-telemetry" + ) + return _executor + + +def _run_suppressed(fn: Callable[..., Any], /, *args, **kwargs) -> None: + """Run ``fn`` in the worker thread, never letting a failure escape. + + Telemetry must never break the app, so any error (including a failed send) + is reported at debug level and otherwise discarded. + + Args: + fn: The callable to run. + args: Positional arguments forwarded to ``fn``. + kwargs: Keyword arguments forwarded to ``fn``. + """ + try: + fn(*args, **kwargs) + except Exception as err: + console.debug(f"Failed to process telemetry event: {err}") + + +def _submit(fn: Callable[..., Any], /, *args, **kwargs) -> None: + """Queue telemetry work on the background executor, swallowing all errors. + + Args: + fn: The callable to run in the telemetry worker thread. + args: Positional arguments forwarded to ``fn``. + kwargs: Keyword arguments forwarded to ``fn``. + """ + with suppress(Exception): + _get_telemetry_executor().submit(_run_suppressed, fn, *args, **kwargs) + + +def _flush(timeout: float | None = None) -> None: + """Block until telemetry queued before this call has been processed. + + The executor has a single worker draining a FIFO queue, so waiting on a + sentinel submitted now guarantees every previously-queued event has been + handled. Intended for tests and best-effort shutdown; ordinary call sites + fire and forget. + + Args: + timeout: Maximum number of seconds to wait, or ``None`` to wait + indefinitely. + """ + if _executor is None: + return + with suppress(Exception): + _executor.submit(lambda: None).result(timeout) + _legacy_alias_attempted = False @@ -527,6 +600,12 @@ def send( ): """Send anonymous telemetry for Reflex. + The event is collected and delivered on a dedicated single-worker thread + pool, so neither the data gathering (blocking syscalls and subprocess calls) + nor the synchronous HTTP request blocks the caller — in particular the + asyncio event loop serving a running app. Delivery is best-effort: any + failure is suppressed and never surfaces to the caller. + Args: event: The event name. telemetry_enabled: Whether to send the telemetry (If None, get from config). @@ -534,28 +613,37 @@ def send( properties. Preferred over ``kwargs`` for new events. kwargs: Additional data to send with the event. """ - _maybe_alias_legacy_distinct_id(telemetry_enabled) + with suppress(Exception): + if telemetry_enabled is None: + telemetry_enabled = get_config().telemetry_enabled + # Only spin up the worker pool when there is actually something to send. + if telemetry_enabled: + _submit( + _process_event, + event, + telemetry_enabled, + properties=properties, + **kwargs, + ) - async def async_send( # noqa: RUF029 - event: str, - telemetry_enabled: bool | None, - properties: dict[str, Any] | None, - **kwargs, - ): - return _send(event, telemetry_enabled, properties=properties, **kwargs) - try: - # Within an event loop context, send the event asynchronously. - task = asyncio.create_task( - async_send(event, telemetry_enabled, properties, **kwargs), - name=f"reflex_send_telemetry_event|{event}", - ) - background_tasks.add(task) - task.add_done_callback(background_tasks.discard) - except RuntimeError: - # If there is no event loop, send the event synchronously. - warnings.filterwarnings("ignore", category=RuntimeWarning) - _send(event, telemetry_enabled, properties=properties, **kwargs) +def _process_event( + event: str, + telemetry_enabled: bool, + *, + properties: dict[str, Any] | None = None, + **kwargs, +) -> None: + """Collect and deliver a single telemetry event from the worker thread. + + Args: + event: The event name. + telemetry_enabled: Whether telemetry is enabled. + properties: Structured payload merged into the event properties. + kwargs: Additional allow-listed event data. + """ + _maybe_alias_legacy_distinct_id(telemetry_enabled) + _send(event, telemetry_enabled, properties=properties, **kwargs) def send_error(error: Exception, context: str): diff --git a/tests/units/test_telemetry.py b/tests/units/test_telemetry.py index 35780ccbf90..c6782352d99 100644 --- a/tests/units/test_telemetry.py +++ b/tests/units/test_telemetry.py @@ -1,4 +1,6 @@ +import asyncio import importlib.metadata +import threading import uuid from types import SimpleNamespace @@ -9,6 +11,22 @@ from reflex.utils import telemetry +@pytest.fixture(autouse=True) +def _drain_telemetry_executor(): + """Drain any queued telemetry work so jobs never leak across tests. + + Most tests mock ``telemetry.send`` and never touch the worker pool, but the + ones exercising the real ``send`` path enqueue work on the shared executor. + Flushing on teardown keeps a slow or failed job from running against the + next test's mocks. + + Yields: + Control to the test, flushing the telemetry executor afterwards. + """ + yield + telemetry._flush() + + @pytest.fixture def event_defaults(mocker: MockerFixture) -> dict: """Patch ``get_event_defaults()`` with a fresh dict. @@ -587,6 +605,8 @@ def test_maybe_alias_create_alias_payload( ) telemetry._maybe_alias_legacy_distinct_id(telemetry_enabled=True) + # The $create_alias is now sent on the telemetry worker thread; wait for it. + telemetry._flush() httpx_post.assert_called_once() payload = httpx_post.call_args.kwargs["json"] @@ -597,3 +617,104 @@ def test_maybe_alias_create_alias_payload( assert props["alias"] == legacy_id # distinct_id is the new UUID-string identity (from the event defaults). assert props["distinct_id"] == event_defaults["properties"]["distinct_id"] + + +def test_telemetry_executor_is_lazy_and_single_worker(): + """The telemetry executor is created once and runs exactly one worker.""" + executor = telemetry._get_telemetry_executor() + # Cached: repeated lookups return the same lazily-created pool. + assert executor is telemetry._get_telemetry_executor() + assert executor._max_workers == 1 + + +def test_process_event_aliases_then_sends(mocker: MockerFixture): + """``_process_event`` runs the alias migration before delivering the event.""" + alias = mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + send = mocker.patch.object(telemetry, "_send") + + telemetry._process_event("init", True, properties={"x": 1}, template="t") + + alias.assert_called_once_with(True) + send.assert_called_once_with("init", True, properties={"x": 1}, template="t") + + +def test_send_disabled_does_not_submit(mocker: MockerFixture): + """Disabled telemetry queues no work and never spins up the worker pool.""" + submit = mocker.patch.object(telemetry, "_submit") + + telemetry.send("run-dev", telemetry_enabled=False) + + submit.assert_not_called() + + +def test_send_resolves_enabled_from_config( + mocker: MockerFixture, patch_telemetry_config +): + """With ``telemetry_enabled`` unspecified, ``send`` consults the config.""" + submit = mocker.patch.object(telemetry, "_submit") + + patch_telemetry_config(enabled=False) + telemetry.send("run-dev") + submit.assert_not_called() + + patch_telemetry_config(enabled=True) + telemetry.send("run-dev") + submit.assert_called_once() + + +def test_send_processes_event_off_caller_thread(mocker: MockerFixture): + """``send`` collects and delivers the event on the worker, not the caller. + + Regression for #6618: telemetry collection (blocking syscalls/subprocess) + and the synchronous HTTP post previously ran on the caller's thread — and, + inside an event loop, on the loop thread via a task that never awaited — + blocking the event loop. The work must now happen on a separate thread. + """ + mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + seen: dict[str, threading.Thread] = {} + + def record(*_args, **_kwargs) -> bool: + seen["thread"] = threading.current_thread() + return True + + mocker.patch.object(telemetry, "_send", side_effect=record) + + telemetry.send("run-dev", telemetry_enabled=True) + telemetry._flush() + + assert seen["thread"] is not threading.current_thread() + assert seen["thread"] is not threading.main_thread() + + +async def test_send_within_event_loop_runs_off_loop_thread(mocker: MockerFixture): + """Inside a running event loop, ``send`` offloads work to the worker thread. + + This is the core scenario from #6618: at runtime ``send`` is called from the + asyncio event loop (e.g. backend error telemetry), and the blocking work + must not execute on the loop thread. + """ + loop_thread = threading.current_thread() + mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + seen: dict[str, threading.Thread] = {} + + def record(*_args, **_kwargs) -> bool: + seen["thread"] = threading.current_thread() + return True + + mocker.patch.object(telemetry, "_send", side_effect=record) + + telemetry.send("error", telemetry_enabled=True) + # Wait for the worker without blocking the event loop ourselves. + await asyncio.to_thread(telemetry._flush) + + assert seen["thread"] is not loop_thread + + +def test_send_suppresses_worker_errors(mocker: MockerFixture): + """A failed telemetry send is swallowed and never reaches the caller.""" + mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + mocker.patch.object(telemetry, "_send", side_effect=RuntimeError("boom")) + + # Neither queuing the event nor draining the failed job may raise. + telemetry.send("run-prod", telemetry_enabled=True) + telemetry._flush() From cc83909bac1778e091d96a5bbfeabf1eac5a2210 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 8 Jun 2026 19:50:45 +0000 Subject: [PATCH 2/3] telemetry: make _flush report whether the queue drained _flush() swallowed concurrent.futures.TimeoutError, so a caller passing a finite timeout could not distinguish a clean drain from an expiry. Return a bool instead: True when the queue drained (or no worker was started), False on timeout. _flush still never raises, preserving its best-effort contract for shutdown paths. Add tests for both branches. https://claude.ai/code/session_011DzmkWzgwGcJL9rdiBTLVN --- reflex/utils/telemetry.py | 13 ++++++++++--- tests/units/test_telemetry.py | 27 +++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/reflex/utils/telemetry.py b/reflex/utils/telemetry.py index c9670ad8e82..241412cbeb8 100644 --- a/reflex/utils/telemetry.py +++ b/reflex/utils/telemetry.py @@ -520,7 +520,7 @@ def _submit(fn: Callable[..., Any], /, *args, **kwargs) -> None: _get_telemetry_executor().submit(_run_suppressed, fn, *args, **kwargs) -def _flush(timeout: float | None = None) -> None: +def _flush(timeout: float | None = None) -> bool: """Block until telemetry queued before this call has been processed. The executor has a single worker draining a FIFO queue, so waiting on a @@ -531,11 +531,18 @@ def _flush(timeout: float | None = None) -> None: Args: timeout: Maximum number of seconds to wait, or ``None`` to wait indefinitely. + + Returns: + ``True`` if the queue drained (or no worker was ever started), + ``False`` if the wait timed out before the queue emptied. """ if _executor is None: - return - with suppress(Exception): + return True + try: _executor.submit(lambda: None).result(timeout) + except Exception: + return False + return True _legacy_alias_attempted = False diff --git a/tests/units/test_telemetry.py b/tests/units/test_telemetry.py index c6782352d99..aa530e39ccf 100644 --- a/tests/units/test_telemetry.py +++ b/tests/units/test_telemetry.py @@ -718,3 +718,30 @@ def test_send_suppresses_worker_errors(mocker: MockerFixture): # Neither queuing the event nor draining the failed job may raise. telemetry.send("run-prod", telemetry_enabled=True) telemetry._flush() + + +def test_flush_returns_true_when_queue_drains(mocker: MockerFixture): + """``_flush`` reports success once the queued work has been processed.""" + mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id") + mocker.patch.object(telemetry, "_send", return_value=True) + + telemetry.send("run-dev", telemetry_enabled=True) + + assert telemetry._flush() is True + + +def test_flush_returns_false_when_worker_does_not_drain_in_time(): + """``_flush`` reports failure when the queue cannot drain within the timeout. + + A blocking job occupies the single worker so the flush sentinel cannot run; + the bounded wait must surface the incomplete drain rather than swallow it. + """ + release = threading.Event() + # Bound the worker's wait so a failing assertion can never wedge the shared + # executor (and the autouse drain fixture) indefinitely. + blocker = telemetry._get_telemetry_executor().submit(release.wait, 10) + try: + assert telemetry._flush(timeout=0.05) is False + finally: + release.set() + blocker.result(timeout=5) From e88322ef90031102368eb57db3df746f8c27c211 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 8 Jun 2026 19:53:36 +0000 Subject: [PATCH 3/3] news: add changelog fragment for telemetry thread pool change https://claude.ai/code/session_011DzmkWzgwGcJL9rdiBTLVN --- news/6626.performance.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 news/6626.performance.md diff --git a/news/6626.performance.md b/news/6626.performance.md new file mode 100644 index 00000000000..978fd041b4d --- /dev/null +++ b/news/6626.performance.md @@ -0,0 +1 @@ +Run anonymous telemetry collection and delivery on a dedicated single-worker background thread instead of inline on the asyncio event loop. The blocking syscalls, subprocess calls and synchronous HTTP request used to gather and post an event no longer stall the event loop — notably when reporting backend errors at a high rate. Delivery is best-effort and any failure is suppressed, so telemetry can never affect the running app.