From 49119c417deb28c4cf27c76a07119ea925c2fd27 Mon Sep 17 00:00:00 2001 From: Albert Wu Date: Tue, 9 Jun 2026 20:52:55 -0700 Subject: [PATCH] refactor(storage): replace batch (queue,state) index with active_batch membership MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the secondary index over the batch table's mutable `state` column with an `active_batch` membership table that answers the only queue-scoped query the pipeline needs: "which batches in this queue are still active?" (the batch controller uses it to find conflict dependencies; the cancel controller uses it to find the batch holding a request). A row is intended to exist while its batch is non-terminal, so the table stays bounded by the live speculation window rather than growing with batch history. `queue` leads the PK so listing is a PK-prefix scan and the table is shardable by queue — an access pattern that ports cleanly to a key-value store (queue = partition key, batch_id = sort key), unlike a server-maintained secondary index over a mutable non-key column. Membership is best-effort, not an exact mirror of batch state, and is maintained without transactions: - Create writes the membership row before the batch row. This ordering is required for correctness: whenever a batch row is visible to a reader its membership row is already present, so a concurrent ListActive can never miss an active batch. INSERT IGNORE keeps the membership write idempotent across retries. - If the batch insert then fails, Create deliberately leaves the membership row in place. A returned error does not prove the row was not written (an ambiguous failure can commit the batch row and still return an error), so deleting would risk permanently orphaning a live, non-terminal batch from ListActive. A dangling membership is the safe direction. - ListActive resolves each member by primary key: a terminal batch's membership is best-effort removed (race-free — a terminal batch is fully committed and its id is never reused); a missing batch is skipped but NOT removed (it may belong to an in-flight Create that has written its membership but not yet its batch row). Cleanup failures are swallowed so a read never fails on index maintenance, and terminal-state writers (merge, speculate, dlq) need not touch the index. Genuinely dangling rows (failed/crashed creates) and batches stuck in a non-terminal state are left for a future reconcile/prune job, documented in schema/README.md. Integration tests cover the self-heal and membership invariants: - TestActiveBatch_SelfHealsTerminalMembership - TestActiveBatch_SkipsDanglingMembershipWithoutDeleting - TestActiveBatch_CreateKeepsMembershipOnDuplicate - TestActiveBatch_CreateKeepsMembershipOnFailedInsert Co-Authored-By: Claude Opus 4.8 (1M context) --- submitqueue/extension/storage/batch_store.go | 11 +- .../storage/mock/batch_store_mock.go | 12 +- .../extension/storage/mysql/batch_store.go | 111 ++++++++++++------ .../extension/storage/mysql/schema/README.md | 20 ++-- .../storage/mysql/schema/active_batch.sql | 12 ++ .../extension/storage/mysql/schema/batch.sql | 5 +- .../orchestrator/controller/batch/batch.go | 20 ++-- .../controller/batch/batch_test.go | 23 ++-- .../orchestrator/controller/cancel/cancel.go | 16 +-- .../controller/cancel/cancel_test.go | 12 +- .../extension/storage/mysql/BUILD.bazel | 2 + .../extension/storage/mysql/storage_test.go | 102 ++++++++++++++++ .../submitqueue/extension/storage/suite.go | 96 +++++++++++++++ 13 files changed, 355 insertions(+), 87 deletions(-) create mode 100644 submitqueue/extension/storage/mysql/schema/active_batch.sql diff --git a/submitqueue/extension/storage/batch_store.go b/submitqueue/extension/storage/batch_store.go index 05e94fb8..59a5dceb 100644 --- a/submitqueue/extension/storage/batch_store.go +++ b/submitqueue/extension/storage/batch_store.go @@ -41,6 +41,13 @@ type BatchStore interface { // Version arithmetic is owned by the caller; the store performs a pure conditional write. UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) error - // GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states. - GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error) + // ListActive returns all active (non-terminal) batches in the given queue. + // "Active" means the batch's persisted state is not terminal — see + // entity.BatchState.IsTerminal. Callers that need a narrower set filter the + // result by state in memory. + // + // The store tracks active membership internally (added on Create, self-healed + // on read) so this is a key-prefix read rather than a secondary-index query; + // see the implementation and extension/storage/mysql/schema/README.md. + ListActive(ctx context.Context, queue string) ([]entity.Batch, error) } diff --git a/submitqueue/extension/storage/mock/batch_store_mock.go b/submitqueue/extension/storage/mock/batch_store_mock.go index 48b6bdaf..c126add2 100644 --- a/submitqueue/extension/storage/mock/batch_store_mock.go +++ b/submitqueue/extension/storage/mock/batch_store_mock.go @@ -70,19 +70,19 @@ func (mr *MockBatchStoreMockRecorder) Get(ctx, id any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockBatchStore)(nil).Get), ctx, id) } -// GetByQueueAndStates mocks base method. -func (m *MockBatchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error) { +// ListActive mocks base method. +func (m *MockBatchStore) ListActive(ctx context.Context, queue string) ([]entity.Batch, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetByQueueAndStates", ctx, queue, states) + ret := m.ctrl.Call(m, "ListActive", ctx, queue) ret0, _ := ret[0].([]entity.Batch) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetByQueueAndStates indicates an expected call of GetByQueueAndStates. -func (mr *MockBatchStoreMockRecorder) GetByQueueAndStates(ctx, queue, states any) *gomock.Call { +// ListActive indicates an expected call of ListActive. +func (mr *MockBatchStoreMockRecorder) ListActive(ctx, queue any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetByQueueAndStates", reflect.TypeOf((*MockBatchStore)(nil).GetByQueueAndStates), ctx, queue, states) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListActive", reflect.TypeOf((*MockBatchStore)(nil).ListActive), ctx, queue) } // UpdateScoreAndState mocks base method. diff --git a/submitqueue/extension/storage/mysql/batch_store.go b/submitqueue/extension/storage/mysql/batch_store.go index e6999aac..76c21e94 100644 --- a/submitqueue/extension/storage/mysql/batch_store.go +++ b/submitqueue/extension/storage/mysql/batch_store.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "fmt" - "strings" "github.com/go-sql-driver/mysql" "github.com/uber-go/tally" @@ -87,6 +86,19 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err return fmt.Errorf("failed to marshal dependencies=%v id=%s for Create batch entity: %w", batch.Dependencies, batch.ID, err) } + // Write membership before the batch row (no transaction) so a batch row is + // never visible to ListActive without its membership. ON DUPLICATE KEY UPDATE is + // an idempotent no-op on PK conflict (retry-safe) while still surfacing real + // errors, unlike INSERT IGNORE which would swallow them and let Create proceed + // without a valid membership. A create that fails after this point leaves a + // dangling row, which ListActive skips and the reconcile job reclaims. + if _, err = s.db.ExecContext(ctx, + "INSERT INTO active_batch (queue, batch_id) VALUES (?, ?) ON DUPLICATE KEY UPDATE batch_id = batch_id", + batch.Queue, batch.ID, + ); err != nil { + return fmt.Errorf("failed to insert active_batch membership for batch entity id=%s queue=%s: %w", batch.ID, batch.Queue, err) + } + _, err = s.db.ExecContext(ctx, "INSERT INTO batch (id, queue, contains, dependencies, score, state, version) VALUES (?, ?, ?, ?, ?, ?, ?)", batch.ID, batch.Queue, containsJSON, dependenciesJSON, batch.Score, batch.State, batch.Version, @@ -96,6 +108,10 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { return fmt.Errorf("batch entity id=%s: %w", batch.ID, storage.ErrAlreadyExists) } + // Leave the membership row in place: a returned error doesn't prove the + // batch row was not written (an ambiguous failure can commit it and still + // error), so deleting could permanently hide a live batch from ListActive. + // A dangling row is the safe direction. return fmt.Errorf("failed to insert batch entity id=%s: %w", batch.ID, err) } @@ -174,52 +190,81 @@ func (s *batchStore) UpdateScoreAndState(ctx context.Context, id string, oldVers return nil } -// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states. -func (s *batchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) (ret []entity.Batch, retErr error) { - op := metrics.Begin(s.scope, "get_by_queue_and_states") +// ListActive returns all active (non-terminal) batches in the given queue. +// +// Membership is tracked in active_batch (queue leads the PK), so listing is a +// PK-prefix scan that ports cleanly to a key-value store. Each member is fetched +// by primary key: a terminal batch's membership is best-effort removed (race-free, +// its id is never reused), while a missing batch is skipped but NOT removed (it +// may belong to an in-flight Create that hasn't written its batch row yet). +func (s *batchStore) ListActive(ctx context.Context, queue string) (ret []entity.Batch, retErr error) { + op := metrics.Begin(s.scope, "list_active") defer func() { op.Complete(retErr) }() - if len(states) == 0 { - return nil, nil + // Read all membership rows and release the connection before resolving each + // batch, since Get issues its own query. + ids, err := s.activeBatchIDs(ctx, queue) + if err != nil { + return nil, err } - query := "SELECT id, queue, contains, dependencies, score, state, version FROM batch WHERE queue = ? AND state IN (?" + strings.Repeat(", ?", len(states)-1) + ")" - - args := make([]any, 1+len(states)) - args[0] = queue - for i, state := range states { - args[i+1] = state + var results []entity.Batch + for _, id := range ids { + batch, err := s.Get(ctx, id) + if err != nil { + if storage.IsNotFound(err) { + // Missing batch: either an in-flight Create or a dangling row. We + // can't tell them apart, so skip without deleting. + continue + } + return nil, fmt.Errorf("failed to get active batch id=%q queue=%q: %w", id, queue, err) + } + if batch.State.IsTerminal() { + // Stale membership: the batch has finished. Race-free to remove since + // its id is never reused. + s.removeActive(ctx, queue, id) + continue + } + results = append(results, batch) } - rows, err := s.db.QueryContext(ctx, query, args...) + return results, nil +} + +// activeBatchIDs reads the batch IDs recorded as active for the queue, owning the +// result set's lifecycle so the caller can resolve each batch after it's closed. +func (s *batchStore) activeBatchIDs(ctx context.Context, queue string) ([]string, error) { + rows, err := s.db.QueryContext(ctx, + "SELECT batch_id FROM active_batch WHERE queue = ?", + queue, + ) if err != nil { - return nil, fmt.Errorf("failed to query batches by queue=%q states=%v from the database: %w", queue, states, err) + return nil, fmt.Errorf("failed to query active batch membership for queue=%q: %w", queue, err) } defer rows.Close() - var results []entity.Batch + var ids []string for rows.Next() { - var batch entity.Batch - var containsJSON []byte - var dependenciesJSON []byte - - if err := rows.Scan(&batch.ID, &batch.Queue, &containsJSON, &dependenciesJSON, &batch.Score, &batch.State, &batch.Version); err != nil { - return nil, fmt.Errorf("failed to scan batch entity by queue=%q states=%v from the database: %w", queue, states, err) - } - - if err := json.Unmarshal(containsJSON, &batch.Contains); err != nil { - return nil, fmt.Errorf("failed to unmarshal contains for batch entity id=%s from the database: %w", batch.ID, err) - } - - if err := json.Unmarshal(dependenciesJSON, &batch.Dependencies); err != nil { - return nil, fmt.Errorf("failed to unmarshal dependencies for batch entity id=%s from the database: %w", batch.ID, err) + var id string + if err := rows.Scan(&id); err != nil { + return nil, fmt.Errorf("failed to scan active batch membership for queue=%q: %w", queue, err) } - - results = append(results, batch) + ids = append(ids, id) } if err := rows.Err(); err != nil { - return nil, fmt.Errorf("failed to iterate batches by queue=%q states=%v from the database: %w", queue, states, err) + return nil, fmt.Errorf("failed to iterate active batch membership for queue=%q: %w", queue, err) } + return ids, nil +} - return results, nil +// removeActive best-effort deletes a single active_batch membership row, used by +// ListActive to reclaim terminal batches' memberships. Failures are counted and +// ignored — the row is harmless and the next read retries. +func (s *batchStore) removeActive(ctx context.Context, queue, batchID string) { + if _, err := s.db.ExecContext(ctx, + "DELETE FROM active_batch WHERE queue = ? AND batch_id = ?", + queue, batchID, + ); err != nil { + metrics.NamedCounter(s.scope, "list_active", "self_heal_errors", 1) + } } diff --git a/submitqueue/extension/storage/mysql/schema/README.md b/submitqueue/extension/storage/mysql/schema/README.md index 9e8c36bf..e398d186 100644 --- a/submitqueue/extension/storage/mysql/schema/README.md +++ b/submitqueue/extension/storage/mysql/schema/README.md @@ -2,19 +2,23 @@ ## batch table -### Secondary index: `idx_queue_state (queue, state)` +The `batch` table is reachable only by its primary key (`id`). It carries no secondary index — every access pattern is expressed as a primary-key get or as a key-prefix scan over a companion membership table (see `active_batch` below). This keeps the access patterns portable to a key-value / document store, where a server-maintained secondary index over a mutable, non-key column (such as `state`) is not a primitive every backend offers cheaply. -The `batch` table has a composite secondary index on `(queue, state)`. This index supports the `GetByQueueAndStates` query, which retrieves batches filtered by queue and one or more states. Without this index, the query would require a full table scan. +## active_batch table -#### Trade-offs +`active_batch` is the membership index that answers "which batches in this queue are still active?" — the only queue-scoped query the pipeline needs (the batch controller uses it to find conflict dependencies; the cancel controller uses it to find the batch holding a request). A row is intended to exist per non-terminal batch, so the table stays bounded by the live speculation window rather than full batch history. The correspondence is best-effort, not exact: readers treat membership as a hint and resolve each batch by primary key — see *Maintenance and self-healing* below. -- **Write overhead**: Every `INSERT` and `UPDATE` to the `batch` table must also update the secondary index, adding latency to write operations. -- **Storage cost**: The index consumes additional disk space proportional to the number of rows in the table. -- **Lock contention**: Under high write concurrency, index maintenance can increase lock contention on the affected index pages. +`queue` leads the composite primary key `(queue, batch_id)`, so listing a queue's active batches is a primary-key-prefix scan and the table is shardable by queue. On a key-value store the same shape maps directly onto a partition key (`queue`) and sort key (`batch_id`) with no secondary index. -#### Future: Prune job +### Maintenance and self-healing -As the `batch` table grows, the secondary index will grow with it, increasing storage costs and degrading write performance. To mitigate this, a prune job should be introduced to periodically delete batches in terminal states (`succeeded`, `failed`, `cancelled`) that are older than a configurable retention period. This keeps the table and its indexes bounded in size, ensuring consistent query and write performance over time. +`BatchStore.Create` writes the membership row before the batch row, so a batch row is never visible to `ListActive` without its membership. If the batch insert then fails, `Create` leaves the membership row in place: a returned error doesn't prove the row wasn't written (an ambiguous failure can commit it and still error), so deleting could permanently hide a live batch. A dangling row is the safe direction. + +On read, `ListActive` resolves each member by primary key. A **terminal** batch's membership is best-effort removed (race-free — its id is never reused). A **missing** batch is skipped but not removed, since it may belong to an in-flight `Create` that hasn't written its batch row yet. Cleanup failures are swallowed, so reads never fail on index maintenance and terminal-state writers (merge, speculate, dlq) never touch the index. Because the two writes are independent (no transaction), the design tolerates partial failure via idempotent retries and read-time reconciliation. + +### Future: prune / reconcile job + +Read-time reconciliation only removes terminal memberships, so two kinds of stale row need a periodic sweep: dangling memberships whose batch never landed (a failed or crashed create), and memberships of batches that are stuck in a non-terminal state (e.g. an orphan stuck in `created` after a mid-process failure). A reconcile job should sweep both, keeping the table bounded independently of read traffic. ## change table diff --git a/submitqueue/extension/storage/mysql/schema/active_batch.sql b/submitqueue/extension/storage/mysql/schema/active_batch.sql new file mode 100644 index 00000000..cc6f80ca --- /dev/null +++ b/submitqueue/extension/storage/mysql/schema/active_batch.sql @@ -0,0 +1,12 @@ +-- active_batch is the membership index for "active (non-terminal) batches in a +-- queue", keeping the set bounded by the live speculation window rather than batch +-- history. queue leads the PK so listing is a PK-prefix scan (shardable by queue; +-- portable to a key-value store with queue = partition key, batch_id = sort key). +-- Membership is best-effort: it is added on Create (before the batch row) and +-- removed on read by ListActive once a batch is terminal. A reconcile job reclaims +-- rows left dangling by a failed or crashed create. See schema/README.md. +CREATE TABLE IF NOT EXISTS active_batch ( + queue VARCHAR(255) NOT NULL, + batch_id VARCHAR(255) NOT NULL, + PRIMARY KEY (queue, batch_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/extension/storage/mysql/schema/batch.sql b/submitqueue/extension/storage/mysql/schema/batch.sql index 8e7deda0..079cdecd 100644 --- a/submitqueue/extension/storage/mysql/schema/batch.sql +++ b/submitqueue/extension/storage/mysql/schema/batch.sql @@ -4,8 +4,7 @@ CREATE TABLE IF NOT EXISTS batch ( contains JSON NOT NULL, dependencies JSON NOT NULL, score DOUBLE NOT NULL, - state VARCHAR(255) NOT NUll, + state VARCHAR(255) NOT NULL, version INT NOT NULL, - PRIMARY KEY (id), - INDEX idx_queue_state (queue, state) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/orchestrator/controller/batch/batch.go b/submitqueue/orchestrator/controller/batch/batch.go index 1722e02b..e8028c1c 100644 --- a/submitqueue/orchestrator/controller/batch/batch.go +++ b/submitqueue/orchestrator/controller/batch/batch.go @@ -133,18 +133,22 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r Version: 1, } - // Get active batches for this queue and ask the conflict analyzer which - // of them the new batch must serialize behind. The dependency set drives - // the speculation graph downstream. - activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ - entity.BatchStateCreated, - entity.BatchStateSpeculating, - entity.BatchStateMerging, - }) + // Ask the conflict analyzer which active batches the new batch must serialize + // behind. ListActive returns all non-terminal batches; we narrow to + // Created/Speculating/Merging so the analyzer only sees batches that can still + // acquire new conflicts. + allActive, err := c.store.GetBatchStore().ListActive(ctx, request.Queue) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1) return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) } + activeBatches := make([]entity.Batch, 0, len(allActive)) + for _, b := range allActive { + switch b.State { + case entity.BatchStateCreated, entity.BatchStateSpeculating, entity.BatchStateMerging: + activeBatches = append(activeBatches, b) + } + } // Dedupe by batch ID since a single (analyzed, in-flight) pair may be // reported with multiple Conflict entries when different conflict types diff --git a/submitqueue/orchestrator/controller/batch/batch_test.go b/submitqueue/orchestrator/controller/batch/batch_test.go index 6e5c07a6..98a9b082 100644 --- a/submitqueue/orchestrator/controller/batch/batch_test.go +++ b/submitqueue/orchestrator/controller/batch/batch_test.go @@ -78,7 +78,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M if mockStorage == nil { mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockBatchStore.EXPECT().ListActive(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockReqStore := storagemock.NewMockRequestStore(ctrl) @@ -209,14 +209,21 @@ func TestController_Process_WithDependencies(t *testing.T) { Version: 1, } - // Set up storage with active batches to become dependencies. + // Set up storage with active batches to become dependencies. ListActive + // returns every non-terminal batch; the controller filters to + // Created/Speculating/Merging in memory. The Scored and Cancelling batches + // below must be excluded — note no BatchDependent expectations are registered + // for them, so the default all.New() analyzer would fail the test on an + // unexpected mock call if the filter let them through. activeBatches := []entity.Batch{ {ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateCreated, Version: 1}, {ID: "test-queue/batch/2", Queue: "test-queue", State: entity.BatchStateSpeculating, Version: 2}, + {ID: "test-queue/batch/3", Queue: "test-queue", State: entity.BatchStateScored, Version: 1}, + {ID: "test-queue/batch/4", Queue: "test-queue", State: entity.BatchStateCancelling, Version: 1}, } mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(activeBatches, nil) mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -268,7 +275,7 @@ func TestController_Process_AnalyzerSelectsSubset(t *testing.T) { } mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(activeBatches, nil) mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -314,7 +321,7 @@ func TestController_Process_AnalyzerFailure(t *testing.T) { request := testRequest() mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) @@ -410,7 +417,7 @@ func TestController_Process_CASLostToCancel(t *testing.T) { request := testRequest() mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) // Create must NOT be called — gomock fails if it is. mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -463,7 +470,7 @@ func TestController_Process_CASUnexpectedErrorPropagates(t *testing.T) { request := testRequest() mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) // Create must NOT be called — gomock fails if it is. mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -509,7 +516,7 @@ func TestController_Process_RecoveryAfterPriorCAS(t *testing.T) { request.Version = 2 // prior attempt bumped from 1 → 2 mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) diff --git a/submitqueue/orchestrator/controller/cancel/cancel.go b/submitqueue/orchestrator/controller/cancel/cancel.go index 6f10bcb4..94139f72 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel.go +++ b/submitqueue/orchestrator/controller/cancel/cancel.go @@ -183,21 +183,11 @@ func (c *Controller) markCancelling(ctx context.Context, request entity.Request) // findActiveBatch scans all active batches in the request's queue for one whose // Contains list includes the request. Returns (batch, true, nil) on a hit, // (zero, false, nil) when the request is not yet batched, and any storage -// error otherwise. -// -// BatchStateCancelling is included in the active-state list so an idempotent -// redelivery of the cancel message (the prior pass wrote the intent but the -// speculate hand-off publish failed) still resolves the batch and re-attempts -// the publish. +// error otherwise. ListActive includes Cancelling batches, so redelivery of a +// cancel whose speculate hand-off publish failed still resolves and retries. func (c *Controller) findActiveBatch(ctx context.Context, request entity.Request) (entity.Batch, bool, error) { // TODO: Scans all the batches in flight - make it more efficient? - active, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ - entity.BatchStateCreated, - entity.BatchStateScored, - entity.BatchStateSpeculating, - entity.BatchStateMerging, - entity.BatchStateCancelling, - }) + active, err := c.store.GetBatchStore().ListActive(ctx, request.Queue) if err != nil { c.metricsScope.Counter("batch_store_errors").Inc(1) return entity.Batch{}, false, fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) diff --git a/submitqueue/orchestrator/controller/cancel/cancel_test.go b/submitqueue/orchestrator/controller/cancel/cancel_test.go index 8fa72e86..79d66e0e 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel_test.go +++ b/submitqueue/orchestrator/controller/cancel/cancel_test.go @@ -143,7 +143,7 @@ func TestProcess_CancelsUnbatchedRequest(t *testing.T) { ) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return(nil, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return(nil, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() @@ -174,7 +174,7 @@ func TestProcess_AlreadyCancelling_SkipsMarkCancelling(t *testing.T) { reqStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(3), int32(4), entity.RequestStateCancelled).Return(nil) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return(nil, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return(nil, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() @@ -227,7 +227,7 @@ func TestProcess_UnbatchedVersionMismatch_Retryable(t *testing.T) { ) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return(nil, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return(nil, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() @@ -275,7 +275,7 @@ func TestProcess_BatchPath_HandsOffToSpeculate(t *testing.T) { reqStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(2), int32(3), entity.RequestStateCancelling).Return(nil) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return([]entity.Batch{batch}, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return([]entity.Batch{batch}, nil) // Single batch CAS: intent only. No terminal CAS. batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(3), int32(4), entity.BatchStateCancelling).Return(nil) @@ -324,7 +324,7 @@ func TestProcess_BatchAlreadyCancelling_RepublishesToSpeculate(t *testing.T) { // No request UpdateState — already in Cancelling. batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return([]entity.Batch{batch}, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return([]entity.Batch{batch}, nil) // No batch UpdateState — already in Cancelling. store := storagemock.NewMockStorage(ctrl) @@ -355,7 +355,7 @@ func TestProcess_BatchIntentVersionMismatch_Retryable(t *testing.T) { reqStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(2), int32(3), entity.RequestStateCancelling).Return(nil) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return([]entity.Batch{batch}, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return([]entity.Batch{batch}, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelling). Return(storage.ErrVersionMismatch) diff --git a/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel b/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel index fd6270de..7733d1f9 100644 --- a/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel +++ b/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel @@ -12,6 +12,8 @@ go_test( "integration", ], deps = [ + "//submitqueue/entity", + "//submitqueue/extension/storage", "//submitqueue/extension/storage/mysql", "//test/integration/submitqueue/extension/storage", "//test/testutil", diff --git a/test/integration/submitqueue/extension/storage/mysql/storage_test.go b/test/integration/submitqueue/extension/storage/mysql/storage_test.go index 03e3ee4a..69ff0dcd 100644 --- a/test/integration/submitqueue/extension/storage/mysql/storage_test.go +++ b/test/integration/submitqueue/extension/storage/mysql/storage_test.go @@ -23,6 +23,8 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" storagesuite "github.com/uber/submitqueue/test/integration/submitqueue/extension/storage" "github.com/uber/submitqueue/test/testutil" @@ -96,3 +98,103 @@ func (s *MySQLStorageIntegrationSuite) TearDownSuite() { s.log.Logf("Tearing down MySQL Storage integration test suite") // Cleanup handled automatically by testutil.ComposeStack } + +// countActiveBatchRows returns the number of active_batch membership rows for the +// given (queue, batch_id) — index state internal to the MySQL impl, not visible +// through the storage.Storage contract. +func (s *MySQLStorageIntegrationSuite) countActiveBatchRows(queue, batchID string) int { + t := s.T() + var n int + err := s.db.QueryRowContext(context.Background(), + "SELECT COUNT(*) FROM active_batch WHERE queue = ? AND batch_id = ?", + queue, batchID, + ).Scan(&n) + require.NoError(t, err) + return n +} + +// TestActiveBatch_SelfHealsTerminalMembership verifies that ListActive deletes the +// membership row of a batch that has reached a terminal state. +func (s *MySQLStorageIntegrationSuite) TestActiveBatch_SelfHealsTerminalMembership() { + t := s.T() + ctx := context.Background() + const queue = "bq-selfheal-terminal" + const id = queue + "/batch/1" + + store := s.GetStorage().GetBatchStore() + require.NoError(t, store.Create(ctx, entity.Batch{ID: id, Queue: queue, Contains: []string{id + "/req"}, Dependencies: []string{}, State: entity.BatchStateCreated, Version: 1})) + require.Equal(t, 1, s.countActiveBatchRows(queue, id), "Create should record active membership") + + require.NoError(t, store.UpdateState(ctx, id, 1, 2, entity.BatchStateSucceeded)) + + active, err := store.ListActive(ctx, queue) + require.NoError(t, err) + require.Empty(t, active, "terminal batch should not be listed as active") + require.Equal(t, 0, s.countActiveBatchRows(queue, id), "ListActive should self-heal the stale membership row") +} + +// TestActiveBatch_SkipsDanglingMembershipWithoutDeleting verifies that ListActive +// skips a membership row whose batch does not exist but does NOT delete it: it may +// belong to an in-flight Create that hasn't written its batch row yet. +func (s *MySQLStorageIntegrationSuite) TestActiveBatch_SkipsDanglingMembershipWithoutDeleting() { + t := s.T() + ctx := context.Background() + const queue = "bq-dangling-skip" + const id = queue + "/batch/ghost" + + _, err := s.db.ExecContext(ctx, "INSERT INTO active_batch (queue, batch_id) VALUES (?, ?)", queue, id) + require.NoError(t, err) + + active, err := s.GetStorage().GetBatchStore().ListActive(ctx, queue) + require.NoError(t, err) + require.Empty(t, active, "dangling membership should not surface a batch") + require.Equal(t, 1, s.countActiveBatchRows(queue, id), "ListActive must NOT delete a missing-batch membership (it may belong to an in-flight create)") +} + +// TestActiveBatch_CreateKeepsMembershipOnDuplicate verifies that a duplicate Create +// (ErrAlreadyExists) does NOT delete the membership row, which belongs to the live +// existing batch. +func (s *MySQLStorageIntegrationSuite) TestActiveBatch_CreateKeepsMembershipOnDuplicate() { + t := s.T() + ctx := context.Background() + const queue = "bq-create-dup" + const id = queue + "/batch/1" + store := s.GetStorage().GetBatchStore() + + b := entity.Batch{ID: id, Queue: queue, Contains: []string{id + "/req"}, Dependencies: []string{}, State: entity.BatchStateCreated, Version: 1} + require.NoError(t, store.Create(ctx, b)) + require.Equal(t, 1, s.countActiveBatchRows(queue, id)) + + require.ErrorIs(t, store.Create(ctx, b), storage.ErrAlreadyExists) + require.Equal(t, 1, s.countActiveBatchRows(queue, id), "duplicate Create must NOT delete the existing batch's membership") + + active, err := store.ListActive(ctx, queue) + require.NoError(t, err) + require.Len(t, active, 1, "the original batch must remain active") +} + +// TestActiveBatch_CreateKeepsMembershipOnFailedInsert verifies that a non-duplicate +// batch-insert failure leaves the membership row in place rather than deleting it +// (the batch row may have committed despite the error). The failure is induced by +// dropping the batch table (restored afterwards). +func (s *MySQLStorageIntegrationSuite) TestActiveBatch_CreateKeepsMembershipOnFailedInsert() { + t := s.T() + ctx := context.Background() + const queue = "bq-create-fail" + const id = queue + "/batch/1" + + var tbl, ddl string + require.NoError(t, s.db.QueryRowContext(ctx, "SHOW CREATE TABLE batch").Scan(&tbl, &ddl)) + _, err := s.db.ExecContext(ctx, "DROP TABLE batch") + require.NoError(t, err) + defer func() { + _, derr := s.db.ExecContext(context.Background(), ddl) + require.NoError(t, derr, "must restore the batch table for subsequent tests") + }() + + err = s.GetStorage().GetBatchStore().Create(ctx, entity.Batch{ID: id, Queue: queue, Contains: []string{id + "/req"}, Dependencies: []string{}, State: entity.BatchStateCreated, Version: 1}) + require.Error(t, err, "Create should fail when the batch table is missing") + require.NotErrorIs(t, err, storage.ErrAlreadyExists, "the failure must be a non-duplicate error") + + require.Equal(t, 1, s.countActiveBatchRows(queue, id), "Create must NOT delete the membership on a failed insert: the batch row may have committed despite the error") +} diff --git a/test/integration/submitqueue/extension/storage/suite.go b/test/integration/submitqueue/extension/storage/suite.go index 4b092e8b..54e5bbc3 100644 --- a/test/integration/submitqueue/extension/storage/suite.go +++ b/test/integration/submitqueue/extension/storage/suite.go @@ -46,6 +46,12 @@ func (s *StorageContractSuite) SetStorage(store storage.Storage) { s.storage = store } +// GetStorage returns the storage instance under test, for implementation-specific +// suites that need to assert backend-internal behavior alongside the contract tests. +func (s *StorageContractSuite) GetStorage() storage.Storage { + return s.storage +} + // SetLogger sets the logger for tests func (s *StorageContractSuite) SetLogger(log *testutil.TestLogger) { s.log = log @@ -379,3 +385,93 @@ func (s *StorageContractSuite) TestStorage_ChangeCreate_EmptyDetails() { require.Len(t, got, 1) assert.Equal(t, entity.ChangeDetails{}, got[0].Details) } + +// newBatch builds a batch fixture for the active-listing tests. +func newBatch(id, queue string, state entity.BatchState) entity.Batch { + return entity.Batch{ + ID: id, + Queue: queue, + Contains: []string{id + "/req"}, + Dependencies: []string{}, + State: state, + Version: 1, + } +} + +// activeBatchIDs returns the sorted IDs of the queue's active batches, for stable comparison. +func (s *StorageContractSuite) activeBatchIDs(queue string) []string { + t := s.T() + batches, err := s.storage.GetBatchStore().ListActive(s.ctx, queue) + require.NoError(t, err) + ids := make([]string, 0, len(batches)) + for _, b := range batches { + ids = append(ids, b.ID) + } + sort.Strings(ids) + return ids +} + +// TestStorage_BatchListActive_ReturnsActive verifies a freshly created batch is +// listed as active, and that ListActive resolves the full entity. +func (s *StorageContractSuite) TestStorage_BatchListActive_ReturnsActive() { + t := s.T() + ctx := s.ctx + const queue = "bq-active" + + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/1", queue, entity.BatchStateCreated))) + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/2", queue, entity.BatchStateSpeculating))) + + batches, err := s.storage.GetBatchStore().ListActive(ctx, queue) + require.NoError(t, err) + require.Len(t, batches, 2) + assert.Equal(t, []string{queue + "/batch/1", queue + "/batch/2"}, s.activeBatchIDs(queue)) +} + +// TestStorage_BatchListActive_ExcludesTerminal verifies that a batch transitioned +// to a terminal state via UpdateState drops out of the active listing. +func (s *StorageContractSuite) TestStorage_BatchListActive_ExcludesTerminal() { + t := s.T() + ctx := s.ctx + const queue = "bq-terminal" + + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/1", queue, entity.BatchStateMerging))) + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/2", queue, entity.BatchStateCreated))) + + // batch/1 lands; it must no longer be active. + require.NoError(t, s.storage.GetBatchStore().UpdateState(ctx, queue+"/batch/1", 1, 2, entity.BatchStateSucceeded)) + + assert.Equal(t, []string{queue + "/batch/2"}, s.activeBatchIDs(queue)) +} + +// TestStorage_BatchListActive_ExcludesTerminalViaScoreAndState covers the other +// terminal write path (UpdateScoreAndState) used by the score/speculate pipeline. +func (s *StorageContractSuite) TestStorage_BatchListActive_ExcludesTerminalViaScoreAndState() { + t := s.T() + ctx := s.ctx + const queue = "bq-terminal-score" + + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/1", queue, entity.BatchStateCreated))) + require.NoError(t, s.storage.GetBatchStore().UpdateScoreAndState(ctx, queue+"/batch/1", 1, 2, 0.5, entity.BatchStateFailed)) + + assert.Empty(t, s.activeBatchIDs(queue)) +} + +// TestStorage_BatchListActive_QueueScoped verifies the listing is scoped to one queue. +func (s *StorageContractSuite) TestStorage_BatchListActive_QueueScoped() { + t := s.T() + ctx := s.ctx + const queueA = "bq-scoped-a" + const queueB = "bq-scoped-b" + + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queueA+"/batch/1", queueA, entity.BatchStateCreated))) + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queueB+"/batch/1", queueB, entity.BatchStateCreated))) + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queueB+"/batch/2", queueB, entity.BatchStateCreated))) + + assert.Equal(t, []string{queueA + "/batch/1"}, s.activeBatchIDs(queueA)) + assert.Equal(t, []string{queueB + "/batch/1", queueB + "/batch/2"}, s.activeBatchIDs(queueB)) +} + +// TestStorage_BatchListActive_UnknownQueue returns an empty set for a queue with no batches. +func (s *StorageContractSuite) TestStorage_BatchListActive_UnknownQueue() { + assert.Empty(s.T(), s.activeBatchIDs("bq-does-not-exist")) +}