Skip to content

batch SGP span upserts#331

Open
alvinkam2001 wants to merge 5 commits intomainfrom
alvinkam/sgp-batched-upsert
Open

batch SGP span upserts#331
alvinkam2001 wants to merge 5 commits intomainfrom
alvinkam/sgp-batched-upsert

Conversation

@alvinkam2001
Copy link
Copy Markdown

@alvinkam2001 alvinkam2001 commented Apr 21, 2026

https://linear.app/scale-epd/issue/AGX1-198/actually-use-sgp-batching-for-spans

Greptile Summary

This PR implements true HTTP batching for SGP span upserts: the queue now groups spans by processor and calls on_spans_start/on_spans_end once per drain cycle per processor, replacing the previous N-calls-per-drain approach. SGPAsyncTracingProcessor overrides the new batched methods to issue a single upsert_batch HTTP request for all spans in the batch, and the interface provides a safe default fan-out fallback for processors that only implement the single-span methods.

Confidence Score: 5/5

Safe to merge; one minor P2 style suggestion about assert vs raise, no blocking issues.

The batching logic is correct and well-tested, the default fallback properly uses return_exceptions=True with per-failure logging, and the prior concern about return_exceptions has been addressed. The only remaining finding is a P2 suggestion to replace assert with an explicit ValueError in _process_items.

No files require special attention.

Important Files Changed

Filename Overview
src/agentex/lib/core/tracing/span_queue.py Refactored _process_items to group spans per-processor and dispatch via batched on_spans_start/on_spans_end; adds assertion (not raise) to guard same-event-type precondition.
src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py Added on_spans_start/on_spans_end overrides that build all SGP spans first then make a single upsert_batch HTTP call; single-span methods now delegate to the batch variants.
src/agentex/lib/core/tracing/processors/tracing_processor_interface.py Added default on_spans_start/on_spans_end that fan out to per-span methods using asyncio.gather with return_exceptions=True and per-failure logging.
tests/lib/core/tracing/processors/test_sgp_tracing_processor.py Added two batch tests verifying on_spans_start and on_spans_end each produce exactly one upsert_batch call for N spans.
tests/lib/core/tracing/processors/test_tracing_processor_interface.py New test file validating the default fan-out behavior: continues past individual failures, does not propagate exceptions, logs each error.
tests/lib/core/tracing/test_span_queue.py Updated mock processor helper to implement on_spans_start/on_spans_end; added TestProcessItemsPreconditions and TestAsyncSpanQueueBatchedDispatch suites.

Sequence Diagram

sequenceDiagram
    participant DL as AsyncSpanQueue._drain_loop
    participant PI as _process_items
    participant SGP as SGPAsyncTracingProcessor
    participant API as SGP HTTP API

    DL->>PI: _process_items(starts)
    PI->>PI: group spans by processor
    PI->>SGP: on_spans_start([span1, span2, ...spanN])
    SGP->>SGP: create all SGPSpans, store in _spans
    SGP->>API: upsert_batch(items=[...N items...])
    API-->>SGP: 200 OK

    DL->>PI: _process_items(ends)
    PI->>SGP: on_spans_end([span1, span2, ...spanN])
    SGP->>SGP: pop from _spans, update fields
    SGP->>API: upsert_batch(items=[...N items...])
    API-->>SGP: 200 OK
Loading

Fix All in Cursor Fix All in Claude Code

Prompt To Fix All With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/span_queue.py
Line: 108-111

Comment:
**`assert` stripped in optimized builds**

`assert` statements are silently removed when Python is run with the `-O` flag (`PYTHONOPTIMIZE`) or compiled to `.pyo` bytecode. If this guard is ever triggered in a production-optimized environment, mixed START/END items would be dispatched incorrectly with no error raised. Consider raising an explicit `ValueError` instead to make the precondition unconditional.

```suggestion
        if not all(i.event_type == event_type for i in items):
            raise ValueError(
                "_process_items requires all items to share the same event_type; "
                "callers must split START and END batches before dispatching."
            )
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (3): Last reviewed commit: "lint fixes" | Re-trigger Greptile

@declan-scale
Copy link
Copy Markdown
Contributor

@greptile

Comment thread src/agentex/lib/core/tracing/processors/tracing_processor_interface.py Outdated
@staticmethod
async def _process_items(items: list[_SpanQueueItem]) -> None:
"""Process a list of span events concurrently."""
"""Dispatch a batch of same-event-type items to each processor in one call.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that all these items are the same event-type?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, see we dispatch them ourselves

if not items:
return

event_type = items[0].event_type
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all the event_types the same?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe they should be but added an assert to check

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants