diff --git a/CLAUDE.md b/CLAUDE.md index 650889d4..90237f09 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -147,7 +147,8 @@ Paths follow the directory layout: shared code is top-level, domain code nests u - Proto (generated): `github.com/uber/submitqueue/{domain}/{service}/protopb` - Domain entities: `github.com/uber/submitqueue/{domain}/entity` (e.g. `.../submitqueue/entity`) - Domain extensions: `github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}]` (e.g. `.../submitqueue/extension/storage/mysql`) -- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/consumer`, `.../submitqueue/core/request`) +- Cross-domain consumer framework: `github.com/uber/submitqueue/core/consumer`; domain pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey` +- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/request`) - Shared entities: `github.com/uber/submitqueue/entity/{name}` (e.g. `.../entity/messagequeue`) - Shared extensions: `github.com/uber/submitqueue/extension/{name}` (e.g. `.../extension/messagequeue`) - Cross-domain infra: `github.com/uber/submitqueue/core/{pkg}` (e.g. `.../core/errs`, `.../core/metrics`) diff --git a/Makefile b/Makefile index 4a8f7cf5..a8467fda 100644 --- a/Makefile +++ b/Makefile @@ -336,7 +336,7 @@ local-stovepipe-gateway-start: build-stovepipe-gateway-linux ## Start Stovepipe mocks: ## Generate mock files using mockgen @echo "Generating mocks..." - @$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/... + @$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./core/consumer/... @echo "Mocks generated successfully!" proto: ## Generate protobuf files from .proto definitions diff --git a/submitqueue/core/consumer/BUILD.bazel b/core/consumer/BUILD.bazel similarity index 87% rename from submitqueue/core/consumer/BUILD.bazel rename to core/consumer/BUILD.bazel index 97a93b0f..185c7b1e 100644 --- a/submitqueue/core/consumer/BUILD.bazel +++ b/core/consumer/BUILD.bazel @@ -7,7 +7,7 @@ go_library( "controller.go", "registry.go", ], - importpath = "github.com/uber/submitqueue/submitqueue/core/consumer", + importpath = "github.com/uber/submitqueue/core/consumer", visibility = ["//visibility:public"], deps = [ "//core/errs", @@ -27,11 +27,12 @@ go_test( ], deps = [ ":consumer", + "//core/consumer/mock", "//core/errs", "//entity/messagequeue", "//extension/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer/mock", + "//submitqueue/core/topickey", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", diff --git a/submitqueue/core/consumer/README.md b/core/consumer/README.md similarity index 92% rename from submitqueue/core/consumer/README.md rename to core/consumer/README.md index a2e911f9..834082af 100644 --- a/submitqueue/core/consumer/README.md +++ b/core/consumer/README.md @@ -26,7 +26,7 @@ The top-level orchestrator. Register controllers, start consuming, and stop grac ```go registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig}, + {Key: topickey.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig}, }) c := consumer.New(logger, scope, registry, @@ -71,13 +71,13 @@ The `TopicRegistry` maps topic keys to queue backends, topic names, and subscrip ```go registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{ { - Key: consumer.TopicKeyStart, + Key: topickey.TopicKeyStart, Name: "request", Queue: q, Subscription: extqueue.DefaultSubscriptionConfig("worker-1", "orchestrator"), }, { - Key: consumer.TopicKeyBuild, + Key: topickey.TopicKeyBuild, Name: "build", Queue: q, // No Subscription — publish-only topic @@ -85,7 +85,7 @@ registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{ }) ``` -**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyStart`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions. +**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyStart`, `TopicKeyBuild`). Constants live in each domain's `core/topickey` package; this package defines only the `TopicKey` type and registry machinery. The actual queue topic name is configured separately, so library consumers can use their own naming conventions. ## Error Handling diff --git a/submitqueue/core/consumer/consumer.go b/core/consumer/consumer.go similarity index 100% rename from submitqueue/core/consumer/consumer.go rename to core/consumer/consumer.go diff --git a/submitqueue/core/consumer/consumer_test.go b/core/consumer/consumer_test.go similarity index 92% rename from submitqueue/core/consumer/consumer_test.go rename to core/consumer/consumer_test.go index d291687a..566ce99d 100644 --- a/submitqueue/core/consumer/consumer_test.go +++ b/core/consumer/consumer_test.go @@ -26,12 +26,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" + consumermock "github.com/uber/submitqueue/core/consumer/mock" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" extqueue "github.com/uber/submitqueue/extension/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" - consumermock "github.com/uber/submitqueue/submitqueue/core/consumer/mock" + "github.com/uber/submitqueue/submitqueue/core/topickey" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) @@ -103,7 +104,7 @@ func TestConsumer_Register(t *testing.T) { c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler1 := consumermock.NewMockController(ctrl) - setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil) + setupController(handler1, "handler1", topickey.TopicKeyStart, "group1", nil) handler2 := consumermock.NewMockController(ctrl) setupController(handler2, "handler2", consumer.TopicKey("other-topic"), "group2", nil) @@ -123,10 +124,10 @@ func TestConsumer_Register_DuplicateTopic(t *testing.T) { c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler1 := consumermock.NewMockController(ctrl) - setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil) + setupController(handler1, "handler1", topickey.TopicKeyStart, "group1", nil) handler2 := consumermock.NewMockController(ctrl) - setupController(handler2, "handler2", consumer.TopicKeyStart, "group2", nil) + setupController(handler2, "handler2", topickey.TopicKeyStart, "group2", nil) err := c.Register(handler1) require.NoError(t, err) @@ -146,7 +147,7 @@ func TestConsumer_Register_AfterStop(t *testing.T) { require.NoError(t, err) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil) + setupController(handler, "handler1", topickey.TopicKeyStart, "group1", nil) err = c.Register(handler) assert.Error(t, err) @@ -170,7 +171,7 @@ func TestConsumer_Start_AfterStop(t *testing.T) { c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil) + setupController(handler, "handler1", topickey.TopicKeyStart, "group1", nil) err := c.Register(handler) require.NoError(t, err) @@ -189,14 +190,14 @@ func TestConsumer_Start_MissingSubscriptionConfig(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) // Registry has queue but no subscription config reg, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyStart, Name: "request", Queue: mockQ}}, ) require.NoError(t, err) c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler", consumer.TopicKeyStart, "group", nil) + setupController(handler, "handler", topickey.TopicKeyStart, "group", nil) err = c.Register(handler) require.NoError(t, err) @@ -217,12 +218,12 @@ func TestConsumer_Start_SubscribeFailure(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "group") c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler", consumer.TopicKeyStart, "group", nil) + setupController(handler, "handler", topickey.TopicKeyStart, "group", nil) err := c.Register(handler) require.NoError(t, err) @@ -243,13 +244,13 @@ func TestConsumer_ProcessDelivery_Success(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handledMsg := "" handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { handledMsg = delivery.Message().ID return nil @@ -289,12 +290,12 @@ func TestConsumer_ProcessDelivery_Error(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return errs.NewRetryableError(fmt.Errorf("processing failed")) }, @@ -331,12 +332,12 @@ func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return fmt.Errorf("bad payload") }, @@ -382,12 +383,12 @@ func TestConsumer_Stop(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", nil) + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", nil) err := c.Register(handler) require.NoError(t, err) @@ -440,12 +441,12 @@ func TestConsumer_ObservabilityTags(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") testC := consumer.New(logger, testScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return tt.handlerError }, @@ -515,12 +516,12 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") c := consumer.New(logger, scope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return nil }, ) @@ -560,12 +561,12 @@ func TestConsumer_ErrorMetrics(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") c := consumer.New(logger, scope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return errs.NewRetryableError(fmt.Errorf("processing failed")) }, @@ -616,7 +617,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) @@ -626,7 +627,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) { var partBProcessed atomic.Bool handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { pk := delivery.Message().PartitionKey if pk == "partition-a" { @@ -701,7 +702,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) @@ -712,7 +713,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) { allDone := make(chan struct{}) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { mu.Lock() order = append(order, delivery.Message().ID) @@ -770,14 +771,14 @@ func TestConsumer_PartitionWorkerCleanup(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") + reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) processedCount := int64(0) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", + setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { atomic.AddInt64(&processedCount, 1) return nil diff --git a/submitqueue/core/consumer/controller.go b/core/consumer/controller.go similarity index 100% rename from submitqueue/core/consumer/controller.go rename to core/consumer/controller.go diff --git a/submitqueue/core/consumer/mock/BUILD.bazel b/core/consumer/mock/BUILD.bazel similarity index 67% rename from submitqueue/core/consumer/mock/BUILD.bazel rename to core/consumer/mock/BUILD.bazel index 8c74b70f..7ce502ae 100644 --- a/submitqueue/core/consumer/mock/BUILD.bazel +++ b/core/consumer/mock/BUILD.bazel @@ -3,11 +3,11 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "mock", srcs = ["controller_mock.go"], - importpath = "github.com/uber/submitqueue/submitqueue/core/consumer/mock", + importpath = "github.com/uber/submitqueue/core/consumer/mock", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//entity/messagequeue", - "//submitqueue/core/consumer", "@org_uber_go_mock//gomock", ], ) diff --git a/submitqueue/core/consumer/mock/controller_mock.go b/core/consumer/mock/controller_mock.go similarity index 94% rename from submitqueue/core/consumer/mock/controller_mock.go rename to core/consumer/mock/controller_mock.go index 192a2545..6dab2be4 100644 --- a/submitqueue/core/consumer/mock/controller_mock.go +++ b/core/consumer/mock/controller_mock.go @@ -1,10 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. // Source: controller.go -// -// Generated by this command: -// -// mockgen -source=controller.go -destination=mock/controller_mock.go -package=mock -// // Package mock is a generated GoMock package. package mock @@ -13,8 +8,8 @@ import ( context "context" reflect "reflect" + consumer "github.com/uber/submitqueue/core/consumer" messagequeue "github.com/uber/submitqueue/entity/messagequeue" - consumer "github.com/uber/submitqueue/submitqueue/core/consumer" gomock "go.uber.org/mock/gomock" ) @@ -22,7 +17,6 @@ import ( type MockDelivery struct { ctrl *gomock.Controller recorder *MockDeliveryMockRecorder - isgomock struct{} } // MockDeliveryMockRecorder is the mock recorder for MockDelivery. @@ -79,7 +73,7 @@ func (m *MockDelivery) ExtendVisibilityTimeout(ctx context.Context, durationMill } // ExtendVisibilityTimeout indicates an expected call of ExtendVisibilityTimeout. -func (mr *MockDeliveryMockRecorder) ExtendVisibilityTimeout(ctx, durationMillis any) *gomock.Call { +func (mr *MockDeliveryMockRecorder) ExtendVisibilityTimeout(ctx, durationMillis interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExtendVisibilityTimeout", reflect.TypeOf((*MockDelivery)(nil).ExtendVisibilityTimeout), ctx, durationMillis) } @@ -130,7 +124,6 @@ func (mr *MockDeliveryMockRecorder) ReceivedAt() *gomock.Call { type MockController struct { ctrl *gomock.Controller recorder *MockControllerMockRecorder - isgomock struct{} } // MockControllerMockRecorder is the mock recorder for MockController. @@ -187,7 +180,7 @@ func (m *MockController) Process(ctx context.Context, delivery consumer.Delivery } // Process indicates an expected call of Process. -func (mr *MockControllerMockRecorder) Process(ctx, delivery any) *gomock.Call { +func (mr *MockControllerMockRecorder) Process(ctx, delivery interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*MockController)(nil).Process), ctx, delivery) } diff --git a/submitqueue/core/consumer/registry.go b/core/consumer/registry.go similarity index 73% rename from submitqueue/core/consumer/registry.go rename to core/consumer/registry.go index a3c381a4..b9247b2a 100644 --- a/submitqueue/core/consumer/registry.go +++ b/core/consumer/registry.go @@ -27,35 +27,6 @@ import ( // choose their own naming conventions. type TopicKey string -const ( - // TopicKeyStart is the pipeline stage where new requests arrive from the gateway. - TopicKeyStart TopicKey = "start" - // TopicKeyCancel is the pipeline stage where cancellation requests arrive from the gateway. - TopicKeyCancel TopicKey = "cancel" - // TopicKeyValidate is the pipeline stage where requests are published for validation. - TopicKeyValidate TopicKey = "validate" - // TopicKeyBatch is the pipeline stage where validated requests are published for batching. - TopicKeyBatch TopicKey = "batch" - // TopicKeyScore is the pipeline stage where batches are published for scoring. - TopicKeyScore TopicKey = "score" - // TopicKeySpeculate is the pipeline stage where scored batches are published for speculation. - TopicKeySpeculate TopicKey = "speculate" - // TopicKeyBuild is the pipeline stage where speculated batches are published for builds. - TopicKeyBuild TopicKey = "build" - // TopicKeyBuildSignal is the polling stage for triggered builds. Each - // message carries a Build; the consumer calls BuildRunner.Status, - // persists the latest status, publishes the batch ID to TopicKeySpeculate - // so the state machine re-evaluates, and re-publishes itself via - // PublishAfter when the build has not yet reached a terminal state. - TopicKeyBuildSignal TopicKey = "buildsignal" - // TopicKeyMerge is the pipeline stage where speculated batches are published for merging. - TopicKeyMerge TopicKey = "merge" - // TopicKeyConclude is the pipeline stage where merged requests are published for conclusion. - TopicKeyConclude TopicKey = "conclude" - // TopicKeyLog is the pipeline stage where per-request logs are written. - TopicKeyLog TopicKey = "log" -) - // String returns the topic key as a string. func (t TopicKey) String() string { return string(t) diff --git a/submitqueue/core/consumer/registry_test.go b/core/consumer/registry_test.go similarity index 86% rename from submitqueue/core/consumer/registry_test.go rename to core/consumer/registry_test.go index a4050c9b..baf64e0c 100644 --- a/submitqueue/core/consumer/registry_test.go +++ b/core/consumer/registry_test.go @@ -19,9 +19,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/core/consumer" extqueue "github.com/uber/submitqueue/extension/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "go.uber.org/mock/gomock" ) @@ -32,7 +33,7 @@ func TestNewTopicRegistry(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ { - Key: consumer.TopicKeyStart, + Key: topickey.TopicKeyStart, Name: "request", Queue: mockQ, Subscription: extqueue.DefaultSubscriptionConfig( @@ -43,15 +44,15 @@ func TestNewTopicRegistry(t *testing.T) { ) require.NoError(t, err) - q, ok := registry.Queue(consumer.TopicKeyStart) + q, ok := registry.Queue(topickey.TopicKeyStart) require.True(t, ok) assert.Equal(t, mockQ, q) - name, ok := registry.TopicName(consumer.TopicKeyStart) + name, ok := registry.TopicName(topickey.TopicKeyStart) require.True(t, ok) assert.Equal(t, "request", name) - cfg, ok := registry.SubscriptionConfig(consumer.TopicKeyStart, "group-a") + cfg, ok := registry.SubscriptionConfig(topickey.TopicKeyStart, "group-a") require.True(t, ok) assert.Equal(t, "group-a", cfg.ConsumerGroup) } @@ -87,7 +88,7 @@ func TestNewTopicRegistry_InvalidTopicName(t *testing.T) { t.Run(tt.name, func(t *testing.T) { _, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyStart, Name: tt.topicName}, + {Key: topickey.TopicKeyStart, Name: tt.topicName}, }, ) require.Error(t, err) @@ -108,14 +109,14 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) { name: "found group-a", configs: []consumer.TopicConfig{ { - Key: consumer.TopicKeyStart, + Key: topickey.TopicKeyStart, Name: "request", Subscription: extqueue.DefaultSubscriptionConfig( "worker-1", "group-a", ), }, }, - lookupKey: consumer.TopicKeyStart, + lookupKey: topickey.TopicKeyStart, lookupGroup: "group-a", expectFound: true, expectedGroup: "group-a", @@ -124,14 +125,14 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) { name: "not found by group", configs: []consumer.TopicConfig{ { - Key: consumer.TopicKeyStart, + Key: topickey.TopicKeyStart, Name: "request", Subscription: extqueue.DefaultSubscriptionConfig( "worker-1", "group-a", ), }, }, - lookupKey: consumer.TopicKeyStart, + lookupKey: topickey.TopicKeyStart, lookupGroup: "nonexistent", expectFound: false, }, @@ -139,7 +140,7 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) { name: "not found by topic key", configs: []consumer.TopicConfig{ { - Key: consumer.TopicKeyStart, + Key: topickey.TopicKeyStart, Name: "request", Subscription: extqueue.DefaultSubscriptionConfig( "worker-1", "group-a", @@ -175,17 +176,17 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ1}, - {Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ2}, + {Key: topickey.TopicKeyStart, Name: "request", Queue: mockQ1}, + {Key: topickey.TopicKeyValidate, Name: "validate", Queue: mockQ2}, }, ) require.NoError(t, err) - q1, ok := registry.Queue(consumer.TopicKeyStart) + q1, ok := registry.Queue(topickey.TopicKeyStart) require.True(t, ok) assert.Equal(t, mockQ1, q1) - q2, ok := registry.Queue(consumer.TopicKeyValidate) + q2, ok := registry.Queue(topickey.TopicKeyValidate) require.True(t, ok) assert.Equal(t, mockQ2, q2) @@ -201,7 +202,7 @@ func TestTopicKey_String(t *testing.T) { }{ { name: "predefined topic key", - key: consumer.TopicKeyStart, + key: topickey.TopicKeyStart, expected: "start", }, { @@ -224,12 +225,12 @@ func TestTopicRegistry_TopicName(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyStart, Name: "my-custom-request", Queue: mockQ}, + {Key: topickey.TopicKeyStart, Name: "my-custom-request", Queue: mockQ}, }, ) require.NoError(t, err) - name, ok := registry.TopicName(consumer.TopicKeyStart) + name, ok := registry.TopicName(topickey.TopicKeyStart) require.True(t, ok) assert.Equal(t, "my-custom-request", name) diff --git a/example/submitqueue/gateway/server/BUILD.bazel b/example/submitqueue/gateway/server/BUILD.bazel index b036db04..3661cada 100644 --- a/example/submitqueue/gateway/server/BUILD.bazel +++ b/example/submitqueue/gateway/server/BUILD.bazel @@ -11,13 +11,14 @@ go_library( importpath = "github.com/uber/submitqueue/example/submitqueue/gateway/server", visibility = ["//visibility:private"], deps = [ + "//core/consumer", "//core/errs", "//core/errs/generic", "//core/errs/mysql", "//extension/counter/mysql", "//extension/messagequeue", "//extension/messagequeue/mysql", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/extension/queueconfig/yaml", "//submitqueue/extension/storage/mysql", "//submitqueue/gateway/controller", diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index 619370db..0bca7b33 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -28,13 +28,14 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" genericerrs "github.com/uber/submitqueue/core/errs/generic" mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" extqueue "github.com/uber/submitqueue/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/submitqueue/gateway/controller" @@ -198,10 +199,10 @@ func run() error { // the gateway is the sole writer of the request log, persisting entries that // the orchestrator publishes there. registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue}, - {Key: consumer.TopicKeyCancel, Name: "cancel", Queue: mysqlQueue}, + {Key: topickey.TopicKeyStart, Name: "start", Queue: mysqlQueue}, + {Key: topickey.TopicKeyCancel, Name: "cancel", Queue: mysqlQueue}, { - Key: consumer.TopicKeyLog, + Key: topickey.TopicKeyLog, Name: "log", Queue: mysqlQueue, Subscription: extqueue.DefaultSubscriptionConfig( @@ -278,7 +279,7 @@ func run() error { ), ) - logController := logctrl.NewController(logger.Sugar(), scope, store, consumer.TopicKeyLog, "gateway-log") + logController := logctrl.NewController(logger.Sugar(), scope, store, topickey.TopicKeyLog, "gateway-log") if err := logConsumer.Register(logController); err != nil { return fmt.Errorf("failed to register log controller: %w", err) } diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index d3a16bee..3b9b6421 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/uber/submitqueue/example/submitqueue/orchestrator/server", visibility = ["//visibility:private"], deps = [ + "//core/consumer", "//core/errs", "//core/errs/generic", "//core/errs/mysql", @@ -19,7 +20,7 @@ go_library( "//extension/counter/mysql", "//extension/messagequeue", "//extension/messagequeue/mysql", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "//submitqueue/extension/buildrunner/fake", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 91e0d619..94fa080c 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -30,6 +30,7 @@ import ( "golang.org/x/oauth2" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" genericerrs "github.com/uber/submitqueue/core/errs/generic" mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" @@ -38,7 +39,7 @@ import ( mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" extqueue "github.com/uber/submitqueue/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" buildfake "github.com/uber/submitqueue/submitqueue/extension/buildrunner/fake" @@ -371,16 +372,16 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe groupSuffix string } primaryTopics := []topicSpec{ - {consumer.TopicKeyStart, "start", "orchestrator-start"}, - {consumer.TopicKeyCancel, "cancel", "orchestrator-cancel"}, - {consumer.TopicKeyValidate, "validate", "orchestrator-validate"}, - {consumer.TopicKeyBatch, "batch", "orchestrator-batch"}, - {consumer.TopicKeyScore, "score", "orchestrator-score"}, - {consumer.TopicKeySpeculate, "speculate", "orchestrator-speculate"}, - {consumer.TopicKeyBuild, "build", "orchestrator-build"}, - {consumer.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"}, - {consumer.TopicKeyMerge, "merge", "orchestrator-merge"}, - {consumer.TopicKeyConclude, "conclude", "orchestrator-conclude"}, + {topickey.TopicKeyStart, "start", "orchestrator-start"}, + {topickey.TopicKeyCancel, "cancel", "orchestrator-cancel"}, + {topickey.TopicKeyValidate, "validate", "orchestrator-validate"}, + {topickey.TopicKeyBatch, "batch", "orchestrator-batch"}, + {topickey.TopicKeyScore, "score", "orchestrator-score"}, + {topickey.TopicKeySpeculate, "speculate", "orchestrator-speculate"}, + {topickey.TopicKeyBuild, "build", "orchestrator-build"}, + {topickey.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"}, + {topickey.TopicKeyMerge, "merge", "orchestrator-merge"}, + {topickey.TopicKeyConclude, "conclude", "orchestrator-conclude"}, } configs := make([]consumer.TopicConfig, 0, 2*len(primaryTopics)) @@ -422,7 +423,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // writes the request log to storage, so the orchestrator registers no // consuming subscription (and therefore no log DLQ) for this topic. configs = append(configs, consumer.TopicConfig{ - Key: consumer.TopicKeyLog, + Key: topickey.TopicKeyLog, Name: "log", Queue: q, }) @@ -526,7 +527,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope, store, registry, - consumer.TopicKeyStart, + topickey.TopicKeyStart, "orchestrator-start", ) if err := c.Register(requestController); err != nil { @@ -539,7 +540,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope, store, registry, - consumer.TopicKeyCancel, + topickey.TopicKeyCancel, "orchestrator-cancel", ) if err := c.Register(cancelController); err != nil { @@ -554,7 +555,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, registry, mcf, cpf, - consumer.TopicKeyValidate, + topickey.TopicKeyValidate, "orchestrator-validate", ) if err := c.Register(validateController); err != nil { @@ -569,7 +570,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, cnt, store, cof, - consumer.TopicKeyBatch, + topickey.TopicKeyBatch, "orchestrator-batch", ) if err := c.Register(batchController); err != nil { @@ -583,7 +584,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, store, scf, registry, - consumer.TopicKeyScore, + topickey.TopicKeyScore, "orchestrator-score", ) if err := c.Register(scoreController); err != nil { @@ -596,7 +597,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope, store, registry, - consumer.TopicKeySpeculate, + topickey.TopicKeySpeculate, "orchestrator-speculate", ) if err := c.Register(speculateController); err != nil { @@ -610,7 +611,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, store, brf, registry, - consumer.TopicKeyBuild, + topickey.TopicKeyBuild, "orchestrator-build", ) if err := c.Register(buildController); err != nil { @@ -624,7 +625,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, store, brf, registry, - consumer.TopicKeyBuildSignal, + topickey.TopicKeyBuildSignal, "orchestrator-buildsignal", ) if err := c.Register(buildsignalController); err != nil { @@ -638,7 +639,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, store, registry, pshf, - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) if err := c.Register(mergeController); err != nil { @@ -651,7 +652,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope, store, registry, - consumer.TopicKeyConclude, + topickey.TopicKeyConclude, "orchestrator-conclude", ) if err := c.Register(concludeController); err != nil { @@ -672,16 +673,16 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop name string ctl consumer.Controller }{ - {"start_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeLandRequestID, dlq.TopicKey(consumer.TopicKeyStart), "orchestrator-start-dlq")}, - {"cancel_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeCancelRequestID, dlq.TopicKey(consumer.TopicKeyCancel), "orchestrator-cancel-dlq")}, - {"validate_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq")}, - {"batch_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(consumer.TopicKeyBatch), "orchestrator-batch-dlq")}, - {"score_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyScore), "orchestrator-score-dlq")}, - {"speculate_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeySpeculate), "orchestrator-speculate-dlq")}, - {"build_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyBuild), "orchestrator-build-dlq")}, - {"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")}, - {"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq")}, - {"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyConclude), "orchestrator-conclude-dlq")}, + {"start_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeLandRequestID, dlq.TopicKey(topickey.TopicKeyStart), "orchestrator-start-dlq")}, + {"cancel_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeCancelRequestID, dlq.TopicKey(topickey.TopicKeyCancel), "orchestrator-cancel-dlq")}, + {"validate_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(topickey.TopicKeyValidate), "orchestrator-validate-dlq")}, + {"batch_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(topickey.TopicKeyBatch), "orchestrator-batch-dlq")}, + {"score_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyScore), "orchestrator-score-dlq")}, + {"speculate_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeySpeculate), "orchestrator-speculate-dlq")}, + {"build_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuild), "orchestrator-build-dlq")}, + {"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")}, + {"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyMerge), "orchestrator-merge-dlq")}, + {"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyConclude), "orchestrator-conclude-dlq")}, } var count int for _, reg := range dlqRegs { diff --git a/stovepipe/core/topickey/BUILD.bazel b/stovepipe/core/topickey/BUILD.bazel new file mode 100644 index 00000000..faac25f7 --- /dev/null +++ b/stovepipe/core/topickey/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "topickey", + srcs = ["topickey.go"], + importpath = "github.com/uber/submitqueue/stovepipe/core/topickey", + visibility = ["//visibility:public"], + deps = ["//core/consumer"], +) diff --git a/stovepipe/core/topickey/topickey.go b/stovepipe/core/topickey/topickey.go new file mode 100644 index 00000000..4bc7e714 --- /dev/null +++ b/stovepipe/core/topickey/topickey.go @@ -0,0 +1,28 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package topickey defines Stovepipe pipeline stage identifiers. +package topickey + +import "github.com/uber/submitqueue/core/consumer" + +// TopicKey is the shared pipeline stage identifier type. +type TopicKey = consumer.TopicKey + +const ( + // TopicKeyStart is the pipeline stage where trunk push events arrive from the gateway. + TopicKeyStart TopicKey = "start" + // TopicKeyValidate is the pipeline stage where commits are published for metadata resolution. + TopicKeyValidate TopicKey = "validate" +) diff --git a/submitqueue/core/request/BUILD.bazel b/submitqueue/core/request/BUILD.bazel index c61803b3..24e04ade 100644 --- a/submitqueue/core/request/BUILD.bazel +++ b/submitqueue/core/request/BUILD.bazel @@ -9,8 +9,9 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/core/request", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//entity/messagequeue", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", ], @@ -24,9 +25,10 @@ go_test( ], embed = [":request"], deps = [ + "//core/consumer", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", diff --git a/submitqueue/core/request/log.go b/submitqueue/core/request/log.go index 7af97060..3f1cd417 100644 --- a/submitqueue/core/request/log.go +++ b/submitqueue/core/request/log.go @@ -18,8 +18,9 @@ import ( "context" "fmt" + "github.com/uber/submitqueue/core/consumer" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" ) @@ -40,14 +41,14 @@ func PublishLog(ctx context.Context, registry consumer.TopicRegistry, logEntry e msgID := fmt.Sprintf("%s/%s", logEntry.RequestID, logEntry.Status) msg := entityqueue.NewMessage(msgID, payload, partitionKey, nil) - q, ok := registry.Queue(consumer.TopicKeyLog) + q, ok := registry.Queue(topickey.TopicKeyLog) if !ok { - return fmt.Errorf("no queue registered for topic key %s", consumer.TopicKeyLog) + return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyLog) } - topicName, ok := registry.TopicName(consumer.TopicKeyLog) + topicName, ok := registry.TopicName(topickey.TopicKeyLog) if !ok { - return fmt.Errorf("no topic name registered for topic key %s", consumer.TopicKeyLog) + return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyLog) } if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { diff --git a/submitqueue/core/request/log_test.go b/submitqueue/core/request/log_test.go index 409491ef..512ee668 100644 --- a/submitqueue/core/request/log_test.go +++ b/submitqueue/core/request/log_test.go @@ -20,9 +20,10 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/core/consumer" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "go.uber.org/mock/gomock" ) @@ -39,7 +40,7 @@ func newTestRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) co mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyLog, Name: "log", Queue: mockQ}}, ) require.NoError(t, err) return registry @@ -94,7 +95,7 @@ func TestPublishBatchLogs_PartialFailure(t *testing.T) { mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyLog, Name: "log", Queue: mockQ}}, ) require.NoError(t, err) @@ -137,7 +138,7 @@ func TestPublishLog_MessageIDScopedByStatus(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyLog, Name: "log", Queue: mockQ}}, ) require.NoError(t, err) diff --git a/submitqueue/core/topickey/BUILD.bazel b/submitqueue/core/topickey/BUILD.bazel new file mode 100644 index 00000000..3d15ae3b --- /dev/null +++ b/submitqueue/core/topickey/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "topickey", + srcs = ["topickey.go"], + importpath = "github.com/uber/submitqueue/submitqueue/core/topickey", + visibility = ["//visibility:public"], + deps = ["//core/consumer"], +) diff --git a/submitqueue/core/topickey/topickey.go b/submitqueue/core/topickey/topickey.go new file mode 100644 index 00000000..b4fbbe42 --- /dev/null +++ b/submitqueue/core/topickey/topickey.go @@ -0,0 +1,50 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package topickey defines SubmitQueue pipeline stage identifiers. +package topickey + +import "github.com/uber/submitqueue/core/consumer" + +// TopicKey is the shared pipeline stage identifier type. +type TopicKey = consumer.TopicKey + +const ( + // TopicKeyStart is the pipeline stage where new requests arrive from the gateway. + TopicKeyStart TopicKey = "start" + // TopicKeyCancel is the pipeline stage where cancellation requests arrive from the gateway. + TopicKeyCancel TopicKey = "cancel" + // TopicKeyValidate is the pipeline stage where requests are published for validation. + TopicKeyValidate TopicKey = "validate" + // TopicKeyBatch is the pipeline stage where validated requests are published for batching. + TopicKeyBatch TopicKey = "batch" + // TopicKeyScore is the pipeline stage where batches are published for scoring. + TopicKeyScore TopicKey = "score" + // TopicKeySpeculate is the pipeline stage where scored batches are published for speculation. + TopicKeySpeculate TopicKey = "speculate" + // TopicKeyBuild is the pipeline stage where speculated batches are published for builds. + TopicKeyBuild TopicKey = "build" + // TopicKeyBuildSignal is the polling stage for triggered builds. Each + // message carries a Build; the consumer calls BuildRunner.Status, + // persists the latest status, publishes the batch ID to TopicKeySpeculate + // so the state machine re-evaluates, and re-publishes itself via + // PublishAfter when the build has not yet reached a terminal state. + TopicKeyBuildSignal TopicKey = "buildsignal" + // TopicKeyMerge is the pipeline stage where speculated batches are published for merging. + TopicKeyMerge TopicKey = "merge" + // TopicKeyConclude is the pipeline stage where merged requests are published for conclusion. + TopicKeyConclude TopicKey = "conclude" + // TopicKeyLog is the pipeline stage where per-request logs are written. + TopicKeyLog TopicKey = "log" +) diff --git a/submitqueue/gateway/controller/BUILD.bazel b/submitqueue/gateway/controller/BUILD.bazel index ab0dabd5..6438ec17 100644 --- a/submitqueue/gateway/controller/BUILD.bazel +++ b/submitqueue/gateway/controller/BUILD.bazel @@ -11,12 +11,13 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/gateway/controller", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/errs", "//core/metrics", "//entity/messagequeue", "//extension/counter", - "//submitqueue/core/consumer", "//submitqueue/core/request", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/queueconfig", "//submitqueue/extension/storage", @@ -36,11 +37,12 @@ go_test( ], embed = [":controller"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/counter/mock", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/queueconfig", "//submitqueue/extension/queueconfig/mock", diff --git a/submitqueue/gateway/controller/cancel.go b/submitqueue/gateway/controller/cancel.go index 6718fd74..54d197e5 100644 --- a/submitqueue/gateway/controller/cancel.go +++ b/submitqueue/gateway/controller/cancel.go @@ -20,9 +20,10 @@ import ( "time" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" @@ -43,7 +44,7 @@ type CancelController struct { // NewCancelController creates a new instance of the gateway cancel controller. // The controller writes a RequestStatusCancelling log entry through requestLogStore and -// publishes cancel requests to the topic registered under consumer.TopicKeyCancel. +// publishes cancel requests to the topic registered under topickey.TopicKeyCancel. func NewCancelController(logger *zap.SugaredLogger, scope tally.Scope, requestLogStore storage.RequestLogStore, registry consumer.TopicRegistry) *CancelController { return &CancelController{ logger: logger, @@ -115,7 +116,7 @@ func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (* c.logger.Infow("cancel request published to queue", "sqid", cancelRequest.ID, - "topic_key", consumer.TopicKeyCancel, + "topic_key", topickey.TopicKeyCancel, ) c.metricsScope.Counter("cancel_publish_success").Inc(1) @@ -133,14 +134,14 @@ func (c *CancelController) publishToQueue(ctx context.Context, cancelRequest ent // TODO: figure best way to ID and partition the message according to new guidelines on queue usage msg := entityqueue.NewMessage(cancelRequest.ID, payload, cancelRequest.ID, nil) - q, ok := c.registry.Queue(consumer.TopicKeyCancel) + q, ok := c.registry.Queue(topickey.TopicKeyCancel) if !ok { - return fmt.Errorf("no queue registered for topic key %s", consumer.TopicKeyCancel) + return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyCancel) } - topicName, ok := c.registry.TopicName(consumer.TopicKeyCancel) + topicName, ok := c.registry.TopicName(topickey.TopicKeyCancel) if !ok { - return fmt.Errorf("no topic name registered for topic key %s", consumer.TopicKeyCancel) + return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyCancel) } if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { diff --git a/submitqueue/gateway/controller/cancel_test.go b/submitqueue/gateway/controller/cancel_test.go index 9b0d120f..27d2f160 100644 --- a/submitqueue/gateway/controller/cancel_test.go +++ b/submitqueue/gateway/controller/cancel_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" @@ -43,7 +44,7 @@ func newCancelTestRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.Topi q.EXPECT().Publisher().Return(pub).AnyTimes() registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyCancel, Name: "cancel", Queue: q}, + {Key: topickey.TopicKeyCancel, Name: "cancel", Queue: q}, }) require.NoError(t, err) return registry, pub diff --git a/submitqueue/gateway/controller/land.go b/submitqueue/gateway/controller/land.go index 2d0c935b..e7e1cdc3 100644 --- a/submitqueue/gateway/controller/land.go +++ b/submitqueue/gateway/controller/land.go @@ -20,11 +20,12 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" "github.com/uber/submitqueue/extension/counter" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/queueconfig" "github.com/uber/submitqueue/submitqueue/extension/storage" @@ -71,7 +72,7 @@ type LandController struct { // NewLandController creates a new instance of the gateway land controller. // The controller publishes land requests to the topic registered under -// consumer.TopicKeyStart in the registry. +// topickey.TopicKeyStart in the registry. func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, store storage.Storage, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController { return &LandController{ logger: logger, @@ -153,7 +154,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p c.logger.Infow("request published to queue", "queue", req.Queue, "sqid", landRequest.ID, - "topic_key", consumer.TopicKeyStart, + "topic_key", topickey.TopicKeyStart, ) metrics.NamedCounter(c.metricsScope, opName, "publish_success", 1) @@ -176,14 +177,14 @@ func (c *LandController) publishToQueue(ctx context.Context, landRequest entity. // - Partition key: landRequest.Queue (ensures ordering per queue) msg := entityqueue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) - q, ok := c.registry.Queue(consumer.TopicKeyStart) + q, ok := c.registry.Queue(topickey.TopicKeyStart) if !ok { - return fmt.Errorf("no queue registered for topic key %s", consumer.TopicKeyStart) + return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyStart) } - topicName, ok := c.registry.TopicName(consumer.TopicKeyStart) + topicName, ok := c.registry.TopicName(topickey.TopicKeyStart) if !ok { - return fmt.Errorf("no topic name registered for topic key %s", consumer.TopicKeyStart) + return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyStart) } if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { diff --git a/submitqueue/gateway/controller/land_test.go b/submitqueue/gateway/controller/land_test.go index 2f433496..2aa1bdb6 100644 --- a/submitqueue/gateway/controller/land_test.go +++ b/submitqueue/gateway/controller/land_test.go @@ -22,11 +22,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" countermock "github.com/uber/submitqueue/extension/counter/mock" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/queueconfig" qcmock "github.com/uber/submitqueue/submitqueue/extension/queueconfig/mock" @@ -47,7 +48,7 @@ func newTestRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.TopicRegis q.EXPECT().Publisher().Return(pub).AnyTimes() registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyStart, Name: "start", Queue: q}, + {Key: topickey.TopicKeyStart, Name: "start", Queue: q}, }) require.NoError(t, err) return registry, pub diff --git a/submitqueue/gateway/controller/log/BUILD.bazel b/submitqueue/gateway/controller/log/BUILD.bazel index ce7eb046..32654283 100644 --- a/submitqueue/gateway/controller/log/BUILD.bazel +++ b/submitqueue/gateway/controller/log/BUILD.bazel @@ -6,8 +6,8 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/gateway/controller/log", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/metrics", - "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/storage", "@com_github_uber_go_tally//:tally", @@ -22,7 +22,7 @@ go_test( deps = [ "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//require", diff --git a/submitqueue/gateway/controller/log/log.go b/submitqueue/gateway/controller/log/log.go index f19d619a..4af2a8c1 100644 --- a/submitqueue/gateway/controller/log/log.go +++ b/submitqueue/gateway/controller/log/log.go @@ -19,8 +19,8 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" - "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" diff --git a/submitqueue/gateway/controller/log/log_test.go b/submitqueue/gateway/controller/log/log_test.go index f7143263..28cec240 100644 --- a/submitqueue/gateway/controller/log/log_test.go +++ b/submitqueue/gateway/controller/log/log_test.go @@ -23,7 +23,7 @@ import ( "github.com/uber-go/tally" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" @@ -35,7 +35,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - return NewController(logger, scope, store, consumer.TopicKeyLog, "gateway-log") + return NewController(logger, scope, store, topickey.TopicKeyLog, "gateway-log") } func TestController_Process(t *testing.T) { diff --git a/submitqueue/orchestrator/controller/batch/BUILD.bazel b/submitqueue/orchestrator/controller/batch/BUILD.bazel index 7a13a84e..61dcdfb5 100644 --- a/submitqueue/orchestrator/controller/batch/BUILD.bazel +++ b/submitqueue/orchestrator/controller/batch/BUILD.bazel @@ -6,10 +6,11 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/batch", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/metrics", "//entity/messagequeue", "//extension/counter", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/conflict", "//submitqueue/extension/storage", @@ -23,10 +24,11 @@ go_test( srcs = ["batch_test.go"], embed = [":batch"], deps = [ + "//core/consumer", "//entity/messagequeue", "//extension/counter/mock", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/conflict", "//submitqueue/extension/conflict/all", diff --git a/submitqueue/orchestrator/controller/batch/batch.go b/submitqueue/orchestrator/controller/batch/batch.go index 1722e02b..fef116fd 100644 --- a/submitqueue/orchestrator/controller/batch/batch.go +++ b/submitqueue/orchestrator/controller/batch/batch.go @@ -20,10 +20,11 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" "github.com/uber/submitqueue/extension/counter" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/conflict" "github.com/uber/submitqueue/submitqueue/extension/storage" @@ -293,14 +294,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Publish to score topic for further processing. // If it fails and the controller retries, a new batch will be created with the new batch ID but the same request ID. // The downstream logic should be able to handle stale entries by looking at the state of the batch. - if err := c.publish(ctx, consumer.TopicKeyScore, batch.ID, batch.Queue); err != nil { + if err := c.publish(ctx, topickey.TopicKeyScore, batch.ID, batch.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish batch ID to score topic: %w", err) } c.logger.Infow("published batch to score topic", "batch_id", batch.ID, - "topic_key", consumer.TopicKeyScore, + "topic_key", topickey.TopicKeyScore, ) return nil // Success - message will be acked diff --git a/submitqueue/orchestrator/controller/batch/batch_test.go b/submitqueue/orchestrator/controller/batch/batch_test.go index 6e5c07a6..59ce99cc 100644 --- a/submitqueue/orchestrator/controller/batch/batch_test.go +++ b/submitqueue/orchestrator/controller/batch/batch_test.go @@ -24,10 +24,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" entityqueue "github.com/uber/submitqueue/entity/messagequeue" countermock "github.com/uber/submitqueue/extension/counter/mock" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/conflict" "github.com/uber/submitqueue/submitqueue/extension/conflict/all" @@ -110,14 +111,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyScore, Name: "score", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyScore, Name: "score", Queue: mockQ}}, ) require.NoError(t, err) analyzerFactory := conflictmock.NewMockFactory(ctrl) analyzerFactory.EXPECT().For(gomock.Any()).Return(analyzer, nil).AnyTimes() - return NewController(logger, scope, registry, cnt, mockStorage, analyzerFactory, consumer.TopicKeyBatch, "orchestrator-batch") + return NewController(logger, scope, registry, cnt, mockStorage, analyzerFactory, topickey.TopicKeyBatch, "orchestrator-batch") } func TestNewController(t *testing.T) { @@ -125,7 +126,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyBatch, controller.TopicKey()) + assert.Equal(t, topickey.TopicKeyBatch, controller.TopicKey()) assert.Equal(t, "orchestrator-batch", controller.ConsumerGroup()) assert.Equal(t, "batch", controller.Name()) } @@ -435,7 +436,7 @@ func TestController_Process_CASLostToCancel(t *testing.T) { mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyScore, Name: "score", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyScore, Name: "score", Queue: mockQ}}, ) require.NoError(t, err) @@ -443,7 +444,7 @@ func TestController_Process_CASLostToCancel(t *testing.T) { analyzerFactory.EXPECT().For(gomock.Any()).Return(all.New(), nil).AnyTimes() controller := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, registry, newSequentialCounter(ctrl), - mockStorage, analyzerFactory, consumer.TopicKeyBatch, "orchestrator-batch", + mockStorage, analyzerFactory, topickey.TopicKeyBatch, "orchestrator-batch", ) msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) diff --git a/submitqueue/orchestrator/controller/build/BUILD.bazel b/submitqueue/orchestrator/controller/build/BUILD.bazel index 78e26b58..808148f8 100644 --- a/submitqueue/orchestrator/controller/build/BUILD.bazel +++ b/submitqueue/orchestrator/controller/build/BUILD.bazel @@ -6,9 +6,10 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/build", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/metrics", "//entity/messagequeue", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "//submitqueue/extension/storage", @@ -22,10 +23,11 @@ go_test( srcs = ["build_test.go"], embed = [":build"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "//submitqueue/extension/buildrunner/fake", diff --git a/submitqueue/orchestrator/controller/build/build.go b/submitqueue/orchestrator/controller/build/build.go index 4606d923..8941b6b8 100644 --- a/submitqueue/orchestrator/controller/build/build.go +++ b/submitqueue/orchestrator/controller/build/build.go @@ -20,9 +20,10 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" "github.com/uber/submitqueue/submitqueue/extension/storage" @@ -158,7 +159,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Hand off to the buildsignal poll loop; it calls Status, updates the // persisted Build, publishes to speculate, and re-publishes itself via // PublishAfter until terminal. - if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil { + if err := c.publish(ctx, topickey.TopicKeyBuildSignal, build); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to buildsignal: %w", err) } @@ -167,7 +168,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "batch_id", batch.ID, "build_id", build.ID, "status", string(build.Status), - "topic_key", consumer.TopicKeyBuildSignal, + "topic_key", topickey.TopicKeyBuildSignal, ) return nil // Success - message will be acked diff --git a/submitqueue/orchestrator/controller/build/build_test.go b/submitqueue/orchestrator/controller/build/build_test.go index a144078e..28c352c2 100644 --- a/submitqueue/orchestrator/controller/build/build_test.go +++ b/submitqueue/orchestrator/controller/build/build_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" buildfake "github.com/uber/submitqueue/submitqueue/extension/buildrunner/fake" @@ -100,11 +101,11 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, ) require.NoError(t, err) - return NewController(logger, scope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + return NewController(logger, scope, store, staticBuildRunnerFactory{r: br}, registry, topickey.TopicKeyBuild, "orchestrator-build") } func TestNewController(t *testing.T) { @@ -114,7 +115,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, store, buildfake.New(), nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyBuild, controller.TopicKey()) + assert.Equal(t, topickey.TopicKeyBuild, controller.TopicKey()) assert.Equal(t, "orchestrator-build", controller.ConsumerGroup()) assert.Equal(t, "build", controller.Name()) } @@ -201,11 +202,11 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, topickey.TopicKeyBuild, "orchestrator-build") msg := entityqueue.NewMessage(headBatch.ID, batchIDPayload(t, headBatch.ID), headBatch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -260,10 +261,10 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, topickey.TopicKeyBuild, "orchestrator-build") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -292,10 +293,10 @@ func TestController_Process_TriggerFailure(t *testing.T) { Return(entity.BuildID{}, fmt.Errorf("provider down")) registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: queuemock.NewMockQueue(ctrl)}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyBuildSignal, Name: "buildsignal", Queue: queuemock.NewMockQueue(ctrl)}}, ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, topickey.TopicKeyBuild, "orchestrator-build") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) diff --git a/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel b/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel index 51b5b0fb..7cf7775b 100644 --- a/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel +++ b/submitqueue/orchestrator/controller/buildsignal/BUILD.bazel @@ -6,10 +6,11 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/buildsignal", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/errs", "//core/metrics", "//entity/messagequeue", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "//submitqueue/extension/storage", @@ -23,10 +24,11 @@ go_test( srcs = ["buildsignal_test.go"], embed = [":buildsignal"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/buildrunner/mock", "//submitqueue/extension/storage/mock", diff --git a/submitqueue/orchestrator/controller/buildsignal/buildsignal.go b/submitqueue/orchestrator/controller/buildsignal/buildsignal.go index 69d9206c..4e1907fb 100644 --- a/submitqueue/orchestrator/controller/buildsignal/buildsignal.go +++ b/submitqueue/orchestrator/controller/buildsignal/buildsignal.go @@ -27,10 +27,11 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" "github.com/uber/submitqueue/submitqueue/extension/storage" @@ -174,7 +175,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } // Re-evaluate the batch state machine with the latest build status. - if err := c.publishBatchID(ctx, consumer.TopicKeySpeculate, build.BatchID, msg.PartitionKey); err != nil { + if err := c.publishBatchID(ctx, topickey.TopicKeySpeculate, build.BatchID, msg.PartitionKey); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to speculate: %w", err) } diff --git a/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go b/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go index 5697d7e9..ede2d629 100644 --- a/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go +++ b/submitqueue/orchestrator/controller/buildsignal/buildsignal_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" buildrunnermock "github.com/uber/submitqueue/submitqueue/extension/buildrunner/mock" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" @@ -59,8 +60,8 @@ func newTestHarness(t *testing.T, ctrl *gomock.Controller) *testHarness { speculateQ.EXPECT().Publisher().Return(speculatePub).AnyTimes() registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: signalQ}, - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: speculateQ}, + {Key: topickey.TopicKeyBuildSignal, Name: "buildsignal", Queue: signalQ}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: speculateQ}, }) require.NoError(t, err) @@ -76,7 +77,7 @@ func newTestHarness(t *testing.T, ctrl *gomock.Controller) *testHarness { store, brFactory, registry, - consumer.TopicKeyBuildSignal, + topickey.TopicKeyBuildSignal, "orchestrator-buildsignal", ) return &testHarness{ @@ -108,7 +109,7 @@ func TestController_Identity(t *testing.T) { h := newTestHarness(t, ctrl) assert.Equal(t, "buildsignal", h.controller.Name()) - assert.Equal(t, consumer.TopicKeyBuildSignal, h.controller.TopicKey()) + assert.Equal(t, topickey.TopicKeyBuildSignal, h.controller.TopicKey()) assert.Equal(t, "orchestrator-buildsignal", h.controller.ConsumerGroup()) var _ consumer.Controller = h.controller diff --git a/submitqueue/orchestrator/controller/cancel/BUILD.bazel b/submitqueue/orchestrator/controller/cancel/BUILD.bazel index 75aca10e..466799f2 100644 --- a/submitqueue/orchestrator/controller/cancel/BUILD.bazel +++ b/submitqueue/orchestrator/controller/cancel/BUILD.bazel @@ -6,9 +6,10 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/cancel", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//entity/messagequeue", - "//submitqueue/core/consumer", "//submitqueue/core/request", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", "@com_github_uber_go_tally//:tally", @@ -21,9 +22,10 @@ go_test( srcs = ["cancel_test.go"], embed = [":cancel"], deps = [ + "//core/consumer", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", diff --git a/submitqueue/orchestrator/controller/cancel/cancel.go b/submitqueue/orchestrator/controller/cancel/cancel.go index 6f10bcb4..96a214ea 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel.go +++ b/submitqueue/orchestrator/controller/cancel/cancel.go @@ -59,9 +59,10 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" @@ -286,7 +287,7 @@ func (c *Controller) cancelBatch(ctx context.Context, batch entity.Batch) error c.metricsScope.Counter("batch_already_cancelling").Inc(1) } - if err := c.publishBatchID(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { + if err := c.publishBatchID(ctx, topickey.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to hand off cancelled batch %s to speculate: %w", batch.ID, err) } diff --git a/submitqueue/orchestrator/controller/cancel/cancel_test.go b/submitqueue/orchestrator/controller/cancel/cancel_test.go index 8fa72e86..a97079fb 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel_test.go +++ b/submitqueue/orchestrator/controller/cancel/cancel_test.go @@ -22,9 +22,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" @@ -50,16 +51,16 @@ func newRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.TopicRegistry, q.EXPECT().Publisher().Return(pub).AnyTimes() reg, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyCancel, Name: "cancel", Queue: q}, - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: q}, - {Key: consumer.TopicKeyLog, Name: "log", Queue: q}, + {Key: topickey.TopicKeyCancel, Name: "cancel", Queue: q}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: q}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: q}, }) require.NoError(t, err) return reg, pub } func newController(t *testing.T, store storage.Storage, registry consumer.TopicRegistry) *Controller { - return NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, consumer.TopicKeyCancel, "orchestrator-cancel") + return NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, topickey.TopicKeyCancel, "orchestrator-cancel") } func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte, partitionKey string) consumer.Delivery { @@ -79,7 +80,7 @@ func TestNewController(t *testing.T) { controller := newController(t, store, registry) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyCancel, controller.TopicKey()) + assert.Equal(t, topickey.TopicKeyCancel, controller.TopicKey()) assert.Equal(t, "orchestrator-cancel", controller.ConsumerGroup()) assert.Equal(t, "cancel", controller.Name()) diff --git a/submitqueue/orchestrator/controller/conclude/BUILD.bazel b/submitqueue/orchestrator/controller/conclude/BUILD.bazel index 5f83b255..75fe5afb 100644 --- a/submitqueue/orchestrator/controller/conclude/BUILD.bazel +++ b/submitqueue/orchestrator/controller/conclude/BUILD.bazel @@ -6,8 +6,8 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/metrics", - "//submitqueue/core/consumer", "//submitqueue/core/request", "//submitqueue/entity", "//submitqueue/extension/storage", @@ -21,10 +21,11 @@ go_test( srcs = ["conclude_test.go"], embed = [":conclude"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", diff --git a/submitqueue/orchestrator/controller/conclude/conclude.go b/submitqueue/orchestrator/controller/conclude/conclude.go index 88abe3d3..1c9e7a72 100644 --- a/submitqueue/orchestrator/controller/conclude/conclude.go +++ b/submitqueue/orchestrator/controller/conclude/conclude.go @@ -19,8 +19,8 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" - "github.com/uber/submitqueue/submitqueue/core/consumer" corerequest "github.com/uber/submitqueue/submitqueue/core/request" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" diff --git a/submitqueue/orchestrator/controller/conclude/conclude_test.go b/submitqueue/orchestrator/controller/conclude/conclude_test.go index 5f59d738..ff47f535 100644 --- a/submitqueue/orchestrator/controller/conclude/conclude_test.go +++ b/submitqueue/orchestrator/controller/conclude/conclude_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" @@ -65,12 +66,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *stora registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: mockQ}, }, ) require.NoError(t, err) - return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude"), mockPub + return NewController(logger, scope, mockStorage, registry, topickey.TopicKeyConclude, "orchestrator-conclude"), mockPub } func TestNewController(t *testing.T) { @@ -78,7 +79,7 @@ func TestNewController(t *testing.T) { controller, _ := newTestController(t, ctrl, nil, false) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyConclude, controller.TopicKey()) + assert.Equal(t, topickey.TopicKeyConclude, controller.TopicKey()) assert.Equal(t, "orchestrator-conclude", controller.ConsumerGroup()) assert.Equal(t, "conclude", controller.Name()) } diff --git a/submitqueue/orchestrator/controller/dlq/BUILD.bazel b/submitqueue/orchestrator/controller/dlq/BUILD.bazel index e75e4968..f38da3af 100644 --- a/submitqueue/orchestrator/controller/dlq/BUILD.bazel +++ b/submitqueue/orchestrator/controller/dlq/BUILD.bazel @@ -12,8 +12,8 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/metrics", - "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/storage", "@com_github_uber_go_tally//:tally", @@ -32,10 +32,11 @@ go_test( ], embed = [":dlq"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", diff --git a/submitqueue/orchestrator/controller/dlq/README.md b/submitqueue/orchestrator/controller/dlq/README.md index 83512134..551138b7 100644 --- a/submitqueue/orchestrator/controller/dlq/README.md +++ b/submitqueue/orchestrator/controller/dlq/README.md @@ -47,5 +47,5 @@ Reconciliation is safe to run more than once for the same message: ## See also - `core/errs/README.md` — the error-processing framework, including `AlwaysRetryableProcessor` and the choice of processor for primary vs. DLQ consumers. -- `submitqueue/core/consumer/README.md` — how the consumer applies the processor to controller errors and decides ack/nack/reject. +- `core/consumer/README.md` — how the consumer applies the processor to controller errors and decides ack/nack/reject. - `doc/rfc/submitqueue/workflow.md` — the per-stage primary pipeline that the DLQ companions mirror. diff --git a/submitqueue/orchestrator/controller/dlq/batch.go b/submitqueue/orchestrator/controller/dlq/batch.go index a35a4914..4d0a3f73 100644 --- a/submitqueue/orchestrator/controller/dlq/batch.go +++ b/submitqueue/orchestrator/controller/dlq/batch.go @@ -19,8 +19,8 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" - "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" diff --git a/submitqueue/orchestrator/controller/dlq/batch_test.go b/submitqueue/orchestrator/controller/dlq/batch_test.go index 45348c04..db55ed6f 100644 --- a/submitqueue/orchestrator/controller/dlq/batch_test.go +++ b/submitqueue/orchestrator/controller/dlq/batch_test.go @@ -20,7 +20,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" @@ -31,7 +32,7 @@ func TestDLQBatchController_InterfaceAndAccessors(t *testing.T) { ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) - c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq") + c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(topickey.TopicKeyMerge), "orchestrator-merge-dlq") assert.Equal(t, "merge_dlq", c.Name()) assert.Equal(t, consumer.TopicKey("merge_dlq"), c.TopicKey()) @@ -62,7 +63,7 @@ func TestDLQBatchController_Process_FailsAndFansOut(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() - c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq") + c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(topickey.TopicKeyMerge), "orchestrator-merge-dlq") payload, err := entity.BatchID{ID: "q/batch/9"}.ToBytes() require.NoError(t, err) @@ -75,7 +76,7 @@ func TestDLQBatchController_Process_MalformedPayloadFails(t *testing.T) { ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) - c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq") + c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(topickey.TopicKeyMerge), "orchestrator-merge-dlq") delivery := newMockDelivery(ctrl, []byte("garbage")) err := c.Process(context.Background(), delivery) @@ -86,7 +87,7 @@ func TestDLQBatchController_Process_EmptyIDFails(t *testing.T) { ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) - c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq") + c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(topickey.TopicKeyMerge), "orchestrator-merge-dlq") payload, err := entity.BatchID{ID: ""}.ToBytes() require.NoError(t, err) diff --git a/submitqueue/orchestrator/controller/dlq/buildsignal.go b/submitqueue/orchestrator/controller/dlq/buildsignal.go index 9d10bf70..efbd2fde 100644 --- a/submitqueue/orchestrator/controller/dlq/buildsignal.go +++ b/submitqueue/orchestrator/controller/dlq/buildsignal.go @@ -20,8 +20,8 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" - "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" diff --git a/submitqueue/orchestrator/controller/dlq/buildsignal_test.go b/submitqueue/orchestrator/controller/dlq/buildsignal_test.go index 7ce4de3d..329f35b5 100644 --- a/submitqueue/orchestrator/controller/dlq/buildsignal_test.go +++ b/submitqueue/orchestrator/controller/dlq/buildsignal_test.go @@ -20,7 +20,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" @@ -32,7 +33,7 @@ func TestDLQBuildSignalController_InterfaceAndAccessors(t *testing.T) { ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) - c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") assert.Equal(t, "buildsignal_dlq", c.Name()) assert.Equal(t, consumer.TopicKey("buildsignal_dlq"), c.TopicKey()) @@ -69,7 +70,7 @@ func TestDLQBuildSignalController_Process_FansOutToBatch(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() - c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") payload, err := entity.BuildID{ID: "build-1"}.ToBytes() require.NoError(t, err) @@ -87,7 +88,7 @@ func TestDLQBuildSignalController_Process_BuildNotFoundIsNoOp(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() - c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") payload, err := entity.BuildID{ID: "build-1"}.ToBytes() require.NoError(t, err) @@ -107,7 +108,7 @@ func TestDLQBuildSignalController_Process_BuildMissingBatchIsNoOp(t *testing.T) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() - c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") payload, err := entity.BuildID{ID: "build-1"}.ToBytes() require.NoError(t, err) @@ -120,7 +121,7 @@ func TestDLQBuildSignalController_Process_MalformedPayloadFails(t *testing.T) { ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) - c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") delivery := newMockDelivery(ctrl, []byte("garbage")) err := c.Process(context.Background(), delivery) diff --git a/submitqueue/orchestrator/controller/dlq/dlq.go b/submitqueue/orchestrator/controller/dlq/dlq.go index 6d48434c..77570ca4 100644 --- a/submitqueue/orchestrator/controller/dlq/dlq.go +++ b/submitqueue/orchestrator/controller/dlq/dlq.go @@ -38,7 +38,7 @@ import ( "errors" "fmt" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" diff --git a/submitqueue/orchestrator/controller/dlq/log.go b/submitqueue/orchestrator/controller/dlq/log.go index 7b432015..74983a35 100644 --- a/submitqueue/orchestrator/controller/dlq/log.go +++ b/submitqueue/orchestrator/controller/dlq/log.go @@ -18,8 +18,8 @@ import ( "context" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" - "github.com/uber/submitqueue/submitqueue/core/consumer" "go.uber.org/zap" ) diff --git a/submitqueue/orchestrator/controller/dlq/log_test.go b/submitqueue/orchestrator/controller/dlq/log_test.go index cfb40ab4..072b069c 100644 --- a/submitqueue/orchestrator/controller/dlq/log_test.go +++ b/submitqueue/orchestrator/controller/dlq/log_test.go @@ -20,13 +20,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) func TestDLQLogController_InterfaceAndAccessors(t *testing.T) { - c := NewDLQLogController(zaptest.NewLogger(t).Sugar(), testScope(), TopicKey(consumer.TopicKeyLog), "orchestrator-log-dlq") + c := NewDLQLogController(zaptest.NewLogger(t).Sugar(), testScope(), TopicKey(topickey.TopicKeyLog), "orchestrator-log-dlq") assert.Equal(t, "log_dlq", c.Name()) assert.Equal(t, consumer.TopicKey("log_dlq"), c.TopicKey()) @@ -36,7 +37,7 @@ func TestDLQLogController_InterfaceAndAccessors(t *testing.T) { func TestDLQLogController_Process_AcksUnconditionally(t *testing.T) { ctrl := gomock.NewController(t) - c := NewDLQLogController(zaptest.NewLogger(t).Sugar(), testScope(), TopicKey(consumer.TopicKeyLog), "orchestrator-log-dlq") + c := NewDLQLogController(zaptest.NewLogger(t).Sugar(), testScope(), TopicKey(topickey.TopicKeyLog), "orchestrator-log-dlq") delivery := newMockDelivery(ctrl, []byte("anything goes")) require.NoError(t, c.Process(context.Background(), delivery)) diff --git a/submitqueue/orchestrator/controller/dlq/request.go b/submitqueue/orchestrator/controller/dlq/request.go index 646c61eb..cf96064a 100644 --- a/submitqueue/orchestrator/controller/dlq/request.go +++ b/submitqueue/orchestrator/controller/dlq/request.go @@ -19,8 +19,8 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" - "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" diff --git a/submitqueue/orchestrator/controller/dlq/request_test.go b/submitqueue/orchestrator/controller/dlq/request_test.go index b0dfdc4e..1786afa9 100644 --- a/submitqueue/orchestrator/controller/dlq/request_test.go +++ b/submitqueue/orchestrator/controller/dlq/request_test.go @@ -20,9 +20,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/core/consumer" queue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" @@ -33,7 +34,7 @@ func TestDLQRequestController_InterfaceAndAccessors(t *testing.T) { ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) - c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq") + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(topickey.TopicKeyValidate), "orchestrator-validate-dlq") assert.Equal(t, "validate_dlq", c.Name()) assert.Equal(t, consumer.TopicKey("validate_dlq"), c.TopicKey()) @@ -56,7 +57,7 @@ func TestDLQRequestController_Process_LandRequestPayload(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() - c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeLandRequestID, TopicKey(consumer.TopicKeyStart), "orchestrator-start-dlq") + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeLandRequestID, TopicKey(topickey.TopicKeyStart), "orchestrator-start-dlq") payload, err := entity.LandRequest{ID: "q/1", Queue: "q"}.ToBytes() require.NoError(t, err) @@ -81,7 +82,7 @@ func TestDLQRequestController_Process_CancelRequestPayload(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() - c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeCancelRequestID, TopicKey(consumer.TopicKeyCancel), "orchestrator-cancel-dlq") + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeCancelRequestID, TopicKey(topickey.TopicKeyCancel), "orchestrator-cancel-dlq") payload, err := entity.CancelRequest{ID: "q/7", Reason: "user"}.ToBytes() require.NoError(t, err) @@ -106,7 +107,7 @@ func TestDLQRequestController_Process_RequestIDPayload(t *testing.T) { store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() - c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyBatch), "orchestrator-batch-dlq") + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(topickey.TopicKeyBatch), "orchestrator-batch-dlq") payload, err := entity.RequestID{ID: "q/3"}.ToBytes() require.NoError(t, err) @@ -131,7 +132,7 @@ func TestDLQRequestController_Process_AlreadyTerminalSkipsUpdateButLogsAnyway(t store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() - c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq") + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(topickey.TopicKeyValidate), "orchestrator-validate-dlq") payload, err := entity.RequestID{ID: "q/1"}.ToBytes() require.NoError(t, err) @@ -146,7 +147,7 @@ func TestDLQRequestController_Process_MalformedPayloadFails(t *testing.T) { store := storagemock.NewMockStorage(ctrl) // no store calls expected - c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq") + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(topickey.TopicKeyValidate), "orchestrator-validate-dlq") delivery := newMockDelivery(ctrl, []byte("not json")) err := c.Process(context.Background(), delivery) @@ -159,7 +160,7 @@ func TestDLQRequestController_Process_EmptyIDFails(t *testing.T) { store := storagemock.NewMockStorage(ctrl) // no store calls expected - c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq") + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(topickey.TopicKeyValidate), "orchestrator-validate-dlq") payload, err := entity.RequestID{ID: ""}.ToBytes() require.NoError(t, err) diff --git a/submitqueue/orchestrator/controller/merge/BUILD.bazel b/submitqueue/orchestrator/controller/merge/BUILD.bazel index 52967da4..627a363a 100644 --- a/submitqueue/orchestrator/controller/merge/BUILD.bazel +++ b/submitqueue/orchestrator/controller/merge/BUILD.bazel @@ -6,9 +6,10 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/metrics", "//entity/messagequeue", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/pusher", "//submitqueue/extension/storage", @@ -22,10 +23,11 @@ go_test( srcs = ["merge_test.go"], embed = [":merge"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/pusher", "//submitqueue/extension/pusher/mock", diff --git a/submitqueue/orchestrator/controller/merge/merge.go b/submitqueue/orchestrator/controller/merge/merge.go index 31f1600c..74eca8f9 100644 --- a/submitqueue/orchestrator/controller/merge/merge.go +++ b/submitqueue/orchestrator/controller/merge/merge.go @@ -22,9 +22,10 @@ import ( "github.com/uber-go/tally" "go.uber.org/zap" + "github.com/uber/submitqueue/core/consumer" coremetrics "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" "github.com/uber/submitqueue/submitqueue/extension/storage" @@ -184,11 +185,11 @@ func (c *Controller) collectChanges(ctx context.Context, batch entity.Batch) ([] // fanout publishes the batch ID to conclude (so requests are updated) and // to speculate (so dependents can re-evaluate now that this batch is done). func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { - if err := c.publish(ctx, consumer.TopicKeyConclude, batchID, partitionKey); err != nil { + if err := c.publish(ctx, topickey.TopicKeyConclude, batchID, partitionKey); err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "publish_conclude_errors", 1) return fmt.Errorf("failed to publish to conclude: %w", err) } - if err := c.publish(ctx, consumer.TopicKeySpeculate, batchID, partitionKey); err != nil { + if err := c.publish(ctx, topickey.TopicKeySpeculate, batchID, partitionKey); err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "publish_speculate_errors", 1) return fmt.Errorf("failed to publish to speculate: %w", err) } diff --git a/submitqueue/orchestrator/controller/merge/merge_test.go b/submitqueue/orchestrator/controller/merge/merge_test.go index 7eb5578b..a4512f3f 100644 --- a/submitqueue/orchestrator/controller/merge/merge_test.go +++ b/submitqueue/orchestrator/controller/merge/merge_test.go @@ -25,10 +25,11 @@ import ( "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" pushermock "github.com/uber/submitqueue/submitqueue/extension/pusher/mock" @@ -60,8 +61,8 @@ func newRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) consum mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, }) require.NoError(t, err) return registry @@ -83,12 +84,12 @@ func TestNewController(t *testing.T) { store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) require.NotNil(t, c) - assert.Equal(t, consumer.TopicKeyMerge, c.TopicKey()) + assert.Equal(t, topickey.TopicKeyMerge, c.TopicKey()) assert.Equal(t, "orchestrator-merge", c.ConsumerGroup()) assert.Equal(t, "merge", c.Name()) var _ consumer.Controller = c @@ -146,7 +147,7 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) @@ -210,7 +211,7 @@ func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) @@ -257,7 +258,7 @@ func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) @@ -303,7 +304,7 @@ func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) @@ -344,7 +345,7 @@ func TestController_Process_TerminalBatchSkipsPushButFansOut(t *testing.T) { store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, mockPusher), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) @@ -383,8 +384,8 @@ func TestController_Process_CancellingShortCircuit(t *testing.T) { mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, }) require.NoError(t, err) @@ -394,7 +395,7 @@ func TestController_Process_CancellingShortCircuit(t *testing.T) { store, registry, newPusherFactory(ctrl, mockPusher), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) @@ -419,7 +420,7 @@ func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) @@ -458,7 +459,7 @@ func TestController_Process_RequestStoreFailurePropagates(t *testing.T) { store, newRegistry(t, ctrl, nil), newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) @@ -506,7 +507,7 @@ func TestController_Process_PublishFailureSurfaces(t *testing.T) { store, newRegistry(t, ctrl, fmt.Errorf("queue down")), newPusherFactory(ctrl, mockPusher), - consumer.TopicKeyMerge, + topickey.TopicKeyMerge, "orchestrator-merge", ) diff --git a/submitqueue/orchestrator/controller/score/BUILD.bazel b/submitqueue/orchestrator/controller/score/BUILD.bazel index c3194dc3..70eaf5da 100644 --- a/submitqueue/orchestrator/controller/score/BUILD.bazel +++ b/submitqueue/orchestrator/controller/score/BUILD.bazel @@ -6,10 +6,11 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/score", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/metrics", "//entity/messagequeue", - "//submitqueue/core/consumer", "//submitqueue/core/request", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/scorer", "//submitqueue/extension/storage", @@ -23,10 +24,11 @@ go_test( srcs = ["score_test.go"], embed = [":score"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/scorer/mock", "//submitqueue/extension/storage/mock", diff --git a/submitqueue/orchestrator/controller/score/score.go b/submitqueue/orchestrator/controller/score/score.go index 836c4449..cf7d7221 100644 --- a/submitqueue/orchestrator/controller/score/score.go +++ b/submitqueue/orchestrator/controller/score/score.go @@ -19,10 +19,11 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/scorer" "github.com/uber/submitqueue/submitqueue/extension/storage" @@ -160,14 +161,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { + if err := c.publish(ctx, topickey.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to speculate: %w", err) } c.logger.Infow("published batch to speculate", "batch_id", batch.ID, - "topic_key", consumer.TopicKeySpeculate, + "topic_key", topickey.TopicKeySpeculate, ) return nil // Success - message will be acked diff --git a/submitqueue/orchestrator/controller/score/score_test.go b/submitqueue/orchestrator/controller/score/score_test.go index a716815f..34e482fe 100644 --- a/submitqueue/orchestrator/controller/score/score_test.go +++ b/submitqueue/orchestrator/controller/score/score_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" scorermock "github.com/uber/submitqueue/submitqueue/extension/scorer/mock" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" @@ -118,9 +119,9 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: mockQ}, }, ) require.NoError(t, err) @@ -128,7 +129,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock scorerFactory := scorermock.NewMockFactory(ctrl) scorerFactory.EXPECT().For(gomock.Any()).Return(scorer, nil).AnyTimes() - return NewController(logger, scope, store, scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") + return NewController(logger, scope, store, scorerFactory, registry, topickey.TopicKeyScore, "orchestrator-score") } func TestNewController(t *testing.T) { @@ -140,7 +141,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, store, mockScorer, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyScore, controller.TopicKey()) + assert.Equal(t, topickey.TopicKeyScore, controller.TopicKey()) assert.Equal(t, "orchestrator-score", controller.ConsumerGroup()) assert.Equal(t, "score", controller.Name()) } @@ -383,16 +384,16 @@ func TestController_Process_TerminalShortCircuit(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: mockQ}, }, ) require.NoError(t, err) scorerFactory := scorermock.NewMockFactory(ctrl) scorerFactory.EXPECT().For(gomock.Any()).Return(mockScorer, nil).AnyTimes() - controller := NewController(logger, scope, mockStorage, scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") + controller := NewController(logger, scope, mockStorage, scorerFactory, registry, topickey.TopicKeyScore, "orchestrator-score") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -433,16 +434,16 @@ func TestController_Process_CancellingShortCircuit(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: mockQ}, }, ) require.NoError(t, err) scorerFactory := scorermock.NewMockFactory(ctrl) scorerFactory.EXPECT().For(gomock.Any()).Return(mockScorer, nil).AnyTimes() - controller := NewController(logger, scope, mockStorage, scorerFactory, registry, consumer.TopicKeyScore, "orchestrator-score") + controller := NewController(logger, scope, mockStorage, scorerFactory, registry, topickey.TopicKeyScore, "orchestrator-score") msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) diff --git a/submitqueue/orchestrator/controller/speculate/BUILD.bazel b/submitqueue/orchestrator/controller/speculate/BUILD.bazel index da930f19..fa1e9ae7 100644 --- a/submitqueue/orchestrator/controller/speculate/BUILD.bazel +++ b/submitqueue/orchestrator/controller/speculate/BUILD.bazel @@ -6,9 +6,10 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/metrics", "//entity/messagequeue", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", "@com_github_uber_go_tally//:tally", @@ -21,10 +22,11 @@ go_test( srcs = ["speculate_test.go"], embed = [":speculate"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", diff --git a/submitqueue/orchestrator/controller/speculate/speculate.go b/submitqueue/orchestrator/controller/speculate/speculate.go index 371f39bd..2631c42e 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate.go +++ b/submitqueue/orchestrator/controller/speculate/speculate.go @@ -20,9 +20,10 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" @@ -153,7 +154,7 @@ func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) e "speculation_chain", append(append([]string{}, batch.Dependencies...), batch.ID), ) - if err := c.publish(ctx, consumer.TopicKeyBuild, batch.ID, batch.Queue); err != nil { + if err := c.publish(ctx, topickey.TopicKeyBuild, batch.ID, batch.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to build: %w", err) } @@ -216,7 +217,7 @@ func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error return nil } - if err := c.publish(ctx, consumer.TopicKeyMerge, batch.ID, batch.Queue); err != nil { + if err := c.publish(ctx, topickey.TopicKeyMerge, batch.ID, batch.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to merge: %w", err) } @@ -249,7 +250,7 @@ func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, d return fmt.Errorf("failed to update batch %s state to failed: %w", batch.ID, err) } - if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { + if err := c.publish(ctx, topickey.TopicKeyConclude, batch.ID, batch.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to conclude: %w", err) } @@ -317,7 +318,7 @@ func (c *Controller) cancelBatch(ctx context.Context, batch entity.Batch) error return err } - if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { + if err := c.publish(ctx, topickey.TopicKeyConclude, batch.ID, batch.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to conclude: %w", err) } @@ -383,7 +384,7 @@ func (c *Controller) respeculateDependents(ctx context.Context, batch entity.Bat // reads, consumer-pool parallelism / backpressure, and the existing // state-machine dispatch in Process all argue for the publish. Revisit // if the extra message hop ever shows up as latency or cost. - if err := c.publish(ctx, consumer.TopicKeySpeculate, depID, batch.Queue); err != nil { + if err := c.publish(ctx, topickey.TopicKeySpeculate, depID, batch.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish dependent batch %s to speculate: %w", depID, err) } @@ -413,7 +414,7 @@ func (c *Controller) fetchDependencies(ctx context.Context, batch entity.Batch) // a terminal state. Used for self-healing when a previous publish was lost: // re-sending to conclude guarantees request-state reconciliation. func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { - if err := c.publish(ctx, consumer.TopicKeyConclude, batchID, partitionKey); err != nil { + if err := c.publish(ctx, topickey.TopicKeyConclude, batchID, partitionKey); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to conclude: %w", err) } diff --git a/submitqueue/orchestrator/controller/speculate/speculate_test.go b/submitqueue/orchestrator/controller/speculate/speculate_test.go index 53b7116a..6b0a6b5f 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate_test.go +++ b/submitqueue/orchestrator/controller/speculate/speculate_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" @@ -69,15 +70,15 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyBuild, Name: "build", Queue: mockQ}, - {Key: consumer.TopicKeyMerge, Name: "merge", Queue: mockQ}, - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, + {Key: topickey.TopicKeyBuild, Name: "build", Queue: mockQ}, + {Key: topickey.TopicKeyMerge, Name: "merge", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: mockQ}, }, ) require.NoError(t, err) - return NewController(logger, scope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + return NewController(logger, scope, store, registry, topickey.TopicKeySpeculate, "orchestrator-speculate") } // runProcess builds a delivery for batchID and invokes Process once. @@ -95,7 +96,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeySpeculate, controller.TopicKey()) + assert.Equal(t, topickey.TopicKeySpeculate, controller.TopicKey()) assert.Equal(t, "orchestrator-speculate", controller.ConsumerGroup()) assert.Equal(t, "speculate", controller.Name()) @@ -270,13 +271,13 @@ func TestController_Process_TerminalSelfHeals(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, }, ) require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, topickey.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) }) @@ -324,14 +325,14 @@ func TestController_Process_CancelledTerminalSelfHealsDependents(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, }, ) require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, topickey.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -392,14 +393,14 @@ func TestController_Process_CancellingTerminalFlow(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, }, ) require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, topickey.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -502,13 +503,13 @@ func TestController_Process_CancellingNoDependents(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, }, ) require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, topickey.TopicKeySpeculate, "orchestrator-speculate") require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) } @@ -542,14 +543,14 @@ func TestController_Process_CancellingTerminalCASVersionMismatch(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, }, ) require.NoError(t, err) logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + controller := NewController(logger, tally.NoopScope, store, registry, topickey.TopicKeySpeculate, "orchestrator-speculate") err = runProcess(t, ctrl, controller, batch.ID) require.Error(t, err) diff --git a/submitqueue/orchestrator/controller/start/BUILD.bazel b/submitqueue/orchestrator/controller/start/BUILD.bazel index 1f023d4a..aa15c54a 100644 --- a/submitqueue/orchestrator/controller/start/BUILD.bazel +++ b/submitqueue/orchestrator/controller/start/BUILD.bazel @@ -6,10 +6,11 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/start", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/metrics", "//entity/messagequeue", - "//submitqueue/core/consumer", "//submitqueue/core/request", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", "@com_github_uber_go_tally//:tally", @@ -22,10 +23,11 @@ go_test( srcs = ["start_test.go"], embed = [":start"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", diff --git a/submitqueue/orchestrator/controller/start/start.go b/submitqueue/orchestrator/controller/start/start.go index 39f6b784..9a3e823a 100644 --- a/submitqueue/orchestrator/controller/start/start.go +++ b/submitqueue/orchestrator/controller/start/start.go @@ -20,10 +20,11 @@ import ( "fmt" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" @@ -119,14 +120,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to publish request log: %w", err) } - if err := c.publish(ctx, consumer.TopicKeyValidate, request.ID, request.Queue); err != nil { + if err := c.publish(ctx, topickey.TopicKeyValidate, request.ID, request.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to validate: %w", err) } c.logger.Infow("published request to validate", "request_id", request.ID, - "topic_key", consumer.TopicKeyValidate, + "topic_key", topickey.TopicKeyValidate, ) return nil diff --git a/submitqueue/orchestrator/controller/start/start_test.go b/submitqueue/orchestrator/controller/start/start_test.go index 77ed8bfb..441e0360 100644 --- a/submitqueue/orchestrator/controller/start/start_test.go +++ b/submitqueue/orchestrator/controller/start/start_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" @@ -55,13 +56,13 @@ func newTestController( registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ}, - {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, + {Key: topickey.TopicKeyValidate, Name: "validate", Queue: mockQ}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: mockQ}, }, ) require.NoError(t, err) - return NewController(logger, scope, store, registry, consumer.TopicKeyStart, "orchestrator-start") + return NewController(logger, scope, store, registry, topickey.TopicKeyStart, "orchestrator-start") } // newMockStorage creates a MockStorage with a MockRequestStore that succeeds on Create. @@ -91,7 +92,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyStart, controller.TopicKey()) + assert.Equal(t, topickey.TopicKeyStart, controller.TopicKey()) assert.Equal(t, "orchestrator-start", controller.ConsumerGroup()) assert.Equal(t, "start", controller.Name()) } diff --git a/submitqueue/orchestrator/controller/validate/BUILD.bazel b/submitqueue/orchestrator/controller/validate/BUILD.bazel index be5acc8e..01eb3e98 100644 --- a/submitqueue/orchestrator/controller/validate/BUILD.bazel +++ b/submitqueue/orchestrator/controller/validate/BUILD.bazel @@ -6,10 +6,11 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/validate", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/errs", "//core/metrics", "//entity/messagequeue", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/changeprovider", "//submitqueue/extension/mergechecker", @@ -24,10 +25,11 @@ go_test( srcs = ["validate_test.go"], embed = [":validate"], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue/mock", - "//submitqueue/core/consumer", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/changeprovider/mock", "//submitqueue/extension/mergechecker", diff --git a/submitqueue/orchestrator/controller/validate/validate.go b/submitqueue/orchestrator/controller/validate/validate.go index 5a44309f..d4356e65 100644 --- a/submitqueue/orchestrator/controller/validate/validate.go +++ b/submitqueue/orchestrator/controller/validate/validate.go @@ -21,10 +21,11 @@ import ( "time" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" coremetrics "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/changeprovider" "github.com/uber/submitqueue/submitqueue/extension/mergechecker" @@ -184,14 +185,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } // Publish to batch topic - if err := c.publish(ctx, consumer.TopicKeyBatch, request.ID, request.Queue); err != nil { + if err := c.publish(ctx, topickey.TopicKeyBatch, request.ID, request.Queue); err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "publish_errors", 1) return fmt.Errorf("failed to publish to batch: %w", err) } c.logger.Infow("published request to batch", "request_id", request.ID, - "topic_key", consumer.TopicKeyBatch, + "topic_key", topickey.TopicKeyBatch, ) return nil // Success - message will be acked diff --git a/submitqueue/orchestrator/controller/validate/validate_test.go b/submitqueue/orchestrator/controller/validate/validate_test.go index 20f4804f..e3d7845b 100644 --- a/submitqueue/orchestrator/controller/validate/validate_test.go +++ b/submitqueue/orchestrator/controller/validate/validate_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" - "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" changeprovidermock "github.com/uber/submitqueue/submitqueue/extension/changeprovider/mock" "github.com/uber/submitqueue/submitqueue/extension/mergechecker" @@ -116,7 +117,7 @@ func newTestController( mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyBatch, Name: "batch", Queue: mockQ}}, + []consumer.TopicConfig{{Key: topickey.TopicKeyBatch, Name: "batch", Queue: mockQ}}, ) require.NoError(t, err) @@ -127,7 +128,7 @@ func newTestController( cpFactory := changeprovidermock.NewMockFactory(ctrl) cpFactory.EXPECT().For(gomock.Any()).Return(cp, nil).AnyTimes() - return NewController(logger, scope, store, registry, mcFactory, cpFactory, consumer.TopicKeyValidate, "orchestrator-validate") + return NewController(logger, scope, store, registry, mcFactory, cpFactory, topickey.TopicKeyValidate, "orchestrator-validate") } func TestNewController(t *testing.T) { @@ -145,7 +146,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), mc, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyValidate, controller.TopicKey()) + assert.Equal(t, topickey.TopicKeyValidate, controller.TopicKey()) assert.Equal(t, "orchestrator-validate", controller.ConsumerGroup()) assert.Equal(t, "validate", controller.Name()) } diff --git a/test/integration/submitqueue/core/consumer/BUILD.bazel b/test/integration/submitqueue/core/consumer/BUILD.bazel index a9660b53..7b123124 100644 --- a/test/integration/submitqueue/core/consumer/BUILD.bazel +++ b/test/integration/submitqueue/core/consumer/BUILD.bazel @@ -12,11 +12,11 @@ go_test( "integration", ], deps = [ + "//core/consumer", "//core/errs", "//entity/messagequeue", "//extension/messagequeue", "//extension/messagequeue/mysql", - "//submitqueue/core/consumer", "//test/testutil", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_stretchr_testify//require", diff --git a/test/integration/submitqueue/core/consumer/consumer_test.go b/test/integration/submitqueue/core/consumer/consumer_test.go index 1de83ae0..d8acfba1 100644 --- a/test/integration/submitqueue/core/consumer/consumer_test.go +++ b/test/integration/submitqueue/core/consumer/consumer_test.go @@ -14,11 +14,11 @@ import ( "github.com/uber-go/tally" "go.uber.org/zap/zaptest" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" extqueue "github.com/uber/submitqueue/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" - "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/test/testutil" ) diff --git a/test/integration/submitqueue/gateway/BUILD.bazel b/test/integration/submitqueue/gateway/BUILD.bazel index 8151d3a3..b43dae8f 100644 --- a/test/integration/submitqueue/gateway/BUILD.bazel +++ b/test/integration/submitqueue/gateway/BUILD.bazel @@ -16,9 +16,10 @@ go_test( "integration", ], deps = [ + "//core/consumer", "//extension/messagequeue/mysql", - "//submitqueue/core/consumer", "//submitqueue/core/request", + "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/gateway/protopb", "//test/testutil", diff --git a/test/integration/submitqueue/gateway/suite_test.go b/test/integration/submitqueue/gateway/suite_test.go index cc526dea..0efadd22 100644 --- a/test/integration/submitqueue/gateway/suite_test.go +++ b/test/integration/submitqueue/gateway/suite_test.go @@ -36,9 +36,10 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" + "github.com/uber/submitqueue/core/consumer" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" - "github.com/uber/submitqueue/submitqueue/core/consumer" corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "github.com/uber/submitqueue/test/testutil" @@ -184,7 +185,7 @@ func (s *GatewayIntegrationSuite) TestRequestLogConsumer() { defer queue.Close() registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyLog, Name: "log", Queue: queue}, + {Key: topickey.TopicKeyLog, Name: "log", Queue: queue}, }) require.NoError(t, err, "failed to create topic registry")