Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ Paths follow the directory layout: shared code is top-level, domain code nests u
- Proto (generated): `github.com/uber/submitqueue/{domain}/{service}/protopb`
- Domain entities: `github.com/uber/submitqueue/{domain}/entity` (e.g. `.../submitqueue/entity`)
- Domain extensions: `github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}]` (e.g. `.../submitqueue/extension/storage/mysql`)
- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/consumer`, `.../submitqueue/core/request`)
- Cross-domain consumer framework: `github.com/uber/submitqueue/core/consumer`; domain pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey`
- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/request`)
- Shared entities: `github.com/uber/submitqueue/entity/{name}` (e.g. `.../entity/messagequeue`)
- Shared extensions: `github.com/uber/submitqueue/extension/{name}` (e.g. `.../extension/messagequeue`)
- Cross-domain infra: `github.com/uber/submitqueue/core/{pkg}` (e.g. `.../core/errs`, `.../core/metrics`)
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ local-stovepipe-gateway-start: build-stovepipe-gateway-linux ## Start Stovepipe

mocks: ## Generate mock files using mockgen
@echo "Generating mocks..."
@$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/...
@$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./core/consumer/...
@echo "Mocks generated successfully!"

proto: ## Generate protobuf files from .proto definitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go_library(
"controller.go",
"registry.go",
],
importpath = "github.com/uber/submitqueue/submitqueue/core/consumer",
importpath = "github.com/uber/submitqueue/core/consumer",
visibility = ["//visibility:public"],
deps = [
"//core/errs",
Expand All @@ -27,11 +27,12 @@ go_test(
],
deps = [
":consumer",
"//core/consumer/mock",
"//core/errs",
"//entity/messagequeue",
"//extension/messagequeue",
"//extension/messagequeue/mock",
"//submitqueue/core/consumer/mock",
"//submitqueue/core/topickey",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The top-level orchestrator. Register controllers, start consuming, and stop grac

```go
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
{Key: consumer.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig},
{Key: topickey.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig},
})

c := consumer.New(logger, scope, registry,
Expand Down Expand Up @@ -71,21 +71,21 @@ The `TopicRegistry` maps topic keys to queue backends, topic names, and subscrip
```go
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Key: consumer.TopicKeyStart,
Key: topickey.TopicKeyStart,
Name: "request",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig("worker-1", "orchestrator"),
},
{
Key: consumer.TopicKeyBuild,
Key: topickey.TopicKeyBuild,
Name: "build",
Queue: q,
// No Subscription — publish-only topic
},
})
```

**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyStart`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions.
**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyStart`, `TopicKeyBuild`). Constants live in each domain's `core/topickey` package; this package defines only the `TopicKey` type and registry machinery. The actual queue topic name is configured separately, so library consumers can use their own naming conventions.

## Error Handling

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"github.com/uber/submitqueue/core/consumer"
consumermock "github.com/uber/submitqueue/core/consumer/mock"
"github.com/uber/submitqueue/core/errs"
entityqueue "github.com/uber/submitqueue/entity/messagequeue"
extqueue "github.com/uber/submitqueue/extension/messagequeue"
queuemock "github.com/uber/submitqueue/extension/messagequeue/mock"
"github.com/uber/submitqueue/submitqueue/core/consumer"
consumermock "github.com/uber/submitqueue/submitqueue/core/consumer/mock"
"github.com/uber/submitqueue/submitqueue/core/topickey"
"go.uber.org/mock/gomock"
"go.uber.org/zap/zaptest"
)
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestConsumer_Register(t *testing.T) {
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

handler1 := consumermock.NewMockController(ctrl)
setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil)
setupController(handler1, "handler1", topickey.TopicKeyStart, "group1", nil)

handler2 := consumermock.NewMockController(ctrl)
setupController(handler2, "handler2", consumer.TopicKey("other-topic"), "group2", nil)
Expand All @@ -123,10 +124,10 @@ func TestConsumer_Register_DuplicateTopic(t *testing.T) {
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

handler1 := consumermock.NewMockController(ctrl)
setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil)
setupController(handler1, "handler1", topickey.TopicKeyStart, "group1", nil)

handler2 := consumermock.NewMockController(ctrl)
setupController(handler2, "handler2", consumer.TopicKeyStart, "group2", nil)
setupController(handler2, "handler2", topickey.TopicKeyStart, "group2", nil)

err := c.Register(handler1)
require.NoError(t, err)
Expand All @@ -146,7 +147,7 @@ func TestConsumer_Register_AfterStop(t *testing.T) {
require.NoError(t, err)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil)
setupController(handler, "handler1", topickey.TopicKeyStart, "group1", nil)

err = c.Register(handler)
assert.Error(t, err)
Expand All @@ -170,7 +171,7 @@ func TestConsumer_Start_AfterStop(t *testing.T) {
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

handler := consumermock.NewMockController(ctrl)
setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil)
setupController(handler, "handler1", topickey.TopicKeyStart, "group1", nil)

err := c.Register(handler)
require.NoError(t, err)
Expand All @@ -189,14 +190,14 @@ func TestConsumer_Start_MissingSubscriptionConfig(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
// Registry has queue but no subscription config
reg, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{{Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ}},
[]consumer.TopicConfig{{Key: topickey.TopicKeyStart, Name: "request", Queue: mockQ}},
)
require.NoError(t, err)

c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

handler := consumermock.NewMockController(ctrl)
setupController(handler, "handler", consumer.TopicKeyStart, "group", nil)
setupController(handler, "handler", topickey.TopicKeyStart, "group", nil)

err = c.Register(handler)
require.NoError(t, err)
Expand All @@ -217,12 +218,12 @@ func TestConsumer_Start_SubscribeFailure(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "group")

c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

handler := consumermock.NewMockController(ctrl)
setupController(handler, "handler", consumer.TopicKeyStart, "group", nil)
setupController(handler, "handler", topickey.TopicKeyStart, "group", nil)

err := c.Register(handler)
require.NoError(t, err)
Expand All @@ -243,13 +244,13 @@ func TestConsumer_ProcessDelivery_Success(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

handledMsg := ""
handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
handledMsg = delivery.Message().ID
return nil
Expand Down Expand Up @@ -289,12 +290,12 @@ func TestConsumer_ProcessDelivery_Error(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
return errs.NewRetryableError(fmt.Errorf("processing failed"))
},
Expand Down Expand Up @@ -331,12 +332,12 @@ func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
return fmt.Errorf("bad payload")
},
Expand Down Expand Up @@ -382,12 +383,12 @@ func TestConsumer_Stop(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", nil)
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", nil)

err := c.Register(handler)
require.NoError(t, err)
Expand Down Expand Up @@ -440,12 +441,12 @@ func TestConsumer_ObservabilityTags(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

testC := consumer.New(logger, testScope, reg, errs.NewClassifierProcessor())

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
return tt.handlerError
},
Expand Down Expand Up @@ -515,12 +516,12 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

c := consumer.New(logger, scope, reg, errs.NewClassifierProcessor())

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error { return nil },
)

Expand Down Expand Up @@ -560,12 +561,12 @@ func TestConsumer_ErrorMetrics(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

c := consumer.New(logger, scope, reg, errs.NewClassifierProcessor())

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
return errs.NewRetryableError(fmt.Errorf("processing failed"))
},
Expand Down Expand Up @@ -616,7 +617,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

Expand All @@ -626,7 +627,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) {
var partBProcessed atomic.Bool

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
pk := delivery.Message().PartitionKey
if pk == "partition-a" {
Expand Down Expand Up @@ -701,7 +702,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

Expand All @@ -712,7 +713,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) {
allDone := make(chan struct{})

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
mu.Lock()
order = append(order, delivery.Message().ID)
Expand Down Expand Up @@ -770,14 +771,14 @@ func TestConsumer_PartitionWorkerCleanup(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())

processedCount := int64(0)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
atomic.AddInt64(&processedCount, 1)
return nil
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "mock",
srcs = ["controller_mock.go"],
importpath = "github.com/uber/submitqueue/submitqueue/core/consumer/mock",
importpath = "github.com/uber/submitqueue/core/consumer/mock",
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//entity/messagequeue",
"//submitqueue/core/consumer",
"@org_uber_go_mock//gomock",
],
)

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading