diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index a0360464..931836ce 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -775,16 +775,17 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch // "origin", PUSHER_TARGET default "main"). When PUSHER_CHECKOUT_PATH is unset it // returns the fake pusher (commits succeed unless a change URI carries a failure // marker, see pusher/fake), keeping the example runnable without a git checkout. -func newPusher(logger *zap.Logger, scope tally.Scope) (pusher.Pusher, error) { +func newPusher(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (pusher.Pusher, error) { checkout := os.Getenv("PUSHER_CHECKOUT_PATH") if checkout == "" { logger.Warn("PUSHER_CHECKOUT_PATH not set; using fake pusher (commits succeed unless URI-marked)") - return pushfake.New(), nil + return pushfake.New(resolver), nil } return gitpusher.NewPusher(gitpusher.Params{ CheckoutPath: checkout, Remote: getEnv("PUSHER_REMOTE", "origin"), Target: getEnv("PUSHER_TARGET", "main"), + Resolver: resolver, Logger: logger.Sugar(), MetricsScope: scope.SubScope("pusher"), }), nil @@ -806,7 +807,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset. if err != nil { return queueRegistry{}, fmt.Errorf("failed to create change provider: %w", err) } - psh, err := newPusher(logger, scope) + psh, err := newPusher(logger, scope, resolver) if err != nil { return queueRegistry{}, fmt.Errorf("failed to create pusher: %w", err) } diff --git a/submitqueue/extension/pusher/fake/BUILD.bazel b/submitqueue/extension/pusher/fake/BUILD.bazel index b5bd7d1d..fc902b12 100644 --- a/submitqueue/extension/pusher/fake/BUILD.bazel +++ b/submitqueue/extension/pusher/fake/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/extension/pusher/fake", visibility = ["//visibility:public"], deps = [ + "//submitqueue/core/changeset", "//submitqueue/core/fakemarker", "//submitqueue/entity", "//submitqueue/extension/pusher", @@ -17,6 +18,7 @@ go_test( srcs = ["fake_test.go"], embed = [":fake"], deps = [ + "//submitqueue/core/changeset/fake", "//submitqueue/entity", "//submitqueue/extension/pusher", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/extension/pusher/fake/fake.go b/submitqueue/extension/pusher/fake/fake.go index 3db1c19f..ea6e50e8 100644 --- a/submitqueue/extension/pusher/fake/fake.go +++ b/submitqueue/extension/pusher/fake/fake.go @@ -32,6 +32,7 @@ import ( "fmt" "sync/atomic" + "github.com/uber/submitqueue/submitqueue/core/changeset" "github.com/uber/submitqueue/submitqueue/core/fakemarker" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" @@ -45,35 +46,54 @@ const ( // fakePusher is a pusher.Pusher that reports every change as committed unless a // marker token in a change URI requests a failure. The atomic counter hands out -// unique synthetic commit SHAs and makes the type safe for concurrent use. +// unique synthetic commit SHAs and makes the type safe for concurrent use. It +// resolves each batch's changes through the injected resolver. type fakePusher struct { - counter atomic.Uint64 + resolver changeset.Resolver + counter atomic.Uint64 } // New returns a pusher.Pusher that defaults to committing every change and -// honors marker tokens embedded in change URIs. -func New() pusher.Pusher { - return &fakePusher{} +// honors marker tokens embedded in change URIs. The resolver resolves each +// batch's changes. +func New(resolver changeset.Resolver) pusher.Pusher { + return &fakePusher{resolver: resolver} } -// Push reports every change as committed with a synthetic commit SHA, unless a -// recognized marker token in one of the changes requests a failure. -func (p *fakePusher) Push(_ context.Context, changes []entity.Change) (pusher.Result, error) { - switch fakemarker.TokenInChanges(changes) { +// Push resolves each batch's changes and reports every change as committed with +// a synthetic commit SHA, grouped per batch, unless a recognized marker token in +// one of the changes requests a failure. +func (p *fakePusher) Push(ctx context.Context, batches []entity.Batch) (pusher.Result, error) { + perBatch := make([][]entity.Change, len(batches)) + var all []entity.Change + for i, b := range batches { + cs, err := p.resolver.ChangesForBatch(ctx, b) + if err != nil { + return pusher.Result{}, fmt.Errorf("fake: resolve batch %s: %w", b.ID, err) + } + perBatch[i] = cs + all = append(all, cs...) + } + + switch fakemarker.TokenInChanges(all) { case tokenConflict: return pusher.Result{}, pusher.ErrConflict case tokenError: return pusher.Result{}, fmt.Errorf("fake: marked push error") } - outcomes := make([]pusher.ChangeOutcome, 0, len(changes)) - for _, change := range changes { - sha := fmt.Sprintf("fake-%d", p.counter.Add(1)) - outcomes = append(outcomes, pusher.ChangeOutcome{ - Change: change, - Status: pusher.OutcomeStatusCommitted, - CommitSHAs: []string{sha}, - }) + result := make([]pusher.BatchOutcome, len(batches)) + for i, b := range batches { + outcomes := make([]pusher.ChangeOutcome, 0, len(perBatch[i])) + for _, change := range perBatch[i] { + sha := fmt.Sprintf("fake-%d", p.counter.Add(1)) + outcomes = append(outcomes, pusher.ChangeOutcome{ + Change: change, + Status: pusher.OutcomeStatusCommitted, + CommitSHAs: []string{sha}, + }) + } + result[i] = pusher.BatchOutcome{BatchID: b.ID, Outcomes: outcomes} } - return pusher.Result{Outcomes: outcomes}, nil + return pusher.Result{Batches: result}, nil } diff --git a/submitqueue/extension/pusher/fake/fake_test.go b/submitqueue/extension/pusher/fake/fake_test.go index df46aa6f..55249d55 100644 --- a/submitqueue/extension/pusher/fake/fake_test.go +++ b/submitqueue/extension/pusher/fake/fake_test.go @@ -21,27 +21,29 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + changesetfake "github.com/uber/submitqueue/submitqueue/core/changeset/fake" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" ) func TestNew_ImplementsInterface(t *testing.T) { - var _ pusher.Pusher = New() + var _ pusher.Pusher = New(changesetfake.New()) } func TestPusher_Push_Committed(t *testing.T) { - p := New() changes := []entity.Change{ {URIs: []string{"github://owner/repo/pull/1/abc"}}, {URIs: []string{"github://owner/repo/pull/2/def"}}, } + p := New(changesetfake.New().Set("b", changes...)) - res, err := p.Push(context.Background(), changes) + res, err := p.Push(context.Background(), []entity.Batch{{ID: "b"}}) require.NoError(t, err) - require.Len(t, res.Outcomes, len(changes)) + require.Len(t, res.Batches, 1) + require.Len(t, res.Batches[0].Outcomes, len(changes)) seen := map[string]bool{} - for i, out := range res.Outcomes { + for i, out := range res.Batches[0].Outcomes { assert.Equal(t, changes[i], out.Change) assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) require.Len(t, out.CommitSHAs, 1) @@ -51,19 +53,15 @@ func TestPusher_Push_Committed(t *testing.T) { } func TestPusher_Push_ConflictMarker(t *testing.T) { - p := New() - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=conflict"}}, - }) + p := New(changesetfake.New().Set("b", entity.Change{URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=conflict"}})) + _, err := p.Push(context.Background(), []entity.Batch{{ID: "b"}}) assert.True(t, errors.Is(err, pusher.ErrConflict)) } func TestPusher_Push_ErrorMarker(t *testing.T) { - p := New() - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=push-error"}}, - }) + p := New(changesetfake.New().Set("b", entity.Change{URIs: []string{"github://owner/repo/pull/1/abc?sq-fake=push-error"}})) + res, err := p.Push(context.Background(), []entity.Batch{{ID: "b"}}) require.Error(t, err) // Atomicity: on error no outcomes are reported. - assert.Empty(t, res.Outcomes) + assert.Empty(t, res.Batches) } diff --git a/submitqueue/extension/pusher/git/BUILD.bazel b/submitqueue/extension/pusher/git/BUILD.bazel index 5ae9f8f1..bbbea0fd 100644 --- a/submitqueue/extension/pusher/git/BUILD.bazel +++ b/submitqueue/extension/pusher/git/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/metrics", + "//submitqueue/core/changeset", "//submitqueue/entity", "//submitqueue/entity/github", "//submitqueue/extension/pusher", @@ -20,6 +21,7 @@ go_test( srcs = ["git_pusher_test.go"], embed = [":git"], deps = [ + "//submitqueue/core/changeset/fake", "//submitqueue/entity", "//submitqueue/extension/pusher", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/extension/pusher/git/git_pusher.go b/submitqueue/extension/pusher/git/git_pusher.go index 7c395ba6..9508fb9d 100644 --- a/submitqueue/extension/pusher/git/git_pusher.go +++ b/submitqueue/extension/pusher/git/git_pusher.go @@ -62,6 +62,7 @@ import ( "go.uber.org/zap" coremetrics "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/submitqueue/core/changeset" "github.com/uber/submitqueue/submitqueue/entity" entitygithub "github.com/uber/submitqueue/submitqueue/entity/github" "github.com/uber/submitqueue/submitqueue/extension/pusher" @@ -82,6 +83,8 @@ type Params struct { Remote string // Target is the destination branch ref on the remote (e.g. "main"). Target string + // Resolver resolves each batch's changes. + Resolver changeset.Resolver // Logger is the structured logger. Logger *zap.SugaredLogger // MetricsScope is the metrics scope for instrumentation. @@ -98,6 +101,7 @@ type gitPusher struct { checkoutPath string remote string target string + resolver changeset.Resolver logger *zap.SugaredLogger metricsScope tally.Scope maxPushAttempts int @@ -121,6 +125,7 @@ func NewPusher(params Params) pusher.Pusher { checkoutPath: params.CheckoutPath, remote: params.Remote, target: params.Target, + resolver: params.Resolver, logger: params.Logger.Named("git_pusher"), metricsScope: params.MetricsScope.SubScope("git_pusher"), maxPushAttempts: maxAttempts, @@ -128,10 +133,23 @@ func NewPusher(params Params) pusher.Pusher { } // Push fulfils the pusher.Pusher contract. -func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret pusher.Result, retErr error) { +func (p *gitPusher) Push(ctx context.Context, batches []entity.Batch) (ret pusher.Result, retErr error) { op := coremetrics.Begin(p.metricsScope, "push") defer func() { op.Complete(retErr) }() + // Resolve each batch's changes, keeping per-batch counts so the flat + // outcomes can be regrouped per batch on success. + perBatch := make([][]entity.Change, len(batches)) + var changes []entity.Change + for i, b := range batches { + cs, err := p.resolver.ChangesForBatch(ctx, b) + if err != nil { + return pusher.Result{}, fmt.Errorf("resolve batch %s: %w", b.ID, err) + } + perBatch[i] = cs + changes = append(changes, cs...) + } + p.mu.Lock() defer p.mu.Unlock() @@ -154,7 +172,7 @@ func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret push "target", p.target, "outcomes", outcomes, ) - return pusher.Result{Outcomes: outcomes}, nil + return pusher.Result{Batches: groupByBatch(batches, perBatch, outcomes)}, nil } // Was the failure caused by the remote tip moving under us between @@ -190,6 +208,19 @@ func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret push return pusher.Result{}, fmt.Errorf("exceeded %d push attempts due to remote contention: %w", p.maxPushAttempts, lastErr) } +// groupByBatch splits the flat, apply-ordered outcomes back into one +// BatchOutcome per input batch, using each batch's resolved change count. +func groupByBatch(batches []entity.Batch, perBatch [][]entity.Change, outcomes []pusher.ChangeOutcome) []pusher.BatchOutcome { + result := make([]pusher.BatchOutcome, len(batches)) + pos := 0 + for i, b := range batches { + n := len(perBatch[i]) + result[i] = pusher.BatchOutcome{BatchID: b.ID, Outcomes: outcomes[pos : pos+n]} + pos += n + } + return result +} + // tryPush runs one full reset+cherry-pick+push cycle. The returned baseSHA // is the SHA the cycle was based on (set as soon as resetToRemote completes) // so the caller can distinguish concurrent-push contention from other push diff --git a/submitqueue/extension/pusher/git/git_pusher_test.go b/submitqueue/extension/pusher/git/git_pusher_test.go index 33d51a69..57e4a9e8 100644 --- a/submitqueue/extension/pusher/git/git_pusher_test.go +++ b/submitqueue/extension/pusher/git/git_pusher_test.go @@ -31,6 +31,7 @@ import ( "github.com/uber-go/tally" "go.uber.org/zap/zaptest" + "github.com/uber/submitqueue/submitqueue/core/changeset/fake" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" ) @@ -47,6 +48,7 @@ type gitFixture struct { remoteDir string checkoutDir string authorDir string // a separate working clone used to author "PR" commits + resolver *fake.Resolver } func setupGitFixture(t *testing.T) gitFixture { @@ -81,9 +83,17 @@ func setupGitFixture(t *testing.T) gitFixture { remoteDir: remoteDir, checkoutDir: checkoutDir, authorDir: authorDir, + resolver: fake.New(), } } +// batchFor seeds the fixture resolver so batch id resolves to the given changes, +// and returns the batch to pass to Push. +func (f gitFixture) batchFor(id string, changes ...entity.Change) entity.Batch { + f.resolver.Set(id, changes...) + return entity.Batch{ID: id} +} + // configRepo applies the test-only config that lets git commit work in a // sandbox without GPG signing, system identity, or system git hooks. The // devpod environment installs hooks via the system git config (core.hooksPath @@ -152,6 +162,7 @@ func (f gitFixture) newPusher(t *testing.T) pusher.Pusher { CheckoutPath: f.checkoutDir, Remote: "origin", Target: "main", + Resolver: f.resolver, Logger: zaptest.NewLogger(t).Sugar(), MetricsScope: tally.NoopScope, }) @@ -243,13 +254,13 @@ func TestPusher_Push_SingleChangeSingleURIProducesOneCommit(t *testing.T) { sha := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(sha)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 1) + require.Len(t, res.Batches[0].Outcomes, 1) - out := res.Outcomes[0] + out := res.Batches[0].Outcomes[0] assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) require.Len(t, out.CommitSHAs, 1) assert.Equal(t, []string{out.CommitSHAs[0]}, f.remoteCommitsSinceSeed(t)) @@ -275,13 +286,13 @@ func TestPusher_Push_StackedURIsProduceMultipleCommitsForOneChange(t *testing.T) mustGit(t, f.authorDir, "push", "-f", "origin", "feature/stack") p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha1), uri(sha2)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(sha1), uri(sha2)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 1) + require.Len(t, res.Batches[0].Outcomes, 1) - out := res.Outcomes[0] + out := res.Batches[0].Outcomes[0] assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) require.Len(t, out.CommitSHAs, 2) assert.Equal(t, out.CommitSHAs, f.remoteCommitsSinceSeed(t)) @@ -298,13 +309,13 @@ func TestPusher_Push_AlreadyLandedChangeIsRebasedOut(t *testing.T) { mainBeforePush := f.remoteHEAD(t) p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(sha)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 1) + require.Len(t, res.Batches[0].Outcomes, 1) - out := res.Outcomes[0] + out := res.Batches[0].Outcomes[0] assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, out.Status) assert.Empty(t, out.CommitSHAs) assert.Equal(t, mainBeforePush, f.remoteHEAD(t), @@ -319,18 +330,17 @@ func TestPusher_Push_MixedChangesPartiallyRebasedOut(t *testing.T) { freshSHA := f.pushPRCommit(t, "feature/b", "extra.txt", "extra\n", "add extra") p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(subsumedSHA)}}, - {URIs: []string{uri(freshSHA)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(subsumedSHA)}}, entity.Change{URIs: []string{uri(freshSHA)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 2) + require.Len(t, res.Batches[0].Outcomes, 2) - assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, res.Outcomes[0].Status) - assert.Empty(t, res.Outcomes[0].CommitSHAs) + assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, res.Batches[0].Outcomes[0].Status) + assert.Empty(t, res.Batches[0].Outcomes[0].CommitSHAs) - assert.Equal(t, pusher.OutcomeStatusCommitted, res.Outcomes[1].Status) - require.Len(t, res.Outcomes[1].CommitSHAs, 1) + assert.Equal(t, pusher.OutcomeStatusCommitted, res.Batches[0].Outcomes[1].Status) + require.Len(t, res.Batches[0].Outcomes[1].CommitSHAs, 1) assert.Equal(t, "extra\n", f.remoteFile(t, "extra.txt")) } @@ -349,8 +359,8 @@ func TestPusher_Push_ConflictReturnsErrConflictAndDoesNotPush(t *testing.T) { conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") p := f.newPusher(t) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(conflictingSHA)}}, + _, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(conflictingSHA)}}), }) require.Error(t, err) assert.True(t, errors.Is(err, pusher.ErrConflict)) @@ -368,13 +378,13 @@ func TestPusher_Push_ResetsBetweenCalls(t *testing.T) { // would fail or include unrelated changes. require.NoError(t, writeFile(filepath.Join(f.checkoutDir, "stray.txt"), "leftover\n")) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(sha)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes[0].CommitSHAs, 1) + require.Len(t, res.Batches[0].Outcomes[0].CommitSHAs, 1) - out := mustGitOutput(t, f.remoteDir, "ls-tree", "--name-only", res.Outcomes[0].CommitSHAs[0]) + out := mustGitOutput(t, f.remoteDir, "ls-tree", "--name-only", res.Batches[0].Outcomes[0].CommitSHAs[0]) assert.NotContains(t, string(out), "stray.txt", "unrelated file should not have landed") assert.Contains(t, string(out), "hello.txt") } @@ -388,19 +398,19 @@ func TestPusher_Push_RecoversAfterPriorConflict(t *testing.T) { conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") p := f.newPusher(t) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(conflictingSHA)}}, + _, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(conflictingSHA)}}), }) require.Error(t, err) // A subsequent, clean push must succeed even though the prior call left // a cherry-pick in progress before its rollback. freshSHA := f.pushPRCommit(t, "feature/c", "extra.txt", "extra\n", "add extra") - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(freshSHA)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b2", entity.Change{URIs: []string{uri(freshSHA)}}), }) require.NoError(t, err) - assert.Equal(t, pusher.OutcomeStatusCommitted, res.Outcomes[0].Status) + assert.Equal(t, pusher.OutcomeStatusCommitted, res.Batches[0].Outcomes[0].Status) assert.Equal(t, "extra\n", f.remoteFile(t, "extra.txt")) } @@ -417,8 +427,8 @@ func TestPusher_Push_InvalidURIErrors(t *testing.T) { f := setupGitFixture(t) p := f.newPusher(t) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{"not a uri"}}, + _, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{"not a uri"}}), }) require.Error(t, err) } @@ -433,12 +443,12 @@ func TestPusher_Push_RetriesWhenRemoteMovesUnderUs(t *testing.T) { f.installRaceHook(t, []string{raceSHA}) p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(featureSHA)}}, + res, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(featureSHA)}}), }) require.NoError(t, err) - require.Len(t, res.Outcomes, 1) - require.Len(t, res.Outcomes[0].CommitSHAs, 1) + require.Len(t, res.Batches[0].Outcomes, 1) + require.Len(t, res.Batches[0].Outcomes[0].CommitSHAs, 1) assert.Equal(t, 2, f.hookInvocations(t), "first attempt rejected by hook, second attempt allowed through") @@ -447,7 +457,7 @@ func TestPusher_Push_RetriesWhenRemoteMovesUnderUs(t *testing.T) { require.Len(t, commits, 2) assert.Equal(t, raceSHA, commits[0], "race commit landed first via the hook") - assert.Equal(t, res.Outcomes[0].CommitSHAs[0], commits[1], + assert.Equal(t, res.Batches[0].Outcomes[0].CommitSHAs[0], commits[1], "our cherry-pick landed on top after the retry") assert.Equal(t, "hello\nearth\n", f.remoteFile(t, "hello.txt")) } @@ -465,12 +475,13 @@ func TestPusher_Push_GivesUpAfterMaxAttempts(t *testing.T) { CheckoutPath: f.checkoutDir, Remote: "origin", Target: "main", + Resolver: f.resolver, Logger: zaptest.NewLogger(t).Sugar(), MetricsScope: tally.NoopScope, MaxPushAttempts: 2, }) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(featureSHA)}}, + _, err := p.Push(context.Background(), []entity.Batch{ + f.batchFor("b", entity.Change{URIs: []string{uri(featureSHA)}}), }) require.Error(t, err) assert.Equal(t, 2, f.hookInvocations(t), diff --git a/submitqueue/extension/pusher/mock/pusher_mock.go b/submitqueue/extension/pusher/mock/pusher_mock.go index 5ff69e71..27345b16 100644 --- a/submitqueue/extension/pusher/mock/pusher_mock.go +++ b/submitqueue/extension/pusher/mock/pusher_mock.go @@ -43,18 +43,18 @@ func (m *MockPusher) EXPECT() *MockPusherMockRecorder { } // Push mocks base method. -func (m *MockPusher) Push(ctx context.Context, changes []entity.Change) (pusher.Result, error) { +func (m *MockPusher) Push(ctx context.Context, batches []entity.Batch) (pusher.Result, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Push", ctx, changes) + ret := m.ctrl.Call(m, "Push", ctx, batches) ret0, _ := ret[0].(pusher.Result) ret1, _ := ret[1].(error) return ret0, ret1 } // Push indicates an expected call of Push. -func (mr *MockPusherMockRecorder) Push(ctx, changes any) *gomock.Call { +func (mr *MockPusherMockRecorder) Push(ctx, batches any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), ctx, changes) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), ctx, batches) } // MockFactory is a mock of Factory interface. diff --git a/submitqueue/extension/pusher/pusher.go b/submitqueue/extension/pusher/pusher.go index 8d86e131..633258e2 100644 --- a/submitqueue/extension/pusher/pusher.go +++ b/submitqueue/extension/pusher/pusher.go @@ -59,16 +59,29 @@ type ChangeOutcome struct { CommitSHAs []string } +// BatchOutcome groups the per-change outcomes for a single input batch, so a +// merge-train push (several batches in one call) stays correlatable back to the +// batch each change belonged to. There is no per-batch status: Push is +// all-or-nothing across the whole call (see the atomicity contract), so a +// per-batch pass/fail would be uniformly redundant. +type BatchOutcome struct { + // BatchID is the input batch this outcome corresponds to. + BatchID string + // Outcomes is one entry per change in the batch, in apply order. + Outcomes []ChangeOutcome +} + // Result is the outcome of a successful Push call. type Result struct { - // Outcomes is one entry per input change, in the same order as the - // changes passed to Push. The slice length equals the input length. - Outcomes []ChangeOutcome + // Batches is one entry per input batch, in the same order as the batches + // passed to Push. The slice length equals the input length. + Batches []BatchOutcome } -// Pusher applies a list of Changes on top of a target branch and pushes the -// result to the source-control remote. Each implementation is bound to a -// specific (checkout, remote, target) at construction time. +// Pusher applies the changes of one or more batches on top of a target branch +// and pushes the result to the source-control remote. Each implementation is +// bound to a specific (checkout, remote, target) at construction time and +// resolves each batch's changes itself through an injected changeset resolver. // // Atomicity contract: when Push returns a non-nil error, NO change has been // pushed to the remote — neither partially nor fully. Implementations must @@ -76,15 +89,19 @@ type Result struct { // when any change fails to apply. Callers can treat a non-nil error as // "the remote is exactly as it was before the call". // -// On success, len(Result.Outcomes) == len(changes) and Outcomes[i] describes -// what happened to changes[i]. A change can produce multiple commits -// (OutcomeStatusCommitted, CommitSHAs populated in apply order) or none at -// all (OutcomeStatusAlreadyExisted, CommitSHAs empty) — the latter happens -// when the change's content is already present on the target branch. +// On success, len(Result.Batches) == len(batches) and Batches[i] describes what +// happened to batches[i], with one ChangeOutcome per change in that batch in +// apply order. A change can produce multiple commits (OutcomeStatusCommitted, +// CommitSHAs populated in apply order) or none at all (OutcomeStatusAlreadyExisted, +// CommitSHAs empty) — the latter happens when the change's content is already +// present on the target branch. type Pusher interface { - // Push applies changes onto the target branch and pushes the resulting - // commits. See the type-level docs for the atomicity contract. - Push(ctx context.Context, changes []entity.Change) (Result, error) + // Push resolves and applies the changes of the given batches, in order, + // onto the target branch and pushes the resulting commits. The batch list + // designs for a merge-train (land several ready batches in one atomic push); + // today merge passes a single batch. See the type-level docs for the + // atomicity contract. + Push(ctx context.Context, batches []entity.Batch) (Result, error) } // Config carries the per-queue identity handed to a Factory. The system knows diff --git a/submitqueue/orchestrator/controller/merge/merge.go b/submitqueue/orchestrator/controller/merge/merge.go index 31f1600c..eb0a319b 100644 --- a/submitqueue/orchestrator/controller/merge/merge.go +++ b/submitqueue/orchestrator/controller/merge/merge.go @@ -120,19 +120,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return c.fanout(ctx, batch.ID, batch.Queue) } - changes, err := c.collectChanges(ctx, batch) - if err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "request_load_errors", 1) - return fmt.Errorf("failed to collect changes for batch %s: %w", batch.ID, err) - } - push, err := c.pushers.For(pusher.Config{QueueName: batch.Queue}) if err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "push_errors", 1) return fmt.Errorf("failed to build pusher for batch %s: %w", batch.ID, err) } - pushRes, pushErr := push.Push(ctx, changes) + // Push a single batch today; the pusher resolves its changes itself. The + // list parameter designs for a future merge-train. + pushRes, pushErr := push.Push(ctx, []entity.Batch{batch}) var newState entity.BatchState switch { @@ -140,7 +136,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r newState = entity.BatchStateSucceeded c.logger.Infow("merged batch", "batch_id", batch.ID, - "outcomes", pushRes.Outcomes, + "outcomes", pushRes.Batches, ) case errors.Is(pushErr, pusher.ErrConflict): coremetrics.NamedCounter(c.metricsScope, "process", "push_conflicts", 1) @@ -166,21 +162,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return c.fanout(ctx, batch.ID, batch.Queue) } -// collectChanges loads each request in batch.Contains and returns its -// Change. The result preserves batch.Contains order so the Pusher applies -// the changes in the same order the requests were batched. -func (c *Controller) collectChanges(ctx context.Context, batch entity.Batch) ([]entity.Change, error) { - changes := make([]entity.Change, 0, len(batch.Contains)) - for _, requestID := range batch.Contains { - request, err := c.store.GetRequestStore().Get(ctx, requestID) - if err != nil { - return nil, fmt.Errorf("failed to get request %s: %w", requestID, err) - } - changes = append(changes, request.Change) - } - return changes, nil -} - // fanout publishes the batch ID to conclude (so requests are updated) and // to speculate (so dependents can re-evaluate now that this batch is done). func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { diff --git a/submitqueue/orchestrator/controller/merge/merge_test.go b/submitqueue/orchestrator/controller/merge/merge_test.go index 7eb5578b..13f1d123 100644 --- a/submitqueue/orchestrator/controller/merge/merge_test.go +++ b/submitqueue/orchestrator/controller/merge/merge_test.go @@ -108,34 +108,26 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { Version: 4, } change := entity.Change{URIs: []string{"github://o/r/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} - request := entity.Request{ - ID: reqID, - Queue: "test-queue", - Change: change, - State: entity.RequestStateProcessing, - Version: 2, - } batchStore := storagemock.NewMockBatchStore(ctrl) batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(4), int32(5), entity.BatchStateSucceeded).Return(nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(request, nil) - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, changes []entity.Change) (pusher.Result, error) { - require.Len(t, changes, 1) - assert.Equal(t, change, changes[0]) - return pusher.Result{Outcomes: []pusher.ChangeOutcome{{ - Change: change, - Status: pusher.OutcomeStatusCommitted, - CommitSHAs: []string{"deadbeef"}, + func(_ context.Context, batches []entity.Batch) (pusher.Result, error) { + require.Len(t, batches, 1) + assert.Equal(t, batch.ID, batches[0].ID) + return pusher.Result{Batches: []pusher.BatchOutcome{{ + BatchID: batch.ID, + Outcomes: []pusher.ChangeOutcome{{ + Change: change, + Status: pusher.OutcomeStatusCommitted, + CommitSHAs: []string{"deadbeef"}, + }}, }}}, nil }, ) @@ -154,16 +146,15 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { require.NoError(t, err) } -func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { +// TestController_Process_ForwardsBatchToPusher verifies the controller forwards +// the batch identity (with its full Contains, in order) to the pusher, which +// resolves the changes itself. Change resolution order is the pusher's concern, +// covered by the git pusher tests. +func TestController_Process_ForwardsBatchToPusher(t *testing.T) { ctrl := gomock.NewController(t) const batchID = "test-queue/batch/multi" requestIDs := []string{"test-queue/1", "test-queue/2", "test-queue/3"} - changes := []entity.Change{ - {URIs: []string{"github://o/r/pull/1/1111111111111111111111111111111111111111"}}, - {URIs: []string{"github://o/r/pull/2/2222222222222222222222222222222222222222"}}, - {URIs: []string{"github://o/r/pull/3/3333333333333333333333333333333333333333"}}, - } batch := entity.Batch{ ID: batchID, @@ -177,30 +168,15 @@ func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(1), int32(2), entity.BatchStateSucceeded).Return(nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - for i, rid := range requestIDs { - requestStore.EXPECT().Get(gomock.Any(), rid).Return(entity.Request{ - ID: rid, Change: changes[i], - }, nil) - } - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, got []entity.Change) (pusher.Result, error) { - assert.Equal(t, changes, got, "changes must be in batch.Contains order") - outcomes := make([]pusher.ChangeOutcome, len(got)) - for i, ch := range got { - outcomes[i] = pusher.ChangeOutcome{ - Change: ch, - Status: pusher.OutcomeStatusCommitted, - CommitSHAs: []string{fmt.Sprintf("sha-%d", i)}, - } - } - return pusher.Result{Outcomes: outcomes}, nil + func(_ context.Context, batches []entity.Batch) (pusher.Result, error) { + require.Len(t, batches, 1) + assert.Equal(t, requestIDs, batches[0].Contains, "batch forwarded with Contains in order") + return pusher.Result{Batches: []pusher.BatchOutcome{{BatchID: batchID}}}, nil }, ) @@ -236,14 +212,8 @@ func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(3), int32(4), entity.BatchStateFailed).Return(nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ - ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/pull/2/2222222222222222222222222222222222222222"}}, - }, nil) - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( @@ -282,14 +252,8 @@ func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { batchStore := storagemock.NewMockBatchStore(ctrl) batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ - ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/pull/3/3333333333333333333333333333333333333333"}}, - }, nil) - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( @@ -428,44 +392,6 @@ func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { assert.False(t, errs.IsRetryable(err)) } -func TestController_Process_RequestStoreFailurePropagates(t *testing.T) { - ctrl := gomock.NewController(t) - - const reqID = "test-queue/6" - const batchID = "test-queue/batch/6" - - batch := entity.Batch{ - ID: batchID, - Queue: "test-queue", - Contains: []string{reqID}, - State: entity.BatchStateMerging, - Version: 1, - } - - batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) - - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{}, fmt.Errorf("db connection lost")) - - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() - - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - newRegistry(t, ctrl, nil), - newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), - consumer.TopicKeyMerge, - "orchestrator-merge", - ) - - err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) - require.Error(t, err) -} - func TestController_Process_PublishFailureSurfaces(t *testing.T) { ctrl := gomock.NewController(t) @@ -484,19 +410,16 @@ func TestController_Process_PublishFailureSurfaces(t *testing.T) { batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(2), int32(3), entity.BatchStateSucceeded).Return(nil) - requestStore := storagemock.NewMockRequestStore(ctrl) - requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ - ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/pull/7/7777777777777777777777777777777777777777"}}, - }, nil) - store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( - pusher.Result{Outcomes: []pusher.ChangeOutcome{{ - Status: pusher.OutcomeStatusCommitted, CommitSHAs: []string{"abc"}, + pusher.Result{Batches: []pusher.BatchOutcome{{ + BatchID: batchID, + Outcomes: []pusher.ChangeOutcome{{ + Status: pusher.OutcomeStatusCommitted, CommitSHAs: []string{"abc"}, + }}, }}}, nil, )