Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions news/6626.performance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Run anonymous telemetry collection and delivery on a dedicated single-worker background thread instead of inline on the asyncio event loop. The blocking syscalls, subprocess calls and synchronous HTTP request used to gather and post an event no longer stall the event loop — notably when reporting backend errors at a high rate. Delivery is best-effort and any failure is suppressed, so telemetry can never affect the running app.
141 changes: 118 additions & 23 deletions reflex/utils/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Anonymous telemetry for Reflex."""

import asyncio
import dataclasses
import importlib.metadata
import json
import multiprocessing
import os
import platform
import sys
import threading
import uuid
import warnings
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from datetime import datetime, timezone
from pathlib import Path
Expand Down Expand Up @@ -463,7 +464,86 @@ def _send(
return False


background_tasks = set()
_executor_lock = threading.Lock()
_executor: ThreadPoolExecutor | None = None


def _get_telemetry_executor() -> ThreadPoolExecutor:
"""Return the process-wide, lazily-created telemetry executor.

A single-worker thread pool runs all telemetry collection and delivery off
the caller's thread — in particular the asyncio event loop, which the
blocking syscalls, subprocess calls and synchronous HTTP request used to
gather and post an event would otherwise stall. The pool's queue serializes
events through the one worker, and the interpreter drains it at exit via
``concurrent.futures``' atexit handler.

Returns:
The shared single-worker telemetry executor.
"""
global _executor
if _executor is None:
with _executor_lock:
if _executor is None:
_executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="reflex-telemetry"
)
return _executor


def _run_suppressed(fn: Callable[..., Any], /, *args, **kwargs) -> None:
"""Run ``fn`` in the worker thread, never letting a failure escape.

Telemetry must never break the app, so any error (including a failed send)
is reported at debug level and otherwise discarded.

Args:
fn: The callable to run.
args: Positional arguments forwarded to ``fn``.
kwargs: Keyword arguments forwarded to ``fn``.
"""
try:
fn(*args, **kwargs)
except Exception as err:
console.debug(f"Failed to process telemetry event: {err}")


def _submit(fn: Callable[..., Any], /, *args, **kwargs) -> None:
"""Queue telemetry work on the background executor, swallowing all errors.

Args:
fn: The callable to run in the telemetry worker thread.
args: Positional arguments forwarded to ``fn``.
kwargs: Keyword arguments forwarded to ``fn``.
"""
with suppress(Exception):
_get_telemetry_executor().submit(_run_suppressed, fn, *args, **kwargs)


def _flush(timeout: float | None = None) -> bool:
"""Block until telemetry queued before this call has been processed.

The executor has a single worker draining a FIFO queue, so waiting on a
sentinel submitted now guarantees every previously-queued event has been
handled. Intended for tests and best-effort shutdown; ordinary call sites
fire and forget.

Args:
timeout: Maximum number of seconds to wait, or ``None`` to wait
indefinitely.

Returns:
``True`` if the queue drained (or no worker was ever started),
``False`` if the wait timed out before the queue emptied.
"""
if _executor is None:
return True
try:
_executor.submit(lambda: None).result(timeout)
Comment on lines +539 to +542

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _flush(timeout=...) silently swallows concurrent.futures.TimeoutError, so a caller that passes a finite timeout has no way to tell whether the flush actually completed or just expired. The suppress is appropriate for catastrophic errors, but letting TimeoutError propagate (or returning a bool) would let the test harness or shutdown code detect an incomplete drain.

Suggested change
if _executor is None:
return
with suppress(Exception):
_executor.submit(lambda: None).result(timeout)
if _executor is None:
return
with suppress(Exception):
future = _executor.submit(lambda: None)
try:
future.result(timeout)
except Exception:
pass

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the unobservable drain. Addressed in cc83909, though with a different shape than the suggestion: _flush now returns bool (True when the queue drained or no worker was ever started, False on timeout) and still never raises, which keeps it safe for best-effort shutdown paths.

I avoided the proposed snippet because it would still swallow TimeoutError via except Exception: pass, and future would be unbound (NameError) if submit() raised inside the preceding with suppress(...). Added tests for both the drained and timed-out branches.


Generated by Claude Code

except Exception:
return False
return True


_legacy_alias_attempted = False

Expand Down Expand Up @@ -527,35 +607,50 @@ def send(
):
"""Send anonymous telemetry for Reflex.

The event is collected and delivered on a dedicated single-worker thread
pool, so neither the data gathering (blocking syscalls and subprocess calls)
nor the synchronous HTTP request blocks the caller — in particular the
asyncio event loop serving a running app. Delivery is best-effort: any
failure is suppressed and never surfaces to the caller.

Args:
event: The event name.
telemetry_enabled: Whether to send the telemetry (If None, get from config).
properties: Arbitrary structured payload merged into the event
properties. Preferred over ``kwargs`` for new events.
kwargs: Additional data to send with the event.
"""
_maybe_alias_legacy_distinct_id(telemetry_enabled)
with suppress(Exception):
if telemetry_enabled is None:
telemetry_enabled = get_config().telemetry_enabled
# Only spin up the worker pool when there is actually something to send.
if telemetry_enabled:
_submit(
_process_event,
event,
telemetry_enabled,
properties=properties,
**kwargs,
)

async def async_send( # noqa: RUF029
event: str,
telemetry_enabled: bool | None,
properties: dict[str, Any] | None,
**kwargs,
):
return _send(event, telemetry_enabled, properties=properties, **kwargs)

try:
# Within an event loop context, send the event asynchronously.
task = asyncio.create_task(
async_send(event, telemetry_enabled, properties, **kwargs),
name=f"reflex_send_telemetry_event|{event}",
)
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
except RuntimeError:
# If there is no event loop, send the event synchronously.
warnings.filterwarnings("ignore", category=RuntimeWarning)
_send(event, telemetry_enabled, properties=properties, **kwargs)
def _process_event(
event: str,
telemetry_enabled: bool,
*,
properties: dict[str, Any] | None = None,
**kwargs,
) -> None:
"""Collect and deliver a single telemetry event from the worker thread.

Args:
event: The event name.
telemetry_enabled: Whether telemetry is enabled.
properties: Structured payload merged into the event properties.
kwargs: Additional allow-listed event data.
"""
_maybe_alias_legacy_distinct_id(telemetry_enabled)
_send(event, telemetry_enabled, properties=properties, **kwargs)


def send_error(error: Exception, context: str):
Expand Down
148 changes: 148 additions & 0 deletions tests/units/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import importlib.metadata
import threading
import uuid
from types import SimpleNamespace

Expand All @@ -9,6 +11,22 @@
from reflex.utils import telemetry


@pytest.fixture(autouse=True)
def _drain_telemetry_executor():
"""Drain any queued telemetry work so jobs never leak across tests.

Most tests mock ``telemetry.send`` and never touch the worker pool, but the
ones exercising the real ``send`` path enqueue work on the shared executor.
Flushing on teardown keeps a slow or failed job from running against the
next test's mocks.

Yields:
Control to the test, flushing the telemetry executor afterwards.
"""
yield
telemetry._flush()


@pytest.fixture
def event_defaults(mocker: MockerFixture) -> dict:
"""Patch ``get_event_defaults()`` with a fresh dict.
Expand Down Expand Up @@ -587,6 +605,8 @@ def test_maybe_alias_create_alias_payload(
)

telemetry._maybe_alias_legacy_distinct_id(telemetry_enabled=True)
# The $create_alias is now sent on the telemetry worker thread; wait for it.
telemetry._flush()

httpx_post.assert_called_once()
payload = httpx_post.call_args.kwargs["json"]
Expand All @@ -597,3 +617,131 @@ def test_maybe_alias_create_alias_payload(
assert props["alias"] == legacy_id
# distinct_id is the new UUID-string identity (from the event defaults).
assert props["distinct_id"] == event_defaults["properties"]["distinct_id"]


def test_telemetry_executor_is_lazy_and_single_worker():
"""The telemetry executor is created once and runs exactly one worker."""
executor = telemetry._get_telemetry_executor()
# Cached: repeated lookups return the same lazily-created pool.
assert executor is telemetry._get_telemetry_executor()
assert executor._max_workers == 1


def test_process_event_aliases_then_sends(mocker: MockerFixture):
"""``_process_event`` runs the alias migration before delivering the event."""
alias = mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id")
send = mocker.patch.object(telemetry, "_send")

telemetry._process_event("init", True, properties={"x": 1}, template="t")

alias.assert_called_once_with(True)
send.assert_called_once_with("init", True, properties={"x": 1}, template="t")


def test_send_disabled_does_not_submit(mocker: MockerFixture):
"""Disabled telemetry queues no work and never spins up the worker pool."""
submit = mocker.patch.object(telemetry, "_submit")

telemetry.send("run-dev", telemetry_enabled=False)

submit.assert_not_called()


def test_send_resolves_enabled_from_config(
mocker: MockerFixture, patch_telemetry_config
):
"""With ``telemetry_enabled`` unspecified, ``send`` consults the config."""
submit = mocker.patch.object(telemetry, "_submit")

patch_telemetry_config(enabled=False)
telemetry.send("run-dev")
submit.assert_not_called()

patch_telemetry_config(enabled=True)
telemetry.send("run-dev")
submit.assert_called_once()


def test_send_processes_event_off_caller_thread(mocker: MockerFixture):
"""``send`` collects and delivers the event on the worker, not the caller.

Regression for #6618: telemetry collection (blocking syscalls/subprocess)
and the synchronous HTTP post previously ran on the caller's thread — and,
inside an event loop, on the loop thread via a task that never awaited —
blocking the event loop. The work must now happen on a separate thread.
"""
mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id")
seen: dict[str, threading.Thread] = {}

def record(*_args, **_kwargs) -> bool:
seen["thread"] = threading.current_thread()
return True

mocker.patch.object(telemetry, "_send", side_effect=record)

telemetry.send("run-dev", telemetry_enabled=True)
telemetry._flush()

assert seen["thread"] is not threading.current_thread()
assert seen["thread"] is not threading.main_thread()


async def test_send_within_event_loop_runs_off_loop_thread(mocker: MockerFixture):
"""Inside a running event loop, ``send`` offloads work to the worker thread.

This is the core scenario from #6618: at runtime ``send`` is called from the
asyncio event loop (e.g. backend error telemetry), and the blocking work
must not execute on the loop thread.
"""
loop_thread = threading.current_thread()
mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id")
seen: dict[str, threading.Thread] = {}

def record(*_args, **_kwargs) -> bool:
seen["thread"] = threading.current_thread()
return True

mocker.patch.object(telemetry, "_send", side_effect=record)

telemetry.send("error", telemetry_enabled=True)
# Wait for the worker without blocking the event loop ourselves.
await asyncio.to_thread(telemetry._flush)

assert seen["thread"] is not loop_thread


def test_send_suppresses_worker_errors(mocker: MockerFixture):
"""A failed telemetry send is swallowed and never reaches the caller."""
mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id")
mocker.patch.object(telemetry, "_send", side_effect=RuntimeError("boom"))

# Neither queuing the event nor draining the failed job may raise.
telemetry.send("run-prod", telemetry_enabled=True)
telemetry._flush()


def test_flush_returns_true_when_queue_drains(mocker: MockerFixture):
"""``_flush`` reports success once the queued work has been processed."""
mocker.patch.object(telemetry, "_maybe_alias_legacy_distinct_id")
mocker.patch.object(telemetry, "_send", return_value=True)

telemetry.send("run-dev", telemetry_enabled=True)

assert telemetry._flush() is True


def test_flush_returns_false_when_worker_does_not_drain_in_time():
"""``_flush`` reports failure when the queue cannot drain within the timeout.

A blocking job occupies the single worker so the flush sentinel cannot run;
the bounded wait must surface the incomplete drain rather than swallow it.
"""
release = threading.Event()
# Bound the worker's wait so a failing assertion can never wedge the shared
# executor (and the autouse drain fixture) indefinitely.
blocker = telemetry._get_telemetry_executor().submit(release.wait, 10)
try:
assert telemetry._flush(timeout=0.05) is False
finally:
release.set()
blocker.result(timeout=5)
Loading