diff --git a/submitqueue/extension/storage/build_store.go b/submitqueue/extension/storage/build_store.go index a2f89bc4..429c3edd 100644 --- a/submitqueue/extension/storage/build_store.go +++ b/submitqueue/extension/storage/build_store.go @@ -27,8 +27,14 @@ type BuildStore interface { // Get retrieves a build by ID. Returns ErrNotFound if the build is not found. Get(ctx context.Context, id string) (entity.Build, error) - // Create creates a new build. The build must have a unique ID already assigned. - // Returns ErrAlreadyExists if a build with the same ID already exists. + // GetByBatchID retrieves the single build scheduled for the given batch + // (build.ID is the runner-assigned ID, so callers that only hold a batch ID + // cannot use Get). + // Returns ErrNotFound if no build exists for the batch. + GetByBatchID(ctx context.Context, batchID string) (entity.Build, error) + + // Create creates a new build. The build must have a unique ID and batch ID. + // Returns ErrAlreadyExists if either uniqueness constraint is violated. Create(ctx context.Context, build entity.Build) error // UpdateStatus updates the status of a build. diff --git a/submitqueue/extension/storage/mock/build_store_mock.go b/submitqueue/extension/storage/mock/build_store_mock.go index 675a2f58..2c4bf171 100644 --- a/submitqueue/extension/storage/mock/build_store_mock.go +++ b/submitqueue/extension/storage/mock/build_store_mock.go @@ -70,6 +70,21 @@ func (mr *MockBuildStoreMockRecorder) Get(ctx, id any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockBuildStore)(nil).Get), ctx, id) } +// GetByBatchID mocks base method. +func (m *MockBuildStore) GetByBatchID(ctx context.Context, batchID string) (entity.Build, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetByBatchID", ctx, batchID) + ret0, _ := ret[0].(entity.Build) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetByBatchID indicates an expected call of GetByBatchID. +func (mr *MockBuildStoreMockRecorder) GetByBatchID(ctx, batchID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetByBatchID", reflect.TypeOf((*MockBuildStore)(nil).GetByBatchID), ctx, batchID) +} + // UpdateStatus mocks base method. func (m *MockBuildStore) UpdateStatus(ctx context.Context, id string, newStatus entity.BuildStatus) error { m.ctrl.T.Helper() diff --git a/submitqueue/extension/storage/mysql/build_store.go b/submitqueue/extension/storage/mysql/build_store.go index 92233e6b..1826db98 100644 --- a/submitqueue/extension/storage/mysql/build_store.go +++ b/submitqueue/extension/storage/mysql/build_store.go @@ -44,29 +44,47 @@ func (s *buildStore) Get(ctx context.Context, id string) (ret entity.Build, retE op := metrics.Begin(s.scope, "get") defer func() { op.Complete(retErr) }() + return s.scanBuild(ctx, + "SELECT id, batch_id, speculation_path, score, status FROM build WHERE id = ?", + "id="+id, id) +} + +// GetByBatchID retrieves the single build scheduled for the given batch. +// Returns ErrNotFound if no build exists for the batch. +func (s *buildStore) GetByBatchID(ctx context.Context, batchID string) (ret entity.Build, retErr error) { + op := metrics.Begin(s.scope, "get_by_batch_id") + defer func() { op.Complete(retErr) }() + + return s.scanBuild(ctx, + "SELECT id, batch_id, speculation_path, score, status FROM build WHERE batch_id = ?", + "batch_id="+batchID, batchID) +} + +// scanBuild runs a single-row build query and decodes the result, including the +// speculation_path JSON column. label is used only for error context (e.g. +// "id=…" or "batch_id=…"). Returns ErrNotFound when the query matches no row. +func (s *buildStore) scanBuild(ctx context.Context, query, label string, args ...any) (entity.Build, error) { var build entity.Build var speculationPathJSON []byte - err := s.db.QueryRowContext(ctx, - "SELECT id, batch_id, speculation_path, score, status FROM build WHERE id = ?", - id, - ).Scan(&build.ID, &build.BatchID, &speculationPathJSON, &build.Score, &build.Status) - + err := s.db.QueryRowContext(ctx, query, args...). + Scan(&build.ID, &build.BatchID, &speculationPathJSON, &build.Score, &build.Status) if errors.Is(err, sql.ErrNoRows) { return entity.Build{}, storage.WrapNotFound(err) } if err != nil { - return entity.Build{}, fmt.Errorf("failed to get build entity id=%s from the database: %w", id, err) + return entity.Build{}, fmt.Errorf("failed to get build entity %s from the database: %w", label, err) } if err := json.Unmarshal(speculationPathJSON, &build.SpeculationPath); err != nil { - return entity.Build{}, fmt.Errorf("failed to unmarshal speculation_path for build entity id=%s from the database: %w", id, err) + return entity.Build{}, fmt.Errorf("failed to unmarshal speculation_path for build entity %s from the database: %w", label, err) } return build, nil } -// Create creates a new build. The build must have a unique ID already assigned. Returns ErrAlreadyExists if the build ID already exists. +// Create creates a new build. The build must have a unique ID and batch ID. +// Returns ErrAlreadyExists if either uniqueness constraint is violated. func (s *buildStore) Create(ctx context.Context, build entity.Build) (retErr error) { op := metrics.Begin(s.scope, "create") defer func() { op.Complete(retErr) }() diff --git a/submitqueue/extension/storage/mysql/schema/build.sql b/submitqueue/extension/storage/mysql/schema/build.sql index 93bc5744..d186022c 100644 --- a/submitqueue/extension/storage/mysql/schema/build.sql +++ b/submitqueue/extension/storage/mysql/schema/build.sql @@ -4,5 +4,6 @@ CREATE TABLE IF NOT EXISTS build ( speculation_path JSON NOT NULL, score FLOAT NOT NULL, status VARCHAR(64) NOT NULL, - PRIMARY KEY (id) + PRIMARY KEY (id), + UNIQUE KEY uniq_batch_id (batch_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/orchestrator/controller/build/build.go b/submitqueue/orchestrator/controller/build/build.go index 6a9ac930..1628f479 100644 --- a/submitqueue/orchestrator/controller/build/build.go +++ b/submitqueue/orchestrator/controller/build/build.go @@ -114,6 +114,17 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil } + existing, err := c.store.GetBuildStore().GetByBatchID(ctx, batch.ID) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get existing build for batch %s: %w", batch.ID, err) + } + } else { + metrics.NamedCounter(c.metricsScope, opName, "build_already_exists", 1) + return c.publishExisting(ctx, existing) + } + // Assemble base (dependency batches in order) and head (this batch). base, err := c.collectChanges(ctx, batch.Dependencies) if err != nil { @@ -148,11 +159,20 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } // Persist the initial Build snapshot so the buildsignal poll loop has a - // row to UpdateStatus against. ErrAlreadyExists is benign — a redelivery - // of this message after a previous successful Create. - if err := c.store.GetBuildStore().Create(ctx, build); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { - metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) - return fmt.Errorf("failed to persist build %s: %w", build.ID, err) + // row to UpdateStatus against. ErrAlreadyExists means either this exact + // build or another build for the same batch already won a redelivery race; + // publish the stored row so the poll loop follows the source of truth. + if err := c.store.GetBuildStore().Create(ctx, build); err != nil { + if !errors.Is(err, storage.ErrAlreadyExists) { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to persist build %s: %w", build.ID, err) + } + build, err = c.store.GetBuildStore().GetByBatchID(ctx, batch.ID) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get existing build for batch %s after duplicate create: %w", batch.ID, err) + } + metrics.NamedCounter(c.metricsScope, opName, "build_already_exists", 1) } // Hand off to the buildsignal poll loop; it calls Status, updates the @@ -173,6 +193,20 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil // Success - message will be acked } +func (c *Controller) publishExisting(ctx context.Context, build entity.Build) error { + if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil { + metrics.NamedCounter(c.metricsScope, "process", "publish_errors", 1) + return fmt.Errorf("failed to publish existing build to buildsignal: %w", err) + } + c.logger.Infow("published existing build to buildsignal", + "batch_id", build.BatchID, + "build_id", build.ID, + "status", string(build.Status), + "topic_key", consumer.TopicKeyBuildSignal, + ) + return nil +} + // collectChanges loads each batch by ID and concatenates the Change values // from its contained requests in batch order. Used to build the base // (dependency batches) and head (this batch) inputs to BuildRunner.Trigger. diff --git a/submitqueue/orchestrator/controller/build/build_test.go b/submitqueue/orchestrator/controller/build/build_test.go index 212c21b7..bc8cd204 100644 --- a/submitqueue/orchestrator/controller/build/build_test.go +++ b/submitqueue/orchestrator/controller/build/build_test.go @@ -64,6 +64,7 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.Mo mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound).AnyTimes() mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() store := storagemock.NewMockStorage(ctrl) @@ -169,6 +170,7 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { var created entity.Build mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), headBatch.ID).Return(entity.Build{}, storage.ErrNotFound) mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, b entity.Build) error { created = b @@ -227,19 +229,22 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { assert.Equal(t, published.ID, created.ID) } -// TestController_Process_BuildStoreAlreadyExistsIsSwallowed covers the -// redelivery case: Create returns ErrAlreadyExists, the controller proceeds -// to publish to buildsignal anyway. The polling loop will pick up the -// existing row via UpdateStatus. -func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { +// TestController_Process_BuildStoreAlreadyExistsPublishesStoredBuild covers the +// race where another redelivery creates the batch's build after our initial +// GetByBatchID miss. The controller must publish the stored row's build ID, +// not the freshly-triggered duplicate runner ID. +func TestController_Process_BuildStoreAlreadyExistsPublishesStoredBuild(t *testing.T) { ctrl := gomock.NewController(t) batch := testBatch() + existing := entity.Build{ID: "build-existing", BatchID: batch.ID, Status: entity.BuildStatusAccepted} mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(storage.ErrAlreadyExists) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(existing, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() @@ -249,11 +254,13 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { br := buildrunnermock.NewMockBuildRunner(ctrl) br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(entity.BuildID{ID: "build-dup"}, nil) - publishCalled := false + var published entity.BuildID mockPub := queuemock.NewMockPublisher(ctrl) mockPub.EXPECT().Publish(gomock.Any(), "buildsignal", gomock.Any()).DoAndReturn( - func(_ context.Context, _ string, _ entityqueue.Message) error { - publishCalled = true + func(_ context.Context, _ string, msg entityqueue.Message) error { + var err error + published, err = entity.BuildIDFromBytes(msg.Payload) + require.NoError(t, err) return nil }, ).Times(1) @@ -271,7 +278,53 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { delivery.EXPECT().Attempt().Return(1).AnyTimes() require.NoError(t, controller.Process(context.Background(), delivery)) - assert.True(t, publishCalled, "publish to buildsignal must run even when Create reports ErrAlreadyExists") + assert.Equal(t, existing.ID, published.ID) +} + +func TestController_Process_ExistingBuildSkipsTriggerAndCreate(t *testing.T) { + ctrl := gomock.NewController(t) + + batch := testBatch() + existing := entity.Build{ID: "build-existing", BatchID: batch.ID, Status: entity.BuildStatusAccepted} + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(existing, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() + // RequestStore is not touched because existing builds skip change assembly. + + br := buildrunnermock.NewMockBuildRunner(ctrl) + // No Trigger expectation: redelivery must not launch another external build. + + var published entity.BuildID + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), "buildsignal", gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, msg entityqueue.Message) error { + var err error + published, err = entity.BuildIDFromBytes(msg.Payload) + require.NoError(t, err) + return nil + }, + ).Times(1) + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, + ) + require.NoError(t, err) + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + + msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.NoError(t, controller.Process(context.Background(), delivery)) + assert.Equal(t, existing.ID, published.ID) } // TestController_Process_TriggerFailure verifies a build-runner failure is @@ -285,7 +338,10 @@ func TestController_Process_TriggerFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(storagemock.NewMockRequestStore(ctrl)).AnyTimes() - // No build store expectation: Trigger failure must short-circuit before Create. + mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() + // No Create expectation: Trigger failure must short-circuit before persistence. br := buildrunnermock.NewMockBuildRunner(ctrl) br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). diff --git a/submitqueue/orchestrator/controller/speculate/speculate.go b/submitqueue/orchestrator/controller/speculate/speculate.go index 05471979..b6f9e566 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate.go +++ b/submitqueue/orchestrator/controller/speculate/speculate.go @@ -115,12 +115,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Terminal state: re-fan-out for self-healing in case a previous publish // was lost. Always re-publish to conclude (idempotent on the batch ID). - // For Cancelled specifically also re-publish to dependents — a crash - // between the terminal CAS and the dependent publish would otherwise - // leave them stuck waiting on a Cancelled dep. + // For terminal states that unblock downstream dependencies by being observed + // as terminal non-success, also re-publish to dependents. A crash between + // the terminal CAS and the dependent publish would otherwise leave them + // stuck waiting. if batch.State.IsTerminal() { metrics.NamedCounter(c.metricsScope, opName, "self_heal_terminal", 1) - if batch.State == entity.BatchStateCancelled { + if batch.State == entity.BatchStateCancelled || batch.State == entity.BatchStateFailed { if err := c.respeculateDependents(ctx, batch); err != nil { return err } @@ -170,8 +171,11 @@ func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) e return nil } -// tryFinalize publishes to merge and transitions to Merging iff every -// dependency batch has reached Succeeded. Cancelled deps are treated as +// tryFinalize publishes to merge and transitions to Merging iff this batch's +// own build has Succeeded AND every dependency batch has reached Succeeded. +// The own-build gate comes first: a Failed build fails the batch, and a build +// still in flight (or not yet persisted) parks the batch until the next +// buildsignal re-triggers speculate. Cancelled deps are treated as // out-of-the-way: the cancelled batch will never land, so it can no longer // conflict — drop it from the chain and proceed. Failed deps still cascade // via failOnDependency. If some deps are still in flight, the call is a @@ -182,6 +186,37 @@ func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) e // from the chain and re-issue speculation for the surviving ordering(s) // — instead of cascading the failure into requests that could still land. func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error { + // Gate on the batch's own build before considering dependencies + build, err := c.store.GetBuildStore().GetByBatchID(ctx, batch.ID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + // The build controller has not persisted the Build yet (race with + // startSpeculation's publish to build). Wait for the next event. + metrics.NamedCounter(c.metricsScope, opName, "waiting_on_build", 1) + return nil + } + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get build for batch %s: %w", batch.ID, err) + } + + switch { + case build.Status == entity.BuildStatusSucceeded: + // Own build passed; fall through to dependency evaluation. + case build.Status.IsTerminal(): + return c.failOnBuild(ctx, batch, build) + default: + // Accepted, Running, or any other non-terminal status: the build is + // still in flight. buildsignal re-triggers speculate on every poll, so + // simply wait. + metrics.NamedCounter(c.metricsScope, opName, "waiting_on_build", 1) + c.logger.Debugw("own build not yet succeeded; waiting", + "batch_id", batch.ID, + "build_id", build.ID, + "build_status", string(build.Status), + ) + return nil + } + deps, err := c.fetchDependencies(ctx, batch) if err != nil { return err @@ -230,11 +265,21 @@ func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error return nil } +// failOnBuild transitions a Speculating batch to Failed when its own build has +// reached a non-succeeding terminal status, then reconciles via failBatch. +func (c *Controller) failOnBuild(ctx context.Context, batch entity.Batch, build entity.Build) error { + metrics.NamedCounter(c.metricsScope, opName, "build_failed", 1) + c.logger.Warnw("own build in non-succeeding terminal state; failing batch", + "batch_id", batch.ID, + "build_id", build.ID, + "build_status", string(build.Status), + ) + return c.failBatch(ctx, batch) +} + // failOnDependency transitions a Speculating batch to Failed when one of its -// dependencies has reached a non-succeeding terminal state, then publishes to -// the conclude queue so the request store and request log get reconciled. -// Without this transition the batch would sit in Speculating forever — no -// downstream event ever fires for it again. +// dependencies has reached a non-succeeding terminal state, then reconciles +// via failBatch. func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, dep entity.Batch) error { metrics.NamedCounter(c.metricsScope, opName, "dependency_failed", 1) c.logger.Warnw("dependency in non-succeeding terminal state; failing batch", @@ -242,12 +287,26 @@ func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, d "dependency_id", dep.ID, "dependency_state", string(dep.State), ) + return c.failBatch(ctx, batch) +} +// failBatch CASes a Speculating batch to Failed, wakes downstream dependents so +// they can observe the terminal dependency, and publishes to the conclude queue +// so the request store and request log get reconciled. Without this transition +// the batch would sit in Speculating forever — no downstream event ever fires +// for it again. +func (c *Controller) failBatch(ctx context.Context, batch entity.Batch) error { newVersion := batch.Version + 1 if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateFailed); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update batch %s state to failed: %w", batch.ID, err) } + batch.Version = newVersion + batch.State = entity.BatchStateFailed + + if err := c.respeculateDependents(ctx, batch); err != nil { + return err + } if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) @@ -326,17 +385,17 @@ func (c *Controller) cancelBatch(ctx context.Context, batch entity.Batch) error } // cancelBuild flips any in-flight Build entity for the batch to -// BuildStatusCancelled. Builds use build.ID == batch.ID, so a single Get -// covers every build scheduled for the batch. Tolerates ErrNotFound (no -// build was ever scheduled — the batch was cancelled before speculation -// started building) and skips already-terminal builds. +// BuildStatusCancelled. It looks the build up by batch ID (build.ID is the +// runner-assigned ID, not the batch ID). Tolerates ErrNotFound (no build was +// ever scheduled — the batch was cancelled before speculation started building) +// and skips already-terminal builds. // // This is the hook point for a future external CI integration: today the // system has no external runner, so the local state flip is the complete // cancellation. Once a runner exists, it must be invoked here before the // local UpdateStatus. func (c *Controller) cancelBuild(ctx context.Context, batch entity.Batch) error { - build, err := c.store.GetBuildStore().Get(ctx, batch.ID) + build, err := c.store.GetBuildStore().GetByBatchID(ctx, batch.ID) if err != nil { if errors.Is(err, storage.ErrNotFound) { metrics.NamedCounter(c.metricsScope, opName, "cancel_build_not_found", 1) @@ -351,9 +410,9 @@ func (c *Controller) cancelBuild(ctx context.Context, batch entity.Batch) error return nil } - if err := c.store.GetBuildStore().UpdateStatus(ctx, batch.ID, entity.BuildStatusCancelled); err != nil { + if err := c.store.GetBuildStore().UpdateStatus(ctx, build.ID, entity.BuildStatusCancelled); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) - return fmt.Errorf("failed to cancel build for batch %s: %w", batch.ID, err) + return fmt.Errorf("failed to cancel build %s for batch %s: %w", build.ID, batch.ID, err) } metrics.NamedCounter(c.metricsScope, opName, "cancel_build_done", 1) return nil diff --git a/submitqueue/orchestrator/controller/speculate/speculate_test.go b/submitqueue/orchestrator/controller/speculate/speculate_test.go index 2342bd17..39d501e8 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate_test.go +++ b/submitqueue/orchestrator/controller/speculate/speculate_test.go @@ -51,6 +51,26 @@ func testBatch(state entity.BatchState, deps ...string) entity.Batch { } } +// expectOwnBuildSucceeded returns a BuildStore mock whose GetByBatchID reports +// the batch's own build as Succeeded, satisfying the own-build gate in +// tryFinalize so the Speculating-path tests reach dependency evaluation. +func expectOwnBuildSucceeded(ctrl *gomock.Controller, batchID string) *storagemock.MockBuildStore { + bs := storagemock.NewMockBuildStore(ctrl) + bs.EXPECT().GetByBatchID(gomock.Any(), batchID).Return( + entity.Build{ID: "fake-build-" + batchID, BatchID: batchID, Status: entity.BuildStatusSucceeded}, nil) + return bs +} + +func expectBatchDependents(ctrl *gomock.Controller, batchID string, dependents []string) *storagemock.MockBatchDependentStore { + ds := storagemock.NewMockBatchDependentStore(ctrl) + ds.EXPECT().Get(gomock.Any(), batchID).Return(entity.BatchDependent{ + BatchID: batchID, + Dependents: dependents, + Version: 1, + }, nil) + return ds +} + // newTestController wires a controller with a registry covering all topics the // speculate controller may publish to. The publisher returns publishErr (or nil). func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { @@ -72,6 +92,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock {Key: consumer.TopicKeyBuild, Name: "build", Queue: mockQ}, {Key: consumer.TopicKeyMerge, Name: "merge", Queue: mockQ}, {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, }, ) @@ -140,6 +161,7 @@ func TestController_Process_FinalizeNoDeps(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -160,6 +182,7 @@ func TestController_Process_FinalizeAllDepsSucceeded(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -178,6 +201,7 @@ func TestController_Process_WaitingOnDep(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -198,6 +222,8 @@ func TestController_Process_FailedDepFailsBatch(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() + store.EXPECT().GetBatchDependentStore().Return(expectBatchDependents(ctrl, batch.ID, nil)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -220,6 +246,126 @@ func TestController_Process_CancelledDepSkipped(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} + +// tryFinalize: a failed own build must fail the batch (Speculating → Failed) +// and publish to conclude, independent of any dependencies. This is the core +// build-gating fix — previously a failed build still advanced to merge. +func TestController_Process_FailedOwnBuildFailsBatch(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSpeculating) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateFailed).Return(nil) + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return( + entity.Build{ID: "fake-build-fail-1", BatchID: batch.ID, Status: entity.BuildStatusFailed}, nil) + depStore := expectBatchDependents(ctrl, batch.ID, []string{"test-queue/batch/2", "test-queue/batch/3"}) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + store.EXPECT().GetBatchDependentStore().Return(depStore).AnyTimes() + + type pubRec struct { + topic string + msgID string + } + var records []pubRec + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + records = append(records, pubRec{topic: topic, msgID: msg.ID}) + return nil + }).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + }, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) + + assert.Equal(t, []pubRec{ + {topic: "speculate", msgID: "test-queue/batch/2"}, + {topic: "speculate", msgID: "test-queue/batch/3"}, + {topic: "conclude", msgID: batch.ID}, + }, records) +} + +// tryFinalize: cancelled is a terminal non-success build status. buildsignal +// emits only one speculate event for terminal statuses, so treating Cancelled as +// "still waiting" would strand the batch in Speculating forever. +func TestController_Process_CancelledOwnBuildFailsBatch(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSpeculating) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateFailed).Return(nil) + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return( + entity.Build{ID: "fake-build-cancel-1", BatchID: batch.ID, Status: entity.BuildStatusCancelled}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + store.EXPECT().GetBatchDependentStore().Return(expectBatchDependents(ctrl, batch.ID, nil)).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} + +// tryFinalize: a build still running parks the batch — no merge, no state change. +func TestController_Process_WaitingOnOwnBuild(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSpeculating) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected — the build has not finished. + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return( + entity.Build{ID: "fake-build-run-1", BatchID: batch.ID, Status: entity.BuildStatusRunning}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} + +// tryFinalize: when the Build row is not yet persisted (race with the build +// controller), the batch waits — no merge, no state change. +func TestController_Process_OwnBuildNotFoundWaits(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSpeculating) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected — no build to gate on yet. + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -241,46 +387,88 @@ func TestController_Process_MergingNoOp(t *testing.T) { require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) } -// Terminal states re-fan-out to conclude for self-healing in case a previous -// publish was lost. State must not change (no UpdateState). The Cancelled -// terminal also re-fans-out dependents and is covered separately in -// TestController_Process_CancelledTerminalSelfHealsDependents. +// Terminal Succeeded re-fans-out to conclude for self-healing in case a +// previous publish was lost. State must not change (no UpdateState). func TestController_Process_TerminalSelfHeals(t *testing.T) { - for _, state := range []entity.BatchState{ - entity.BatchStateSucceeded, - entity.BatchStateFailed, - } { - t.Run(string(state), func(t *testing.T) { - ctrl := gomock.NewController(t) - batch := testBatch(state) + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSucceeded) - batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) - // No UpdateState expected. + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected. - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - // Require exactly one publish to the conclude topic for self-healing. - mockPub := queuemock.NewMockPublisher(ctrl) - mockPub.EXPECT().Publish(gomock.Any(), "conclude", gomock.Any()).Return(nil).Times(1) + // Require exactly one publish to the conclude topic for self-healing. + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), "conclude", gomock.Any()).Return(nil).Times(1) - mockQ := queuemock.NewMockQueue(ctrl) - mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - }, - ) - require.NoError(t, err) + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + }, + ) + require.NoError(t, err) + + logger := zaptest.NewLogger(t).Sugar() + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") - logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} - require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) - }) +// Failed is terminal: redelivery must re-fan-out dependents and re-publish to +// conclude so a crash after the Failed CAS cannot strand downstream batches. +func TestController_Process_FailedTerminalSelfHealsDependents(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateFailed) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected. + + depStore := expectBatchDependents(ctrl, batch.ID, []string{"test-queue/batch/2", "test-queue/batch/3"}) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBatchDependentStore().Return(depStore).AnyTimes() + + type pubRec struct { + topic string + msgID string } + var records []pubRec + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + records = append(records, pubRec{topic: topic, msgID: msg.ID}) + return nil + }).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + }, + ) + require.NoError(t, err) + + logger := zaptest.NewLogger(t).Sugar() + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) + + assert.Equal(t, []pubRec{ + {topic: "speculate", msgID: "test-queue/batch/2"}, + {topic: "speculate", msgID: "test-queue/batch/3"}, + {topic: "conclude", msgID: batch.ID}, + }, records) } // Cancelled is terminal: redelivery must re-fan-out dependents (so a crash @@ -358,10 +546,10 @@ func TestController_Process_CancellingTerminalFlow(t *testing.T) { batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelled).Return(nil) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{ - ID: batch.ID, BatchID: batch.ID, Status: entity.BuildStatusRunning, + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{ + ID: "fake-build-run-1", BatchID: batch.ID, Status: entity.BuildStatusRunning, }, nil) - buildStore.EXPECT().UpdateStatus(gomock.Any(), batch.ID, entity.BuildStatusCancelled).Return(nil) + buildStore.EXPECT().UpdateStatus(gomock.Any(), "fake-build-run-1", entity.BuildStatusCancelled).Return(nil) depStore := storagemock.NewMockBatchDependentStore(ctrl) depStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.BatchDependent{ @@ -423,8 +611,8 @@ func TestController_Process_CancellingBuildAlreadyTerminal(t *testing.T) { batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelled).Return(nil) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{ - ID: batch.ID, BatchID: batch.ID, Status: entity.BuildStatusSucceeded, + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{ + ID: "fake-build-ok-1", BatchID: batch.ID, Status: entity.BuildStatusSucceeded, }, nil) // No UpdateStatus expected — the build is already terminal. @@ -454,7 +642,7 @@ func TestController_Process_CancellingNoBuildYet(t *testing.T) { batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelled).Return(nil) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) // No UpdateStatus expected. depStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -484,7 +672,7 @@ func TestController_Process_CancellingNoDependents(t *testing.T) { batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelled).Return(nil) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) depStore := storagemock.NewMockBatchDependentStore(ctrl) depStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.BatchDependent{BatchID: batch.ID, Dependents: []string{}, Version: 1}, nil) @@ -528,7 +716,7 @@ func TestController_Process_CancellingTerminalCASVersionMismatch(t *testing.T) { Return(storage.ErrVersionMismatch) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index 91fb0fb7..ce09efb7 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -167,18 +167,63 @@ func (s *E2EIntegrationSuite) TestPingOrchestrator() { s.log.Logf("Orchestrator ping: %s", resp.Message) } +// TestLandRequest_SinglePR drives a normal request through the orchestrator +// pipeline and asserts it lands. e2e-test-queue is wired with immediate fakes +// (every stage succeeds), so the happy path reaches the terminal "landed" status. func (s *E2EIntegrationSuite) TestLandRequest_SinglePR() { + s.landAndWait("github://uber/e2e-service/pull/123/abcdef0123456789abcdef0123456789abcdef01", "landed", 30*time.Second) +} + +// TestLandRequest_BuildFails verifies that a build failure (injected via the +// fake build runner's "sq-fake=build-fail" marker) drives the request to a +// terminal "error" rather than landing. This is the end-to-end guard for the +// speculate build-gating fix: before that fix the batch ignored its own failed +// build and still merged. +func (s *E2EIntegrationSuite) TestLandRequest_BuildFails() { + s.landAndWait("github://uber/e2e-service/pull/124/abcdef0123456789abcdef0123456789abcdef02?sq-fake=build-fail", "error", 30*time.Second) +} + +// landAndWait submits a single-URI land request to e2e-test-queue and asserts +// it reaches the expected terminal status. +func (s *E2EIntegrationSuite) landAndWait(uri, wantStatus string, timeout time.Duration) { + t := s.T() req := &gatewaypb.LandRequest{ Queue: "e2e-test-queue", - Change: &gatewaypb.Change{Uris: []string{"github://uber/e2e-service/pull/123/abcdef0123456789abcdef0123456789abcdef01"}}, + Change: &gatewaypb.Change{Uris: []string{uri}}, Strategy: gatewaypb.Strategy_REBASE, } - s.log.Logf("Sending Land request (single PR) for queue=%s", req.Queue) + s.log.Logf("Sending Land request for queue=%s uri=%s", req.Queue, uri) resp, err := s.gatewayClient.Land(s.ctx, req) - require.NoError(s.T(), err, "Land request failed") - require.NotEmpty(s.T(), resp.Sqid, "SQID should not be empty") - s.log.Logf("Land request (single PR) succeeded: sqid=%s", resp.Sqid) + require.NoError(t, err, "Land request failed") + require.NotEmpty(t, resp.Sqid, "SQID should not be empty") + + final := s.waitForStatus(resp.Sqid, timeout) + assert.Equal(t, wantStatus, final) +} + +// waitForStatus polls the gateway Status RPC until the request reaches a +// terminal status (landed / error / cancelled) and returns it, or fails the +// test if no terminal status is observed within timeout. +func (s *E2EIntegrationSuite) waitForStatus(sqid string, timeout time.Duration) string { + t := s.T() + var final string + require.Eventually(t, func() bool { + resp, err := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) + if err != nil { + s.log.Logf("Status RPC error for sqid=%s: %v", sqid, err) + return false + } + switch resp.Status { + case "landed", "error", "cancelled": + final = resp.Status + s.log.Logf("sqid=%s reached terminal status=%s last_error=%q", sqid, resp.Status, resp.LastError) + return true + default: + return false + } + }, timeout, 500*time.Millisecond, "sqid=%s did not reach a terminal status within %s", sqid, timeout) + return final } // TestLandRequest_PersistsStartedLogViaGatewayConsumer verifies the request-log @@ -186,12 +231,13 @@ func (s *E2EIntegrationSuite) TestLandRequest_SinglePR() { // entries to the log topic (it never writes the request log itself), and the // gateway's log consumer drains that topic and persists them to storage. // -// We observe this through the gateway Status RPC: immediately after Land the -// status is "accepted" (the gateway's synchronous direct write), and once the -// orchestrator's start controller publishes "started" to the log topic, the -// gateway consumer persists it and Status advances to "started". Seeing -// "started" therefore proves the publish→consume→persist path works across both -// services. +// We assert on the durable request_log table rather than the live Status RPC. +// request_log is append-only and the gateway consumer is its sole writer, so a +// "started" row proves the orchestrator-publish → gateway-consume → persist path +// ran. The Status RPC only returns the *current* status, and "started" is a +// transient intermediate state: on the fast happy path the request reaches a +// terminal status within a single poll interval, so polling Status for "started" +// loses the race. The persisted log row, by contrast, never disappears. func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsumer() { t := s.T() @@ -206,15 +252,17 @@ func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsum s.log.Logf("Land succeeded: sqid=%s; waiting for gateway consumer to persist 'started'", sqid) require.Eventually(t, func() bool { - resp, statusErr := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) - if statusErr != nil { - s.log.Logf("Status(%s) not ready yet: %v", sqid, statusErr) + var count int + queryErr := s.db.QueryRowContext(s.ctx, + "SELECT COUNT(*) FROM request_log WHERE request_id = ? AND status = ?", + sqid, string(entity.RequestStatusStarted)).Scan(&count) + if queryErr != nil { + s.log.Logf("request_log query for sqid=%s not ready yet: %v", sqid, queryErr) return false } - s.log.Logf("Status(%s) = %q", sqid, resp.Status) - return resp.Status == string(entity.RequestStatusStarted) + return count > 0 }, persistTimeout, persistPollInterval, - "request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted) + "request %s should have a %q entry persisted by the gateway log consumer", sqid, entity.RequestStatusStarted) s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid) }