Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions example/pushqueue/gateway/client/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("@rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "client_lib",
srcs = ["main.go"],
importpath = "github.com/uber/submitqueue/example/pushqueue/gateway/client",
visibility = ["//visibility:private"],
deps = [
"//pushqueue/gateway/protopb",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
],
)

go_binary(
name = "client",
embed = [":client_lib"],
visibility = ["//visibility:public"],
)
73 changes: 73 additions & 0 deletions example/pushqueue/gateway/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"
"fmt"
"os"
"time"

pb "github.com/uber/submitqueue/pushqueue/gateway/protopb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func main() {
addr := flag.String("addr", "localhost:8084", "pushqueue gateway server address")
message := flag.String("message", "", "message to send in ping request")
timeout := flag.Duration("timeout", 5*time.Second, "request timeout")
flag.Parse()

if err := run(*addr, *message, *timeout); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}

func run(addr, message string, timeout time.Duration) error {
conn, err := grpc.NewClient(
addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
defer conn.Close()

client := pb.NewPushQueueGatewayClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

req := &pb.PingRequest{
Message: message,
}

fmt.Printf("Sending ping to pushqueue gateway at %s...\n", addr)
resp, err := client.Ping(ctx, req)
if err != nil {
return fmt.Errorf("ping failed: %w", err)
}

fmt.Printf("\nResponse:\n")
fmt.Printf(" Message: %s\n", resp.Message)
fmt.Printf(" Service Name: %s\n", resp.ServiceName)
fmt.Printf(" Timestamp: %d (%s)\n", resp.Timestamp, time.Unix(resp.Timestamp, 0))
fmt.Printf(" Hostname: %s\n", resp.Hostname)

return nil
}
25 changes: 25 additions & 0 deletions example/pushqueue/gateway/server/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
load("@rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "server_lib",
srcs = ["main.go"],
importpath = "github.com/uber/submitqueue/example/pushqueue/gateway/server",
visibility = ["//visibility:private"],
deps = [
"//pushqueue/entity",
"//pushqueue/extension/landqueue",
"//pushqueue/extension/vcs",
"//pushqueue/gateway/controller",
"//pushqueue/gateway/protopb",
"@com_github_uber_go_tally//:tally",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//reflection",
"@org_uber_go_zap//:zap",
],
)

go_binary(
name = "server",
embed = [":server_lib"],
visibility = ["//visibility:public"],
)
200 changes: 200 additions & 0 deletions example/pushqueue/gateway/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"errors"
"fmt"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/uber-go/tally"
"github.com/uber/submitqueue/pushqueue/entity"
"github.com/uber/submitqueue/pushqueue/extension/landqueue"
"github.com/uber/submitqueue/pushqueue/extension/vcs"
"github.com/uber/submitqueue/pushqueue/gateway/controller"
pb "github.com/uber/submitqueue/pushqueue/gateway/protopb"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

// GatewayServer wraps the controllers and implements the gRPC service interface.
type GatewayServer struct {
pb.UnimplementedPushQueueGatewayServer
pingController *controller.PingController
landController *controller.LandController
}

// Ping delegates to the controller.
func (s *GatewayServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
return s.pingController.Ping(ctx, req)
}

// Land delegates to the controller.
func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) {
return s.landController.Land(ctx, req)
}

// CheckMergeability delegates to the controller.
func (s *GatewayServer) CheckMergeability(ctx context.Context, req *pb.CheckMergeabilityRequest) (*pb.CheckMergeabilityResponse, error) {
return s.landController.CheckMergeability(ctx, req)
}

func main() {
code := 0
if err := run(); err != nil {
if errors.Is(err, context.Canceled) {
fmt.Println("PushQueue gateway server stopped by signal")
code = 128 + int(syscall.SIGTERM)
} else {
fmt.Fprintf(os.Stderr, "PushQueue gateway server failure: %v\n", err)
code = 1
}
}
os.Exit(code)
}

func run() error {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

logger, err := zap.NewDevelopment()
if err != nil {
return fmt.Errorf("failed to create logger: %w", err)
}
defer logger.Sync()

scope := tally.NewTestScope("pushqueue_gateway", nil)
metricsStopCh := make(chan any, 1)
metricsWgDone := sync.WaitGroup{}
metricsWgDone.Add(1)
go func() {
defer metricsWgDone.Done()

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-metricsStopCh:
return
case <-ticker.C:
snapshot := scope.Snapshot()
logger.Info("metrics snapshot",
zap.Any("counters", snapshot.Counters()),
zap.Any("gauges", snapshot.Gauges()),
zap.Any("timers", snapshot.Timers()),
)
}
}
}()

defer func() {
close(metricsStopCh)
metricsWgDone.Wait()
}()

grpcServer := grpc.NewServer()

pingController := controller.NewPingController(logger.Sugar(), scope)
landController := controller.NewLandController(logger.Sugar(), scope, noopVCS{}, noopQueue{})
srv := &GatewayServer{
pingController: pingController,
landController: landController,
}
pb.RegisterPushQueueGatewayServer(grpcServer, srv)

reflection.Register(grpcServer)

port := os.Getenv("PORT")
if port == "" {
port = ":8084"
}
listener, err := net.Listen("tcp", port)
if err != nil {
return fmt.Errorf("failed to listen on port %s: %w", port, err)
}

fmt.Printf("PushQueue gateway gRPC server is running on %s\n", port)
fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.")

serverErrCh := make(chan error, 1)
go func() {
serverErrCh <- grpcServer.Serve(listener)
}()

var serverErr error
select {
case <-ctx.Done():
fmt.Println("Shutting down pushqueue gateway server due to interruption signal...")
err = ctx.Err()
grpcServer.GracefulStop()
serverErr = <-serverErrCh
case serverErr = <-serverErrCh:
fmt.Println("Shutting down pushqueue gateway server due to critical GRPC server error...")
}

if serverErr != nil {
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
}

return err
}

// noopVCS is a placeholder VCS that errors on every operation.
type noopVCS struct{}

func (noopVCS) CheckMergeability(_ context.Context, _ entity.QueueTarget, items []entity.LandItem) ([]vcs.MergeabilityResult, error) {
results := make([]vcs.MergeabilityResult, len(items))
for i := range results {
results[i] = vcs.MergeabilityResult{Mergeable: false, Reason: "noop VCS: not configured"}
}
return results, nil
}

func (noopVCS) Prepare(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) error {
return fmt.Errorf("noop VCS: not configured")
}

func (noopVCS) Push(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) (vcs.PushResult, error) {
return vcs.PushResult{}, fmt.Errorf("noop VCS: not configured")
}

func (noopVCS) Finalize(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) error {
return fmt.Errorf("noop VCS: not configured")
}

// noopQueue is a placeholder Queue that passes through immediately.
type noopQueue struct{}

var _ landqueue.Queue = noopQueue{}

func (noopQueue) Enqueue(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) error {
return nil
}

func (noopQueue) Wait(_ context.Context, _ entity.QueueTarget) error {
return nil
}

func (noopQueue) Dequeue(_ context.Context, _ entity.QueueTarget) error {
return nil
}
Loading