From e48be368862690e3702f35ec26f57e8c5fd05d04 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Mon, 8 Jun 2026 12:43:00 -0700 Subject: [PATCH 1/2] refactor(buildrunner): trigger on batches, resolve changes internally MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change BuildRunner.Trigger to take batch identity — base []entity.Batch (the dependency batches) and head entity.Batch (the batch under test) — instead of controller-pre-resolved base/head []entity.Change, per the extension contract. Each implementation (buildkite, githubactions, fake) gains an injected changeset.Resolver and resolves the base and head batches' changes itself; the build controller drops its private collectChanges walk and loads the dependency batches as identity. Status, Cancel, and the build id/status outputs are unchanged. The wiring injects the resolver into the fake build runner; the buildkite/githubactions Params gain a Resolver field. Revises build-runner.md, which had deliberately kept batches out of the boundary — the base/head split survives, expressed as batch identity. --- doc/rfc/submitqueue/build-runner.md | 14 ++--- .../submitqueue/orchestrator/server/main.go | 2 +- submitqueue/extension/buildrunner/BUILD.bazel | 10 +++- .../extension/buildrunner/build_runner.go | 13 ++--- .../buildrunner/buildkite/BUILD.bazel | 3 ++ .../buildrunner/buildkite/buildkite.go | 36 +++++++++---- .../buildrunner/buildkite/buildkite_test.go | 53 ++++++++++++------- .../extension/buildrunner/fake/BUILD.bazel | 2 + .../extension/buildrunner/fake/fake.go | 22 +++++--- .../extension/buildrunner/fake/fake_test.go | 39 ++++++++------ .../buildrunner/githubactions/BUILD.bazel | 3 ++ .../githubactions/githubactions.go | 21 ++++++-- .../githubactions/githubactions_test.go | 27 +++++++--- .../buildrunner/mock/build_runner_mock.go | 2 +- submitqueue/extension/buildrunner/resolve.go | 38 +++++++++++++ .../orchestrator/controller/build/BUILD.bazel | 1 + .../orchestrator/controller/build/build.go | 34 +++++------- .../controller/build/build_test.go | 34 +++++------- 18 files changed, 229 insertions(+), 125 deletions(-) create mode 100644 submitqueue/extension/buildrunner/resolve.go diff --git a/doc/rfc/submitqueue/build-runner.md b/doc/rfc/submitqueue/build-runner.md index 8cc3c815..1bfba5df 100644 --- a/doc/rfc/submitqueue/build-runner.md +++ b/doc/rfc/submitqueue/build-runner.md @@ -46,13 +46,15 @@ See `extension/buildrunner/build_runner.go` for the exact Go signatures. The sec ### Trigger: base + head -`Trigger` takes two ordered lists of changes and a free-form metadata map: +> **Revised by [extension-contract.md](extension-contract.md).** `Trigger` now takes identity at the batch granularity — `base []entity.Batch` (the dependency batches) and `head entity.Batch` (the batch under test) — and the runner resolves each batch's changes itself through an injected `changeset.Resolver`. The base/head split below still holds; only the boundary type changed from resolved `[]entity.Change` to batch identity. The "rejected: list-of-lists of changes" note is superseded by that RFC's "identity in, resolve internally" principle. -- **`base`** — changes from the dependency batches (assumed-good prefix). Ordered. -- **`head`** — changes from the batch being verified. Ordered. +`Trigger` takes the base dependency batches, the head batch, and a free-form metadata map: + +- **`base`** — the dependency batches (assumed-good prefix), ordered. The runner resolves their changes. +- **`head`** — the batch being verified. The runner resolves its changes. - **`metadata`** — caller-supplied attributes (requester, ticket ID, trace ID, etc.) the provider MAY persist or echo back via `Status`. Schema is caller/provider-defined; the interface treats it as opaque. `nil` is equivalent to an empty map. -The provider applies `base` then `head` in order on top of the queue's target branch and validates the resulting tree. Validation is **implicit and holistic**: it is not a per-change action, it is what the provider does after applying everything. +The provider resolves and applies `base` then `head` in order on top of the queue's target branch and validates the resulting tree. Validation is **implicit and holistic**: it is not a per-change action, it is what the provider does after applying everything. Why split base and head: @@ -60,9 +62,7 @@ Why split base and head: - Lets a provider cache or short-circuit the base when it has validated the same prefix before — a hot path for stacked-batch speculation. - Lets the provider attribute terminal failure to base vs head in `BuildMetadata` without the orchestrator having to round-trip the split itself. -Rejected: a single flat `changes []entity.Change`. Provider would have to deduce base via prefix matching and could not distinguish "base broke" from "head broke" without out-of-band hints. Loses the orchestrator's clearest piece of structural information at the boundary for no gain. - -Rejected: list-of-lists, one slice per batch. Pushes batch structure across the boundary, which the provider does not care about. The provider thinks in terms of "stuff to apply before" and "stuff to validate" — base / head matches that mental model. Batches are an orchestrator concept. +Rejected: a single flat input with no base/head split. Provider would have to deduce base via prefix matching and could not distinguish "base broke" from "head broke" without out-of-band hints. Loses the orchestrator's clearest piece of structural information at the boundary for no gain. ### Async vs sync contract diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 2c5411cb..a0360464 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -833,7 +833,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset. mergeChecker: mc, changeProvider: cp, pusher: psh, - buildRunner: buildfake.New(), + buildRunner: buildfake.New(resolver), scorer: scorerfake.New(resolver, heuristic.New( resolver, []heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: 0.5}}, diff --git a/submitqueue/extension/buildrunner/BUILD.bazel b/submitqueue/extension/buildrunner/BUILD.bazel index 62c1ba42..ecffd631 100644 --- a/submitqueue/extension/buildrunner/BUILD.bazel +++ b/submitqueue/extension/buildrunner/BUILD.bazel @@ -2,8 +2,14 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "buildrunner", - srcs = ["build_runner.go"], + srcs = [ + "build_runner.go", + "resolve.go", + ], importpath = "github.com/uber/submitqueue/submitqueue/extension/buildrunner", visibility = ["//visibility:public"], - deps = ["//submitqueue/entity"], + deps = [ + "//submitqueue/core/changeset", + "//submitqueue/entity", + ], ) diff --git a/submitqueue/extension/buildrunner/build_runner.go b/submitqueue/extension/buildrunner/build_runner.go index ab4b7cc2..406307fc 100644 --- a/submitqueue/extension/buildrunner/build_runner.go +++ b/submitqueue/extension/buildrunner/build_runner.go @@ -40,10 +40,11 @@ type BuildRunner interface { // Validation is implicit and holistic — it is what the runner does // after applying everything, not a per-change action. // - // base contains changes from the dependency batches (an assumed-good - // prefix). head contains changes from the batch being verified. - // Splitting them lets a runner cache or short-circuit the base when - // it has validated the same prefix before, and lets it attribute + // base is the dependency batches (an assumed-good prefix); head is the + // batch being verified. The runner resolves each batch's changes itself + // through an injected changeset resolver. Keeping base and head as + // separate batch inputs lets a runner cache or short-circuit the base + // when it has validated the same prefix before, and lets it attribute // terminal failure to base vs head in BuildMetadata. // // metadata carries free-form caller-supplied attributes (e.g. requester, @@ -59,8 +60,8 @@ type BuildRunner interface { // Factory that built it. Returns an error if the request is invalid. Trigger( ctx context.Context, - base []entity.Change, - head []entity.Change, + base []entity.Batch, + head entity.Batch, metadata entity.BuildMetadata, ) (buildID entity.BuildID, err error) diff --git a/submitqueue/extension/buildrunner/buildkite/BUILD.bazel b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel index 06a0d4d0..76359dc7 100644 --- a/submitqueue/extension/buildrunner/buildkite/BUILD.bazel +++ b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel @@ -9,6 +9,7 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/extension/buildrunner/buildkite", visibility = ["//visibility:public"], deps = [ + "//submitqueue/core/changeset", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "@org_uber_go_zap//:zap", @@ -21,6 +22,8 @@ go_test( embed = [":buildkite"], deps = [ "//core/httpclient", + "//submitqueue/core/changeset", + "//submitqueue/core/changeset/fake", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite.go b/submitqueue/extension/buildrunner/buildkite/buildkite.go index b38fefe1..74e070eb 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite.go @@ -38,6 +38,7 @@ import ( "go.uber.org/zap" + "github.com/uber/submitqueue/submitqueue/core/changeset" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" ) @@ -66,9 +67,10 @@ const ( // runner implements buildrunner.BuildRunner. type runner struct { - cfg buildrunner.Config - client *client - logger *zap.SugaredLogger + cfg buildrunner.Config + client *client + resolver changeset.Resolver + logger *zap.SugaredLogger } var _ buildrunner.BuildRunner = (*runner)(nil) @@ -83,6 +85,8 @@ type Params struct { // for the base URL (via httpclient.BaseURLTransport) and auth (via a // transport layer). If nil, http.DefaultClient is used. HTTPClient *http.Client + // Resolver resolves a batch's changes (base and head batches). + Resolver changeset.Resolver // Logger is the structured logger. Logger *zap.SugaredLogger } @@ -100,24 +104,34 @@ func NewBuildRunner(params Params) (buildrunner.BuildRunner, error) { if params.Logger == nil { return nil, fmt.Errorf("logger is required") } - return newRunner(params.Config, &client{httpClient: params.HTTPClient}, params.Logger.Named("buildkite_buildrunner")), nil + return newRunner(params.Config, &client{httpClient: params.HTTPClient}, params.Resolver, params.Logger.Named("buildkite_buildrunner")), nil } // newRunner constructs a runner. Used by NewBuildRunner and by tests. -func newRunner(cfg buildrunner.Config, c *client, logger *zap.SugaredLogger) *runner { +func newRunner(cfg buildrunner.Config, c *client, resolver changeset.Resolver, logger *zap.SugaredLogger) *runner { return &runner{ - cfg: cfg, - client: c, - logger: logger, + cfg: cfg, + client: c, + resolver: resolver, + logger: logger, } } // Trigger calls the Buildkite API to create the build and returns the Buildkite // build number as the build ID. Errors are propagated to the caller so the // queue consumer can nack and retry. -func (r *runner) Trigger(ctx context.Context, base, head []entity.Change, metadata entity.BuildMetadata) (entity.BuildID, error) { - baseJSON, _ := json.Marshal(flattenURIs(base)) - headJSON, _ := json.Marshal(flattenURIs(head)) +func (r *runner) Trigger(ctx context.Context, base []entity.Batch, head entity.Batch, metadata entity.BuildMetadata) (entity.BuildID, error) { + baseChanges, err := buildrunner.ResolveBatches(ctx, r.resolver, base) + if err != nil { + return entity.BuildID{}, fmt.Errorf("buildkite: resolve base: %w", err) + } + headChanges, err := r.resolver.ChangesForBatch(ctx, head) + if err != nil { + return entity.BuildID{}, fmt.Errorf("buildkite: resolve head: %w", err) + } + + baseJSON, _ := json.Marshal(flattenURIs(baseChanges)) + headJSON, _ := json.Marshal(flattenURIs(headChanges)) env := map[string]string{ EnvKeyBaseURIs: string(baseJSON), diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go index cea6a1a8..c2ceb714 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go @@ -27,20 +27,29 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber/submitqueue/core/httpclient" + "github.com/uber/submitqueue/submitqueue/core/changeset" + changesetfake "github.com/uber/submitqueue/submitqueue/core/changeset/fake" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" ) -// newTestRunner creates a runner backed by a test HTTP server. -func newTestRunner(t *testing.T, handler http.Handler) *runner { +// newTestRunner creates a runner backed by a test HTTP server. An optional +// resolver seeds the batch changes the runner resolves; omit it for tests that +// do not trigger builds (Status/Cancel). +func newTestRunner(t *testing.T, handler http.Handler, resolver ...changeset.Resolver) *runner { t.Helper() srv := httptest.NewServer(handler) t.Cleanup(srv.Close) c, err := httpclient.NewClient(srv.URL) require.NoError(t, err) + r := changeset.Resolver(changesetfake.New()) + if len(resolver) > 0 { + r = resolver[0] + } return newRunner( buildrunner.Config{QueueName: "my-queue"}, &client{httpClient: c}, + r, zap.NewNop().Sugar(), ) } @@ -70,17 +79,18 @@ func TestTrigger_SubmitsCorrectPayloadAndReturnsBuildkiteNumber(t *testing.T) { var capturedMethod string var capturedBody []byte + resolver := changesetfake.New(). + Set("base-batch", entity.Change{URIs: []string{"github://org/repo/pull/1/aaa111"}}). + Set("head-batch", entity.Change{URIs: []string{"github://org/repo/pull/2/bbb222"}}) + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { capturedMethod = req.Method capturedBody, _ = io.ReadAll(req.Body) w.Header().Set("Content-Type", "application/json") _, _ = w.Write(buildJSON(42, "scheduled", "https://buildkite.com/test-org/my-pipeline/builds/42")) - })) + }), resolver) - base := []entity.Change{{URIs: []string{"github://org/repo/pull/1/aaa111"}}} - head := []entity.Change{{URIs: []string{"github://org/repo/pull/2/bbb222"}}} - - id, err := r.Trigger(context.Background(), base, head, nil) + id, err := r.Trigger(context.Background(), []entity.Batch{{ID: "base-batch"}}, entity.Batch{ID: "head-batch"}, nil) require.NoError(t, err) assert.Equal(t, encodeBuildNumber(42), id.ID) @@ -95,13 +105,14 @@ func TestTrigger_SubmitsCorrectPayloadAndReturnsBuildkiteNumber(t *testing.T) { func TestTrigger_EmptyBase_ProducesJSONArray(t *testing.T) { var capturedBody []byte + resolver := changesetfake.New().Set("head-batch", entity.Change{URIs: []string{"u"}}) r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { capturedBody, _ = io.ReadAll(req.Body) w.Header().Set("Content-Type", "application/json") _, _ = w.Write(buildJSON(1, "scheduled", "")) - })) + }), resolver) - _, err := r.Trigger(context.Background(), nil, []entity.Change{{URIs: []string{"u"}}}, nil) + _, err := r.Trigger(context.Background(), nil, entity.Batch{ID: "head-batch"}, nil) require.NoError(t, err) var req createBuildRequest @@ -112,17 +123,17 @@ func TestTrigger_EmptyBase_ProducesJSONArray(t *testing.T) { func TestTrigger_MultipleChangesFlattened(t *testing.T) { var capturedBody []byte + resolver := changesetfake.New().Set("head-batch", + entity.Change{URIs: []string{"github://org/repo/pull/1/aaa"}}, + entity.Change{URIs: []string{"github://org/repo/pull/2/bbb", "github://org/repo/pull/3/ccc"}}, + ) r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { capturedBody, _ = io.ReadAll(req.Body) w.Header().Set("Content-Type", "application/json") _, _ = w.Write(buildJSON(2, "scheduled", "")) - })) + }), resolver) - head := []entity.Change{ - {URIs: []string{"github://org/repo/pull/1/aaa"}}, - {URIs: []string{"github://org/repo/pull/2/bbb", "github://org/repo/pull/3/ccc"}}, - } - _, err := r.Trigger(context.Background(), nil, head, nil) + _, err := r.Trigger(context.Background(), nil, entity.Batch{ID: "head-batch"}, nil) require.NoError(t, err) var req createBuildRequest @@ -138,20 +149,21 @@ func TestTrigger_BuildkiteError_ReturnsError(t *testing.T) { w.WriteHeader(http.StatusInternalServerError) })) - _, err := r.Trigger(context.Background(), nil, nil, nil) + _, err := r.Trigger(context.Background(), nil, entity.Batch{ID: "head-batch"}, nil) require.Error(t, err) } func TestTrigger_WithMetadata_SetsEnvVar(t *testing.T) { var capturedBody []byte + resolver := changesetfake.New().Set("head-batch", entity.Change{URIs: []string{"u"}}) r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { capturedBody, _ = io.ReadAll(req.Body) w.Header().Set("Content-Type", "application/json") _, _ = w.Write(buildJSON(10, "scheduled", "")) - })) + }), resolver) metadata := entity.BuildMetadata{"requester": "alice", "ticket": "SQ-42"} - _, err := r.Trigger(context.Background(), nil, []entity.Change{{URIs: []string{"u"}}}, metadata) + _, err := r.Trigger(context.Background(), nil, entity.Batch{ID: "head-batch"}, metadata) require.NoError(t, err) var req createBuildRequest @@ -165,13 +177,14 @@ func TestTrigger_WithMetadata_SetsEnvVar(t *testing.T) { func TestTrigger_NilMetadata_NoMetadataEnvVar(t *testing.T) { var capturedBody []byte + resolver := changesetfake.New().Set("head-batch", entity.Change{URIs: []string{"u"}}) r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { capturedBody, _ = io.ReadAll(req.Body) w.Header().Set("Content-Type", "application/json") _, _ = w.Write(buildJSON(11, "scheduled", "")) - })) + }), resolver) - _, err := r.Trigger(context.Background(), nil, []entity.Change{{URIs: []string{"u"}}}, nil) + _, err := r.Trigger(context.Background(), nil, entity.Batch{ID: "head-batch"}, nil) require.NoError(t, err) var req createBuildRequest diff --git a/submitqueue/extension/buildrunner/fake/BUILD.bazel b/submitqueue/extension/buildrunner/fake/BUILD.bazel index dcd30362..ea954c01 100644 --- a/submitqueue/extension/buildrunner/fake/BUILD.bazel +++ b/submitqueue/extension/buildrunner/fake/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/extension/buildrunner/fake", visibility = ["//visibility:public"], deps = [ + "//submitqueue/core/changeset", "//submitqueue/core/fakemarker", "//submitqueue/entity", "//submitqueue/extension/buildrunner", @@ -17,6 +18,7 @@ go_test( srcs = ["fake_test.go"], embed = [":fake"], deps = [ + "//submitqueue/core/changeset/fake", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/extension/buildrunner/fake/fake.go b/submitqueue/extension/buildrunner/fake/fake.go index c66c98d6..9b99c39f 100644 --- a/submitqueue/extension/buildrunner/fake/fake.go +++ b/submitqueue/extension/buildrunner/fake/fake.go @@ -37,6 +37,7 @@ import ( "fmt" "strings" + "github.com/uber/submitqueue/submitqueue/core/changeset" "github.com/uber/submitqueue/submitqueue/core/fakemarker" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" @@ -57,21 +58,28 @@ const outcomeOK = "ok" // per-build state: the outcome is encoded in the BuildID at Trigger and read // back out at Status. Uniqueness comes from a random suffix per ID, so it needs // no shared counter and never collides across instances or processes. -type runner struct{} +type runner struct { + resolver changeset.Resolver +} // New returns a buildrunner.BuildRunner that defaults to succeeding and honors -// marker tokens embedded in head change URIs. -func New() buildrunner.BuildRunner { - return &runner{} +// marker tokens embedded in head change URIs. The resolver resolves the head +// batch's changes so the marker can be inspected. +func New(resolver changeset.Resolver) buildrunner.BuildRunner { + return &runner{resolver: resolver} } // Trigger fails when a head change URI carries the trigger-error marker; // otherwise it returns a unique BuildID that encodes the terminal outcome the // build should report at Status time (decided from the head marker). The base -// changes and metadata are ignored. -func (r *runner) Trigger(_ context.Context, _ []entity.Change, head []entity.Change, _ entity.BuildMetadata) (entity.BuildID, error) { +// batches and metadata are ignored. +func (r *runner) Trigger(ctx context.Context, _ []entity.Batch, head entity.Batch, _ entity.BuildMetadata) (entity.BuildID, error) { + headChanges, err := r.resolver.ChangesForBatch(ctx, head) + if err != nil { + return entity.BuildID{}, fmt.Errorf("fake: resolve head: %w", err) + } outcome := outcomeOK - switch fakemarker.TokenInChanges(head) { + switch fakemarker.TokenInChanges(headChanges) { case tokenTriggerError: return entity.BuildID{}, fmt.Errorf("fake: marked trigger error") case tokenFail: diff --git a/submitqueue/extension/buildrunner/fake/fake_test.go b/submitqueue/extension/buildrunner/fake/fake_test.go index 2de24454..2dca9647 100644 --- a/submitqueue/extension/buildrunner/fake/fake_test.go +++ b/submitqueue/extension/buildrunner/fake/fake_test.go @@ -20,26 +20,35 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + changesetfake "github.com/uber/submitqueue/submitqueue/core/changeset/fake" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" ) +const headBatchID = "head-batch" + +// newFake returns a fake runner whose head batch resolves to a single change +// carrying the given URIs. +func newFake(uris ...string) buildrunner.BuildRunner { + return New(changesetfake.New().Set(headBatchID, entity.Change{URIs: uris})) +} + func TestNew_ImplementsInterface(t *testing.T) { - var _ buildrunner.BuildRunner = New() + var _ buildrunner.BuildRunner = New(changesetfake.New()) } func TestRunner_Trigger_UniqueIDs(t *testing.T) { ctx := context.Background() - id1, err := New().Trigger(ctx, nil, []entity.Change{{URIs: []string{"github://o/r/pull/1/a"}}}, nil) + id1, err := newFake("github://o/r/pull/1/a").Trigger(ctx, nil, entity.Batch{ID: headBatchID}, nil) require.NoError(t, err) assert.NotEmpty(t, id1.ID) - // Same runner instance, different trigger. - r := New() - id2, err := r.Trigger(ctx, nil, nil, nil) + // Same runner instance, different trigger (empty head — no marker). + r := New(changesetfake.New()) + id2, err := r.Trigger(ctx, nil, entity.Batch{ID: "x"}, nil) require.NoError(t, err) - id3, err := r.Trigger(ctx, nil, nil, nil) + id3, err := r.Trigger(ctx, nil, entity.Batch{ID: "x"}, nil) require.NoError(t, err) assert.NotEqual(t, id2, id3) @@ -49,9 +58,8 @@ func TestRunner_Trigger_UniqueIDs(t *testing.T) { } func TestRunner_TriggerError(t *testing.T) { - r := New() - _, err := r.Trigger(context.Background(), nil, - []entity.Change{{URIs: []string{"github://o/r/pull/1/a?sq-fake=trigger-error"}}}, nil) + r := newFake("github://o/r/pull/1/a?sq-fake=trigger-error") + _, err := r.Trigger(context.Background(), nil, entity.Batch{ID: headBatchID}, nil) require.Error(t, err) } @@ -88,8 +96,8 @@ func TestRunner_Status(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := New() - id, err := r.Trigger(ctx, nil, []entity.Change{{URIs: tt.headURIs}}, nil) + r := newFake(tt.headURIs...) + id, err := r.Trigger(ctx, nil, entity.Batch{ID: headBatchID}, nil) require.NoError(t, err) status, _, err := r.Status(ctx, id) @@ -104,7 +112,7 @@ func TestRunner_Status(t *testing.T) { } func TestRunner_Status_UnknownBuildSucceeds(t *testing.T) { - r := New() + r := New(changesetfake.New()) status, _, err := r.Status(context.Background(), entity.BuildID{ID: "never-triggered"}) require.NoError(t, err) assert.Equal(t, entity.BuildStatusSucceeded, status) @@ -115,16 +123,15 @@ func TestRunner_Status_UnknownBuildSucceeds(t *testing.T) { // back correctly by a different runner instance. func TestStatus_StatelessAcrossInstances(t *testing.T) { ctx := context.Background() - id, err := New().Trigger(ctx, nil, - []entity.Change{{URIs: []string{"github://o/r/pull/1/a?sq-fake=build-fail"}}}, nil) + id, err := newFake("github://o/r/pull/1/a?sq-fake=build-fail").Trigger(ctx, nil, entity.Batch{ID: headBatchID}, nil) require.NoError(t, err) - status, _, err := New().Status(ctx, id) + status, _, err := New(changesetfake.New()).Status(ctx, id) require.NoError(t, err) assert.Equal(t, entity.BuildStatusFailed, status) } func TestRunner_Cancel(t *testing.T) { - r := New() + r := New(changesetfake.New()) assert.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "any"})) } diff --git a/submitqueue/extension/buildrunner/githubactions/BUILD.bazel b/submitqueue/extension/buildrunner/githubactions/BUILD.bazel index 8ff1abb7..e4f04faf 100644 --- a/submitqueue/extension/buildrunner/githubactions/BUILD.bazel +++ b/submitqueue/extension/buildrunner/githubactions/BUILD.bazel @@ -9,6 +9,7 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/extension/buildrunner/githubactions", visibility = ["//visibility:public"], deps = [ + "//submitqueue/core/changeset", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "@org_uber_go_zap//:zap", @@ -21,6 +22,8 @@ go_test( embed = [":githubactions"], deps = [ "//core/httpclient", + "//submitqueue/core/changeset", + "//submitqueue/core/changeset/fake", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/extension/buildrunner/githubactions/githubactions.go b/submitqueue/extension/buildrunner/githubactions/githubactions.go index d7dff659..0ac8ccf9 100644 --- a/submitqueue/extension/buildrunner/githubactions/githubactions.go +++ b/submitqueue/extension/buildrunner/githubactions/githubactions.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" + "github.com/uber/submitqueue/submitqueue/core/changeset" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" ) @@ -58,6 +59,8 @@ type Params struct { // The token needs actions:write to dispatch/cancel workflows and actions:read // to poll status. HTTPClient *http.Client + // Resolver resolves a batch's changes (base and head batches). + Resolver changeset.Resolver // Logger is the structured logger. Logger *zap.SugaredLogger // Owner is the repository owner or organization, for example "uber". @@ -80,6 +83,7 @@ type runner struct { ref string extraInputs map[string]string client *client + resolver changeset.Resolver logger *zap.SugaredLogger } @@ -105,11 +109,12 @@ func NewBuildRunner(params Params) (buildrunner.BuildRunner, error) { repo: params.Repo, workflowID: params.WorkflowID, }, + params.Resolver, params.Logger.Named("githubactions_buildrunner"), ), nil } -func newRunner(cfg buildrunner.Config, ref string, extraInputs map[string]string, c *client, logger *zap.SugaredLogger) *runner { +func newRunner(cfg buildrunner.Config, ref string, extraInputs map[string]string, c *client, resolver changeset.Resolver, logger *zap.SugaredLogger) *runner { copied := make(map[string]string, len(extraInputs)) for k, v := range extraInputs { copied[k] = v @@ -119,6 +124,7 @@ func newRunner(cfg buildrunner.Config, ref string, extraInputs map[string]string ref: ref, extraInputs: copied, client: c, + resolver: resolver, logger: logger, } } @@ -145,8 +151,17 @@ func validateConfig(httpClient *http.Client, logger *zap.SugaredLogger, owner, r // Trigger dispatches the configured GitHub Actions workflow and returns the // GitHub workflow run ID as the SubmitQueue build ID. Errors are propagated to // the caller so the queue consumer can nack and retry. -func (r *runner) Trigger(ctx context.Context, base, head []entity.Change, metadata entity.BuildMetadata) (entity.BuildID, error) { - inputs, err := r.dispatchInputs(base, head, metadata) +func (r *runner) Trigger(ctx context.Context, base []entity.Batch, head entity.Batch, metadata entity.BuildMetadata) (entity.BuildID, error) { + baseChanges, err := buildrunner.ResolveBatches(ctx, r.resolver, base) + if err != nil { + return entity.BuildID{}, fmt.Errorf("github actions: resolve base: %w", err) + } + headChanges, err := r.resolver.ChangesForBatch(ctx, head) + if err != nil { + return entity.BuildID{}, fmt.Errorf("github actions: resolve head: %w", err) + } + + inputs, err := r.dispatchInputs(baseChanges, headChanges, metadata) if err != nil { return entity.BuildID{}, err } diff --git a/submitqueue/extension/buildrunner/githubactions/githubactions_test.go b/submitqueue/extension/buildrunner/githubactions/githubactions_test.go index 80502f39..b07d50e6 100644 --- a/submitqueue/extension/buildrunner/githubactions/githubactions_test.go +++ b/submitqueue/extension/buildrunner/githubactions/githubactions_test.go @@ -29,11 +29,13 @@ import ( "go.uber.org/zap" "github.com/uber/submitqueue/core/httpclient" + "github.com/uber/submitqueue/submitqueue/core/changeset" + changesetfake "github.com/uber/submitqueue/submitqueue/core/changeset/fake" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" ) -func newTestRunner(t *testing.T, handler http.Handler) *runner { +func newTestRunner(t *testing.T, handler http.Handler, resolver ...changeset.Resolver) *runner { t.Helper() srv := httptest.NewServer(handler) t.Cleanup(srv.Close) @@ -41,6 +43,11 @@ func newTestRunner(t *testing.T, handler http.Handler) *runner { c, err := httpclient.NewClient(srv.URL) require.NoError(t, err) + r := changeset.Resolver(changesetfake.New()) + if len(resolver) > 0 { + r = resolver[0] + } + return newRunner( buildrunner.Config{QueueName: "my-queue"}, "main", @@ -51,6 +58,7 @@ func newTestRunner(t *testing.T, handler http.Handler) *runner { repo: "submitqueue", workflowID: "submitqueue-ci.yml", }, + r, zap.NewNop().Sugar(), ) } @@ -84,6 +92,10 @@ func TestTrigger_DispatchesWorkflowAndReturnsRunID(t *testing.T) { var capturedPath string var capturedBody []byte + resolver := changesetfake.New(). + Set("base-batch", entity.Change{URIs: []string{"github://org/repo/pull/1/aaa111"}}). + Set("head-batch", entity.Change{URIs: []string{"github://org/repo/pull/2/bbb222"}}) + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { capturedMethod = req.Method capturedPath = req.URL.String() @@ -93,13 +105,11 @@ func TestTrigger_DispatchesWorkflowAndReturnsRunID(t *testing.T) { RunURL: "https://api.github.com/repos/uber/submitqueue/actions/runs/42", HTMLURL: "https://github.com/uber/submitqueue/actions/runs/42", }) - })) + }), resolver) - base := []entity.Change{{URIs: []string{"github://org/repo/pull/1/aaa111"}}} - head := []entity.Change{{URIs: []string{"github://org/repo/pull/2/bbb222"}}} metadata := entity.BuildMetadata{"requester": "alice"} - id, err := r.Trigger(context.Background(), base, head, metadata) + id, err := r.Trigger(context.Background(), []entity.Batch{{ID: "base-batch"}}, entity.Batch{ID: "head-batch"}, metadata) require.NoError(t, err) assert.Equal(t, "42", id.ID) @@ -123,12 +133,13 @@ func TestTrigger_DispatchesWorkflowAndReturnsRunID(t *testing.T) { func TestTrigger_EmptyBaseProducesJSONArray(t *testing.T) { var capturedBody []byte + resolver := changesetfake.New().Set("head-batch", entity.Change{URIs: []string{"u"}}) r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { capturedBody, _ = io.ReadAll(req.Body) _ = json.NewEncoder(w).Encode(dispatchWorkflowResponse{WorkflowRunID: 7}) - })) + }), resolver) - _, err := r.Trigger(context.Background(), nil, []entity.Change{{URIs: []string{"u"}}}, nil) + _, err := r.Trigger(context.Background(), nil, entity.Batch{ID: "head-batch"}, nil) require.NoError(t, err) var req dispatchWorkflowRequest @@ -142,7 +153,7 @@ func TestTrigger_ErrorsWhenDispatchResponseHasNoRunID(t *testing.T) { _ = json.NewEncoder(w).Encode(dispatchWorkflowResponse{}) })) - _, err := r.Trigger(context.Background(), nil, nil, nil) + _, err := r.Trigger(context.Background(), nil, entity.Batch{ID: "head-batch"}, nil) require.Error(t, err) assert.Contains(t, err.Error(), "response missing workflow_run_id") } diff --git a/submitqueue/extension/buildrunner/mock/build_runner_mock.go b/submitqueue/extension/buildrunner/mock/build_runner_mock.go index 4b7ac83a..fca56de9 100644 --- a/submitqueue/extension/buildrunner/mock/build_runner_mock.go +++ b/submitqueue/extension/buildrunner/mock/build_runner_mock.go @@ -73,7 +73,7 @@ func (mr *MockBuildRunnerMockRecorder) Status(ctx, buildID any) *gomock.Call { } // Trigger mocks base method. -func (m *MockBuildRunner) Trigger(ctx context.Context, base, head []entity.Change, metadata entity.BuildMetadata) (entity.BuildID, error) { +func (m *MockBuildRunner) Trigger(ctx context.Context, base []entity.Batch, head entity.Batch, metadata entity.BuildMetadata) (entity.BuildID, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Trigger", ctx, base, head, metadata) ret0, _ := ret[0].(entity.BuildID) diff --git a/submitqueue/extension/buildrunner/resolve.go b/submitqueue/extension/buildrunner/resolve.go new file mode 100644 index 00000000..c23e8e4d --- /dev/null +++ b/submitqueue/extension/buildrunner/resolve.go @@ -0,0 +1,38 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buildrunner + +import ( + "context" + + "github.com/uber/submitqueue/submitqueue/core/changeset" + "github.com/uber/submitqueue/submitqueue/entity" +) + +// ResolveBatches resolves each batch's changes through the resolver and +// concatenates them in order. It is shared by BuildRunner implementations that +// need a flat change list (e.g. the base, assembled from several dependency +// batches) so the per-batch resolution loop is not duplicated per backend. +func ResolveBatches(ctx context.Context, resolver changeset.Resolver, batches []entity.Batch) ([]entity.Change, error) { + var changes []entity.Change + for _, b := range batches { + cs, err := resolver.ChangesForBatch(ctx, b) + if err != nil { + return nil, err + } + changes = append(changes, cs...) + } + return changes, nil +} diff --git a/submitqueue/orchestrator/controller/build/BUILD.bazel b/submitqueue/orchestrator/controller/build/BUILD.bazel index 78e26b58..ededade1 100644 --- a/submitqueue/orchestrator/controller/build/BUILD.bazel +++ b/submitqueue/orchestrator/controller/build/BUILD.bazel @@ -25,6 +25,7 @@ go_test( "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", + "//submitqueue/core/changeset/fake", "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/buildrunner", diff --git a/submitqueue/orchestrator/controller/build/build.go b/submitqueue/orchestrator/controller/build/build.go index 4606d923..9143993a 100644 --- a/submitqueue/orchestrator/controller/build/build.go +++ b/submitqueue/orchestrator/controller/build/build.go @@ -114,16 +114,12 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil } - // Assemble base (dependency batches in order) and head (this batch). - base, err := c.collectChanges(ctx, batch.Dependencies) + // Load the dependency batches (base) as identity; the build runner resolves + // each batch's changes itself. head is this batch. + base, err := c.loadBatches(ctx, batch.Dependencies) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) - return fmt.Errorf("failed to assemble base changes for batch %s: %w", batch.ID, err) - } - head, err := c.collectChanges(ctx, []string{batch.ID}) - if err != nil { - metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) - return fmt.Errorf("failed to assemble head changes for batch %s: %w", batch.ID, err) + return fmt.Errorf("failed to load dependency batches for batch %s: %w", batch.ID, err) } // Trigger the build with the queue's build runner. metadata is nil @@ -134,7 +130,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1) return fmt.Errorf("failed to build runner for batch %s: %w", batch.ID, err) } - buildID, err := buildRunner.Trigger(ctx, base, head, nil) + buildID, err := buildRunner.Trigger(ctx, base, batch, nil) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1) return fmt.Errorf("failed to trigger build for batch %s: %w", batch.ID, err) @@ -173,28 +169,22 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil // Success - message will be acked } -// collectChanges loads each batch by ID and concatenates the Change values -// from its contained requests in batch order. Used to build the base -// (dependency batches) and head (this batch) inputs to BuildRunner.Trigger. -func (c *Controller) collectChanges(ctx context.Context, batchIDs []string) ([]entity.Change, error) { +// loadBatches loads each batch by ID, preserving order. Used to load the base +// (dependency batches) identity handed to BuildRunner.Trigger; the build runner +// resolves each batch's changes itself. +func (c *Controller) loadBatches(ctx context.Context, batchIDs []string) ([]entity.Batch, error) { if len(batchIDs) == 0 { return nil, nil } - var changes []entity.Change + batches := make([]entity.Batch, 0, len(batchIDs)) for _, bID := range batchIDs { b, err := c.store.GetBatchStore().Get(ctx, bID) if err != nil { return nil, fmt.Errorf("failed to get batch %s: %w", bID, err) } - for _, reqID := range b.Contains { - req, err := c.store.GetRequestStore().Get(ctx, reqID) - if err != nil { - return nil, fmt.Errorf("failed to get request %s for batch %s: %w", reqID, bID, err) - } - changes = append(changes, req.Change) - } + batches = append(batches, b) } - return changes, nil + return batches, nil } // publish publishes a build's ID to the specified topic key. Only the diff --git a/submitqueue/orchestrator/controller/build/build_test.go b/submitqueue/orchestrator/controller/build/build_test.go index a144078e..b2c96d59 100644 --- a/submitqueue/orchestrator/controller/build/build_test.go +++ b/submitqueue/orchestrator/controller/build/build_test.go @@ -25,6 +25,7 @@ import ( "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" + changesetfake "github.com/uber/submitqueue/submitqueue/core/changeset/fake" "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" @@ -74,7 +75,7 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.Mo } // newTestController creates a controller with test dependencies. br is the -// build runner to inject; pass buildfake.New() for the pass-through default. +// build runner to inject; pass buildfake.New(changesetfake.New()) for the pass-through default. // staticBuildRunnerFactory is a test factory that returns a fixed BuildRunner // for any entityqueue. type staticBuildRunnerFactory struct{ r buildrunner.BuildRunner } @@ -111,7 +112,7 @@ func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, buildfake.New(), nil) + controller := newTestController(t, ctrl, store, buildfake.New(changesetfake.New()), nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyBuild, controller.TopicKey()) @@ -124,7 +125,7 @@ func TestController_Process_Success(t *testing.T) { batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, buildfake.New(), nil) + controller := newTestController(t, ctrl, store, buildfake.New(changesetfake.New()), nil) msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -135,10 +136,10 @@ func TestController_Process_Success(t *testing.T) { require.NoError(t, err) } -// TestController_Process_TriggersWithBaseAndHead verifies the controller -// splits the input to BuildRunner.Trigger into base (dependency batches in -// order, concatenated) and head (this batch's changes in order), persists -// the initial Accepted Build, and publishes it to the buildsignal topic. +// TestController_Process_TriggersWithBaseAndHead verifies the controller hands +// BuildRunner.Trigger the base (dependency batches in order) and head (this +// batch) as identity, persists the initial Accepted Build, and publishes it to +// the buildsignal topic. The runner resolves each batch's changes itself. func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { ctrl := gomock.NewController(t) @@ -155,17 +156,10 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { Dependencies: []string{depBatch.ID}, Contains: []string{"test-queue/head-1", "test-queue/head-2"}, } - depReq := entity.Request{ID: "test-queue/dep-1", Change: entity.Change{URIs: []string{"github://o/r/pull/9/aaa"}}} - head1 := entity.Request{ID: "test-queue/head-1", Change: entity.Change{URIs: []string{"github://o/r/pull/1/aaa"}}} - head2 := entity.Request{ID: "test-queue/head-2", Change: entity.Change{URIs: []string{"github://o/r/pull/2/bbb"}}} mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), headBatch.ID).Return(headBatch, nil).AnyTimes() mockBatchStore.EXPECT().Get(gomock.Any(), depBatch.ID).Return(depBatch, nil).AnyTimes() - mockRequestStore := storagemock.NewMockRequestStore(ctrl) - mockRequestStore.EXPECT().Get(gomock.Any(), depReq.ID).Return(depReq, nil) - mockRequestStore.EXPECT().Get(gomock.Any(), head1.ID).Return(head1, nil) - mockRequestStore.EXPECT().Get(gomock.Any(), head2.ID).Return(head2, nil) var created entity.Build mockBuildStore := storagemock.NewMockBuildStore(ctrl) @@ -178,13 +172,11 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() br := buildrunnermock.NewMockBuildRunner(ctrl) - wantBase := []entity.Change{depReq.Change} - wantHead := []entity.Change{head1.Change, head2.Change} - br.EXPECT().Trigger(gomock.Any(), wantBase, wantHead, gomock.Nil()).Return(entity.BuildID{ID: "build-xyz"}, nil) + // base is the dependency batches (identity); head is this batch. + br.EXPECT().Trigger(gomock.Any(), []entity.Batch{depBatch}, headBatch, gomock.Nil()).Return(entity.BuildID{ID: "build-xyz"}, nil) var publishedTopic string var published entity.BuildID @@ -315,7 +307,7 @@ func TestController_Process_StorageFailure(t *testing.T) { store.EXPECT().GetRequestStore().Return(storagemock.NewMockRequestStore(ctrl)).AnyTimes() store.EXPECT().GetBuildStore().Return(storagemock.NewMockBuildStore(ctrl)).AnyTimes() - controller := newTestController(t, ctrl, store, buildfake.New(), nil) + controller := newTestController(t, ctrl, store, buildfake.New(changesetfake.New()), nil) msg := entityqueue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -332,7 +324,7 @@ func TestController_Process_PublishFailure(t *testing.T) { batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, buildfake.New(), fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, store, buildfake.New(changesetfake.New()), fmt.Errorf("publish failed")) msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -347,7 +339,7 @@ func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, buildfake.New(), nil) + controller := newTestController(t, ctrl, store, buildfake.New(changesetfake.New()), nil) var _ consumer.Controller = controller } From 028d25af67d950723e5a3a1abcd1f7c90df64a72 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Mon, 8 Jun 2026 12:57:37 -0700 Subject: [PATCH 2/2] refactor(pusher): push ordered batches, return per-batch outcomes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change Pusher.Push to take ordered []entity.Batch instead of controller-pre-resolved []entity.Change, per the extension contract. The git pusher and the fake gain an injected changeset.Resolver and resolve each batch's changes themselves; the merge controller drops its private collectChanges walk and passes the single batch (the list designs for a future merge-train). This is the one extension whose output shape also changes: Result now groups outcomes per batch — Result{Batches []BatchOutcome}, where BatchOutcome{BatchID, Outcomes []ChangeOutcome} — so each landed batch stays correlatable, the way conflict.Conflict carries its BatchID. ChangeOutcome (per-change commit detail) is unchanged. No per-batch status: push atomicity stays all-or-nothing across the whole call. --- .../submitqueue/orchestrator/server/main.go | 7 +- submitqueue/extension/pusher/fake/BUILD.bazel | 2 + submitqueue/extension/pusher/fake/fake.go | 56 +++++--- .../extension/pusher/fake/fake_test.go | 26 ++-- submitqueue/extension/pusher/git/BUILD.bazel | 2 + .../extension/pusher/git/git_pusher.go | 35 ++++- .../extension/pusher/git/git_pusher_test.go | 91 +++++++------ .../extension/pusher/mock/pusher_mock.go | 8 +- submitqueue/extension/pusher/pusher.go | 45 +++++-- .../orchestrator/controller/merge/merge.go | 27 +--- .../controller/merge/merge_test.go | 125 ++++-------------- 11 files changed, 205 insertions(+), 219 deletions(-) diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index a0360464..931836ce 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -775,16 +775,17 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch // "origin", PUSHER_TARGET default "main"). When PUSHER_CHECKOUT_PATH is unset it // returns the fake pusher (commits succeed unless a change URI carries a failure // marker, see pusher/fake), keeping the example runnable without a git checkout. -func newPusher(logger *zap.Logger, scope tally.Scope) (pusher.Pusher, error) { +func newPusher(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (pusher.Pusher, error) { checkout := os.Getenv("PUSHER_CHECKOUT_PATH") if checkout == "" { logger.Warn("PUSHER_CHECKOUT_PATH not set; using fake pusher (commits succeed unless URI-marked)") - return pushfake.New(), nil + return pushfake.New(resolver), nil } return gitpusher.NewPusher(gitpusher.Params{ CheckoutPath: checkout, Remote: getEnv("PUSHER_REMOTE", "origin"), Target: getEnv("PUSHER_TARGET", "main"), + Resolver: resolver, Logger: logger.Sugar(), MetricsScope: scope.SubScope("pusher"), }), nil @@ -806,7 +807,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset. if err != nil { return queueRegistry{}, fmt.Errorf("failed to create change provider: %w", err) } - psh, err := newPusher(logger, scope) + psh, err := newPusher(logger, scope, resolver) if err != nil { return queueRegistry{}, fmt.Errorf("failed to create pusher: %w", err) } diff --git a/submitqueue/extension/pusher/fake/BUILD.bazel b/submitqueue/extension/pusher/fake/BUILD.bazel index b5bd7d1d..fc902b12 100644 --- a/submitqueue/extension/pusher/fake/BUILD.bazel +++ b/submitqueue/extension/pusher/fake/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/extension/pusher/fake", visibility = ["//visibility:public"], deps = [ + "//submitqueue/core/changeset", "//submitqueue/core/fakemarker", "//submitqueue/entity", "//submitqueue/extension/pusher", @@ -17,6 +18,7 @@ go_test( srcs = ["fake_test.go"], embed = [":fake"], deps = [ + "//submitqueue/core/changeset/fake", "//submitqueue/entity", "//submitqueue/extension/pusher", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/extension/pusher/fake/fake.go b/submitqueue/extension/pusher/fake/fake.go index 3db1c19f..ea6e50e8 100644 --- a/submitqueue/extension/pusher/fake/fake.go +++ b/submitqueue/extension/pusher/fake/fake.go @@ -32,6 +32,7 @@ import ( "fmt" "sync/atomic" + "github.com/uber/submitqueue/submitqueue/core/changeset" "github.com/uber/submitqueue/submitqueue/core/fakemarker" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" @@ -45,35 +46,54 @@ const ( // fakePusher is a pusher.Pusher that reports every change as committed unless a // marker token in a change URI requests a failure. The atomic counter hands out -// unique synthetic commit SHAs and makes the type safe for concurrent use. +// unique synthetic commit SHAs and makes the type safe for concurrent use. It +// resolves each batch's changes through the injected resolver. type fakePusher struct { - counter atomic.Uint64 + resolver changeset.Resolver + counter atomic.Uint64 } // New returns a pusher.Pusher that defaults to committing every change and -// honors marker tokens embedded in change URIs. -func New() pusher.Pusher { - return &fakePusher{} +// honors marker tokens embedded in change URIs. The resolver resolves each +// batch's changes. +func New(resolver changeset.Resolver) pusher.Pusher { + return &fakePusher{resolver: resolver} } -// Push reports every change as committed with a synthetic commit SHA, unless a -// recognized marker token in one of the changes requests a failure. -func (p *fakePusher) Push(_ context.Context, changes []entity.Change) (pusher.Result, error) { - switch fakemarker.TokenInChanges(changes) { +// Push resolves each batch's changes and reports every change as committed with +// a synthetic commit SHA, grouped per batch, unless a recognized marker token in +// one of the changes requests a failure. +func (p *fakePusher) Push(ctx context.Context, batches []entity.Batch) (pusher.Result, error) { + perBatch := make([][]entity.Change, len(batches)) + var all []entity.Change + for i, b := range batches { + cs, err := p.resolver.ChangesForBatch(ctx, b) + if err != nil { + return pusher.Result{}, fmt.Errorf("fake: resolve batch %s: %w", b.ID, err) + } + perBatch[i] = cs + all = append(all, cs...) + } + + switch fakemarker.TokenInChanges(all) { case tokenConflict: return pusher.Result{}, pusher.ErrConflict case tokenError: return pusher.Result{}, fmt.Errorf("fake: marked push error") } - outcomes := make([]pusher.ChangeOutcome, 0, len(changes)) - for _, change := range changes { - sha := fmt.Sprintf("fake-%d", p.counter.Add(1)) - outcomes = append(outcomes, pusher.ChangeOutcome{ - Change: change, - Status: pusher.OutcomeStatusCommitted, - CommitSHAs: []string{sha}, - }) + result := make([]pusher.BatchOutcome, len(batches)) + for i, b := range batches { + outcomes := make([]pusher.ChangeOutcome, 0, len(perBatch[i])) + for _, change := range perBatch[i] { + sha := fmt.Sprintf("fake-%d", p.counter.Add(1)) + outcomes = append(outcomes, pusher.ChangeOutcome{ + Change: change, + Status: pusher.OutcomeStatusCommitted, + CommitSHAs: []string{sha}, + }) + } + result[i] = pusher.BatchOutcome{BatchID: b.ID, Outcomes: outcomes} } - return pusher.Result{Outcomes: outcomes}, nil + return pusher.Result{Batches: result}, nil } diff --git a/submitqueue/extension/pusher/fake/fake_test.go b/submitqueue/extension/pusher/fake/fake_test.go index df46aa6f..55249d55 100644 --- a/submitqueue/extension/pusher/fake/fake_test.go +++ b/submitqueue/extension/pusher/fake/fake_test.go @@ -21,27 +21,29 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + changesetfake "github.com/uber/submitqueue/submitqueue/core/changeset/fake" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" ) func TestNew_ImplementsInterface(t *testing.T) { - var _ pusher.Pusher = New() + var _ pusher.Pusher = New(changesetfake.New()) } func TestPusher_Push_Committed(t *testing.T) { - p := New() changes := []entity.Change{ {URIs: []string{"github://owner/repo/pull/1/abc"}}, {URIs: []string{"github://owner/repo/pull/2/def"}}, } + p := New(changesetfake.New().Set("b", changes...)) - res, err := p.Push(context.Background(), changes) + res, err := p.Push(context.Background(), []entity.Batch{{ID: "b"}}) require.NoError(t, err) - require.Len(t, res.Outcomes, len(changes)) + require.Len(t, res.Batches, 1) + require.Len(t, res.Batches[0].Outcomes, len(changes)) seen := map[string]bool{} - for i, out := range res.Outcomes { + for i, out := range res.Batches[0].Outcomes { assert.Equal(t, changes[i], out.Change) assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) require.Len(t, out.CommitSHAs, 1) @@ -51,19 +53,15 @@ func TestPusher_Push_Committed(t *testing.T) { } func TestPusher_Push_ConflictMarker(t *testing.T) { - p := New() - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=conflict"}}, - }) + p := New(changesetfake.New().Set("b", entity.Change{URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=conflict"}})) + _, err := p.Push(context.Background(), []entity.Batch{{ID: "b"}}) assert.True(t, errors.Is(err, pusher.ErrConflict)) } func TestPusher_Push_ErrorMarker(t *testing.T) { - p := New() - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=push-error"}}, - }) + p := New(changesetfake.New().Set("b", entity.Change{URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=push-error"}})) + res, err := p.Push(context.Background(), []entity.Batch{{ID: "b"}}) require.Error(t, err) // Atomicity: on error no outcomes are reported. - assert.Empty(t, res.Outcomes) + assert.Empty(t, res.Batches) } diff --git a/submitqueue/extension/pusher/git/BUILD.bazel b/submitqueue/extension/pusher/git/BUILD.bazel index 5ae9f8f1..bbbea0fd 100644 --- a/submitqueue/extension/pusher/git/BUILD.bazel +++ b/submitqueue/extension/pusher/git/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/metrics", + "//submitqueue/core/changeset", "//submitqueue/entity", "//submitqueue/entity/github", "//submitqueue/extension/pusher", @@ -20,6 +21,7 @@ go_test( srcs = ["git_pusher_test.go"], embed = [":git"], deps = [ + "//submitqueue/core/changeset/fake", "//submitqueue/entity", "//submitqueue/extension/pusher", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/extension/pusher/git/git_pusher.go b/submitqueue/extension/pusher/git/git_pusher.go index 7c395ba6..9508fb9d 100644 --- a/submitqueue/extension/pusher/git/git_pusher.go +++ b/submitqueue/extension/pusher/git/git_pusher.go @@ -62,6 +62,7 @@ import ( "go.uber.org/zap" coremetrics "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/submitqueue/core/changeset" "github.com/uber/submitqueue/submitqueue/entity" entitygithub "github.com/uber/submitqueue/submitqueue/entity/github" "github.com/uber/submitqueue/submitqueue/extension/pusher" @@ -82,6 +83,8 @@ type Params struct { Remote string // Target is the destination branch ref on the remote (e.g. "main"). Target string + // Resolver resolves each batch's changes. + Resolver changeset.Resolver // Logger is the structured logger. Logger *zap.SugaredLogger // MetricsScope is the metrics scope for instrumentation. @@ -98,6 +101,7 @@ type gitPusher struct { checkoutPath string remote string target string + resolver changeset.Resolver logger *zap.SugaredLogger metricsScope tally.Scope maxPushAttempts int @@ -121,6 +125,7 @@ func NewPusher(params Params) pusher.Pusher { checkoutPath: params.CheckoutPath, remote: params.Remote, target: params.Target, + resolver: params.Resolver, logger: params.Logger.Named("git_pusher"), metricsScope: params.MetricsScope.SubScope("git_pusher"), maxPushAttempts: maxAttempts, @@ -128,10 +133,23 @@ func NewPusher(params Params) pusher.Pusher { } // Push fulfils the pusher.Pusher contract. -func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret pusher.Result, retErr error) { +func (p *gitPusher) Push(ctx context.Context, batches []entity.Batch) (ret pusher.Result, retErr error) { op := coremetrics.Begin(p.metricsScope, "push") defer func() { op.Complete(retErr) }() + // Resolve each batch's changes, keeping per-batch counts so the flat + // outcomes can be regrouped per batch on success. + perBatch := make([][]entity.Change, len(batches)) + var changes []entity.Change + for i, b := range batches { + cs, err := p.resolver.ChangesForBatch(ctx, b) + if err != nil { + return pusher.Result{}, fmt.Errorf("resolve batch %s: %w", b.ID, err) + } + perBatch[i] = cs + changes = append(changes, cs...) + } + p.mu.Lock() defer p.mu.Unlock() @@ -154,7 +172,7 @@ func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret push "target", p.target, "outcomes", outcomes, ) - return pusher.Result{Outcomes: outcomes}, nil + return pusher.Result{Batches: groupByBatch(batches, perBatch, outcomes)}, nil } // Was the failure caused by the remote tip moving under us between @@ -190,6 +208,19 @@ func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret push return pusher.Result{}, fmt.Errorf("exceeded %d push attempts due to remote contention: %w", p.maxPushAttempts, lastErr) } +// groupByBatch splits the flat, apply-ordered outcomes back into one +// BatchOutcome per input batch, using each batch's resolved change count. +func groupByBatch(batches []entity.Batch, perBatch [][]entity.Change, outcomes []pusher.ChangeOutcome) []pusher.BatchOutcome { + result := make([]pusher.BatchOutcome, len(batches)) + pos := 0 + for i, b := range batches { + n := len(perBatch[i]) + result[i] = pusher.BatchOutcome{BatchID: b.ID, Outcomes: outcomes[pos : pos+n]} + pos += n + } + return result +} + // tryPush runs one full reset+cherry-pick+push cycle. The returned baseSHA // is the SHA the cycle was based on (set as soon as resetToRemote completes) // so the caller can distinguish concurrent-push contention from other push diff --git a/submitqueue/extension/pusher/git/git_pusher_test.go b/submitqueue/extension/pusher/git/git_pusher_test.go index 33d51a69..57e4a9e8 100644 --- a/submitqueue/extension/pusher/git/git_pusher_test.go +++ b/submitqueue/extension/pusher/git/git_pusher_test.go @@ -31,6 +31,7 @@ import ( "github.com/uber-go/tally" "go.uber.org/zap/zaptest" + "github.com/uber/submitqueue/submitqueue/core/changeset/fake" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" ) @@ -47,6 +48,7 @@ type gitFixture struct { remoteDir string checkoutDir string authorDir string // a separate working clone used to author "PR" commits + resolver *fake.Resolver } func setupGitFixture(t *testing.T) gitFixture { @@ -81,9 +83,17 @@ func setupGitFixture(t *testing.T) gitFixture { remoteDir: remoteDir, checkoutDir: checkoutDir, authorDir: authorDir, + resolver: fake.New(), } } +// batchFor seeds the fixture resolver so batch id resolves to the given changes, +// and returns the batch to pass to Push. +func (f gitFixture) batchFor(id string, changes ...entity.Change) entity.Batch { + f.resolver.Set(id, changes...) + return entity.Batch{ID: id} +} + // configRepo applies the test-only config that lets git commit work in a // sandbox without GPG signing, system identity, or system git hooks. The // devpod environment installs hooks via the system git config (core.hooksPath @@ -152,6 +162,7 @@ func (f gitFixture) newPusher(t *testing.T) pusher.Pusher { CheckoutPath: f.checkoutDir, Remote: "origin", Target: "main", + Resolver: f.resolver, Logger: zaptest.NewLogger(t).Sugar(), MetricsScope: tally.NoopScope, }) @@ -243,13 +254,13 @@ func TestPusher_Push_SingleChangeSingleURIProducesOneCommit(t *testing.T) { sha := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(sha)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 1) + require.Len(t, res.Batches[0].Outcomes, 1) - out := res.Outcomes[0] + out := res.Batches[0].Outcomes[0] assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) require.Len(t, out.CommitSHAs, 1) assert.Equal(t, []string{out.CommitSHAs[0]}, f.remoteCommitsSinceSeed(t)) @@ -275,13 +286,13 @@ func TestPusher_Push_StackedURIsProduceMultipleCommitsForOneChange(t *testing.T) mustGit(t, f.authorDir, "push", "-f", "origin", "feature/stack") p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha1), uri(sha2)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(sha1), uri(sha2)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 1) + require.Len(t, res.Batches[0].Outcomes, 1) - out := res.Outcomes[0] + out := res.Batches[0].Outcomes[0] assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) require.Len(t, out.CommitSHAs, 2) assert.Equal(t, out.CommitSHAs, f.remoteCommitsSinceSeed(t)) @@ -298,13 +309,13 @@ func TestPusher_Push_AlreadyLandedChangeIsRebasedOut(t *testing.T) { mainBeforePush := f.remoteHEAD(t) p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(sha)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 1) + require.Len(t, res.Batches[0].Outcomes, 1) - out := res.Outcomes[0] + out := res.Batches[0].Outcomes[0] assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, out.Status) assert.Empty(t, out.CommitSHAs) assert.Equal(t, mainBeforePush, f.remoteHEAD(t), @@ -319,18 +330,17 @@ func TestPusher_Push_MixedChangesPartiallyRebasedOut(t *testing.T) { freshSHA := f.pushPRCommit(t, "feature/b", "extra.txt", "extra\n", "add extra") p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(subsumedSHA)}}, - {URIs: []string{uri(freshSHA)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(subsumedSHA)}}, entity.Change{URIs: []string{uri(freshSHA)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 2) + require.Len(t, res.Batches[0].Outcomes, 2) - assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, res.Outcomes[0].Status) - assert.Empty(t, res.Outcomes[0].CommitSHAs) + assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, res.Batches[0].Outcomes[0].Status) + assert.Empty(t, res.Batches[0].Outcomes[0].CommitSHAs) - assert.Equal(t, pusher.OutcomeStatusCommitted, res.Outcomes[1].Status) - require.Len(t, res.Outcomes[1].CommitSHAs, 1) + assert.Equal(t, pusher.OutcomeStatusCommitted, res.Batches[0].Outcomes[1].Status) + require.Len(t, res.Batches[0].Outcomes[1].CommitSHAs, 1) assert.Equal(t, "extra\n", f.remoteFile(t, "extra.txt")) } @@ -349,8 +359,8 @@ func TestPusher_Push_ConflictReturnsErrConflictAndDoesNotPush(t *testing.T) { conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") p := f.newPusher(t) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(conflictingSHA)}}, + _, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(conflictingSHA)}}), }) require.Error(t, err) assert.True(t, errors.Is(err, pusher.ErrConflict)) @@ -368,13 +378,13 @@ func TestPusher_Push_ResetsBetweenCalls(t *testing.T) { // would fail or include unrelated changes. require.NoError(t, writeFile(filepath.Join(f.checkoutDir, "stray.txt"), "leftover\n")) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(sha)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes[0].CommitSHAs, 1) + require.Len(t, res.Batches[0].Outcomes[0].CommitSHAs, 1) - out := mustGitOutput(t, f.remoteDir, "ls-tree", "--name-only", res.Outcomes[0].CommitSHAs[0]) + out := mustGitOutput(t, f.remoteDir, "ls-tree", "--name-only", res.Batches[0].Outcomes[0].CommitSHAs[0]) assert.NotContains(t, string(out), "stray.txt", "unrelated file should not have landed") assert.Contains(t, string(out), "hello.txt") } @@ -388,19 +398,19 @@ func TestPusher_Push_RecoversAfterPriorConflict(t *testing.T) { conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") p := f.newPusher(t) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(conflictingSHA)}}, + _, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(conflictingSHA)}}), }) require.Error(t, err) // A subsequent, clean push must succeed even though the prior call left // a cherry-pick in progress before its rollback. freshSHA := f.pushPRCommit(t, "feature/c", "extra.txt", "extra\n", "add extra") - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(freshSHA)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b2", entity.Change{URIs: []string{uri(freshSHA)}}), }) require.NoError(t, err) - assert.Equal(t, pusher.OutcomeStatusCommitted, res.Outcomes[0].Status) + assert.Equal(t, pusher.OutcomeStatusCommitted, res.Batches[0].Outcomes[0].Status) assert.Equal(t, "extra\n", f.remoteFile(t, "extra.txt")) } @@ -417,8 +427,8 @@ func TestPusher_Push_InvalidURIErrors(t *testing.T) { f := setupGitFixture(t) p := f.newPusher(t) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{"not a uri"}}, + _, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{"not a uri"}}), }) require.Error(t, err) } @@ -433,12 +443,12 @@ func TestPusher_Push_RetriesWhenRemoteMovesUnderUs(t *testing.T) { f.installRaceHook(t, []string{raceSHA}) p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(featureSHA)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(featureSHA)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 1) - require.Len(t, res.Outcomes[0].CommitSHAs, 1) + require.Len(t, res.Batches[0].Outcomes, 1) + require.Len(t, res.Batches[0].Outcomes[0].CommitSHAs, 1) assert.Equal(t, 2, f.hookInvocations(t), "first attempt rejected by hook, second attempt allowed through") @@ -447,7 +457,7 @@ func TestPusher_Push_RetriesWhenRemoteMovesUnderUs(t *testing.T) { require.Len(t, commits, 2) assert.Equal(t, raceSHA, commits[0], "race commit landed first via the hook") - assert.Equal(t, res.Outcomes[0].CommitSHAs[0], commits[1], + assert.Equal(t, res.Batches[0].Outcomes[0].CommitSHAs[0], commits[1], "our cherry-pick landed on top after the retry") assert.Equal(t, "hello\nearth\n", f.remoteFile(t, "hello.txt")) } @@ -465,12 +475,13 @@ func TestPusher_Push_GivesUpAfterMaxAttempts(t *testing.T) { CheckoutPath: f.checkoutDir, Remote: "origin", Target: "main", + Resolver: f.resolver, Logger: zaptest.NewLogger(t).Sugar(), MetricsScope: tally.NoopScope, MaxPushAttempts: 2, }) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(featureSHA)}}, + _, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(featureSHA)}}), }) require.Error(t, err) assert.Equal(t, 2, f.hookInvocations(t), diff --git a/submitqueue/extension/pusher/mock/pusher_mock.go b/submitqueue/extension/pusher/mock/pusher_mock.go index 5ff69e71..27345b16 100644 --- a/submitqueue/extension/pusher/mock/pusher_mock.go +++ b/submitqueue/extension/pusher/mock/pusher_mock.go @@ -43,18 +43,18 @@ func (m *MockPusher) EXPECT() *MockPusherMockRecorder { } // Push mocks base method. -func (m *MockPusher) Push(ctx context.Context, changes []entity.Change) (pusher.Result, error) { +func (m *MockPusher) Push(ctx context.Context, batches []entity.Batch) (pusher.Result, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Push", ctx, changes) + ret := m.ctrl.Call(m, "Push", ctx, batches) ret0, _ := ret[0].(pusher.Result) ret1, _ := ret[1].(error) return ret0, ret1 } // Push indicates an expected call of Push. -func (mr *MockPusherMockRecorder) Push(ctx, changes any) *gomock.Call { +func (mr *MockPusherMockRecorder) Push(ctx, batches any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), ctx, changes) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), ctx, batches) } // MockFactory is a mock of Factory interface. diff --git a/submitqueue/extension/pusher/pusher.go b/submitqueue/extension/pusher/pusher.go index 8d86e131..633258e2 100644 --- a/submitqueue/extension/pusher/pusher.go +++ b/submitqueue/extension/pusher/pusher.go @@ -59,16 +59,29 @@ type ChangeOutcome struct { CommitSHAs []string } +// BatchOutcome groups the per-change outcomes for a single input batch, so a +// merge-train push (several batches in one call) stays correlatable back to the +// batch each change belonged to. There is no per-batch status: Push is +// all-or-nothing across the whole call (see the atomicity contract), so a +// per-batch pass/fail would be uniformly redundant. +type BatchOutcome struct { + // BatchID is the input batch this outcome corresponds to. + BatchID string + // Outcomes is one entry per change in the batch, in apply order. + Outcomes []ChangeOutcome +} + // Result is the outcome of a successful Push call. type Result struct { - // Outcomes is one entry per input change, in the same order as the - // changes passed to Push. The slice length equals the input length. - Outcomes []ChangeOutcome + // Batches is one entry per input batch, in the same order as the batches + // passed to Push. The slice length equals the input length. + Batches []BatchOutcome } -// Pusher applies a list of Changes on top of a target branch and pushes the -// result to the source-control remote. Each implementation is bound to a -// specific (checkout, remote, target) at construction time. +// Pusher applies the changes of one or more batches on top of a target branch +// and pushes the result to the source-control remote. Each implementation is +// bound to a specific (checkout, remote, target) at construction time and +// resolves each batch's changes itself through an injected changeset resolver. // // Atomicity contract: when Push returns a non-nil error, NO change has been // pushed to the remote — neither partially nor fully. Implementations must @@ -76,15 +89,19 @@ type Result struct { // when any change fails to apply. Callers can treat a non-nil error as // "the remote is exactly as it was before the call". // -// On success, len(Result.Outcomes) == len(changes) and Outcomes[i] describes -// what happened to changes[i]. A change can produce multiple commits -// (OutcomeStatusCommitted, CommitSHAs populated in apply order) or none at -// all (OutcomeStatusAlreadyExisted, CommitSHAs empty) — the latter happens -// when the change's content is already present on the target branch. +// On success, len(Result.Batches) == len(batches) and Batches[i] describes what +// happened to batches[i], with one ChangeOutcome per change in that batch in +// apply order. A change can produce multiple commits (OutcomeStatusCommitted, +// CommitSHAs populated in apply order) or none at all (OutcomeStatusAlreadyExisted, +// CommitSHAs empty) — the latter happens when the change's content is already +// present on the target branch. type Pusher interface { - // Push applies changes onto the target branch and pushes the resulting - // commits. See the type-level docs for the atomicity contract. - Push(ctx context.Context, changes []entity.Change) (Result, error) + // Push resolves and applies the changes of the given batches, in order, + // onto the target branch and pushes the resulting commits. The batch list + // designs for a merge-train (land several ready batches in one atomic push); + // today merge passes a single batch. See the type-level docs for the + // atomicity contract. + Push(ctx context.Context, batches []entity.Batch) (Result, error) } // Config carries the per-queue identity handed to a Factory. The system knows diff --git a/submitqueue/orchestrator/controller/merge/merge.go b/submitqueue/orchestrator/controller/merge/merge.go index 31f1600c..eb0a319b 100644 --- a/submitqueue/orchestrator/controller/merge/merge.go +++ b/submitqueue/orchestrator/controller/merge/merge.go @@ -120,19 +120,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return c.fanout(ctx, batch.ID, batch.Queue) } - changes, err := c.collectChanges(ctx, batch) - if err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "request_load_errors", 1) - return fmt.Errorf("failed to collect changes for batch %s: %w", batch.ID, err) - } - push, err := c.pushers.For(pusher.Config{QueueName: batch.Queue}) if err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "push_errors", 1) return fmt.Errorf("failed to build pusher for batch %s: %w", batch.ID, err) } - pushRes, pushErr := push.Push(ctx, changes) + // Push a single batch today; the pusher resolves its changes itself. The + // list parameter designs for a future merge-train. + pushRes, pushErr := push.Push(ctx, []entity.Batch{batch}) var newState entity.BatchState switch { @@ -140,7 +136,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r newState = entity.BatchStateSucceeded c.logger.Infow("merged batch", "batch_id", batch.ID, - "outcomes", pushRes.Outcomes, + "outcomes", pushRes.Batches, ) case errors.Is(pushErr, pusher.ErrConflict): coremetrics.NamedCounter(c.metricsScope, "process", "push_conflicts", 1) @@ -166,21 +162,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return c.fanout(ctx, batch.ID, batch.Queue) } -// collectChanges loads each request in batch.Contains and returns its -// Change. The result preserves batch.Contains order so the Pusher applies -// the changes in the same order the requests were batched. -func (c *Controller) collectChanges(ctx context.Context, batch entity.Batch) ([]entity.Change, error) { - changes := make([]entity.Change, 0, len(batch.Contains)) - for _, requestID := range batch.Contains { - request, err := c.store.GetRequestStore().Get(ctx, requestID) - if err != nil { - return nil, fmt.Errorf("failed to get request %s: %w", requestID, err) - } - changes = append(changes, request.Change) - } - return changes, nil -} - // fanout publishes the batch ID to conclude (so requests are updated) and // to speculate (so dependents can re-evaluate now that this batch is done). func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { diff --git a/submitqueue/orchestrator/controller/merge/merge_test.go b/submitqueue/orchestrator/controller/merge/merge_test.go index 7eb5578b..13f1d123 100644 --- a/submitqueue/orchestrator/controller/merge/merge_test.go +++ b/submitqueue/orchestrator/controller/merge/merge_test.go @@ -108,34 +108,26 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { Version: 4, } change := entity.Change{URIs: []string{"github://o/r/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} - request := entity.Request{ - ID: reqID, - Queue: "test-queue", - Change: change, - State: entity.RequestStateProcessing, - Version: 2, - } batchStore := storagemock.NewMockBatchStore(ctrl) batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(4), int32(5), entity.BatchStateSucceeded).Return(nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(request, nil) - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, changes []entity.Change) (pusher.Result, error) { - require.Len(t, changes, 1) - assert.Equal(t, change, changes[0]) - return pusher.Result{Outcomes: []pusher.ChangeOutcome{{ - Change: change, - Status: pusher.OutcomeStatusCommitted, - CommitSHAs: []string{"deadbeef"}, + func(_ context.Context, batches []entity.Batch) (pusher.Result, error) { + require.Len(t, batches, 1) + assert.Equal(t, batch.ID, batches[0].ID) + return pusher.Result{Batches: []pusher.BatchOutcome{{ + BatchID: batch.ID, + Outcomes: []pusher.ChangeOutcome{{ + Change: change, + Status: pusher.OutcomeStatusCommitted, + CommitSHAs: []string{"deadbeef"}, + }}, }}}, nil }, ) @@ -154,16 +146,15 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { require.NoError(t, err) } -func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { +// TestController_Process_ForwardsBatchToPusher verifies the controller forwards +// the batch identity (with its full Contains, in order) to the pusher, which +// resolves the changes itself. Change resolution order is the pusher's concern, +// covered by the git pusher tests. +func TestController_Process_ForwardsBatchToPusher(t *testing.T) { ctrl := gomock.NewController(t) const batchID = "test-queue/batch/multi" requestIDs := []string{"test-queue/1", "test-queue/2", "test-queue/3"} - changes := []entity.Change{ - {URIs: []string{"github://o/r/pull/1/1111111111111111111111111111111111111111"}}, - {URIs: []string{"github://o/r/pull/2/2222222222222222222222222222222222222222"}}, - {URIs: []string{"github://o/r/pull/3/3333333333333333333333333333333333333333"}}, - } batch := entity.Batch{ ID: batchID, @@ -177,30 +168,15 @@ func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(1), int32(2), entity.BatchStateSucceeded).Return(nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - for i, rid := range requestIDs { - requestStore.EXPECT().Get(gomock.Any(), rid).Return(entity.Request{ - ID: rid, Change: changes[i], - }, nil) - } - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, got []entity.Change) (pusher.Result, error) { - assert.Equal(t, changes, got, "changes must be in batch.Contains order") - outcomes := make([]pusher.ChangeOutcome, len(got)) - for i, ch := range got { - outcomes[i] = pusher.ChangeOutcome{ - Change: ch, - Status: pusher.OutcomeStatusCommitted, - CommitSHAs: []string{fmt.Sprintf("sha-%d", i)}, - } - } - return pusher.Result{Outcomes: outcomes}, nil + func(_ context.Context, batches []entity.Batch) (pusher.Result, error) { + require.Len(t, batches, 1) + assert.Equal(t, requestIDs, batches[0].Contains, "batch forwarded with Contains in order") + return pusher.Result{Batches: []pusher.BatchOutcome{{BatchID: batchID}}}, nil }, ) @@ -236,14 +212,8 @@ func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(3), int32(4), entity.BatchStateFailed).Return(nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ - ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/pull/2/2222222222222222222222222222222222222222"}}, - }, nil) - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( @@ -282,14 +252,8 @@ func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { batchStore := storagemock.NewMockBatchStore(ctrl) batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ - ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/pull/3/3333333333333333333333333333333333333333"}}, - }, nil) - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( @@ -428,44 +392,6 @@ func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { assert.False(t, errs.IsRetryable(err)) } -func TestController_Process_RequestStoreFailurePropagates(t *testing.T) { - ctrl := gomock.NewController(t) - - const reqID = "test-queue/6" - const batchID = "test-queue/batch/6" - - batch := entity.Batch{ - ID: batchID, - Queue: "test-queue", - Contains: []string{reqID}, - State: entity.BatchStateMerging, - Version: 1, - } - - batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) - - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{}, fmt.Errorf("db connection lost")) - - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() - - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - newRegistry(t, ctrl, nil), - newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), - consumer.TopicKeyMerge, - "orchestrator-merge", - ) - - err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) - require.Error(t, err) -} - func TestController_Process_PublishFailureSurfaces(t *testing.T) { ctrl := gomock.NewController(t) @@ -484,19 +410,16 @@ func TestController_Process_PublishFailureSurfaces(t *testing.T) { batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(2), int32(3), entity.BatchStateSucceeded).Return(nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ - ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/pull/7/7777777777777777777777777777777777777777"}}, - }, nil) - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( - pusher.Result{Outcomes: []pusher.ChangeOutcome{{ - Status: pusher.OutcomeStatusCommitted, CommitSHAs: []string{"abc"}, + pusher.Result{Batches: []pusher.BatchOutcome{{ + BatchID: batchID, + Outcomes: []pusher.ChangeOutcome{{ + Status: pusher.OutcomeStatusCommitted, CommitSHAs: []string{"abc"}, + }}, }}}, nil, )