Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions submitqueue/extension/storage/batch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ type BatchStore interface {
// Version arithmetic is owned by the caller; the store performs a pure conditional write.
UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) error

// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states.
GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error)
// ListActive returns all active (non-terminal) batches in the given queue.
// "Active" means the batch's persisted state is not terminal — see
// entity.BatchState.IsTerminal. Callers that need a narrower set filter the
// result by state in memory.
//
// The store tracks active membership internally (added on Create, self-healed
// on read) so this is a key-prefix read rather than a secondary-index query;
// see the implementation and extension/storage/mysql/schema/README.md.
ListActive(ctx context.Context, queue string) ([]entity.Batch, error)
}
12 changes: 6 additions & 6 deletions submitqueue/extension/storage/mock/batch_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

111 changes: 78 additions & 33 deletions submitqueue/extension/storage/mysql/batch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/go-sql-driver/mysql"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -87,6 +86,19 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err
return fmt.Errorf("failed to marshal dependencies=%v id=%s for Create batch entity: %w", batch.Dependencies, batch.ID, err)
}

// Write membership before the batch row (no transaction) so a batch row is
// never visible to ListActive without its membership. ON DUPLICATE KEY UPDATE is
// an idempotent no-op on PK conflict (retry-safe) while still surfacing real
// errors, unlike INSERT IGNORE which would swallow them and let Create proceed
// without a valid membership. A create that fails after this point leaves a
// dangling row, which ListActive skips and the reconcile job reclaims.
if _, err = s.db.ExecContext(ctx,
"INSERT INTO active_batch (queue, batch_id) VALUES (?, ?) ON DUPLICATE KEY UPDATE batch_id = batch_id",
batch.Queue, batch.ID,
); err != nil {
return fmt.Errorf("failed to insert active_batch membership for batch entity id=%s queue=%s: %w", batch.ID, batch.Queue, err)
}
Comment thread
Copilot marked this conversation as resolved.

_, err = s.db.ExecContext(ctx,
"INSERT INTO batch (id, queue, contains, dependencies, score, state, version) VALUES (?, ?, ?, ?, ?, ?, ?)",
batch.ID, batch.Queue, containsJSON, dependenciesJSON, batch.Score, batch.State, batch.Version,
Expand All @@ -96,6 +108,10 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
return fmt.Errorf("batch entity id=%s: %w", batch.ID, storage.ErrAlreadyExists)
}
// Leave the membership row in place: a returned error doesn't prove the
// batch row was not written (an ambiguous failure can commit it and still
// error), so deleting could permanently hide a live batch from ListActive.
// A dangling row is the safe direction.
return fmt.Errorf("failed to insert batch entity id=%s: %w", batch.ID, err)
}

Expand Down Expand Up @@ -174,52 +190,81 @@ func (s *batchStore) UpdateScoreAndState(ctx context.Context, id string, oldVers
return nil
}

// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states.
func (s *batchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) (ret []entity.Batch, retErr error) {
op := metrics.Begin(s.scope, "get_by_queue_and_states")
// ListActive returns all active (non-terminal) batches in the given queue.
//
// Membership is tracked in active_batch (queue leads the PK), so listing is a
// PK-prefix scan that ports cleanly to a key-value store. Each member is fetched
// by primary key: a terminal batch's membership is best-effort removed (race-free,
// its id is never reused), while a missing batch is skipped but NOT removed (it
// may belong to an in-flight Create that hasn't written its batch row yet).
func (s *batchStore) ListActive(ctx context.Context, queue string) (ret []entity.Batch, retErr error) {
op := metrics.Begin(s.scope, "list_active")
defer func() { op.Complete(retErr) }()

if len(states) == 0 {
return nil, nil
// Read all membership rows and release the connection before resolving each
// batch, since Get issues its own query.
ids, err := s.activeBatchIDs(ctx, queue)
if err != nil {
return nil, err
}

query := "SELECT id, queue, contains, dependencies, score, state, version FROM batch WHERE queue = ? AND state IN (?" + strings.Repeat(", ?", len(states)-1) + ")"

args := make([]any, 1+len(states))
args[0] = queue
for i, state := range states {
args[i+1] = state
var results []entity.Batch
for _, id := range ids {
batch, err := s.Get(ctx, id)
if err != nil {
if storage.IsNotFound(err) {
// Missing batch: either an in-flight Create or a dangling row. We
// can't tell them apart, so skip without deleting.
continue
}
return nil, fmt.Errorf("failed to get active batch id=%q queue=%q: %w", id, queue, err)
}
if batch.State.IsTerminal() {
// Stale membership: the batch has finished. Race-free to remove since
// its id is never reused.
s.removeActive(ctx, queue, id)
continue
}
results = append(results, batch)
}

rows, err := s.db.QueryContext(ctx, query, args...)
return results, nil
}

// activeBatchIDs reads the batch IDs recorded as active for the queue, owning the
// result set's lifecycle so the caller can resolve each batch after it's closed.
func (s *batchStore) activeBatchIDs(ctx context.Context, queue string) ([]string, error) {
rows, err := s.db.QueryContext(ctx,
"SELECT batch_id FROM active_batch WHERE queue = ?",
queue,
)
if err != nil {
return nil, fmt.Errorf("failed to query batches by queue=%q states=%v from the database: %w", queue, states, err)
return nil, fmt.Errorf("failed to query active batch membership for queue=%q: %w", queue, err)
}
defer rows.Close()

var results []entity.Batch
var ids []string
for rows.Next() {
var batch entity.Batch
var containsJSON []byte
var dependenciesJSON []byte

if err := rows.Scan(&batch.ID, &batch.Queue, &containsJSON, &dependenciesJSON, &batch.Score, &batch.State, &batch.Version); err != nil {
return nil, fmt.Errorf("failed to scan batch entity by queue=%q states=%v from the database: %w", queue, states, err)
}

if err := json.Unmarshal(containsJSON, &batch.Contains); err != nil {
return nil, fmt.Errorf("failed to unmarshal contains for batch entity id=%s from the database: %w", batch.ID, err)
}

if err := json.Unmarshal(dependenciesJSON, &batch.Dependencies); err != nil {
return nil, fmt.Errorf("failed to unmarshal dependencies for batch entity id=%s from the database: %w", batch.ID, err)
var id string
if err := rows.Scan(&id); err != nil {
return nil, fmt.Errorf("failed to scan active batch membership for queue=%q: %w", queue, err)
}

results = append(results, batch)
ids = append(ids, id)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate batches by queue=%q states=%v from the database: %w", queue, states, err)
return nil, fmt.Errorf("failed to iterate active batch membership for queue=%q: %w", queue, err)
}
return ids, nil
}

return results, nil
// removeActive best-effort deletes a single active_batch membership row, used by
// ListActive to reclaim terminal batches' memberships. Failures are counted and
// ignored — the row is harmless and the next read retries.
func (s *batchStore) removeActive(ctx context.Context, queue, batchID string) {
if _, err := s.db.ExecContext(ctx,
"DELETE FROM active_batch WHERE queue = ? AND batch_id = ?",
queue, batchID,
); err != nil {
metrics.NamedCounter(s.scope, "list_active", "self_heal_errors", 1)
}
}
20 changes: 12 additions & 8 deletions submitqueue/extension/storage/mysql/schema/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@

## batch table

### Secondary index: `idx_queue_state (queue, state)`
The `batch` table is reachable only by its primary key (`id`). It carries no secondary index — every access pattern is expressed as a primary-key get or as a key-prefix scan over a companion membership table (see `active_batch` below). This keeps the access patterns portable to a key-value / document store, where a server-maintained secondary index over a mutable, non-key column (such as `state`) is not a primitive every backend offers cheaply.

The `batch` table has a composite secondary index on `(queue, state)`. This index supports the `GetByQueueAndStates` query, which retrieves batches filtered by queue and one or more states. Without this index, the query would require a full table scan.
## active_batch table

#### Trade-offs
`active_batch` is the membership index that answers "which batches in this queue are still active?" — the only queue-scoped query the pipeline needs (the batch controller uses it to find conflict dependencies; the cancel controller uses it to find the batch holding a request). A row is intended to exist per non-terminal batch, so the table stays bounded by the live speculation window rather than full batch history. The correspondence is best-effort, not exact: readers treat membership as a hint and resolve each batch by primary key — see *Maintenance and self-healing* below.

- **Write overhead**: Every `INSERT` and `UPDATE` to the `batch` table must also update the secondary index, adding latency to write operations.
- **Storage cost**: The index consumes additional disk space proportional to the number of rows in the table.
- **Lock contention**: Under high write concurrency, index maintenance can increase lock contention on the affected index pages.
`queue` leads the composite primary key `(queue, batch_id)`, so listing a queue's active batches is a primary-key-prefix scan and the table is shardable by queue. On a key-value store the same shape maps directly onto a partition key (`queue`) and sort key (`batch_id`) with no secondary index.

#### Future: Prune job
### Maintenance and self-healing
Comment thread
albertywu marked this conversation as resolved.

As the `batch` table grows, the secondary index will grow with it, increasing storage costs and degrading write performance. To mitigate this, a prune job should be introduced to periodically delete batches in terminal states (`succeeded`, `failed`, `cancelled`) that are older than a configurable retention period. This keeps the table and its indexes bounded in size, ensuring consistent query and write performance over time.
`BatchStore.Create` writes the membership row before the batch row, so a batch row is never visible to `ListActive` without its membership. If the batch insert then fails, `Create` leaves the membership row in place: a returned error doesn't prove the row wasn't written (an ambiguous failure can commit it and still error), so deleting could permanently hide a live batch. A dangling row is the safe direction.

On read, `ListActive` resolves each member by primary key. A **terminal** batch's membership is best-effort removed (race-free — its id is never reused). A **missing** batch is skipped but not removed, since it may belong to an in-flight `Create` that hasn't written its batch row yet. Cleanup failures are swallowed, so reads never fail on index maintenance and terminal-state writers (merge, speculate, dlq) never touch the index. Because the two writes are independent (no transaction), the design tolerates partial failure via idempotent retries and read-time reconciliation.

### Future: prune / reconcile job

Read-time reconciliation only removes terminal memberships, so two kinds of stale row need a periodic sweep: dangling memberships whose batch never landed (a failed or crashed create), and memberships of batches that are stuck in a non-terminal state (e.g. an orphan stuck in `created` after a mid-process failure). A reconcile job should sweep both, keeping the table bounded independently of read traffic.

## change table

Expand Down
12 changes: 12 additions & 0 deletions submitqueue/extension/storage/mysql/schema/active_batch.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- active_batch is the membership index for "active (non-terminal) batches in a
-- queue", keeping the set bounded by the live speculation window rather than batch
-- history. queue leads the PK so listing is a PK-prefix scan (shardable by queue;
-- portable to a key-value store with queue = partition key, batch_id = sort key).
-- Membership is best-effort: it is added on Create (before the batch row) and
-- removed on read by ListActive once a batch is terminal. A reconcile job reclaims
-- rows left dangling by a failed or crashed create. See schema/README.md.
CREATE TABLE IF NOT EXISTS active_batch (
queue VARCHAR(255) NOT NULL,
batch_id VARCHAR(255) NOT NULL,
PRIMARY KEY (queue, batch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
5 changes: 2 additions & 3 deletions submitqueue/extension/storage/mysql/schema/batch.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ CREATE TABLE IF NOT EXISTS batch (
contains JSON NOT NULL,
dependencies JSON NOT NULL,
score DOUBLE NOT NULL,
state VARCHAR(255) NOT NUll,
state VARCHAR(255) NOT NULL,
version INT NOT NULL,
PRIMARY KEY (id),
INDEX idx_queue_state (queue, state)
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
20 changes: 12 additions & 8 deletions submitqueue/orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,22 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
Version: 1,
}

// Get active batches for this queue and ask the conflict analyzer which
// of them the new batch must serialize behind. The dependency set drives
// the speculation graph downstream.
activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{
entity.BatchStateCreated,
entity.BatchStateSpeculating,
entity.BatchStateMerging,
})
// Ask the conflict analyzer which active batches the new batch must serialize
// behind. ListActive returns all non-terminal batches; we narrow to
// Created/Speculating/Merging so the analyzer only sees batches that can still
// acquire new conflicts.
allActive, err := c.store.GetBatchStore().ListActive(ctx, request.Queue)
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1)
return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err)
}
activeBatches := make([]entity.Batch, 0, len(allActive))
for _, b := range allActive {
switch b.State {
case entity.BatchStateCreated, entity.BatchStateSpeculating, entity.BatchStateMerging:
activeBatches = append(activeBatches, b)
}
}

// Dedupe by batch ID since a single (analyzed, in-flight) pair may be
// reported with multiple Conflict entries when different conflict types
Expand Down
Loading
Loading