From 85cb46bcb8ce1dc7744cd505c8aea94a4323dbb7 Mon Sep 17 00:00:00 2001 From: "kevin.new" Date: Wed, 10 Jun 2026 09:26:12 +0000 Subject: [PATCH] feat: widen Pusher interface with QueueTarget and PushItem Consolidate QueueTarget into the shared entity/ package and update all pushqueue imports accordingly. Widen Pusher.Push to accept entity.QueueTarget and []PushItem (Change + Strategy). Add queueconfig.Store dependency to the merge controller for resolving queue names to VCS targets. Co-Authored-By: Claude Opus 4.6 (1M context) --- entity/BUILD.bazel | 5 +- entity/queue_target.go | 25 +++++++ example/pushqueue/gateway/server/BUILD.bazel | 1 + example/pushqueue/gateway/server/main.go | 13 ++-- .../orchestrator/server/BUILD.bazel | 3 + .../submitqueue/orchestrator/server/main.go | 28 +++++++- pushqueue/entity/land.go | 11 ---- pushqueue/extension/landqueue/BUILD.bazel | 5 +- pushqueue/extension/landqueue/landqueue.go | 7 +- .../extension/landqueue/mock/BUILD.bazel | 1 + .../landqueue/mock/landqueue_mock.go | 7 +- pushqueue/extension/vcs/BUILD.bazel | 5 +- pushqueue/extension/vcs/mock/BUILD.bazel | 1 + pushqueue/extension/vcs/mock/vcs_mock.go | 11 ++-- pushqueue/extension/vcs/vcs.go | 11 ++-- pushqueue/gateway/controller/BUILD.bazel | 2 + pushqueue/gateway/controller/land.go | 21 +++--- pushqueue/gateway/controller/land_test.go | 17 ++--- pushqueue/gateway/protopb/gateway.pb.go | 5 +- pushqueue/gateway/protopb/gateway_grpc.pb.go | 1 + submitqueue/extension/pusher/BUILD.bazel | 5 +- submitqueue/extension/pusher/git/BUILD.bazel | 2 + .../extension/pusher/git/git_pusher.go | 18 +++-- .../extension/pusher/git/git_pusher_test.go | 49 +++++++------- submitqueue/extension/pusher/mock/BUILD.bazel | 2 +- .../extension/pusher/mock/pusher_mock.go | 10 +-- submitqueue/extension/pusher/pusher.go | 15 ++++- .../orchestrator/controller/merge/BUILD.bazel | 4 ++ .../orchestrator/controller/merge/merge.go | 48 +++++++++++--- .../controller/merge/merge_test.go | 65 +++++++++++++++---- 30 files changed, 277 insertions(+), 121 deletions(-) create mode 100644 entity/queue_target.go diff --git a/entity/BUILD.bazel b/entity/BUILD.bazel index 9f06cd03..87a3628e 100644 --- a/entity/BUILD.bazel +++ b/entity/BUILD.bazel @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "entity", - srcs = ["entity.go"], + srcs = [ + "entity.go", + "queue_target.go", + ], importpath = "github.com/uber/submitqueue/entity", visibility = ["//visibility:public"], ) diff --git a/entity/queue_target.go b/entity/queue_target.go new file mode 100644 index 00000000..f5621c7e --- /dev/null +++ b/entity/queue_target.go @@ -0,0 +1,25 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// QueueTarget identifies a landing destination in a version control system. +type QueueTarget struct { + // Name is an optional logical identifier for correlation and config lookup. + Name string + // Address is the VCS repository address (remote URL, depot path). + Address string + // Target is the landing ref (branch name, stream path). + Target string +} diff --git a/example/pushqueue/gateway/server/BUILD.bazel b/example/pushqueue/gateway/server/BUILD.bazel index 987a0d60..002339e3 100644 --- a/example/pushqueue/gateway/server/BUILD.bazel +++ b/example/pushqueue/gateway/server/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/example/pushqueue/gateway/server", visibility = ["//visibility:private"], deps = [ + "//entity", "//pushqueue/entity", "//pushqueue/extension/landqueue", "//pushqueue/extension/vcs", diff --git a/example/pushqueue/gateway/server/main.go b/example/pushqueue/gateway/server/main.go index 6eb2d608..2697b276 100644 --- a/example/pushqueue/gateway/server/main.go +++ b/example/pushqueue/gateway/server/main.go @@ -26,7 +26,8 @@ import ( "time" "github.com/uber-go/tally/v4" - "github.com/uber/submitqueue/pushqueue/entity" + "github.com/uber/submitqueue/entity" + pqentity "github.com/uber/submitqueue/pushqueue/entity" "github.com/uber/submitqueue/pushqueue/extension/landqueue" "github.com/uber/submitqueue/pushqueue/extension/vcs" "github.com/uber/submitqueue/pushqueue/gateway/controller" @@ -162,7 +163,7 @@ func run() error { // noopVCS is a placeholder VCS that errors on every operation. type noopVCS struct{} -func (noopVCS) CheckMergeability(_ context.Context, _ entity.QueueTarget, items []entity.LandItem) ([]vcs.MergeabilityResult, error) { +func (noopVCS) CheckMergeability(_ context.Context, _ entity.QueueTarget, items []pqentity.LandItem) ([]vcs.MergeabilityResult, error) { results := make([]vcs.MergeabilityResult, len(items)) for i := range results { results[i] = vcs.MergeabilityResult{Mergeable: false, Reason: "noop VCS: not configured"} @@ -170,15 +171,15 @@ func (noopVCS) CheckMergeability(_ context.Context, _ entity.QueueTarget, items return results, nil } -func (noopVCS) Prepare(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) error { +func (noopVCS) Prepare(_ context.Context, _ entity.QueueTarget, _ []pqentity.LandItem) error { return fmt.Errorf("noop VCS: not configured") } -func (noopVCS) Push(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) (vcs.PushResult, error) { +func (noopVCS) Push(_ context.Context, _ entity.QueueTarget, _ []pqentity.LandItem) (vcs.PushResult, error) { return vcs.PushResult{}, fmt.Errorf("noop VCS: not configured") } -func (noopVCS) Finalize(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) error { +func (noopVCS) Finalize(_ context.Context, _ entity.QueueTarget, _ []pqentity.LandItem) error { return fmt.Errorf("noop VCS: not configured") } @@ -187,7 +188,7 @@ type noopQueue struct{} var _ landqueue.Queue = noopQueue{} -func (noopQueue) Enqueue(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) error { +func (noopQueue) Enqueue(_ context.Context, _ entity.QueueTarget, _ []pqentity.LandItem) error { return nil } diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index e1e0892c..333709c7 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//core/errs/generic", "//core/errs/mysql", "//core/httpclient", + "//entity", "//extension/queue", "//extension/queue/mysql", "//submitqueue/core/consumer", @@ -31,6 +32,8 @@ go_library( "//submitqueue/extension/mergechecker/github", "//submitqueue/extension/pusher", "//submitqueue/extension/pusher/git", + "//submitqueue/extension/queueconfig", + "//submitqueue/extension/queueconfig/yaml", "//submitqueue/extension/scorer/heuristic", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mysql", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 39146177..c0637fe1 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -33,6 +33,7 @@ import ( genericerrs "github.com/uber/submitqueue/core/errs/generic" mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" "github.com/uber/submitqueue/core/httpclient" + sharedentity "github.com/uber/submitqueue/entity" extqueue "github.com/uber/submitqueue/extension/queue" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" "github.com/uber/submitqueue/submitqueue/core/consumer" @@ -50,6 +51,8 @@ import ( githubchecker "github.com/uber/submitqueue/submitqueue/extension/mergechecker/github" "github.com/uber/submitqueue/submitqueue/extension/pusher" gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git" + "github.com/uber/submitqueue/submitqueue/extension/queueconfig" + yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml" "github.com/uber/submitqueue/submitqueue/extension/scorer/heuristic" "github.com/uber/submitqueue/submitqueue/extension/storage" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" @@ -231,8 +234,14 @@ func run() error { // (every build immediately succeeds) until a real backend is wired in. br := buildnoop.New() + // Create queue config store + qcfg, err := newQueueConfigStore(logger) + if err != nil { + return fmt.Errorf("failed to create queue config store: %w", err) + } + // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store, changeStore); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store, changeStore, qcfg); err != nil { return err } @@ -425,7 +434,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // │ │ │ // └────────┴───────────────────────┘ -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore, qcfg queueconfig.Store) error { requestController := start.NewController( logger, scope, @@ -551,6 +560,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t store, registry, psh, + qcfg, consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -675,6 +685,18 @@ func newPusher(logger *zap.Logger, scope tally.Scope) (pusher.Pusher, error) { }), nil } +// newQueueConfigStore loads queue configuration from a YAML file pointed to by +// QUEUE_CONFIG_PATH. If the env var is not set, returns an empty store — the +// merge controller will fail to resolve any queue target. +func newQueueConfigStore(logger *zap.Logger) (queueconfig.Store, error) { + path := os.Getenv("QUEUE_CONFIG_PATH") + if path == "" { + logger.Warn("QUEUE_CONFIG_PATH not set; merge controller will fail to resolve queue targets") + return yamlqueueconfig.Store{}, nil + } + return yamlqueueconfig.NewStore(path) +} + // noopPusher is a fallback Pusher used when PUSHER_CHECKOUT_PATH is not // configured. It returns an error on every Push so the merge controller // (which treats non-ErrConflict errors as transient and nacks the message) @@ -683,6 +705,6 @@ func newPusher(logger *zap.Logger, scope tally.Scope) (pusher.Pusher, error) { // that don't run the merge step. type noopPusher struct{} -func (noopPusher) Push(_ context.Context, _ []entity.Change) (pusher.Result, error) { +func (noopPusher) Push(_ context.Context, _ sharedentity.QueueTarget, _ []pusher.PushItem) (pusher.Result, error) { return pusher.Result{}, fmt.Errorf("pusher not configured: set PUSHER_CHECKOUT_PATH to enable pushing") } diff --git a/pushqueue/entity/land.go b/pushqueue/entity/land.go index 48f9b050..13f2c212 100644 --- a/pushqueue/entity/land.go +++ b/pushqueue/entity/land.go @@ -14,17 +14,6 @@ package entity -// QueueTarget identifies a landing destination in a version control system. -// Defined locally in pushqueue; consolidated into shared entity/ by Chunk 2. -type QueueTarget struct { - // Name is an optional logical identifier for correlation and config lookup. - Name string - // Address is the VCS repository address (remote URL, depot path). - Address string - // Target is the landing ref (branch name, stream path). - Target string -} - // LandStrategy defines the possible landing methods for a code change. type LandStrategy string diff --git a/pushqueue/extension/landqueue/BUILD.bazel b/pushqueue/extension/landqueue/BUILD.bazel index 282394ab..a2dc21d8 100644 --- a/pushqueue/extension/landqueue/BUILD.bazel +++ b/pushqueue/extension/landqueue/BUILD.bazel @@ -5,5 +5,8 @@ go_library( srcs = ["landqueue.go"], importpath = "github.com/uber/submitqueue/pushqueue/extension/landqueue", visibility = ["//visibility:public"], - deps = ["//pushqueue/entity"], + deps = [ + "//entity", + "//pushqueue/entity", + ], ) diff --git a/pushqueue/extension/landqueue/landqueue.go b/pushqueue/extension/landqueue/landqueue.go index a1d1aa53..0f8dc0f2 100644 --- a/pushqueue/extension/landqueue/landqueue.go +++ b/pushqueue/extension/landqueue/landqueue.go @@ -19,7 +19,8 @@ package landqueue import ( "context" - "github.com/uber/submitqueue/pushqueue/entity" + "github.com/uber/submitqueue/entity" + pqentity "github.com/uber/submitqueue/pushqueue/entity" ) // Preparer performs pre-push preparation for a set of land items. @@ -27,7 +28,7 @@ import ( // receives a Preparer at construction time and may invoke it between // Enqueue and Wait to pipeline preparation with queue wait time. type Preparer interface { - Prepare(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) error + Prepare(ctx context.Context, target entity.QueueTarget, items []pqentity.LandItem) error } // Queue serializes access to a landing target, ensuring only one request @@ -44,7 +45,7 @@ type Preparer interface { // - Implementations decide when to call Preparer.Prepare — during the // wait (pipelined) or synchronously before Wait returns (simple). type Queue interface { - Enqueue(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) error + Enqueue(ctx context.Context, target entity.QueueTarget, items []pqentity.LandItem) error Wait(ctx context.Context, target entity.QueueTarget) error Dequeue(ctx context.Context, target entity.QueueTarget) error } diff --git a/pushqueue/extension/landqueue/mock/BUILD.bazel b/pushqueue/extension/landqueue/mock/BUILD.bazel index a05e2adf..731db25f 100644 --- a/pushqueue/extension/landqueue/mock/BUILD.bazel +++ b/pushqueue/extension/landqueue/mock/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/pushqueue/extension/landqueue/mock", visibility = ["//visibility:public"], deps = [ + "//entity", "//pushqueue/entity", "@org_uber_go_mock//gomock", ], diff --git a/pushqueue/extension/landqueue/mock/landqueue_mock.go b/pushqueue/extension/landqueue/mock/landqueue_mock.go index 7f1e1a55..5b04c948 100644 --- a/pushqueue/extension/landqueue/mock/landqueue_mock.go +++ b/pushqueue/extension/landqueue/mock/landqueue_mock.go @@ -13,7 +13,8 @@ import ( context "context" reflect "reflect" - entity "github.com/uber/submitqueue/pushqueue/entity" + entity "github.com/uber/submitqueue/entity" + entity0 "github.com/uber/submitqueue/pushqueue/entity" gomock "go.uber.org/mock/gomock" ) @@ -42,7 +43,7 @@ func (m *MockPreparer) EXPECT() *MockPreparerMockRecorder { } // Prepare mocks base method. -func (m *MockPreparer) Prepare(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) error { +func (m *MockPreparer) Prepare(ctx context.Context, target entity.QueueTarget, items []entity0.LandItem) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Prepare", ctx, target, items) ret0, _ := ret[0].(error) @@ -94,7 +95,7 @@ func (mr *MockQueueMockRecorder) Dequeue(ctx, target any) *gomock.Call { } // Enqueue mocks base method. -func (m *MockQueue) Enqueue(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) error { +func (m *MockQueue) Enqueue(ctx context.Context, target entity.QueueTarget, items []entity0.LandItem) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Enqueue", ctx, target, items) ret0, _ := ret[0].(error) diff --git a/pushqueue/extension/vcs/BUILD.bazel b/pushqueue/extension/vcs/BUILD.bazel index 5be99025..90134bf5 100644 --- a/pushqueue/extension/vcs/BUILD.bazel +++ b/pushqueue/extension/vcs/BUILD.bazel @@ -5,5 +5,8 @@ go_library( srcs = ["vcs.go"], importpath = "github.com/uber/submitqueue/pushqueue/extension/vcs", visibility = ["//visibility:public"], - deps = ["//pushqueue/entity"], + deps = [ + "//entity", + "//pushqueue/entity", + ], ) diff --git a/pushqueue/extension/vcs/mock/BUILD.bazel b/pushqueue/extension/vcs/mock/BUILD.bazel index f57b9784..b9eca2b7 100644 --- a/pushqueue/extension/vcs/mock/BUILD.bazel +++ b/pushqueue/extension/vcs/mock/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/pushqueue/extension/vcs/mock", visibility = ["//visibility:public"], deps = [ + "//entity", "//pushqueue/entity", "//pushqueue/extension/vcs", "@org_uber_go_mock//gomock", diff --git a/pushqueue/extension/vcs/mock/vcs_mock.go b/pushqueue/extension/vcs/mock/vcs_mock.go index 199d2faa..f1fc4097 100644 --- a/pushqueue/extension/vcs/mock/vcs_mock.go +++ b/pushqueue/extension/vcs/mock/vcs_mock.go @@ -13,7 +13,8 @@ import ( context "context" reflect "reflect" - entity "github.com/uber/submitqueue/pushqueue/entity" + entity "github.com/uber/submitqueue/entity" + entity0 "github.com/uber/submitqueue/pushqueue/entity" vcs "github.com/uber/submitqueue/pushqueue/extension/vcs" gomock "go.uber.org/mock/gomock" ) @@ -43,7 +44,7 @@ func (m *MockVCS) EXPECT() *MockVCSMockRecorder { } // CheckMergeability mocks base method. -func (m *MockVCS) CheckMergeability(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) ([]vcs.MergeabilityResult, error) { +func (m *MockVCS) CheckMergeability(ctx context.Context, target entity.QueueTarget, items []entity0.LandItem) ([]vcs.MergeabilityResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CheckMergeability", ctx, target, items) ret0, _ := ret[0].([]vcs.MergeabilityResult) @@ -58,7 +59,7 @@ func (mr *MockVCSMockRecorder) CheckMergeability(ctx, target, items any) *gomock } // Finalize mocks base method. -func (m *MockVCS) Finalize(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) error { +func (m *MockVCS) Finalize(ctx context.Context, target entity.QueueTarget, items []entity0.LandItem) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Finalize", ctx, target, items) ret0, _ := ret[0].(error) @@ -72,7 +73,7 @@ func (mr *MockVCSMockRecorder) Finalize(ctx, target, items any) *gomock.Call { } // Prepare mocks base method. -func (m *MockVCS) Prepare(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) error { +func (m *MockVCS) Prepare(ctx context.Context, target entity.QueueTarget, items []entity0.LandItem) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Prepare", ctx, target, items) ret0, _ := ret[0].(error) @@ -86,7 +87,7 @@ func (mr *MockVCSMockRecorder) Prepare(ctx, target, items any) *gomock.Call { } // Push mocks base method. -func (m *MockVCS) Push(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) (vcs.PushResult, error) { +func (m *MockVCS) Push(ctx context.Context, target entity.QueueTarget, items []entity0.LandItem) (vcs.PushResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Push", ctx, target, items) ret0, _ := ret[0].(vcs.PushResult) diff --git a/pushqueue/extension/vcs/vcs.go b/pushqueue/extension/vcs/vcs.go index ae362c75..c80556f0 100644 --- a/pushqueue/extension/vcs/vcs.go +++ b/pushqueue/extension/vcs/vcs.go @@ -20,7 +20,8 @@ import ( "context" "errors" - "github.com/uber/submitqueue/pushqueue/entity" + "github.com/uber/submitqueue/entity" + pqentity "github.com/uber/submitqueue/pushqueue/entity" ) // ErrConflict is returned when a change fails to apply cleanly on top of the target. @@ -80,8 +81,8 @@ type MergeabilityResult struct { // Push atomicity: when Push returns a non-nil error, NO change has been pushed // to the remote. type VCS interface { - CheckMergeability(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) ([]MergeabilityResult, error) - Prepare(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) error - Push(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) (PushResult, error) - Finalize(ctx context.Context, target entity.QueueTarget, items []entity.LandItem) error + CheckMergeability(ctx context.Context, target entity.QueueTarget, items []pqentity.LandItem) ([]MergeabilityResult, error) + Prepare(ctx context.Context, target entity.QueueTarget, items []pqentity.LandItem) error + Push(ctx context.Context, target entity.QueueTarget, items []pqentity.LandItem) (PushResult, error) + Finalize(ctx context.Context, target entity.QueueTarget, items []pqentity.LandItem) error } diff --git a/pushqueue/gateway/controller/BUILD.bazel b/pushqueue/gateway/controller/BUILD.bazel index 0056653b..fbbc28d8 100644 --- a/pushqueue/gateway/controller/BUILD.bazel +++ b/pushqueue/gateway/controller/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//core/errs", "//core/metrics", + "//entity", "//pushqueue/entity", "//pushqueue/extension/landqueue", "//pushqueue/extension/vcs", @@ -29,6 +30,7 @@ go_test( embed = [":controller"], deps = [ "//core/errs", + "//entity", "//pushqueue/entity", "//pushqueue/extension/landqueue/mock", "//pushqueue/extension/vcs", diff --git a/pushqueue/gateway/controller/land.go b/pushqueue/gateway/controller/land.go index bb959bce..ae514e59 100644 --- a/pushqueue/gateway/controller/land.go +++ b/pushqueue/gateway/controller/land.go @@ -22,7 +22,8 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/core/metrics" - "github.com/uber/submitqueue/pushqueue/entity" + "github.com/uber/submitqueue/entity" + pqentity "github.com/uber/submitqueue/pushqueue/entity" "github.com/uber/submitqueue/pushqueue/extension/landqueue" "github.com/uber/submitqueue/pushqueue/extension/vcs" pb "github.com/uber/submitqueue/pushqueue/gateway/protopb" @@ -62,9 +63,9 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p Target: req.Queue.GetTarget(), } - items := make([]entity.LandItem, 0, len(req.Items)) + items := make([]pqentity.LandItem, 0, len(req.Items)) for _, item := range req.Items { - items = append(items, entity.LandItem{ + items = append(items, pqentity.LandItem{ URIs: item.GetUris(), Strategy: resolveStrategy(item.GetStrategy()), }) @@ -121,9 +122,9 @@ func (c *LandController) CheckMergeability(ctx context.Context, req *pb.CheckMer Target: req.Queue.GetTarget(), } - items := make([]entity.LandItem, 0, len(req.Items)) + items := make([]pqentity.LandItem, 0, len(req.Items)) for _, item := range req.Items { - items = append(items, entity.LandItem{ + items = append(items, pqentity.LandItem{ URIs: item.GetUris(), Strategy: resolveStrategy(item.GetStrategy()), }) @@ -152,16 +153,16 @@ func (c *LandController) CheckMergeability(ctx context.Context, req *pb.CheckMer }, nil } -func resolveStrategy(s pb.Strategy) entity.LandStrategy { +func resolveStrategy(s pb.Strategy) pqentity.LandStrategy { switch s { case pb.Strategy_STRATEGY_REBASE: - return entity.LandStrategyRebase + return pqentity.LandStrategyRebase case pb.Strategy_STRATEGY_SQUASH_REBASE: - return entity.LandStrategySquashRebase + return pqentity.LandStrategySquashRebase case pb.Strategy_STRATEGY_MERGE: - return entity.LandStrategyMerge + return pqentity.LandStrategyMerge default: - return entity.LandStrategyUnknown + return pqentity.LandStrategyUnknown } } diff --git a/pushqueue/gateway/controller/land_test.go b/pushqueue/gateway/controller/land_test.go index 170043ad..0aa2c333 100644 --- a/pushqueue/gateway/controller/land_test.go +++ b/pushqueue/gateway/controller/land_test.go @@ -26,7 +26,8 @@ import ( "go.uber.org/zap/zaptest" "github.com/uber/submitqueue/core/errs" - "github.com/uber/submitqueue/pushqueue/entity" + "github.com/uber/submitqueue/entity" + pqentity "github.com/uber/submitqueue/pushqueue/entity" landqueuemock "github.com/uber/submitqueue/pushqueue/extension/landqueue/mock" "github.com/uber/submitqueue/pushqueue/extension/vcs" vcsmock "github.com/uber/submitqueue/pushqueue/extension/vcs/mock" @@ -65,9 +66,9 @@ func TestLand_Success(t *testing.T) { mockQueue := landqueuemock.NewMockQueue(ctrl) target := testTarget() - items := []entity.LandItem{{ + items := []pqentity.LandItem{{ URIs: []string{"github://uber/repo/pull/1/abc123"}, - Strategy: entity.LandStrategyRebase, + Strategy: pqentity.LandStrategyRebase, }} gomock.InOrder( @@ -232,12 +233,12 @@ func TestLand_StrategyMapping(t *testing.T) { testCases := []struct { name string proto pb.Strategy - expected entity.LandStrategy + expected pqentity.LandStrategy }{ - {"rebase", pb.Strategy_STRATEGY_REBASE, entity.LandStrategyRebase}, - {"squash_rebase", pb.Strategy_STRATEGY_SQUASH_REBASE, entity.LandStrategySquashRebase}, - {"merge", pb.Strategy_STRATEGY_MERGE, entity.LandStrategyMerge}, - {"unspecified", pb.Strategy_STRATEGY_UNSPECIFIED, entity.LandStrategyUnknown}, + {"rebase", pb.Strategy_STRATEGY_REBASE, pqentity.LandStrategyRebase}, + {"squash_rebase", pb.Strategy_STRATEGY_SQUASH_REBASE, pqentity.LandStrategySquashRebase}, + {"merge", pb.Strategy_STRATEGY_MERGE, pqentity.LandStrategyMerge}, + {"unspecified", pb.Strategy_STRATEGY_UNSPECIFIED, pqentity.LandStrategyUnknown}, } for _, tc := range testCases { diff --git a/pushqueue/gateway/protopb/gateway.pb.go b/pushqueue/gateway/protopb/gateway.pb.go index f999a7aa..92ea24e8 100644 --- a/pushqueue/gateway/protopb/gateway.pb.go +++ b/pushqueue/gateway/protopb/gateway.pb.go @@ -21,11 +21,12 @@ package protopb import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/pushqueue/gateway/protopb/gateway_grpc.pb.go b/pushqueue/gateway/protopb/gateway_grpc.pb.go index 88c1bb6e..c563c780 100644 --- a/pushqueue/gateway/protopb/gateway_grpc.pb.go +++ b/pushqueue/gateway/protopb/gateway_grpc.pb.go @@ -22,6 +22,7 @@ package protopb import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/submitqueue/extension/pusher/BUILD.bazel b/submitqueue/extension/pusher/BUILD.bazel index d36db3a0..8c4337a9 100644 --- a/submitqueue/extension/pusher/BUILD.bazel +++ b/submitqueue/extension/pusher/BUILD.bazel @@ -5,5 +5,8 @@ go_library( srcs = ["pusher.go"], importpath = "github.com/uber/submitqueue/submitqueue/extension/pusher", visibility = ["//visibility:public"], - deps = ["//submitqueue/entity"], + deps = [ + "//entity", + "//submitqueue/entity", + ], ) diff --git a/submitqueue/extension/pusher/git/BUILD.bazel b/submitqueue/extension/pusher/git/BUILD.bazel index 2525962f..9afaffaa 100644 --- a/submitqueue/extension/pusher/git/BUILD.bazel +++ b/submitqueue/extension/pusher/git/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/metrics", + "//entity", "//submitqueue/entity", "//submitqueue/entity/github", "//submitqueue/extension/pusher", @@ -20,6 +21,7 @@ go_test( srcs = ["git_pusher_test.go"], embed = [":git"], deps = [ + "//entity", "//submitqueue/entity", "//submitqueue/extension/pusher", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/extension/pusher/git/git_pusher.go b/submitqueue/extension/pusher/git/git_pusher.go index 5774948d..a1199d01 100644 --- a/submitqueue/extension/pusher/git/git_pusher.go +++ b/submitqueue/extension/pusher/git/git_pusher.go @@ -62,7 +62,8 @@ import ( "go.uber.org/zap" coremetrics "github.com/uber/submitqueue/core/metrics" - "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/entity" + sqentity "github.com/uber/submitqueue/submitqueue/entity" entitygithub "github.com/uber/submitqueue/submitqueue/entity/github" "github.com/uber/submitqueue/submitqueue/extension/pusher" ) @@ -128,18 +129,23 @@ func NewPusher(params Params) pusher.Pusher { } // Push fulfils the pusher.Pusher contract. -func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret pusher.Result, retErr error) { +func (p *gitPusher) Push(ctx context.Context, _ entity.QueueTarget, items []pusher.PushItem) (ret pusher.Result, retErr error) { op := coremetrics.Begin(p.metricsScope, "push") defer func() { op.Complete(retErr) }() p.mu.Lock() defer p.mu.Unlock() - if len(changes) == 0 { + if len(items) == 0 { coremetrics.NamedCounter(p.metricsScope, "push", "empty_changes", 1) return pusher.Result{}, fmt.Errorf("push called with no changes") } + changes := make([]sqentity.Change, len(items)) + for i, item := range items { + changes[i] = item.Change + } + p.logger.Debugw("starting push", "target", p.target, "remote", p.remote, @@ -195,7 +201,7 @@ func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret push // so the caller can distinguish concurrent-push contention from other push // failures. baseSHA is empty when the failure happened before reset // produced a base. -func (p *gitPusher) tryPush(ctx context.Context, changes []entity.Change) (string, []pusher.ChangeOutcome, error) { +func (p *gitPusher) tryPush(ctx context.Context, changes []sqentity.Change) (string, []pusher.ChangeOutcome, error) { if err := p.resetToRemote(ctx); err != nil { coremetrics.NamedCounter(p.metricsScope, "push", "reset_errors", 1) return "", nil, err @@ -264,7 +270,7 @@ func (p *gitPusher) resetToRemote(ctx context.Context) error { // cherryPickAll walks the changes in order, cherry-picking every URI's head // SHA, and returns one ChangeOutcome per Change in the same order. -func (p *gitPusher) cherryPickAll(ctx context.Context, changes []entity.Change) ([]pusher.ChangeOutcome, error) { +func (p *gitPusher) cherryPickAll(ctx context.Context, changes []sqentity.Change) ([]pusher.ChangeOutcome, error) { outcomes := make([]pusher.ChangeOutcome, 0, len(changes)) for _, change := range changes { commits, err := p.cherryPickChange(ctx, change) @@ -288,7 +294,7 @@ func (p *gitPusher) cherryPickAll(ctx context.Context, changes []entity.Change) // SHA, and cherry-picks it. It returns the list of new commit SHAs // produced for this change (empty if every pick was a no-op because the // content was already on the target branch). -func (p *gitPusher) cherryPickChange(ctx context.Context, change entity.Change) ([]string, error) { +func (p *gitPusher) cherryPickChange(ctx context.Context, change sqentity.Change) ([]string, error) { var commits []string for _, uri := range change.URIs { cid, err := entitygithub.ParseChangeID(uri) diff --git a/submitqueue/extension/pusher/git/git_pusher_test.go b/submitqueue/extension/pusher/git/git_pusher_test.go index 1248966e..e8d1ec41 100644 --- a/submitqueue/extension/pusher/git/git_pusher_test.go +++ b/submitqueue/extension/pusher/git/git_pusher_test.go @@ -31,6 +31,7 @@ import ( "github.com/uber-go/tally/v4" "go.uber.org/zap/zaptest" + sharedentity "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" ) @@ -243,8 +244,8 @@ func TestPusher_Push_SingleChangeSingleURIProducesOneCommit(t *testing.T) { sha := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha)}}, + res, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(sha)}}}, }) require.NoError(t, err) require.Len(t, res.Outcomes, 1) @@ -275,8 +276,8 @@ func TestPusher_Push_StackedURIsProduceMultipleCommitsForOneChange(t *testing.T) mustGit(t, f.authorDir, "push", "-f", "origin", "feature/stack") p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha1), uri(sha2)}}, + res, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(sha1), uri(sha2)}}}, }) require.NoError(t, err) require.Len(t, res.Outcomes, 1) @@ -298,8 +299,8 @@ func TestPusher_Push_AlreadyLandedChangeIsRebasedOut(t *testing.T) { mainBeforePush := f.remoteHEAD(t) p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha)}}, + res, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(sha)}}}, }) require.NoError(t, err) require.Len(t, res.Outcomes, 1) @@ -319,9 +320,9 @@ func TestPusher_Push_MixedChangesPartiallyRebasedOut(t *testing.T) { freshSHA := f.pushPRCommit(t, "feature/b", "extra.txt", "extra\n", "add extra") p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(subsumedSHA)}}, - {URIs: []string{uri(freshSHA)}}, + res, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(subsumedSHA)}}}, + {Change: entity.Change{URIs: []string{uri(freshSHA)}}}, }) require.NoError(t, err) require.Len(t, res.Outcomes, 2) @@ -349,8 +350,8 @@ func TestPusher_Push_ConflictReturnsErrConflictAndDoesNotPush(t *testing.T) { conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") p := f.newPusher(t) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(conflictingSHA)}}, + _, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(conflictingSHA)}}}, }) require.Error(t, err) assert.True(t, errors.Is(err, pusher.ErrConflict)) @@ -368,8 +369,8 @@ func TestPusher_Push_ResetsBetweenCalls(t *testing.T) { // would fail or include unrelated changes. require.NoError(t, writeFile(filepath.Join(f.checkoutDir, "stray.txt"), "leftover\n")) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(sha)}}, + res, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(sha)}}}, }) require.NoError(t, err) require.Len(t, res.Outcomes[0].CommitSHAs, 1) @@ -388,16 +389,16 @@ func TestPusher_Push_RecoversAfterPriorConflict(t *testing.T) { conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") p := f.newPusher(t) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(conflictingSHA)}}, + _, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(conflictingSHA)}}}, }) require.Error(t, err) // A subsequent, clean push must succeed even though the prior call left // a cherry-pick in progress before its rollback. freshSHA := f.pushPRCommit(t, "feature/c", "extra.txt", "extra\n", "add extra") - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(freshSHA)}}, + res, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(freshSHA)}}}, }) require.NoError(t, err) assert.Equal(t, pusher.OutcomeStatusCommitted, res.Outcomes[0].Status) @@ -408,7 +409,7 @@ func TestPusher_Push_RejectsEmptyChanges(t *testing.T) { f := setupGitFixture(t) p := f.newPusher(t) - _, err := p.Push(context.Background(), nil) + _, err := p.Push(context.Background(), sharedentity.QueueTarget{}, nil) require.Error(t, err) assert.False(t, errors.Is(err, pusher.ErrConflict)) } @@ -417,8 +418,8 @@ func TestPusher_Push_InvalidURIErrors(t *testing.T) { f := setupGitFixture(t) p := f.newPusher(t) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{"not a uri"}}, + _, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{"not a uri"}}}, }) require.Error(t, err) } @@ -433,8 +434,8 @@ func TestPusher_Push_RetriesWhenRemoteMovesUnderUs(t *testing.T) { f.installRaceHook(t, []string{raceSHA}) p := f.newPusher(t) - res, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(featureSHA)}}, + res, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(featureSHA)}}}, }) require.NoError(t, err) require.Len(t, res.Outcomes, 1) @@ -469,8 +470,8 @@ func TestPusher_Push_GivesUpAfterMaxAttempts(t *testing.T) { MetricsScope: tally.NoopScope, MaxPushAttempts: 2, }) - _, err := p.Push(context.Background(), []entity.Change{ - {URIs: []string{uri(featureSHA)}}, + _, err := p.Push(context.Background(), sharedentity.QueueTarget{}, []pusher.PushItem{ + {Change: entity.Change{URIs: []string{uri(featureSHA)}}}, }) require.Error(t, err) assert.Equal(t, 2, f.hookInvocations(t), diff --git a/submitqueue/extension/pusher/mock/BUILD.bazel b/submitqueue/extension/pusher/mock/BUILD.bazel index 8bf808e4..e1c23fee 100644 --- a/submitqueue/extension/pusher/mock/BUILD.bazel +++ b/submitqueue/extension/pusher/mock/BUILD.bazel @@ -6,7 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/extension/pusher/mock", visibility = ["//visibility:public"], deps = [ - "//submitqueue/entity", + "//entity", "//submitqueue/extension/pusher", "@org_uber_go_mock//gomock", ], diff --git a/submitqueue/extension/pusher/mock/pusher_mock.go b/submitqueue/extension/pusher/mock/pusher_mock.go index 6ead5f51..463ede80 100644 --- a/submitqueue/extension/pusher/mock/pusher_mock.go +++ b/submitqueue/extension/pusher/mock/pusher_mock.go @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - entity "github.com/uber/submitqueue/submitqueue/entity" + entity "github.com/uber/submitqueue/entity" pusher "github.com/uber/submitqueue/submitqueue/extension/pusher" gomock "go.uber.org/mock/gomock" ) @@ -43,16 +43,16 @@ func (m *MockPusher) EXPECT() *MockPusherMockRecorder { } // Push mocks base method. -func (m *MockPusher) Push(ctx context.Context, changes []entity.Change) (pusher.Result, error) { +func (m *MockPusher) Push(ctx context.Context, target entity.QueueTarget, items []pusher.PushItem) (pusher.Result, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Push", ctx, changes) + ret := m.ctrl.Call(m, "Push", ctx, target, items) ret0, _ := ret[0].(pusher.Result) ret1, _ := ret[1].(error) return ret0, ret1 } // Push indicates an expected call of Push. -func (mr *MockPusherMockRecorder) Push(ctx, changes any) *gomock.Call { +func (mr *MockPusherMockRecorder) Push(ctx, target, items any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), ctx, changes) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), ctx, target, items) } diff --git a/submitqueue/extension/pusher/pusher.go b/submitqueue/extension/pusher/pusher.go index 102ca204..96e4b64c 100644 --- a/submitqueue/extension/pusher/pusher.go +++ b/submitqueue/extension/pusher/pusher.go @@ -20,7 +20,8 @@ import ( "context" "errors" - "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/entity" + sqentity "github.com/uber/submitqueue/submitqueue/entity" ) // ErrConflict is returned by a Pusher when one of the changes fails to apply @@ -46,10 +47,18 @@ const ( OutcomeStatusAlreadyExisted OutcomeStatus = "already_existed" ) +// PushItem pairs a code change with the landing strategy to apply. +type PushItem struct { + // Change is the code change to land. + Change sqentity.Change + // Strategy is the landing strategy for this change. + Strategy sqentity.RequestLandStrategy +} + // ChangeOutcome describes what happened to a single Change inside a Push. type ChangeOutcome struct { // Change is the input change this outcome corresponds to. - Change entity.Change + Change sqentity.Change // Status describes whether the change produced commits or was already // present on the target branch. Status OutcomeStatus @@ -84,5 +93,5 @@ type Result struct { type Pusher interface { // Push applies changes onto the target branch and pushes the resulting // commits. See the type-level docs for the atomicity contract. - Push(ctx context.Context, changes []entity.Change) (Result, error) + Push(ctx context.Context, target entity.QueueTarget, items []PushItem) (Result, error) } diff --git a/submitqueue/orchestrator/controller/merge/BUILD.bazel b/submitqueue/orchestrator/controller/merge/BUILD.bazel index 62929205..f2f4b281 100644 --- a/submitqueue/orchestrator/controller/merge/BUILD.bazel +++ b/submitqueue/orchestrator/controller/merge/BUILD.bazel @@ -7,10 +7,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/metrics", + "//entity", "//entity/queue", "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/pusher", + "//submitqueue/extension/queueconfig", "//submitqueue/extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -23,12 +25,14 @@ go_test( embed = [":merge"], deps = [ "//core/errs", + "//entity", "//entity/queue", "//extension/queue/mock", "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/pusher", "//submitqueue/extension/pusher/mock", + "//submitqueue/extension/queueconfig/mock", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/submitqueue/orchestrator/controller/merge/merge.go b/submitqueue/orchestrator/controller/merge/merge.go index 8f3172c5..17489f3e 100644 --- a/submitqueue/orchestrator/controller/merge/merge.go +++ b/submitqueue/orchestrator/controller/merge/merge.go @@ -23,10 +23,12 @@ import ( "go.uber.org/zap" coremetrics "github.com/uber/submitqueue/core/metrics" + sharedentity "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" + "github.com/uber/submitqueue/submitqueue/extension/queueconfig" "github.com/uber/submitqueue/submitqueue/extension/storage" ) @@ -45,6 +47,7 @@ type Controller struct { store storage.Storage registry consumer.TopicRegistry pusher pusher.Pusher + queueConfig queueconfig.Store topicKey consumer.TopicKey consumerGroup string } @@ -59,6 +62,7 @@ func NewController( store storage.Storage, registry consumer.TopicRegistry, pusherImpl pusher.Pusher, + queueConfig queueconfig.Store, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { @@ -68,6 +72,7 @@ func NewController( store: store, registry: registry, pusher: pusherImpl, + queueConfig: queueConfig, topicKey: topicKey, consumerGroup: consumerGroup, } @@ -120,13 +125,19 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return c.fanout(ctx, batch.ID, batch.Queue) } - changes, err := c.collectChanges(ctx, batch) + changes, err := c.collectPushItems(ctx, batch) if err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "request_load_errors", 1) - return fmt.Errorf("failed to collect changes for batch %s: %w", batch.ID, err) + return fmt.Errorf("failed to collect push items for batch %s: %w", batch.ID, err) } - pushRes, pushErr := c.pusher.Push(ctx, changes) + target, err := c.resolveQueueTarget(ctx, batch.Queue) + if err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "queue_config_errors", 1) + return fmt.Errorf("failed to resolve queue target for batch %s: %w", batch.ID, err) + } + + pushRes, pushErr := c.pusher.Push(ctx, target, changes) var newState entity.BatchState switch { @@ -160,19 +171,36 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return c.fanout(ctx, batch.ID, batch.Queue) } -// collectChanges loads each request in batch.Contains and returns its -// Change. The result preserves batch.Contains order so the Pusher applies -// the changes in the same order the requests were batched. -func (c *Controller) collectChanges(ctx context.Context, batch entity.Batch) ([]entity.Change, error) { - changes := make([]entity.Change, 0, len(batch.Contains)) +// collectPushItems loads each request in batch.Contains and returns a +// PushItem for each. The result preserves batch.Contains order so the Pusher +// applies the changes in the same order the requests were batched. +func (c *Controller) collectPushItems(ctx context.Context, batch entity.Batch) ([]pusher.PushItem, error) { + items := make([]pusher.PushItem, 0, len(batch.Contains)) for _, requestID := range batch.Contains { request, err := c.store.GetRequestStore().Get(ctx, requestID) if err != nil { return nil, fmt.Errorf("failed to get request %s: %w", requestID, err) } - changes = append(changes, request.Change) + items = append(items, pusher.PushItem{ + Change: request.Change, + Strategy: request.LandStrategy, + }) + } + return items, nil +} + +// resolveQueueTarget looks up the queue configuration and returns a QueueTarget +// for the Pusher to use as the landing destination. +func (c *Controller) resolveQueueTarget(ctx context.Context, queueName string) (sharedentity.QueueTarget, error) { + cfg, err := c.queueConfig.Get(ctx, queueName) + if err != nil { + return sharedentity.QueueTarget{}, fmt.Errorf("resolve queue target %q: %w", queueName, err) } - return changes, nil + return sharedentity.QueueTarget{ + Name: cfg.Name, + Address: cfg.VCSAddress, + Target: cfg.Target, + }, nil } // fanout publishes the batch ID to conclude (so requests are updated) and diff --git a/submitqueue/orchestrator/controller/merge/merge_test.go b/submitqueue/orchestrator/controller/merge/merge_test.go index 50f6652b..99ea1907 100644 --- a/submitqueue/orchestrator/controller/merge/merge_test.go +++ b/submitqueue/orchestrator/controller/merge/merge_test.go @@ -26,12 +26,14 @@ import ( "go.uber.org/zap/zaptest" "github.com/uber/submitqueue/core/errs" + sharedentity "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" pushermock "github.com/uber/submitqueue/submitqueue/extension/pusher/mock" + queueconfigmock "github.com/uber/submitqueue/submitqueue/extension/queueconfig/mock" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" ) @@ -76,6 +78,7 @@ func TestNewController(t *testing.T) { store, newRegistry(t, ctrl, nil), pushermock.NewMockPusher(ctrl), + queueconfigmock.NewMockStore(ctrl), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -121,10 +124,10 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, changes []entity.Change) (pusher.Result, error) { - require.Len(t, changes, 1) - assert.Equal(t, change, changes[0]) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ sharedentity.QueueTarget, items []pusher.PushItem) (pusher.Result, error) { + require.Len(t, items, 1) + assert.Equal(t, change, items[0].Change) return pusher.Result{Outcomes: []pusher.ChangeOutcome{{ Change: change, Status: pusher.OutcomeStatusCommitted, @@ -133,12 +136,18 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { }, ) + mockQueueConfig := queueconfigmock.NewMockStore(ctrl) + mockQueueConfig.EXPECT().Get(gomock.Any(), "test-queue").Return(entity.QueueConfig{ + Name: "test-queue", VCSAddress: "git@github.com:uber/repo.git", Target: "main", + }, nil).AnyTimes() + c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, newRegistry(t, ctrl, nil), mockPusher, + mockQueueConfig, consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -182,13 +191,17 @@ func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, got []entity.Change) (pusher.Result, error) { + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ sharedentity.QueueTarget, items []pusher.PushItem) (pusher.Result, error) { + got := make([]entity.Change, len(items)) + for i, item := range items { + got[i] = item.Change + } assert.Equal(t, changes, got, "changes must be in batch.Contains order") - outcomes := make([]pusher.ChangeOutcome, len(got)) - for i, ch := range got { + outcomes := make([]pusher.ChangeOutcome, len(items)) + for i, item := range items { outcomes[i] = pusher.ChangeOutcome{ - Change: ch, + Change: item.Change, Status: pusher.OutcomeStatusCommitted, CommitSHAs: []string{fmt.Sprintf("sha-%d", i)}, } @@ -197,12 +210,18 @@ func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { }, ) + mockQueueConfig := queueconfigmock.NewMockStore(ctrl) + mockQueueConfig.EXPECT().Get(gomock.Any(), "test-queue").Return(entity.QueueConfig{ + Name: "test-queue", VCSAddress: "git@github.com:uber/repo.git", Target: "main", + }, nil).AnyTimes() + c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, newRegistry(t, ctrl, nil), mockPusher, + mockQueueConfig, consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -239,17 +258,23 @@ func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).Return( pusher.Result{}, fmt.Errorf("apply: %w", pusher.ErrConflict), ) + mockQueueConfig := queueconfigmock.NewMockStore(ctrl) + mockQueueConfig.EXPECT().Get(gomock.Any(), "test-queue").Return(entity.QueueConfig{ + Name: "test-queue", VCSAddress: "git@github.com:uber/repo.git", Target: "main", + }, nil).AnyTimes() + c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, newRegistry(t, ctrl, nil), mockPusher, + mockQueueConfig, consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -285,17 +310,23 @@ func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).Return( pusher.Result{}, fmt.Errorf("ssh: connection refused"), ) + mockQueueConfig := queueconfigmock.NewMockStore(ctrl) + mockQueueConfig.EXPECT().Get(gomock.Any(), "test-queue").Return(entity.QueueConfig{ + Name: "test-queue", VCSAddress: "git@github.com:uber/repo.git", Target: "main", + }, nil).AnyTimes() + c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, newRegistry(t, ctrl, nil), mockPusher, + mockQueueConfig, consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -337,6 +368,7 @@ func TestController_Process_TerminalBatchSkipsPushButFansOut(t *testing.T) { store, newRegistry(t, ctrl, nil), mockPusher, + queueconfigmock.NewMockStore(ctrl), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -387,6 +419,7 @@ func TestController_Process_CancellingShortCircuit(t *testing.T) { store, registry, mockPusher, + queueconfigmock.NewMockStore(ctrl), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -412,6 +445,7 @@ func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { store, newRegistry(t, ctrl, nil), pushermock.NewMockPusher(ctrl), + queueconfigmock.NewMockStore(ctrl), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -451,6 +485,7 @@ func TestController_Process_RequestStoreFailurePropagates(t *testing.T) { store, newRegistry(t, ctrl, nil), pushermock.NewMockPusher(ctrl), + queueconfigmock.NewMockStore(ctrl), consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -487,18 +522,24 @@ func TestController_Process_PublishFailureSurfaces(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).Return( pusher.Result{Outcomes: []pusher.ChangeOutcome{{ Status: pusher.OutcomeStatusCommitted, CommitSHAs: []string{"abc"}, }}}, nil, ) + mockQueueConfig := queueconfigmock.NewMockStore(ctrl) + mockQueueConfig.EXPECT().Get(gomock.Any(), "test-queue").Return(entity.QueueConfig{ + Name: "test-queue", VCSAddress: "git@github.com:uber/repo.git", Target: "main", + }, nil).AnyTimes() + c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, newRegistry(t, ctrl, fmt.Errorf("queue down")), mockPusher, + mockQueueConfig, consumer.TopicKeyMerge, "orchestrator-merge", )