From 091a462abf80ed8882a699f3164a5f4fceae467d Mon Sep 17 00:00:00 2001 From: Garrik Sturges Date: Thu, 4 Jun 2026 16:32:25 -0700 Subject: [PATCH 1/4] feat(stovepipe): add change ingestion hooks and push filtering Introduce ChangeInfo, ChangeIngester/ChangeHandler, a no-op logging handler stub, and URI watch filtering for downstream ingestion. --- stovepipe/core/filter/filter.go | 41 +++++++++++++++++ stovepipe/entity/entity.go | 37 ++++++++++++++- stovepipe/extension/changeingester/logging.go | 46 +++++++++++++++++++ stovepipe/extension/extension.go | 18 ++++++++ 4 files changed, 140 insertions(+), 2 deletions(-) create mode 100644 stovepipe/core/filter/filter.go create mode 100644 stovepipe/extension/changeingester/logging.go diff --git a/stovepipe/core/filter/filter.go b/stovepipe/core/filter/filter.go new file mode 100644 index 00000000..356f6d56 --- /dev/null +++ b/stovepipe/core/filter/filter.go @@ -0,0 +1,41 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package filter implements a filter for commit events. +package filter + +import ( + "strings" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// Config controls which VCS URIs are watched. +// WatchedURIPrefixes is a list of URI prefixes to match against ChangeInfo.URI. +// Example: "git://github.com/uber/go-code/refs/heads/main" +// watches all commits on the main branch of uber/go-code. +type Config struct { + WatchedURIPrefixes []string +} + +// ShouldProcess returns true if the commit event's URI matches +// any of the configured watched prefixes. +func ShouldProcess(cfg Config, event entity.ChangeInfo) bool { + for _, prefix := range cfg.WatchedURIPrefixes { + if strings.HasPrefix(event.URI, prefix) { + return true + } + } + return false +} diff --git a/stovepipe/entity/entity.go b/stovepipe/entity/entity.go index 15ac8680..80c625df 100644 --- a/stovepipe/entity/entity.go +++ b/stovepipe/entity/entity.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -12,5 +12,38 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package entity holds Stovepipe-specific domain types (distinct from shared repo entity/). +// Package entity holds Stovepipe-specific domain entities. package entity + +// ChangeInfo represents a new change detected on a VCS remote. +// It is intentionally VCS-agnostic: the URI scheme carries the +// provider identity, mirroring the github:// scheme used in +// SubmitQueue's ChangeInfo. +// +// URI format: "git://///" +// Example: "git://github.com/uber/go-code/refs/heads/main/c3a4d5e6f789..." +// +// Fields are immutable after construction. +type ChangeInfo struct { + // URI is the canonical VCS identifier for this change. + // Scheme is "git://"; path encodes host, repo, ref, and new revision. + // This mirrors the ChangeInfo.URI pattern used in SubmitQueue. + URI string `json:"uri"` + + // PreviousURI is the URI of the prior revision on the same ref, if known. + // Empty string if unavailable. + // Example: "git://github.com/uber/go-code/refs/heads/main/aabbccdd..." + PreviousURI string `json:"previous_uri,omitempty"` + + // Author is the identity of the person who authored the change. + Author Author `json:"author"` +} + +// Author identifies the person who authored a change. +// Mirrors SubmitQueue's Author to keep the two domains consistent. +type Author struct { + // Name is the display name of the author. + Name string `json:"name"` + // Email is the email address of the author. + Email string `json:"email,omitempty"` +} diff --git a/stovepipe/extension/changeingester/logging.go b/stovepipe/extension/changeingester/logging.go new file mode 100644 index 00000000..e3c50e57 --- /dev/null +++ b/stovepipe/extension/changeingester/logging.go @@ -0,0 +1,46 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package changeingester provides ChangeIngester implementations. +package changeingester + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension" + "go.uber.org/zap" +) + +// LoggingHandler is a stub ChangeHandler that logs received changes. +// Replace with real persistence logic once DB schema is ready. +type LoggingHandler struct { + logger *zap.Logger +} + +// New constructs a new LoggingHandler. +// The return type enforces interface compliance at compile time. +func New(logger *zap.Logger) extension.ChangeHandler { + return LoggingHandler{logger: logger} +} + +func (h LoggingHandler) IngestChange(ctx context.Context, info entity.ChangeInfo) error { + h.logger.Info("ingested change", + zap.String("uri", info.URI), + zap.String("previous_uri", info.PreviousURI), + zap.String("author", info.Author.Name), + zap.String("author_email", info.Author.Email), + ) + return nil +} diff --git a/stovepipe/extension/extension.go b/stovepipe/extension/extension.go index 92d4bcea..5a891faf 100644 --- a/stovepipe/extension/extension.go +++ b/stovepipe/extension/extension.go @@ -14,3 +14,21 @@ // Package extension holds Stovepipe-specific extension implementations. package extension + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// ChangeIngester subscribes to change events from a VCS source +// and dispatches them for processing. The source and VCS are +// implementation details left to the injected backend. +type ChangeIngester interface { + Start(ctx context.Context) error +} + +// ChangeHandler processes a single change received from the ingester. +type ChangeHandler interface { + IngestChange(ctx context.Context, info entity.ChangeInfo) error +} From bb38eab18a74cc15ca3cbfd480358c44a1f2ea17 Mon Sep 17 00:00:00 2001 From: gsturges <59843992+gsturges@users.noreply.github.com> Date: Mon, 8 Jun 2026 10:19:26 -0700 Subject: [PATCH 2/4] Apply suggestion from @behinddwalls Co-authored-by: Preetam Dwivedi --- stovepipe/core/filter/filter.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/stovepipe/core/filter/filter.go b/stovepipe/core/filter/filter.go index 356f6d56..31e72513 100644 --- a/stovepipe/core/filter/filter.go +++ b/stovepipe/core/filter/filter.go @@ -25,14 +25,8 @@ import ( // WatchedURIPrefixes is a list of URI prefixes to match against ChangeInfo.URI. // Example: "git://github.com/uber/go-code/refs/heads/main" // watches all commits on the main branch of uber/go-code. -type Config struct { - WatchedURIPrefixes []string -} - -// ShouldProcess returns true if the commit event's URI matches -// any of the configured watched prefixes. -func ShouldProcess(cfg Config, event entity.ChangeInfo) bool { - for _, prefix := range cfg.WatchedURIPrefixes { +func ShouldProcess(event entity.ChangeInfo, watchedPrefixes []string) bool { + for _, prefix := range watchedPrefixes { if strings.HasPrefix(event.URI, prefix) { return true } From 43860ec159d983323d875f35c38955858625e8e9 Mon Sep 17 00:00:00 2001 From: Garrik Sturges Date: Tue, 9 Jun 2026 11:08:15 -0700 Subject: [PATCH 3/4] feat(stovepipe): scaffold pipeline entities and storage extension Add Commit and Batch as the core domain entities the validation pipeline operates on, each with status types and optimistic-locking Version fields. Add CommitStore and BatchStore extension interfaces scoped to their respective service owners (gateway and orchestrator). Fix missing BUILD.bazel files for the filter and changeingester packages. Flatten ChangeInfo.Author into AuthorName/AuthorEmail fields directly. --- stovepipe/core/filter/BUILD.bazel | 9 +++ stovepipe/entity/BUILD.bazel | 6 +- stovepipe/entity/batch.go | 63 +++++++++++++++++++ stovepipe/entity/commit.go | 57 +++++++++++++++++ stovepipe/entity/entity.go | 16 ++--- stovepipe/extension/BUILD.bazel | 1 + .../extension/changeingester/BUILD.bazel | 13 ++++ stovepipe/extension/changeingester/logging.go | 4 +- stovepipe/extension/storage/BUILD.bazel | 13 ++++ stovepipe/extension/storage/batch_store.go | 43 +++++++++++++ stovepipe/extension/storage/commit_store.go | 48 ++++++++++++++ stovepipe/extension/storage/storage.go | 44 +++++++++++++ 12 files changed, 302 insertions(+), 15 deletions(-) create mode 100644 stovepipe/core/filter/BUILD.bazel create mode 100644 stovepipe/entity/batch.go create mode 100644 stovepipe/entity/commit.go create mode 100644 stovepipe/extension/changeingester/BUILD.bazel create mode 100644 stovepipe/extension/storage/BUILD.bazel create mode 100644 stovepipe/extension/storage/batch_store.go create mode 100644 stovepipe/extension/storage/commit_store.go create mode 100644 stovepipe/extension/storage/storage.go diff --git a/stovepipe/core/filter/BUILD.bazel b/stovepipe/core/filter/BUILD.bazel new file mode 100644 index 00000000..454754b5 --- /dev/null +++ b/stovepipe/core/filter/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "filter", + srcs = ["filter.go"], + importpath = "github.com/uber/submitqueue/stovepipe/core/filter", + visibility = ["//visibility:public"], + deps = ["//stovepipe/entity"], +) diff --git a/stovepipe/entity/BUILD.bazel b/stovepipe/entity/BUILD.bazel index c179fb43..8dadfb95 100644 --- a/stovepipe/entity/BUILD.bazel +++ b/stovepipe/entity/BUILD.bazel @@ -2,7 +2,11 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "entity", - srcs = ["entity.go"], + srcs = [ + "batch.go", + "commit.go", + "entity.go", + ], importpath = "github.com/uber/submitqueue/stovepipe/entity", visibility = ["//visibility:public"], ) diff --git a/stovepipe/entity/batch.go b/stovepipe/entity/batch.go new file mode 100644 index 00000000..feb580c4 --- /dev/null +++ b/stovepipe/entity/batch.go @@ -0,0 +1,63 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// BatchStatus is the state of a validation batch as it moves through the pipeline. +type BatchStatus string + +const ( + // BatchStatusUnknown is the unreachable default. It should never be seen in the system. + BatchStatusUnknown BatchStatus = "" + // BatchStatusPending means the batch has been created but speculate has not yet run. + BatchStatusPending BatchStatus = "pending" + // BatchStatusBuilding means the batch is moving through the speculate→build→buildsignal→bisect cycle. + BatchStatusBuilding BatchStatus = "building" + // BatchStatusSucceeded means all commits in the batch's range have been validated green. + BatchStatusSucceeded BatchStatus = "succeeded" + // BatchStatusFailed means an offending commit in the range has been isolated and marked failed. + BatchStatusFailed BatchStatus = "failed" +) + +// IsBatchStatusTerminal returns true if the batch has reached a final state. +func IsBatchStatusTerminal(s BatchStatus) bool { + return s == BatchStatusSucceeded || s == BatchStatusFailed +} + +// Batch is a contiguous range of trunk commits submitted for validation together. +// The range spans from FromSHA (oldest, inclusive) to ToSHA (newest, inclusive) +// and represents all commits since the last known green on the branch. +// Bisection creates sub-range batches from the same type — there is no separate +// bisection entity; the state of the search lives in the ordinary batch results. +type Batch struct { + // ID is the unique identifier for this batch. + ID string + // FromSHA is the oldest commit SHA in the validation range (inclusive). + FromSHA string + // ToSHA is the newest commit SHA in the validation range (inclusive). + ToSHA string + // Repository is the repository this batch validates. + Repository string + // Branch is the branch this batch validates. + Branch string + // Status is the current state of this batch. + Status BatchStatus + // Version is incremented on each update and used for optimistic locking. + // Version arithmetic lives in the controller; the store performs a pure conditional write. + Version int32 + // CreatedAt is the time this batch was created, in milliseconds since epoch. + CreatedAt int64 + // UpdatedAt is the time this batch was last updated, in milliseconds since epoch. + UpdatedAt int64 +} diff --git a/stovepipe/entity/commit.go b/stovepipe/entity/commit.go new file mode 100644 index 00000000..b5850f86 --- /dev/null +++ b/stovepipe/entity/commit.go @@ -0,0 +1,57 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// CommitStatus is the validation state of a trunk commit as determined by Stovepipe. +type CommitStatus string + +const ( + // CommitStatusUnknown is the default state when a commit is first ingested. + // The commit has landed on main but has not yet been validated. + CommitStatusUnknown CommitStatus = "" + // CommitStatusSucceeded means the relevant targets build and test successfully at this commit. + CommitStatusSucceeded CommitStatus = "succeeded" + // CommitStatusFailed means a target is broken at this commit; it is the offending change. + CommitStatusFailed CommitStatus = "failed" +) + +// IsCommitStatusTerminal returns true if the status is a final, irreversible state. +func IsCommitStatusTerminal(s CommitStatus) bool { + return s == CommitStatusSucceeded || s == CommitStatusFailed +} + +// Commit is a trunk commit tracked by Stovepipe. The SHA scoped by Repository and +// Branch is the natural identity and dedup key: a commit announced by both a webhook +// and a poll backfill resolves to the same record and is processed once. +type Commit struct { + // SHA is the full commit hash. Identity key; immutable after creation. + SHA string + // Repository is the repository URI (e.g. "github.com/uber/go-code"). + Repository string + // Branch is the target branch (e.g. "main"). + Branch string + // CommitterTimeMs is the committer timestamp in milliseconds since epoch. + // Used to order commits within a range and to establish the trunk sequence. + CommitterTimeMs int64 + // Status is the current validation state of this commit. + Status CommitStatus + // Version is incremented on each update and used for optimistic locking. + // Version arithmetic lives in the controller; the store performs a pure conditional write. + Version int32 + // CreatedAt is the time this commit was first recorded, in milliseconds since epoch. + CreatedAt int64 + // UpdatedAt is the time this commit was last updated, in milliseconds since epoch. + UpdatedAt int64 +} diff --git a/stovepipe/entity/entity.go b/stovepipe/entity/entity.go index 80c625df..a7136aa3 100644 --- a/stovepipe/entity/entity.go +++ b/stovepipe/entity/entity.go @@ -27,7 +27,6 @@ package entity type ChangeInfo struct { // URI is the canonical VCS identifier for this change. // Scheme is "git://"; path encodes host, repo, ref, and new revision. - // This mirrors the ChangeInfo.URI pattern used in SubmitQueue. URI string `json:"uri"` // PreviousURI is the URI of the prior revision on the same ref, if known. @@ -35,15 +34,8 @@ type ChangeInfo struct { // Example: "git://github.com/uber/go-code/refs/heads/main/aabbccdd..." PreviousURI string `json:"previous_uri,omitempty"` - // Author is the identity of the person who authored the change. - Author Author `json:"author"` -} - -// Author identifies the person who authored a change. -// Mirrors SubmitQueue's Author to keep the two domains consistent. -type Author struct { - // Name is the display name of the author. - Name string `json:"name"` - // Email is the email address of the author. - Email string `json:"email,omitempty"` + // AuthorName is the display name of the person who authored the change. + AuthorName string `json:"author_name,omitempty"` + // AuthorEmail is the email address of the author. + AuthorEmail string `json:"author_email,omitempty"` } diff --git a/stovepipe/extension/BUILD.bazel b/stovepipe/extension/BUILD.bazel index bbf4f83b..ac032b39 100644 --- a/stovepipe/extension/BUILD.bazel +++ b/stovepipe/extension/BUILD.bazel @@ -5,4 +5,5 @@ go_library( srcs = ["extension.go"], importpath = "github.com/uber/submitqueue/stovepipe/extension", visibility = ["//visibility:public"], + deps = ["//stovepipe/entity"], ) diff --git a/stovepipe/extension/changeingester/BUILD.bazel b/stovepipe/extension/changeingester/BUILD.bazel new file mode 100644 index 00000000..4575d4b3 --- /dev/null +++ b/stovepipe/extension/changeingester/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "changeingester", + srcs = ["logging.go"], + importpath = "github.com/uber/submitqueue/stovepipe/extension/changeingester", + visibility = ["//visibility:public"], + deps = [ + "//stovepipe/entity", + "//stovepipe/extension", + "@org_uber_go_zap//:zap", + ], +) diff --git a/stovepipe/extension/changeingester/logging.go b/stovepipe/extension/changeingester/logging.go index e3c50e57..69e6dc2f 100644 --- a/stovepipe/extension/changeingester/logging.go +++ b/stovepipe/extension/changeingester/logging.go @@ -39,8 +39,8 @@ func (h LoggingHandler) IngestChange(ctx context.Context, info entity.ChangeInfo h.logger.Info("ingested change", zap.String("uri", info.URI), zap.String("previous_uri", info.PreviousURI), - zap.String("author", info.Author.Name), - zap.String("author_email", info.Author.Email), + zap.String("author_name", info.AuthorName), + zap.String("author_email", info.AuthorEmail), ) return nil } diff --git a/stovepipe/extension/storage/BUILD.bazel b/stovepipe/extension/storage/BUILD.bazel new file mode 100644 index 00000000..d06b7577 --- /dev/null +++ b/stovepipe/extension/storage/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "storage", + srcs = [ + "batch_store.go", + "commit_store.go", + "storage.go", + ], + importpath = "github.com/uber/submitqueue/stovepipe/extension/storage", + visibility = ["//visibility:public"], + deps = ["//stovepipe/entity"], +) diff --git a/stovepipe/extension/storage/batch_store.go b/stovepipe/extension/storage/batch_store.go new file mode 100644 index 00000000..316d7a59 --- /dev/null +++ b/stovepipe/extension/storage/batch_store.go @@ -0,0 +1,43 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +//go:generate mockgen -source=batch_store.go -destination=mock/batch_store_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// BatchStore is the orchestrator-owned store for in-flight validation batches. +// A batch represents a contiguous range of trunk commits under validation. +// Bisection reuses the same type — sub-range probes are ordinary batches +// driven through the same speculate→build→buildsignal→bisect loop. +type BatchStore interface { + // Get retrieves a batch by ID. Returns ErrNotFound if no record exists. + Get(ctx context.Context, id string) (entity.Batch, error) + + // Create records a new batch with status BatchStatusPending. + // Returns ErrAlreadyExists if a batch with the same ID already exists. + Create(ctx context.Context, batch entity.Batch) error + + // UpdateStatus updates the batch's status and advances the version from + // oldVersion to newVersion. Returns ErrVersionMismatch if the current + // persisted version does not match oldVersion; the caller must re-read and retry. + // Version arithmetic is owned by the caller (controller); the store performs + // a pure conditional write. + UpdateStatus(ctx context.Context, id string, oldVersion, newVersion int32, status entity.BatchStatus) error +} diff --git a/stovepipe/extension/storage/commit_store.go b/stovepipe/extension/storage/commit_store.go new file mode 100644 index 00000000..579b9c6a --- /dev/null +++ b/stovepipe/extension/storage/commit_store.go @@ -0,0 +1,48 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +//go:generate mockgen -source=commit_store.go -destination=mock/commit_store_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// CommitStore is the gateway-owned store for trunk commit health state. +// It is the authoritative record of each commit's validation status and is the +// only storage the gateway's GetStatus RPC reads from. +// +// The (Repository, Branch, SHA) triple is the identity key and dedup handle: +// a commit announced by both a webhook and a poll backfill resolves to the same +// row and is processed once. +type CommitStore interface { + // Get retrieves a commit by its (repository, branch, sha) identity key. + // Returns ErrNotFound if no record exists. + Get(ctx context.Context, repository, branch, sha string) (entity.Commit, error) + + // Create records a new commit. The status must be CommitStatusUnknown. + // Returns ErrAlreadyExists if a commit with the same identity already exists; + // callers treat this as a successful dedup, not a failure. + Create(ctx context.Context, commit entity.Commit) error + + // UpdateStatus updates the commit's status and advances the version from + // oldVersion to newVersion. Returns ErrVersionMismatch if the current + // persisted version does not match oldVersion; the caller must re-read and retry. + // Version arithmetic is owned by the caller (controller); the store performs + // a pure conditional write. + UpdateStatus(ctx context.Context, repository, branch, sha string, oldVersion, newVersion int32, status entity.CommitStatus) error +} diff --git a/stovepipe/extension/storage/storage.go b/stovepipe/extension/storage/storage.go new file mode 100644 index 00000000..be4b980a --- /dev/null +++ b/stovepipe/extension/storage/storage.go @@ -0,0 +1,44 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package storage defines the storage extension interfaces for the Stovepipe domain. +// The gateway owns the CommitStore (commit-status store and event log). +// The orchestrator owns the BatchStore (in-flight batches and pipeline working state). +// The two services share no storage; they communicate only through the messaging queue. +package storage + +import ( + "errors" + "fmt" +) + +// ErrNotFound is returned by storage implementations when the requested record is not found. +var ErrNotFound = errors.New("record not found") + +// IsNotFound returns true if any error in the chain is ErrNotFound. +func IsNotFound(err error) bool { + return errors.Is(err, ErrNotFound) +} + +// WrapNotFound wraps ErrNotFound with the underlying implementation error. +func WrapNotFound(err error) error { + return fmt.Errorf("%w: %w", ErrNotFound, err) +} + +// ErrAlreadyExists is returned when attempting to create a record whose identity already exists. +var ErrAlreadyExists = errors.New("record already exists") + +// ErrVersionMismatch is returned when an optimistic-locking CAS write finds the persisted +// version does not match the expected version. Callers should retry from a fresh read. +var ErrVersionMismatch = errors.New("version mismatch") From 0239b4ad9d0db9ecdaa0767aafede292e0d3d545 Mon Sep 17 00:00:00 2001 From: Garrik Sturges Date: Tue, 9 Jun 2026 17:34:44 -0700 Subject: [PATCH 4/4] refactor(stovepipe): scope entities and extensions to gateway only Replace ChangeInfo with ChangeEvent as the ingestion payload type and introduce ChangeURI as the lightweight pipeline reference. Slim Commit to the four fields the gateway needs: URI, Status, CreatedAt, UpdatedAt. Update ChangeHandler and the filter to accept ChangeEvent. Remove the storage extension and Batch entity, which belong to the orchestrator. --- stovepipe/core/filter/filter.go | 4 +- stovepipe/entity/BUILD.bazel | 5 +- stovepipe/entity/batch.go | 63 ------------------- stovepipe/entity/change_event.go | 59 +++++++++++++++++ stovepipe/entity/change_uri.go | 36 +++++++++++ stovepipe/entity/commit.go | 18 +----- stovepipe/entity/entity.go | 41 ------------ stovepipe/extension/changeingester/logging.go | 7 +-- stovepipe/extension/extension.go | 2 +- stovepipe/extension/storage/BUILD.bazel | 13 ---- stovepipe/extension/storage/batch_store.go | 43 ------------- stovepipe/extension/storage/commit_store.go | 48 -------------- stovepipe/extension/storage/storage.go | 44 ------------- 13 files changed, 106 insertions(+), 277 deletions(-) delete mode 100644 stovepipe/entity/batch.go create mode 100644 stovepipe/entity/change_event.go create mode 100644 stovepipe/entity/change_uri.go delete mode 100644 stovepipe/entity/entity.go delete mode 100644 stovepipe/extension/storage/BUILD.bazel delete mode 100644 stovepipe/extension/storage/batch_store.go delete mode 100644 stovepipe/extension/storage/commit_store.go delete mode 100644 stovepipe/extension/storage/storage.go diff --git a/stovepipe/core/filter/filter.go b/stovepipe/core/filter/filter.go index 31e72513..4130bb5a 100644 --- a/stovepipe/core/filter/filter.go +++ b/stovepipe/core/filter/filter.go @@ -22,10 +22,10 @@ import ( ) // Config controls which VCS URIs are watched. -// WatchedURIPrefixes is a list of URI prefixes to match against ChangeInfo.URI. +// WatchedURIPrefixes is a list of URI prefixes to match against ChangeEvent.URI. // Example: "git://github.com/uber/go-code/refs/heads/main" // watches all commits on the main branch of uber/go-code. -func ShouldProcess(event entity.ChangeInfo, watchedPrefixes []string) bool { +func ShouldProcess(event entity.ChangeEvent, watchedPrefixes []string) bool { for _, prefix := range watchedPrefixes { if strings.HasPrefix(event.URI, prefix) { return true diff --git a/stovepipe/entity/BUILD.bazel b/stovepipe/entity/BUILD.bazel index 8dadfb95..96707407 100644 --- a/stovepipe/entity/BUILD.bazel +++ b/stovepipe/entity/BUILD.bazel @@ -3,10 +3,11 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "entity", srcs = [ - "batch.go", + "change_event.go", + "change_uri.go", "commit.go", - "entity.go", ], importpath = "github.com/uber/submitqueue/stovepipe/entity", visibility = ["//visibility:public"], + deps = ["//stovepipe/entity/git"], ) diff --git a/stovepipe/entity/batch.go b/stovepipe/entity/batch.go deleted file mode 100644 index feb580c4..00000000 --- a/stovepipe/entity/batch.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package entity - -// BatchStatus is the state of a validation batch as it moves through the pipeline. -type BatchStatus string - -const ( - // BatchStatusUnknown is the unreachable default. It should never be seen in the system. - BatchStatusUnknown BatchStatus = "" - // BatchStatusPending means the batch has been created but speculate has not yet run. - BatchStatusPending BatchStatus = "pending" - // BatchStatusBuilding means the batch is moving through the speculate→build→buildsignal→bisect cycle. - BatchStatusBuilding BatchStatus = "building" - // BatchStatusSucceeded means all commits in the batch's range have been validated green. - BatchStatusSucceeded BatchStatus = "succeeded" - // BatchStatusFailed means an offending commit in the range has been isolated and marked failed. - BatchStatusFailed BatchStatus = "failed" -) - -// IsBatchStatusTerminal returns true if the batch has reached a final state. -func IsBatchStatusTerminal(s BatchStatus) bool { - return s == BatchStatusSucceeded || s == BatchStatusFailed -} - -// Batch is a contiguous range of trunk commits submitted for validation together. -// The range spans from FromSHA (oldest, inclusive) to ToSHA (newest, inclusive) -// and represents all commits since the last known green on the branch. -// Bisection creates sub-range batches from the same type — there is no separate -// bisection entity; the state of the search lives in the ordinary batch results. -type Batch struct { - // ID is the unique identifier for this batch. - ID string - // FromSHA is the oldest commit SHA in the validation range (inclusive). - FromSHA string - // ToSHA is the newest commit SHA in the validation range (inclusive). - ToSHA string - // Repository is the repository this batch validates. - Repository string - // Branch is the branch this batch validates. - Branch string - // Status is the current state of this batch. - Status BatchStatus - // Version is incremented on each update and used for optimistic locking. - // Version arithmetic lives in the controller; the store performs a pure conditional write. - Version int32 - // CreatedAt is the time this batch was created, in milliseconds since epoch. - CreatedAt int64 - // UpdatedAt is the time this batch was last updated, in milliseconds since epoch. - UpdatedAt int64 -} diff --git a/stovepipe/entity/change_event.go b/stovepipe/entity/change_event.go new file mode 100644 index 00000000..46a664cf --- /dev/null +++ b/stovepipe/entity/change_event.go @@ -0,0 +1,59 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import ( + "encoding/json" + "fmt" + + entitygit "github.com/uber/submitqueue/stovepipe/entity/git" +) + +// ChangeEvent represents a single new trunk change entering the pipeline, published to the +// start topic. A trunk change is one commit, so the event carries one git-backed URI. It is +// source-agnostic: both the webhook and the reconciliation poller emit it. Additional fields +// (e.g. source, committer time) can be added later as ingestion needs them. +type ChangeEvent struct { + // URI identifies the commit that entered the pipeline (git://owner/repo/branch/revision). + URI string `json:"uri"` +} + +// ToBytes serializes the ChangeEvent to JSON bytes for queue message payload. +func (e ChangeEvent) ToBytes() ([]byte, error) { + return json.Marshal(e) +} + +// Validate checks that the change event carries a valid git-backed commit URI. +func (e ChangeEvent) Validate() error { + if e.URI == "" { + return fmt.Errorf("change event requires a commit URI") + } + if _, err := entitygit.ParseChangeID(e.URI); err != nil { + return fmt.Errorf("change event URI: %w", err) + } + return nil +} + +// ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes. +func ChangeEventFromBytes(data []byte) (ChangeEvent, error) { + var event ChangeEvent + if err := json.Unmarshal(data, &event); err != nil { + return ChangeEvent{}, err + } + if err := event.Validate(); err != nil { + return ChangeEvent{}, err + } + return event, nil +} diff --git a/stovepipe/entity/change_uri.go b/stovepipe/entity/change_uri.go new file mode 100644 index 00000000..7abc45f3 --- /dev/null +++ b/stovepipe/entity/change_uri.go @@ -0,0 +1,36 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import "encoding/json" + +// ChangeURI is the lightweight reference passed between pipeline stages. It +// carries only the change identity; stages re-resolve any state they need. +type ChangeURI struct { + // URI is the change identity (git://owner/repo/branch/revision). + URI string `json:"uri"` +} + +// ToBytes serializes the ChangeURI to JSON bytes for a queue message payload. +func (c ChangeURI) ToBytes() ([]byte, error) { + return json.Marshal(c) +} + +// ChangeURIFromBytes deserializes a ChangeURI from JSON bytes. +func ChangeURIFromBytes(data []byte) (ChangeURI, error) { + var ref ChangeURI + err := json.Unmarshal(data, &ref) + return ref, err +} diff --git a/stovepipe/entity/commit.go b/stovepipe/entity/commit.go index b5850f86..fe734cdb 100644 --- a/stovepipe/entity/commit.go +++ b/stovepipe/entity/commit.go @@ -32,24 +32,12 @@ func IsCommitStatusTerminal(s CommitStatus) bool { return s == CommitStatusSucceeded || s == CommitStatusFailed } -// Commit is a trunk commit tracked by Stovepipe. The SHA scoped by Repository and -// Branch is the natural identity and dedup key: a commit announced by both a webhook -// and a poll backfill resolves to the same record and is processed once. +// Commit is a trunk commit tracked by Stovepipe's gateway. type Commit struct { - // SHA is the full commit hash. Identity key; immutable after creation. - SHA string - // Repository is the repository URI (e.g. "github.com/uber/go-code"). - Repository string - // Branch is the target branch (e.g. "main"). - Branch string - // CommitterTimeMs is the committer timestamp in milliseconds since epoch. - // Used to order commits within a range and to establish the trunk sequence. - CommitterTimeMs int64 + // URI is the canonical change identity from the originating ChangeEvent. + URI string // Status is the current validation state of this commit. Status CommitStatus - // Version is incremented on each update and used for optimistic locking. - // Version arithmetic lives in the controller; the store performs a pure conditional write. - Version int32 // CreatedAt is the time this commit was first recorded, in milliseconds since epoch. CreatedAt int64 // UpdatedAt is the time this commit was last updated, in milliseconds since epoch. diff --git a/stovepipe/entity/entity.go b/stovepipe/entity/entity.go deleted file mode 100644 index a7136aa3..00000000 --- a/stovepipe/entity/entity.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package entity holds Stovepipe-specific domain entities. -package entity - -// ChangeInfo represents a new change detected on a VCS remote. -// It is intentionally VCS-agnostic: the URI scheme carries the -// provider identity, mirroring the github:// scheme used in -// SubmitQueue's ChangeInfo. -// -// URI format: "git://///" -// Example: "git://github.com/uber/go-code/refs/heads/main/c3a4d5e6f789..." -// -// Fields are immutable after construction. -type ChangeInfo struct { - // URI is the canonical VCS identifier for this change. - // Scheme is "git://"; path encodes host, repo, ref, and new revision. - URI string `json:"uri"` - - // PreviousURI is the URI of the prior revision on the same ref, if known. - // Empty string if unavailable. - // Example: "git://github.com/uber/go-code/refs/heads/main/aabbccdd..." - PreviousURI string `json:"previous_uri,omitempty"` - - // AuthorName is the display name of the person who authored the change. - AuthorName string `json:"author_name,omitempty"` - // AuthorEmail is the email address of the author. - AuthorEmail string `json:"author_email,omitempty"` -} diff --git a/stovepipe/extension/changeingester/logging.go b/stovepipe/extension/changeingester/logging.go index 69e6dc2f..88e566b1 100644 --- a/stovepipe/extension/changeingester/logging.go +++ b/stovepipe/extension/changeingester/logging.go @@ -35,12 +35,9 @@ func New(logger *zap.Logger) extension.ChangeHandler { return LoggingHandler{logger: logger} } -func (h LoggingHandler) IngestChange(ctx context.Context, info entity.ChangeInfo) error { +func (h LoggingHandler) IngestChange(ctx context.Context, event entity.ChangeEvent) error { h.logger.Info("ingested change", - zap.String("uri", info.URI), - zap.String("previous_uri", info.PreviousURI), - zap.String("author_name", info.AuthorName), - zap.String("author_email", info.AuthorEmail), + zap.String("uri", event.URI), ) return nil } diff --git a/stovepipe/extension/extension.go b/stovepipe/extension/extension.go index 5a891faf..f4de9644 100644 --- a/stovepipe/extension/extension.go +++ b/stovepipe/extension/extension.go @@ -30,5 +30,5 @@ type ChangeIngester interface { // ChangeHandler processes a single change received from the ingester. type ChangeHandler interface { - IngestChange(ctx context.Context, info entity.ChangeInfo) error + IngestChange(ctx context.Context, event entity.ChangeEvent) error } diff --git a/stovepipe/extension/storage/BUILD.bazel b/stovepipe/extension/storage/BUILD.bazel deleted file mode 100644 index d06b7577..00000000 --- a/stovepipe/extension/storage/BUILD.bazel +++ /dev/null @@ -1,13 +0,0 @@ -load("@rules_go//go:def.bzl", "go_library") - -go_library( - name = "storage", - srcs = [ - "batch_store.go", - "commit_store.go", - "storage.go", - ], - importpath = "github.com/uber/submitqueue/stovepipe/extension/storage", - visibility = ["//visibility:public"], - deps = ["//stovepipe/entity"], -) diff --git a/stovepipe/extension/storage/batch_store.go b/stovepipe/extension/storage/batch_store.go deleted file mode 100644 index 316d7a59..00000000 --- a/stovepipe/extension/storage/batch_store.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -//go:generate mockgen -source=batch_store.go -destination=mock/batch_store_mock.go -package=mock - -import ( - "context" - - "github.com/uber/submitqueue/stovepipe/entity" -) - -// BatchStore is the orchestrator-owned store for in-flight validation batches. -// A batch represents a contiguous range of trunk commits under validation. -// Bisection reuses the same type — sub-range probes are ordinary batches -// driven through the same speculate→build→buildsignal→bisect loop. -type BatchStore interface { - // Get retrieves a batch by ID. Returns ErrNotFound if no record exists. - Get(ctx context.Context, id string) (entity.Batch, error) - - // Create records a new batch with status BatchStatusPending. - // Returns ErrAlreadyExists if a batch with the same ID already exists. - Create(ctx context.Context, batch entity.Batch) error - - // UpdateStatus updates the batch's status and advances the version from - // oldVersion to newVersion. Returns ErrVersionMismatch if the current - // persisted version does not match oldVersion; the caller must re-read and retry. - // Version arithmetic is owned by the caller (controller); the store performs - // a pure conditional write. - UpdateStatus(ctx context.Context, id string, oldVersion, newVersion int32, status entity.BatchStatus) error -} diff --git a/stovepipe/extension/storage/commit_store.go b/stovepipe/extension/storage/commit_store.go deleted file mode 100644 index 579b9c6a..00000000 --- a/stovepipe/extension/storage/commit_store.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -//go:generate mockgen -source=commit_store.go -destination=mock/commit_store_mock.go -package=mock - -import ( - "context" - - "github.com/uber/submitqueue/stovepipe/entity" -) - -// CommitStore is the gateway-owned store for trunk commit health state. -// It is the authoritative record of each commit's validation status and is the -// only storage the gateway's GetStatus RPC reads from. -// -// The (Repository, Branch, SHA) triple is the identity key and dedup handle: -// a commit announced by both a webhook and a poll backfill resolves to the same -// row and is processed once. -type CommitStore interface { - // Get retrieves a commit by its (repository, branch, sha) identity key. - // Returns ErrNotFound if no record exists. - Get(ctx context.Context, repository, branch, sha string) (entity.Commit, error) - - // Create records a new commit. The status must be CommitStatusUnknown. - // Returns ErrAlreadyExists if a commit with the same identity already exists; - // callers treat this as a successful dedup, not a failure. - Create(ctx context.Context, commit entity.Commit) error - - // UpdateStatus updates the commit's status and advances the version from - // oldVersion to newVersion. Returns ErrVersionMismatch if the current - // persisted version does not match oldVersion; the caller must re-read and retry. - // Version arithmetic is owned by the caller (controller); the store performs - // a pure conditional write. - UpdateStatus(ctx context.Context, repository, branch, sha string, oldVersion, newVersion int32, status entity.CommitStatus) error -} diff --git a/stovepipe/extension/storage/storage.go b/stovepipe/extension/storage/storage.go deleted file mode 100644 index be4b980a..00000000 --- a/stovepipe/extension/storage/storage.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package storage defines the storage extension interfaces for the Stovepipe domain. -// The gateway owns the CommitStore (commit-status store and event log). -// The orchestrator owns the BatchStore (in-flight batches and pipeline working state). -// The two services share no storage; they communicate only through the messaging queue. -package storage - -import ( - "errors" - "fmt" -) - -// ErrNotFound is returned by storage implementations when the requested record is not found. -var ErrNotFound = errors.New("record not found") - -// IsNotFound returns true if any error in the chain is ErrNotFound. -func IsNotFound(err error) bool { - return errors.Is(err, ErrNotFound) -} - -// WrapNotFound wraps ErrNotFound with the underlying implementation error. -func WrapNotFound(err error) error { - return fmt.Errorf("%w: %w", ErrNotFound, err) -} - -// ErrAlreadyExists is returned when attempting to create a record whose identity already exists. -var ErrAlreadyExists = errors.New("record already exists") - -// ErrVersionMismatch is returned when an optimistic-locking CAS write finds the persisted -// version does not match the expected version. Callers should retry from a fresh read. -var ErrVersionMismatch = errors.New("version mismatch")