Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/ghalistener/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Config struct {
MetricsAddr string `json:"metrics_addr"`
MetricsEndpoint string `json:"metrics_endpoint"`
Metrics *v1alpha1.MetricsConfig `json:"metrics"`
OTelEndpoint string `json:"otel_endpoint"`
OTelInsecure bool `json:"otel_insecure"`
}

func Read(ctx context.Context, configPath string) (*Config, error) {
Expand Down
42 changes: 38 additions & 4 deletions cmd/ghalistener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/actions/actions-runner-controller/github/actions"
"github.com/actions/scaleset/listener"
"github.com/google/uuid"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -65,6 +66,27 @@ func run(ctx context.Context, config *config.Config) error {
})
}

// OTel trace recorder (optional)
var otelRecorder *metrics.OTelRecorder
if config.OTelEndpoint != "" {
opts := []otlptracehttp.Option{
otlptracehttp.WithEndpoint(config.OTelEndpoint),
}
if config.OTelInsecure {
opts = append(opts, otlptracehttp.WithInsecure())
}
otlpExporter, err := otlptracehttp.New(ctx, opts...)
if err != nil {
return fmt.Errorf("failed to create OTel exporter: %w", err)
}
otelRecorder = metrics.NewOTelRecorder(
otlpExporter,
logger.With("component", "otel recorder"),
)
defer otelRecorder.Shutdown(ctx)
logger.Info("OTel trace recorder enabled", "endpoint", config.OTelEndpoint)
}

hostname, err := os.Hostname()
if err != nil {
hostname = uuid.NewString()
Expand All @@ -91,13 +113,25 @@ func run(ctx context.Context, config *config.Config) error {
}()

var listenerOptions []listener.Option
if metricsExporter != nil {

// Build the metrics recorder: Prometheus, OTel, or both
var recorder listener.MetricsRecorder
switch {
case metricsExporter != nil && otelRecorder != nil:
recorder = metrics.NewComposite(metricsExporter, otelRecorder)
case metricsExporter != nil:
recorder = metricsExporter
case otelRecorder != nil:
recorder = otelRecorder
}

if recorder != nil {
listenerOptions = append(
listenerOptions,
listener.WithMetricsRecorder(
metricsExporter,
),
listener.WithMetricsRecorder(recorder),
)
}
if metricsExporter != nil {
metricsExporter.RecordStatic(config.MinRunners, config.MaxRunners)
}

Expand Down
44 changes: 44 additions & 0 deletions cmd/ghalistener/metrics/composite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package metrics

import (
"github.com/actions/scaleset"
"github.com/actions/scaleset/listener"
)

var _ listener.MetricsRecorder = &CompositeRecorder{}

// CompositeRecorder delegates every MetricsRecorder call to all
// wrapped recorders. This allows the Prometheus exporter and OTel
// recorder to operate side by side.
type CompositeRecorder struct {
recorders []listener.MetricsRecorder
}

// NewComposite creates a MetricsRecorder that fans out to all given recorders.
func NewComposite(recorders ...listener.MetricsRecorder) *CompositeRecorder {
return &CompositeRecorder{recorders: recorders}
}

func (c *CompositeRecorder) RecordStatistics(stats *scaleset.RunnerScaleSetStatistic) {
for _, r := range c.recorders {
r.RecordStatistics(stats)
}
}

func (c *CompositeRecorder) RecordJobStarted(msg *scaleset.JobStarted) {
for _, r := range c.recorders {
r.RecordJobStarted(msg)
}
}

func (c *CompositeRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {
for _, r := range c.recorders {
r.RecordJobCompleted(msg)
}
}

func (c *CompositeRecorder) RecordDesiredRunners(count int) {
for _, r := range c.recorders {
r.RecordDesiredRunners(count)
}
}
201 changes: 201 additions & 0 deletions cmd/ghalistener/metrics/otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package metrics

import (
"context"
"crypto/md5"
"encoding/binary"
"fmt"
"log/slog"
"strconv"
"sync"
"time"

"github.com/actions/scaleset"
"github.com/actions/scaleset/listener"
"go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)

var _ listener.MetricsRecorder = &OTelRecorder{}

// OTelRecorder implements listener.MetricsRecorder by emitting
// OpenTelemetry trace spans for each completed job. Three child spans
// are created under the job's parent span using deterministic IDs:
//
// - runner.queue: QueueTime → ScaleSetAssignTime
// - runner.startup: ScaleSetAssignTime → RunnerAssignTime
// - runner.execution: RunnerAssignTime → FinishTime
//
// The deterministic ID scheme (MD5 of runID-attempt for trace ID,
// big-endian int64 for span ID) is compatible with otel-explorer's
// GitHub Actions trace view, allowing ARC spans to merge into existing
// workflow traces with zero correlation configuration.
type OTelRecorder struct {
mu sync.Mutex
exporter sdktrace.SpanExporter
logger *slog.Logger

// runAttempt defaults to 1. ARC messages don't include
// run_attempt; override via SetRunAttempt if determinable.
runAttempt int64
}

// NewOTelRecorder creates a recorder that exports job lifecycle spans
// via the given SpanExporter.
func NewOTelRecorder(exporter sdktrace.SpanExporter, logger *slog.Logger) *OTelRecorder {
if logger == nil {
logger = slog.New(slog.DiscardHandler)
}
return &OTelRecorder{
exporter: exporter,
logger: logger,
runAttempt: 1,
}
}

// SetRunAttempt overrides the default run attempt (1) used for trace
// ID generation.
func (r *OTelRecorder) SetRunAttempt(attempt int64) {
r.mu.Lock()
defer r.mu.Unlock()
r.runAttempt = attempt
}

// Shutdown flushes pending spans and releases exporter resources.
func (r *OTelRecorder) Shutdown(ctx context.Context) error {
return r.exporter.Shutdown(ctx)
}

func (r *OTelRecorder) RecordJobStarted(_ *scaleset.JobStarted) {}

func (r *OTelRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {
r.mu.Lock()
attempt := r.runAttempt
r.mu.Unlock()

spans := buildJobSpans(msg, attempt)
if len(spans) == 0 {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := r.exporter.ExportSpans(ctx, spans); err != nil {
r.logger.Warn("failed to export OTel spans", "error", err)
}
}

func (r *OTelRecorder) RecordStatistics(_ *scaleset.RunnerScaleSetStatistic) {}
func (r *OTelRecorder) RecordDesiredRunners(_ int) {}

func buildJobSpans(msg *scaleset.JobCompleted, attempt int64) []sdktrace.ReadOnlySpan {
traceID := newTraceID(msg.WorkflowRunID, attempt)
parentSpanID := toSpanID(msg.JobID)
parentSC := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: parentSpanID,
TraceFlags: trace.FlagsSampled,
})

commonAttrs := []attribute.KeyValue{
attribute.Int64("github.run_id", msg.WorkflowRunID),
attribute.String("github.job_id", msg.JobID),
attribute.String("github.job_name", msg.JobDisplayName),
attribute.String("github.repository", msg.OwnerName+"/"+msg.RepositoryName),
attribute.String("github.runner_name", msg.RunnerName),
attribute.Int("github.runner_id", msg.RunnerID),
attribute.String("github.workflow_ref", msg.JobWorkflowRef),
attribute.String("github.event_name", msg.EventName),
}

var stubs tracetest.SpanStubs

if !msg.QueueTime.IsZero() && !msg.ScaleSetAssignTime.IsZero() {
stubs = append(stubs, newStub(
traceID, parentSC, "runner.queue",
msg.QueueTime, msg.ScaleSetAssignTime,
append(sliceClone(commonAttrs), attribute.String("type", "runner.queue")),
))
}

if !msg.ScaleSetAssignTime.IsZero() && !msg.RunnerAssignTime.IsZero() {
stubs = append(stubs, newStub(
traceID, parentSC, "runner.startup",
msg.ScaleSetAssignTime, msg.RunnerAssignTime,
append(sliceClone(commonAttrs), attribute.String("type", "runner.startup")),
))
}

if !msg.RunnerAssignTime.IsZero() && !msg.FinishTime.IsZero() {
stubs = append(stubs, newStub(
traceID, parentSC, "runner.execution",
msg.RunnerAssignTime, msg.FinishTime,
append(sliceClone(commonAttrs),
attribute.String("type", "runner.execution"),
attribute.String("github.conclusion", msg.Result),
),
))
}

return stubs.Snapshots()
}

func newStub(
traceID trace.TraceID,
parentSC trace.SpanContext,
name string,
start, end time.Time,
attrs []attribute.KeyValue,
) tracetest.SpanStub {
spanID := newSpanIDFromString(fmt.Sprintf("%s-%s", name, parentSC.SpanID()))
return tracetest.SpanStub{
Name: name,
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
TraceFlags: trace.FlagsSampled,
}),
Parent: parentSC,
StartTime: start,
EndTime: end,
Attributes: attrs,
}
}

// Deterministic ID generation — compatible with otel-explorer's
// pkg/githubapi/ids.go scheme.

func newTraceID(runID, runAttempt int64) trace.TraceID {
if runAttempt == 0 {
runAttempt = 1
}
return trace.TraceID(md5.Sum([]byte(fmt.Sprintf("%d-%d", runID, runAttempt))))
}

func newSpanID(id int64) trace.SpanID {
var sid trace.SpanID
binary.BigEndian.PutUint64(sid[:], uint64(id))
return sid
}

func newSpanIDFromString(s string) trace.SpanID {
sum := md5.Sum([]byte(s))
var sid trace.SpanID
copy(sid[:], sum[:8])
return sid
}

func toSpanID(jobID string) trace.SpanID {
if id, err := strconv.ParseInt(jobID, 10, 64); err == nil {
return newSpanID(id)
}
return newSpanIDFromString(jobID)
}

func sliceClone(s []attribute.KeyValue) []attribute.KeyValue {
out := make([]attribute.KeyValue, len(s))
copy(out, s)
return out
}
Loading