Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions example/stovepipe/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,17 @@ go_library(
importpath = "github.com/uber/submitqueue/example/stovepipe/orchestrator/server",
visibility = ["//visibility:private"],
deps = [
"//core/consumer",
"//core/errs",
"//core/errs/generic",
"//core/errs/mysql",
"//extension/messagequeue",
"//extension/messagequeue/mysql",
"//stovepipe/core/topickey",
"//stovepipe/orchestrator/controller",
"//stovepipe/orchestrator/controller/start",
"//stovepipe/orchestrator/protopb",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally//:tally",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//reflection",
Expand Down
89 changes: 88 additions & 1 deletion example/stovepipe/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"database/sql"
"errors"
"fmt"
"net"
Expand All @@ -25,8 +26,17 @@ import (
"syscall"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
genericerrs "github.com/uber/submitqueue/core/errs/generic"
mysqlerrs "github.com/uber/submitqueue/core/errs/mysql"
extqueue "github.com/uber/submitqueue/extension/messagequeue"
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
"github.com/uber/submitqueue/stovepipe/core/topickey"
"github.com/uber/submitqueue/stovepipe/orchestrator/controller"
"github.com/uber/submitqueue/stovepipe/orchestrator/controller/start"
pb "github.com/uber/submitqueue/stovepipe/orchestrator/protopb"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -102,6 +112,58 @@ func run() error {
metricsWgDone.Wait()
}()

queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
if queueDSN == "" {
return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required")
}
queueDB, err := sql.Open("mysql", queueDSN)
if err != nil {
return fmt.Errorf("failed to open queue database: %w", err)
}
defer queueDB.Close()

mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{
DB: queueDB,
Logger: logger,
MetricsScope: scope.SubScope("queue"),
})
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}
defer mysqlQueue.Close()

logger.Info("initialized queue", zap.String("dsn", queueDSN))

subscriberName := os.Getenv("HOSTNAME")
if subscriberName == "" {
subscriberName = fmt.Sprintf("stovepipe-orchestrator-%d", time.Now().Unix())
}

registry, err := newTopicRegistry(mysqlQueue, subscriberName)
if err != nil {
return fmt.Errorf("failed to create topic registry: %w", err)
}

primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry,
errs.NewClassifierProcessor(
genericerrs.Classifier,
mysqlerrs.Classifier,
),
)

startController := start.NewController(
logger.Sugar(), scope, registry, topickey.TopicKeyStart, "orchestrator-start",
)
if err := primaryConsumer.Register(startController); err != nil {
return fmt.Errorf("failed to register start controller: %w", err)
}
logger.Info("controllers registered", zap.Int("primary", 1))

if err := primaryConsumer.Start(ctx); err != nil {
return fmt.Errorf("failed to start primary consumer: %w", err)
}
logger.Info("consumer started")

grpcServer := grpc.NewServer()

pingController := controller.NewPingController(logger, scope)
Expand Down Expand Up @@ -140,11 +202,36 @@ func run() error {
serverErr = <-serverErrCh
case serverErr = <-serverErrCh:
fmt.Println("Shutting down stovepipe orchestrator server due to critical GRPC server error...")
cancel()
}

if serverErr != nil {
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr)
}

primaryStopErr := primaryConsumer.Stop(30000)
if primaryStopErr != nil {
primaryStopErr = fmt.Errorf("failed to stop consumer: %w", primaryStopErr)
}

if primaryStopErr != nil || serverErr != nil {
err = errors.Join(primaryStopErr, serverErr)
}

return err
}

func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) {
return consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Key: topickey.TopicKeyStart,
Name: "start",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "orchestrator-start",
),
},
// Publish-only until the validate controller lands.
{Key: topickey.TopicKeyValidate, Name: "validate", Queue: q},
})
}
2 changes: 1 addition & 1 deletion stovepipe/core/topickey/topickey.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import "github.com/uber/submitqueue/core/consumer"
type TopicKey = consumer.TopicKey

const (
// TopicKeyStart is the pipeline stage where trunk push events arrive from the gateway.
// TopicKeyStart is the pipeline stage where trunk change events arrive from the gateway.
TopicKeyStart TopicKey = "start"
// TopicKeyValidate is the pipeline stage where commits are published for metadata resolution.
TopicKeyValidate TopicKey = "validate"
Expand Down
22 changes: 20 additions & 2 deletions stovepipe/entity/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
load("@rules_go//go:def.bzl", "go_library")
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "entity",
srcs = ["entity.go"],
srcs = [
"change_event.go",
"change_uri.go",
"entity.go",
],
importpath = "github.com/uber/submitqueue/stovepipe/entity",
visibility = ["//visibility:public"],
deps = ["//stovepipe/entity/git"],
)

go_test(
name = "entity_test",
srcs = [
"change_event_test.go",
"change_uri_test.go",
],
embed = [":entity"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
59 changes: 59 additions & 0 deletions stovepipe/entity/change_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

import (
"encoding/json"
"fmt"

entitygit "github.com/uber/submitqueue/stovepipe/entity/git"
)

// ChangeEvent represents a single new trunk change entering the pipeline, published to the
// start topic. A trunk change is one commit, so the event carries one git-backed URI. It is
// source-agnostic: both the webhook and the reconciliation poller emit it. Additional fields
// (e.g. source, committer time) can be added later as ingestion needs them.
type ChangeEvent struct {
// URI identifies the commit that entered the pipeline (git://owner/repo/branch/revision).
URI string `json:"uri"`
}

// ToBytes serializes the ChangeEvent to JSON bytes for queue message payload.
func (e ChangeEvent) ToBytes() ([]byte, error) {
return json.Marshal(e)
}

// Validate checks that the change event carries a valid git-backed commit URI.
func (e ChangeEvent) Validate() error {
if e.URI == "" {
return fmt.Errorf("change event requires a commit URI")
}
if _, err := entitygit.ParseChangeID(e.URI); err != nil {
return fmt.Errorf("change event URI: %w", err)
}
return nil
}

// ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes.
func ChangeEventFromBytes(data []byte) (ChangeEvent, error) {
var event ChangeEvent
if err := json.Unmarshal(data, &event); err != nil {
return ChangeEvent{}, err
}
if err := event.Validate(); err != nil {
return ChangeEvent{}, err
}
return event, nil
}
50 changes: 50 additions & 0 deletions stovepipe/entity/change_event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const testURI = "git://uber/monorepo/main/abcdef0123456789abcdef0123456789abcdef01"

func TestChangeEvent_Validate(t *testing.T) {
t.Run("valid URI", func(t *testing.T) {
event := ChangeEvent{URI: testURI}
require.NoError(t, event.Validate())
})

t.Run("rejects empty URI", func(t *testing.T) {
require.Error(t, ChangeEvent{}.Validate())
})

t.Run("rejects non-git URI", func(t *testing.T) {
event := ChangeEvent{URI: "github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}
require.Error(t, event.Validate())
})
}

func TestChangeEventFromBytes(t *testing.T) {
original := ChangeEvent{URI: testURI}
data, err := original.ToBytes()
require.NoError(t, err)

got, err := ChangeEventFromBytes(data)
require.NoError(t, err)
assert.Equal(t, original.URI, got.URI)
}
36 changes: 36 additions & 0 deletions stovepipe/entity/change_uri.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

import "encoding/json"

// ChangeURI is the lightweight reference passed between pipeline stages. It
// carries only the change identity; stages re-resolve any state they need.
type ChangeURI struct {
// URI is the change identity (git://owner/repo/branch/revision).
URI string `json:"uri"`
}

// ToBytes serializes the ChangeURI to JSON bytes for a queue message payload.
func (c ChangeURI) ToBytes() ([]byte, error) {
return json.Marshal(c)
}

// ChangeURIFromBytes deserializes a ChangeURI from JSON bytes.
func ChangeURIFromBytes(data []byte) (ChangeURI, error) {
var ref ChangeURI
err := json.Unmarshal(data, &ref)
return ref, err
}
32 changes: 32 additions & 0 deletions stovepipe/entity/change_uri_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestChangeURI_RoundTrip(t *testing.T) {
original := ChangeURI{URI: "git://uber/monorepo/main/abcdef0123456789abcdef0123456789abcdef01"}
data, err := original.ToBytes()
require.NoError(t, err)

got, err := ChangeURIFromBytes(data)
require.NoError(t, err)
assert.Equal(t, original, got)
}
18 changes: 18 additions & 0 deletions stovepipe/entity/git/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "git",
srcs = ["change_id.go"],
importpath = "github.com/uber/submitqueue/stovepipe/entity/git",
visibility = ["//visibility:public"],
)

go_test(
name = "git_test",
srcs = ["change_id_test.go"],
embed = [":git"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
Loading