Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 22 additions & 90 deletions app/config/webhook/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"fmt"
"net/url"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/cloudzero/cloudzero-agent/app/utils/scout"
Expand All @@ -26,27 +24,30 @@ import (

// Settings represents the configuration settings for the application.
type Settings struct {
CloudAccountID string `yaml:"cloud_account_id" env:"CLOUD_ACCOUNT_ID" env-description:"CSP account ID"`
Region string `yaml:"region" env:"CSP_REGION" env-description:"cloud service provider region"`
ClusterName string `yaml:"cluster_name" env:"CLUSTER_NAME" env-description:"name of the cluster to monitor"`
Destination string `yaml:"destination" env:"DESTINATION" env-default:"https://api.cloudzero.com/v1/container-metrics" env-description:"location to send metrics to"`
APIKeyPath string `yaml:"api_key_path" env:"API_KEY_PATH" env-description:"path to the API key file"`
Server Server `yaml:"server"`
Certificate Certificate `yaml:"certificate"`
Logging Logging `yaml:"logging"`
Database Database `yaml:"database"`
Filters Filters `yaml:"filters"`
RemoteWrite RemoteWrite `yaml:"remote_write"`
K8sClient K8sClient `yaml:"k8s_client"`
CloudAccountID string `yaml:"cloud_account_id" env:"CLOUD_ACCOUNT_ID" env-description:"CSP account ID"`
Region string `yaml:"region" env:"CSP_REGION" env-description:"cloud service provider region"`
ClusterName string `yaml:"cluster_name" env:"CLUSTER_NAME" env-description:"name of the cluster to monitor"`
Destination string `yaml:"destination" env:"DESTINATION" env-default:"https://api.cloudzero.com/v1/container-metrics" env-description:"location to send metrics to"`
Server Server `yaml:"server"`
Certificate Certificate `yaml:"certificate"`
Logging Logging `yaml:"logging"`
Database Database `yaml:"database"`
Filters Filters `yaml:"filters"`
RemoteWrite RemoteWrite `yaml:"remote_write"`
K8sClient K8sClient `yaml:"k8s_client"`

// Deprecated: removed in CP-28161 when the insights-controller stopped
// authenticating to the in-cluster aggregator. Kept as an ignored
// tombstone so legacy configs (older Helm-rendered server-config.yaml,
// or an API_KEY_PATH env var still set in a pod spec) load cleanly under
// strict YAML/env decoders. Has no effect.
APIKeyPath string `yaml:"api_key_path" env:"API_KEY_PATH" env-description:"deprecated; ignored"`

LabelMatches []regexp.Regexp
AnnotationMatches []regexp.Regexp

// control for dynamic reloading
mu sync.Mutex
}

type RemoteWrite struct {
apiKey string
Host string
MaxBytesPerSend int `yaml:"max_bytes_per_send" default:"10000000" env:"MAX_BYTES_PER_SEND" env-description:"maximum bytes to send in a single request"`
SendInterval time.Duration `yaml:"send_interval" default:"60s" env:"SEND_INTERVAL" env-description:"interval in seconds to send data"`
Expand Down Expand Up @@ -94,10 +95,6 @@ func NewSettings(configFiles ...string) (*Settings, error) {

cfg.setCompiledFilters()

if err := cfg.SetAPIKey(); err != nil {
return nil, fmt.Errorf("failed to get API key: %w", err)
}

cfg.setRemoteWriteURL()
cfg.setPolicy()

Expand All @@ -106,63 +103,11 @@ func NewSettings(configFiles ...string) (*Settings, error) {
return &cfg, nil
}

func (s *Settings) GetAPIKey() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.RemoteWrite.apiKey
}

func (s *Settings) SetAPIKey() error {
s.mu.Lock()
defer s.mu.Unlock()

apiKeyPathLocation, err := absFilePath(s.APIKeyPath)
if err != nil {
return fmt.Errorf("failed to get absolute path: %w", err)
}

if _, err = os.Stat(apiKeyPathLocation); os.IsNotExist(err) {
return fmt.Errorf("API key file %s not found: %w", apiKeyPathLocation, err)
}
apiKey, err := os.ReadFile(s.APIKeyPath)
if err != nil {
return fmt.Errorf("failed to read API key: %w", err)
}
s.RemoteWrite.apiKey = strings.TrimSpace(string(apiKey))

if len(s.RemoteWrite.apiKey) == 0 {
return errors.New("API key is empty")
}
return nil
}

func (s *Settings) setRemoteWriteURL() {
if s.Destination == "" {
s.Destination = "https://api.cloudzero.com/v1/container-metrics"
}
baseURL, err := url.Parse(s.Destination)
if err != nil {
fmt.Println("Malformed URL: ", err.Error())
return
}
params := url.Values{}
params.Add("cluster_name", s.ClusterName)
params.Add("cloud_account_id", s.CloudAccountID)
params.Add("region", s.Region)
baseURL.RawQuery = params.Encode()
url := baseURL.String()

if !isValidURL(url) {
log.Fatal().Str("url", url).Msg("URL format invalid")
}
s.RemoteWrite.Host = url
}

func isValidURL(uri string) bool {
if _, err := url.ParseRequestURI(uri); err != nil {
return false
if _, err := url.ParseRequestURI(s.Destination); err != nil {
log.Fatal().Str("url", s.Destination).Err(err).Msg("URL format invalid")
}
return true
s.RemoteWrite.Host = s.Destination
}

func (s *Settings) setPolicy() {
Expand Down Expand Up @@ -195,19 +140,6 @@ func (s *Settings) compilePatterns(patterns []string) []regexp.Regexp {
return compiledPatterns
}

func absFilePath(location string) (string, error) {
dir := filepath.Dir(filepath.Clean(location))
// validate path if not local directory
if dir == "" || strings.HasPrefix(dir, ".") {
wd, err := os.Getwd()
if err != nil {
return "", fmt.Errorf("failed to get working directory: %w", err)
}
location = filepath.Clean(filepath.Join(wd, location))
}
return location, nil
}

// Files is a custom flag type to handle multiple configuration files
type Files []string

Expand Down
28 changes: 5 additions & 23 deletions app/config/webhook/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package config

import (
"os"
"strings"
"testing"
"time"

Expand All @@ -17,7 +16,6 @@ func TestNewSettings(t *testing.T) {
t.Run("valid config file", func(t *testing.T) {
// Create a temporary config file
configContent := `
api_key_path: "/path/to/api_key"
server:
port: 8080
certificate:
Expand Down Expand Up @@ -57,21 +55,8 @@ destination: "https://api.cloudzero.com/v1/container-metrics"
_, err = tmpFileExtra.Write([]byte(configContentExtra))
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
require.NoError(t, tmpFileExtra.Close())

// Mock the API key file
apiKeyContent := "test-api-key"
apiKeyFile, err := os.CreateTemp("", "api_key-*.txt")
require.NoError(t, err)
defer os.Remove(apiKeyFile.Name())

_, err = apiKeyFile.Write([]byte(apiKeyContent))
require.NoError(t, err)
require.NoError(t, apiKeyFile.Close())

// Update the API key path in the config
configContent = strings.Replace(configContent, "/path/to/api_key", apiKeyFile.Name(), 1)
err = os.WriteFile(tmpFile.Name(), []byte(configContent), 0o644)
require.NoError(t, err)
configFiles := Files{tmpFile.Name(), tmpFileExtra.Name()}
settings, err := NewSettings(configFiles...)
require.NoError(t, err)
Expand All @@ -85,14 +70,11 @@ destination: "https://api.cloudzero.com/v1/container-metrics"
assert.NotEmpty(t, settings.Region, "Region should be set from config or Scout detection")
assert.Equal(t, "test-cluster", settings.ClusterName)
assert.Equal(t, "https://api.cloudzero.com/v1/container-metrics", settings.Destination)
assert.Equal(t, apiKeyContent, settings.GetAPIKey())

// Verify RemoteWrite.Host contains the expected base URL and cluster_name
// (cloudAccountId and region may differ due to Scout detection)
assert.Contains(t, settings.RemoteWrite.Host, "https://api.cloudzero.com/v1/container-metrics")
assert.Contains(t, settings.RemoteWrite.Host, "cluster_name=test-cluster")
assert.Contains(t, settings.RemoteWrite.Host, "cloud_account_id="+settings.CloudAccountID)
assert.Contains(t, settings.RemoteWrite.Host, "region="+settings.Region)
// RemoteWrite.Host should equal Destination verbatim — no query-string
// encoding (the aggregator doesn't read those params and we no longer
// authenticate the in-cluster hop).
assert.Equal(t, settings.Destination, settings.RemoteWrite.Host)

assert.Equal(t, 10000000, settings.RemoteWrite.MaxBytesPerSend)
assert.Equal(t, 60*time.Second, settings.RemoteWrite.SendInterval)
Expand Down
32 changes: 28 additions & 4 deletions app/domain/monitor/secret_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package monitor_test
import (
"context"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

config "github.com/cloudzero/cloudzero-agent/app/config/webhook"
"github.com/cloudzero/cloudzero-agent/app/domain/monitor"
)

Expand All @@ -28,6 +29,31 @@ func (m *MockFileMonitor) Close() {
m.Called()
}

// fileBackedAPIKey is a minimal monitor.MonitoredAPIKey that reads from a
// file on disk, used here to exercise the secrets monitor's refresh loop.
type fileBackedAPIKey struct {
path string
mu sync.Mutex
key string
}

func (f *fileBackedAPIKey) GetAPIKey() string {
f.mu.Lock()
defer f.mu.Unlock()
return f.key
}

func (f *fileBackedAPIKey) SetAPIKey() error {
data, err := os.ReadFile(f.path)
if err != nil {
return err
}
f.mu.Lock()
f.key = strings.TrimSpace(string(data))
f.mu.Unlock()
return nil
}

func TestSecretsMonitor_Start(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -44,9 +70,7 @@ func TestSecretsMonitor_Start(t *testing.T) {
_, err = file.WriteString("foo")
assert.NoError(t, err)

settings := &config.Settings{
APIKeyPath: file.Name(),
}
settings := &fileBackedAPIKey{path: file.Name()}

err = settings.SetAPIKey()
assert.NoError(t, err)
Expand Down
11 changes: 2 additions & 9 deletions app/domain/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package pusher
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/rand/v2"
Expand Down Expand Up @@ -242,18 +241,13 @@ func (h *MetricsPusher) sendBatch(batch []*types.ResourceTags) error {
}

endpoint := h.settings.RemoteWrite.Host
apiToken := h.settings.GetAPIKey()
if apiToken == "" {
RemoteWriteFailures.WithLabelValues(endpoint).Inc()
return errors.New("API key is empty")
}

ts := h.formatMetrics(batch)
log.Ctx(h.ctx).Info().
Int("recordCount", len(ts)).
Msg("Pushing records to remote write endpoint")

if err := h.pushMetrics(h.settings.RemoteWrite.Host, apiToken, ts); err != nil {
if err := h.pushMetrics(h.settings.RemoteWrite.Host, ts); err != nil {
RemoteWriteFailures.WithLabelValues(endpoint).Inc()
return fmt.Errorf("failed to push metrics to remote write: %v", err)
}
Expand Down Expand Up @@ -388,7 +382,7 @@ func (h *MetricsPusher) createTimeseries(
return ts
}

func (h *MetricsPusher) pushMetrics(remoteWriteURL string, apiKey string, timeSeries []prompb.TimeSeries) error {
func (h *MetricsPusher) pushMetrics(remoteWriteURL string, timeSeries []prompb.TimeSeries) error {
writeRequest := &prompb.WriteRequest{
Timeseries: timeSeries,
}
Expand Down Expand Up @@ -420,7 +414,6 @@ func (h *MetricsPusher) pushMetrics(remoteWriteURL string, apiKey string, timeSe

req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Authorization", "Bearer "+apiKey)

client := &http.Client{}
resp, err = client.Do(req)
Expand Down
Loading
Loading