Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 107 additions & 18 deletions cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand All @@ -24,6 +25,7 @@ import (
"github.com/evstack/apex/pkg/fetch"
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/profile"
apexs3 "github.com/evstack/apex/pkg/s3"
"github.com/evstack/apex/pkg/store"
"github.com/evstack/apex/pkg/submit"
syncer "github.com/evstack/apex/pkg/sync"
Expand Down Expand Up @@ -214,6 +216,89 @@ func setStoreMetrics(db store.Store, rec metrics.Recorder) {
}
}

func setupS3Server(cfg *config.Config, db store.Store, blobSubmitter submit.Submitter, log zerolog.Logger) (*http.Server, error) {
if !cfg.S3.Enabled {
return nil, nil
}

if cfg.S3.AccessKeyID == "" && cfg.S3.SecretAccessKey == "" {
warnLog := log.Warn().Str("addr", cfg.S3.ListenAddr)
if isLoopbackBindAddr(cfg.S3.ListenAddr) {
warnLog.Msg("S3 API authentication is disabled; restrict access to trusted local clients or configure s3.access_key_id and s3.secret_access_key")
} else {
warnLog.Msg("S3 API authentication is disabled on a non-loopback bind; configure s3.access_key_id and s3.secret_access_key or place Apex behind a trusted authenticated proxy")
}
}

var ns types.Namespace
if cfg.S3.Namespace != "" {
var err error
ns, err = types.NamespaceFromHex(cfg.S3.Namespace)
if err != nil {
return nil, fmt.Errorf("parse S3 namespace: %w", err)
}
}

sqliteDB, ok := db.(*store.SQLiteStore)
if !ok {
return nil, fmt.Errorf("S3 API requires SQLite store, got %T", db)
}

objStore := store.NewObjectStore(sqliteDB, ns)
s3Svc := apexs3.NewService(objStore, blobSubmitter, ns)
s3Srv := apexs3.NewServer(s3Svc, cfg.S3.Region, cfg.S3.AccessKeyID, cfg.S3.SecretAccessKey, log)

httpSrv := &http.Server{
Addr: cfg.S3.ListenAddr,
Handler: s3Srv,
ReadHeaderTimeout: 10 * time.Second,
}

go func() {
log.Info().Str("addr", cfg.S3.ListenAddr).Msg("S3 API server listening")
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error().Err(err).Msg("S3 API server error")
}
}()

return httpSrv, nil
}

func isLoopbackBindAddr(addr string) bool {
target := strings.TrimSpace(addr)
if target == "" {
return false
}
if strings.HasPrefix(target, "/") || strings.HasPrefix(target, "unix://") || strings.HasPrefix(target, "unix:") {
return true
}
if idx := strings.LastIndex(target, "://"); idx >= 0 {
target = target[idx+3:]
}
target = strings.TrimLeft(target, "/")
if idx := strings.LastIndex(target, "/"); idx >= 0 {
target = target[idx+1:]
}
if strings.HasPrefix(target, ":") {
return false
}

host := target
if parsedHost, _, err := net.SplitHostPort(target); err == nil {
host = parsedHost
}
host = strings.Trim(host, "[]")
if host == "" {
return false
}
if strings.EqualFold(host, "localhost") || strings.HasSuffix(strings.ToLower(host), ".localhost") {
return true
}

ip := net.ParseIP(host)
return ip != nil && ip.IsLoopback()
}

func persistNamespaces(ctx context.Context, db store.Store, namespaces []types.Namespace) error {
for _, ns := range namespaces {
if err := db.PutNamespace(ctx, ns); err != nil {
Expand Down Expand Up @@ -282,11 +367,21 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}
defer dataFetcher.Close() //nolint:errcheck

svc, notifier, closeSubmitter, err := setupAPIService(cfg, db, dataFetcher, proofFwd, rec)
blobSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return err
}
defer closeSubmitter()
if blobSubmitter != nil {
defer blobSubmitter.Close() //nolint:errcheck
}

// Setup S3 API server if enabled.
s3Srv, err := setupS3Server(cfg, db, blobSubmitter, log.Logger)
if err != nil {
return fmt.Errorf("setup S3 server: %w", err)
}

svc, notifier := setupAPIService(cfg, db, dataFetcher, proofFwd, rec, blobSubmitter)

// Build and run the sync coordinator with observer hook.
coordOpts, closeBackfill, err := buildCoordinatorOptions(cfg, notifier, rec)
Expand Down Expand Up @@ -342,7 +437,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {

err = coord.Run(ctx)

gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv)
gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv, s3Srv)

if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("coordinator: %w", err)
Expand Down Expand Up @@ -385,19 +480,7 @@ func openBlobSubmitter(cfg *config.Config) (*submit.DirectSubmitter, error) {
return blobSubmitter, nil
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder) (*api.Service, *api.Notifier, func(), error) {
blobSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return nil, nil, nil, err
}

closeSubmitter := func() {}
if blobSubmitter != nil {
closeSubmitter = func() {
_ = blobSubmitter.Close()
}
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder, blobSubmitter submit.Submitter) (*api.Service, *api.Notifier) {
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)

Expand All @@ -407,7 +490,7 @@ func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataF
}

svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger, svcOpts...)
return svc, notifier, closeSubmitter, nil
return svc, notifier
}

func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec metrics.Recorder) ([]syncer.Option, func(), error) {
Expand Down Expand Up @@ -436,7 +519,7 @@ func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec met
return coordOpts, closeBackfill, nil
}

func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) {
func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server, s3Srv *http.Server) {
stopped := make(chan struct{})
go func() {
grpcSrv.GracefulStop()
Expand All @@ -457,6 +540,12 @@ func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *me
log.Error().Err(err).Msg("JSON-RPC server shutdown error")
}

if s3Srv != nil {
if err := s3Srv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("S3 API server shutdown error")
}
}

if metricsSrv != nil {
if err := metricsSrv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("metrics server shutdown error")
Expand Down
34 changes: 34 additions & 0 deletions cmd/apex/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import "testing"

func TestIsLoopbackBindAddr(t *testing.T) {
t.Parallel()

tests := []struct {
addr string
want bool
}{
{addr: "127.0.0.1:8333", want: true},
{addr: "[::1]:8333", want: true},
{addr: "localhost:8333", want: true},
{addr: "api.localhost:8333", want: true},
{addr: "unix:///tmp/apex.sock", want: true},
{addr: "/tmp/apex.sock", want: true},
{addr: ":8333", want: false},
{addr: "0.0.0.0:8333", want: false},
{addr: "[::]:8333", want: false},
{addr: "apex.example.com:8333", want: false},
{addr: "", want: false},
}

for _, tt := range tests {
t.Run(tt.addr, func(t *testing.T) {
t.Parallel()

if got := isLoopbackBindAddr(tt.addr); got != tt.want {
t.Fatalf("isLoopbackBindAddr(%q) = %v, want %v", tt.addr, got, tt.want)
}
})
}
}
16 changes: 16 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
Profiling ProfilingConfig `yaml:"profiling"`
Log LogConfig `yaml:"log"`
Submission SubmissionConfig `yaml:"submission"`
S3 S3APIConfig `yaml:"s3"`
}

// DataSourceConfig configures the Celestia data source.
Expand Down Expand Up @@ -94,6 +95,16 @@ type LogConfig struct {
Format string `yaml:"format"`
}

// S3APIConfig configures the S3-compatible API server.
type S3APIConfig struct {
Enabled bool `yaml:"enabled"`
ListenAddr string `yaml:"listen_addr"`
Region string `yaml:"region"`
Namespace string `yaml:"namespace"` // Celestia namespace for S3 objects (hex)
AccessKeyID string `yaml:"access_key_id"` // optional SigV4 access key for the S3 API
SecretAccessKey string `yaml:"secret_access_key"` // optional SigV4 secret key for the S3 API
}

// SubmissionConfig contains settings for the future blob submission pipeline.
type SubmissionConfig struct {
Enabled bool `yaml:"enabled"`
Expand Down Expand Up @@ -155,6 +166,11 @@ func DefaultConfig() Config {
Level: "info",
Format: "json",
},
S3: S3APIConfig{
Enabled: false,
ListenAddr: ":8333",
Region: "us-east-1",
},
}
}

Expand Down
52 changes: 52 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ storage:
# endpoint: "" # custom endpoint for MinIO, R2, etc.
# chunk_size: 64 # heights per S3 object

s3:
# Enable the S3-compatible HTTP API backed by SQLite object storage.
enabled: false
# Address for the S3-compatible API server.
listen_addr: ":8333"
# AWS region reported to clients.
region: "us-east-1"
# Namespace used when S3 uploads are submitted to Celestia.
namespace: ""
# Optional SigV4 credentials enforced by the S3 API.
access_key_id: ""
secret_access_key: ""

rpc:
# Address for the JSON-RPC API server (HTTP/WebSocket)
listen_addr: ":8080"
Expand Down Expand Up @@ -290,6 +303,9 @@ func validate(cfg *Config) error {
if err := validateSubmission(&cfg.Submission); err != nil {
return err
}
if err := validateS3API(&cfg.S3, &cfg.Storage, &cfg.Submission); err != nil {
return err
}
if !validLogLevels[cfg.Log.Level] {
return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level)
}
Expand Down Expand Up @@ -381,6 +397,42 @@ func validateSubmission(s *SubmissionConfig) error {
return nil
}

func validateS3API(s3cfg *S3APIConfig, storage *StorageConfig, submission *SubmissionConfig) error {
s3cfg.ListenAddr = strings.TrimSpace(s3cfg.ListenAddr)
s3cfg.Region = strings.TrimSpace(s3cfg.Region)
s3cfg.Namespace = strings.TrimSpace(s3cfg.Namespace)
s3cfg.AccessKeyID = strings.TrimSpace(s3cfg.AccessKeyID)
s3cfg.SecretAccessKey = strings.TrimSpace(s3cfg.SecretAccessKey)
if s3cfg.Region == "" {
s3cfg.Region = DefaultConfig().S3.Region
}
if (s3cfg.AccessKeyID == "") != (s3cfg.SecretAccessKey == "") {
return errors.New("s3.access_key_id and s3.secret_access_key must be provided together")
}
if !s3cfg.Enabled {
return nil
}
if s3cfg.ListenAddr == "" {
return errors.New("s3.listen_addr is required when s3.enabled is true")
}
if storage.Type != "sqlite" && storage.Type != "" {
return errors.New("s3.enabled requires storage.type to be \"sqlite\"")
}
if s3cfg.Namespace != "" {
ns, err := types.NamespaceFromHex(s3cfg.Namespace)
if err != nil {
return fmt.Errorf("s3.namespace is invalid: %w", err)
}
if err := ns.ValidateForBlob(); err != nil {
return fmt.Errorf("s3.namespace is invalid: %w", err)
}
}
if submission.Enabled && s3cfg.Namespace == "" {
return errors.New("s3.namespace is required when both s3.enabled and submission.enabled are true")
}
return nil
}

func resolveSubmissionSignerKeyPath(s *SubmissionConfig, baseDir string) error {
if !s.Enabled {
return nil
Expand Down
Loading
Loading