diff --git a/stovepipe/core/filter/BUILD.bazel b/stovepipe/core/filter/BUILD.bazel new file mode 100644 index 00000000..454754b5 --- /dev/null +++ b/stovepipe/core/filter/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "filter", + srcs = ["filter.go"], + importpath = "github.com/uber/submitqueue/stovepipe/core/filter", + visibility = ["//visibility:public"], + deps = ["//stovepipe/entity"], +) diff --git a/stovepipe/entity/entity.go b/stovepipe/core/filter/filter.go similarity index 50% rename from stovepipe/entity/entity.go rename to stovepipe/core/filter/filter.go index 15ac8680..4130bb5a 100644 --- a/stovepipe/entity/entity.go +++ b/stovepipe/core/filter/filter.go @@ -12,5 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package entity holds Stovepipe-specific domain types (distinct from shared repo entity/). -package entity +// Package filter implements a filter for commit events. +package filter + +import ( + "strings" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// Config controls which VCS URIs are watched. +// WatchedURIPrefixes is a list of URI prefixes to match against ChangeEvent.URI. +// Example: "git://github.com/uber/go-code/refs/heads/main" +// watches all commits on the main branch of uber/go-code. +func ShouldProcess(event entity.ChangeEvent, watchedPrefixes []string) bool { + for _, prefix := range watchedPrefixes { + if strings.HasPrefix(event.URI, prefix) { + return true + } + } + return false +} diff --git a/stovepipe/entity/BUILD.bazel b/stovepipe/entity/BUILD.bazel index c179fb43..96707407 100644 --- a/stovepipe/entity/BUILD.bazel +++ b/stovepipe/entity/BUILD.bazel @@ -2,7 +2,12 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "entity", - srcs = ["entity.go"], + srcs = [ + "change_event.go", + "change_uri.go", + "commit.go", + ], importpath = "github.com/uber/submitqueue/stovepipe/entity", visibility = ["//visibility:public"], + deps = ["//stovepipe/entity/git"], ) diff --git a/stovepipe/entity/change_event.go b/stovepipe/entity/change_event.go new file mode 100644 index 00000000..46a664cf --- /dev/null +++ b/stovepipe/entity/change_event.go @@ -0,0 +1,59 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import ( + "encoding/json" + "fmt" + + entitygit "github.com/uber/submitqueue/stovepipe/entity/git" +) + +// ChangeEvent represents a single new trunk change entering the pipeline, published to the +// start topic. A trunk change is one commit, so the event carries one git-backed URI. It is +// source-agnostic: both the webhook and the reconciliation poller emit it. Additional fields +// (e.g. source, committer time) can be added later as ingestion needs them. +type ChangeEvent struct { + // URI identifies the commit that entered the pipeline (git://owner/repo/branch/revision). + URI string `json:"uri"` +} + +// ToBytes serializes the ChangeEvent to JSON bytes for queue message payload. +func (e ChangeEvent) ToBytes() ([]byte, error) { + return json.Marshal(e) +} + +// Validate checks that the change event carries a valid git-backed commit URI. +func (e ChangeEvent) Validate() error { + if e.URI == "" { + return fmt.Errorf("change event requires a commit URI") + } + if _, err := entitygit.ParseChangeID(e.URI); err != nil { + return fmt.Errorf("change event URI: %w", err) + } + return nil +} + +// ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes. +func ChangeEventFromBytes(data []byte) (ChangeEvent, error) { + var event ChangeEvent + if err := json.Unmarshal(data, &event); err != nil { + return ChangeEvent{}, err + } + if err := event.Validate(); err != nil { + return ChangeEvent{}, err + } + return event, nil +} diff --git a/stovepipe/entity/change_uri.go b/stovepipe/entity/change_uri.go new file mode 100644 index 00000000..7abc45f3 --- /dev/null +++ b/stovepipe/entity/change_uri.go @@ -0,0 +1,36 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import "encoding/json" + +// ChangeURI is the lightweight reference passed between pipeline stages. It +// carries only the change identity; stages re-resolve any state they need. +type ChangeURI struct { + // URI is the change identity (git://owner/repo/branch/revision). + URI string `json:"uri"` +} + +// ToBytes serializes the ChangeURI to JSON bytes for a queue message payload. +func (c ChangeURI) ToBytes() ([]byte, error) { + return json.Marshal(c) +} + +// ChangeURIFromBytes deserializes a ChangeURI from JSON bytes. +func ChangeURIFromBytes(data []byte) (ChangeURI, error) { + var ref ChangeURI + err := json.Unmarshal(data, &ref) + return ref, err +} diff --git a/stovepipe/entity/commit.go b/stovepipe/entity/commit.go new file mode 100644 index 00000000..fe734cdb --- /dev/null +++ b/stovepipe/entity/commit.go @@ -0,0 +1,45 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// CommitStatus is the validation state of a trunk commit as determined by Stovepipe. +type CommitStatus string + +const ( + // CommitStatusUnknown is the default state when a commit is first ingested. + // The commit has landed on main but has not yet been validated. + CommitStatusUnknown CommitStatus = "" + // CommitStatusSucceeded means the relevant targets build and test successfully at this commit. + CommitStatusSucceeded CommitStatus = "succeeded" + // CommitStatusFailed means a target is broken at this commit; it is the offending change. + CommitStatusFailed CommitStatus = "failed" +) + +// IsCommitStatusTerminal returns true if the status is a final, irreversible state. +func IsCommitStatusTerminal(s CommitStatus) bool { + return s == CommitStatusSucceeded || s == CommitStatusFailed +} + +// Commit is a trunk commit tracked by Stovepipe's gateway. +type Commit struct { + // URI is the canonical change identity from the originating ChangeEvent. + URI string + // Status is the current validation state of this commit. + Status CommitStatus + // CreatedAt is the time this commit was first recorded, in milliseconds since epoch. + CreatedAt int64 + // UpdatedAt is the time this commit was last updated, in milliseconds since epoch. + UpdatedAt int64 +} diff --git a/stovepipe/extension/BUILD.bazel b/stovepipe/extension/BUILD.bazel index bbf4f83b..ac032b39 100644 --- a/stovepipe/extension/BUILD.bazel +++ b/stovepipe/extension/BUILD.bazel @@ -5,4 +5,5 @@ go_library( srcs = ["extension.go"], importpath = "github.com/uber/submitqueue/stovepipe/extension", visibility = ["//visibility:public"], + deps = ["//stovepipe/entity"], ) diff --git a/stovepipe/extension/changeingester/BUILD.bazel b/stovepipe/extension/changeingester/BUILD.bazel new file mode 100644 index 00000000..4575d4b3 --- /dev/null +++ b/stovepipe/extension/changeingester/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "changeingester", + srcs = ["logging.go"], + importpath = "github.com/uber/submitqueue/stovepipe/extension/changeingester", + visibility = ["//visibility:public"], + deps = [ + "//stovepipe/entity", + "//stovepipe/extension", + "@org_uber_go_zap//:zap", + ], +) diff --git a/stovepipe/extension/changeingester/logging.go b/stovepipe/extension/changeingester/logging.go new file mode 100644 index 00000000..88e566b1 --- /dev/null +++ b/stovepipe/extension/changeingester/logging.go @@ -0,0 +1,43 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package changeingester provides ChangeIngester implementations. +package changeingester + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension" + "go.uber.org/zap" +) + +// LoggingHandler is a stub ChangeHandler that logs received changes. +// Replace with real persistence logic once DB schema is ready. +type LoggingHandler struct { + logger *zap.Logger +} + +// New constructs a new LoggingHandler. +// The return type enforces interface compliance at compile time. +func New(logger *zap.Logger) extension.ChangeHandler { + return LoggingHandler{logger: logger} +} + +func (h LoggingHandler) IngestChange(ctx context.Context, event entity.ChangeEvent) error { + h.logger.Info("ingested change", + zap.String("uri", event.URI), + ) + return nil +} diff --git a/stovepipe/extension/extension.go b/stovepipe/extension/extension.go index 92d4bcea..f4de9644 100644 --- a/stovepipe/extension/extension.go +++ b/stovepipe/extension/extension.go @@ -14,3 +14,21 @@ // Package extension holds Stovepipe-specific extension implementations. package extension + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// ChangeIngester subscribes to change events from a VCS source +// and dispatches them for processing. The source and VCS are +// implementation details left to the injected backend. +type ChangeIngester interface { + Start(ctx context.Context) error +} + +// ChangeHandler processes a single change received from the ingester. +type ChangeHandler interface { + IngestChange(ctx context.Context, event entity.ChangeEvent) error +}