Move telemetry off event loop to dedicated worker thread#6626
Conversation
Telemetry collection uses blocking syscalls and subprocess calls and a synchronous httpx.post. The asyncio "send" path ran that work directly on the event loop thread (a task that never awaited), so it stalled the loop -- notably when emitting runtime-error telemetry at a high rate -- and the no-event-loop path blocked the caller synchronously. Run all telemetry collection and delivery on a lazily-initialized, single-worker ThreadPoolExecutor whose FIFO queue serializes events off the caller's thread. Delivery is best-effort: every failure is suppressed so telemetry can never break the app. The pool is created only when telemetry is enabled and there is something to send, and concurrent.futures drains its queue at interpreter exit so CLI events are not lost. Closes #6618 https://claude.ai/code/session_011DzmkWzgwGcJL9rdiBTLVN
Greptile SummaryThis PR moves telemetry collection and delivery off the asyncio event loop onto a dedicated single-worker
Confidence Score: 4/5Safe to merge; the core bug fix is correct and all blocking work is now off the event loop. The executor setup, double-checked locking, error-suppression chain, and the recursive-alias-send path via the single worker are all handled correctly. The one rough edge is that _flush(timeout=...) swallows TimeoutError, making it impossible for callers to confirm the drain completed — currently only tests use _flush without a timeout, so this has no runtime impact today. No files need special attention; both changed files are straightforward and the logic is sound. Important Files Changed
Reviews (1): Last reviewed commit: "fix(telemetry): process telemetry events..." | Re-trigger Greptile |
| if _executor is None: | ||
| return | ||
| with suppress(Exception): | ||
| _executor.submit(lambda: None).result(timeout) |
There was a problem hiding this comment.
_flush(timeout=...) silently swallows concurrent.futures.TimeoutError, so a caller that passes a finite timeout has no way to tell whether the flush actually completed or just expired. The suppress is appropriate for catastrophic errors, but letting TimeoutError propagate (or returning a bool) would let the test harness or shutdown code detect an incomplete drain.
| if _executor is None: | |
| return | |
| with suppress(Exception): | |
| _executor.submit(lambda: None).result(timeout) | |
| if _executor is None: | |
| return | |
| with suppress(Exception): | |
| future = _executor.submit(lambda: None) | |
| try: | |
| future.result(timeout) | |
| except Exception: | |
| pass |
There was a problem hiding this comment.
Good catch on the unobservable drain. Addressed in cc83909, though with a different shape than the suggestion: _flush now returns bool (True when the queue drained or no worker was ever started, False on timeout) and still never raises, which keeps it safe for best-effort shutdown paths.
I avoided the proposed snippet because it would still swallow TimeoutError via except Exception: pass, and future would be unbound (NameError) if submit() raised inside the preceding with suppress(...). Added tests for both the drained and timed-out branches.
Generated by Claude Code
Merging this PR will not alter performance
Comparing Footnotes
|
_flush() swallowed concurrent.futures.TimeoutError, so a caller passing a finite timeout could not distinguish a clean drain from an expiry. Return a bool instead: True when the queue drained (or no worker was started), False on timeout. _flush still never raises, preserving its best-effort contract for shutdown paths. Add tests for both branches. https://claude.ai/code/session_011DzmkWzgwGcJL9rdiBTLVN
FarhanAliRaza
left a comment
There was a problem hiding this comment.
We now accept the latency atexit time. because now it sends all the remaining events at the end before exiting at the default timeout of 5 seconds of httpx.
Type of change
Description
Fixes #6618: telemetry collection and delivery were blocking the asyncio event loop when
send()was called from within a running loop (e.g., backend error telemetry). The blocking syscalls, subprocess calls, and synchronous HTTP requests used to gather and post events would stall the event loop.Changes:
Replace asyncio task-based approach with a dedicated worker thread pool:
asyncio.create_task()and background task trackingThreadPoolExecutor(_get_telemetry_executor()) that lazily initializes on first useNew helper functions:
_get_telemetry_executor(): Returns the process-wide, lazily-created telemetry executor_submit(): Queues work on the executor, swallowing all errors so telemetry never breaks the app_run_suppressed(): Wraps executor work to catch and suppress exceptions at debug level_flush(): Blocks until queued telemetry has been processed (for tests and shutdown)_process_event(): Extracted fromsend()to run on the worker threadSimplified
send()API:_process_event()on the worker threadtelemetry_enabledfrom config if unspecifiedTest coverage:
_drain_telemetry_executorfixture to prevent test leakageTesting
Checklist
uv run ruff check .anduv run ruff format .cleanuv run pyright reflex testspasseshttps://claude.ai/code/session_011DzmkWzgwGcJL9rdiBTLVN