Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 85 additions & 22 deletions block/internal/da/fiber_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,43 +87,106 @@
}
}

flat := flattenBlobs(data)
// Fibre's per-upload cap is ~128 MiB (hard server-side reject:
// "data size %d exceeds maximum 134217723"). flattenBlobs adds
// 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk
// to leave overhead room and avoid borderline rejects.
chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget)
nsID := namespace[len(namespace)-10:]
result, err := c.fiber.Upload(context.Background(), nsID, flat)
if err != nil {
code := datypes.StatusError
switch {
case errors.Is(err, context.Canceled):
code = datypes.StatusContextCanceled
case errors.Is(err, context.DeadlineExceeded):
code = datypes.StatusContextDeadline
}
c.logger.Error().Err(err).Msg("fiber upload failed")
return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: code,
Message: fmt.Sprintf("fiber upload failed for blob: %v", err),
SubmittedCount: uint64(len(data) - 1),
BlobSize: blobSize,
Timestamp: time.Now(),
},

ids := make([][]byte, 0, len(chunks))
var submitted int
for chunkIdx, chunk := range chunks {
flat := flattenBlobs(chunk)
uploadStart := time.Now()
result, err := c.fiber.Upload(context.Background(), nsID, flat)

Check failure on line 102 in block/internal/da/fiber_client.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

Non-inherited new context, use function like `context.WithXXX` instead (contextcheck)
uploadDuration := time.Since(uploadStart)
if err != nil {
c.logger.Warn().
Dur("duration", uploadDuration).
Int("flat_size", len(flat)).
Int("blob_count", len(chunk)).
Int("chunk_idx", chunkIdx).
Int("chunk_total", len(chunks)).
Err(err).
Msg("fiber upload duration (failed)")
code := datypes.StatusError
switch {
case errors.Is(err, context.Canceled):
code = datypes.StatusContextCanceled
case errors.Is(err, context.DeadlineExceeded):
code = datypes.StatusContextDeadline
}
c.logger.Error().Err(err).Msg("fiber upload failed")
return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: code,
Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err),
SubmittedCount: uint64(submitted),
BlobSize: blobSize,
Timestamp: time.Now(),
},
}
}
c.logger.Info().
Dur("duration", uploadDuration).
Int("flat_size", len(flat)).
Int("blob_count", len(chunk)).
Int("chunk_idx", chunkIdx).
Int("chunk_total", len(chunks)).
Msg("fiber upload duration (ok)")
ids = append(ids, result.BlobID)
submitted += len(chunk)
}

c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")

return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: datypes.StatusSuccess,
IDs: [][]byte{result.BlobID},
SubmittedCount: uint64(len(data)),
IDs: ids,
SubmittedCount: uint64(submitted),
Height: 0, /* TODO */
BlobSize: blobSize,
Timestamp: time.Now(),
},
}
}

// fibreUploadChunkBudget is the target maximum flattened size of a single
// Fibre Upload call. Fibre rejects payloads above ~128 MiB
// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for
// flattenBlobs's per-blob length prefixes and for any future overhead.
const fibreUploadChunkBudget = 120 * 1024 * 1024

// chunkBlobsForFibre groups data into chunks whose flattened size stays
// below budget. Per-blob length-prefix overhead matches flattenBlobs.
// A single oversized blob (already validated against DefaultMaxBlobSize
// above) lands in its own chunk; the upload still fails server-side but
// at least we don't drag healthy peers down with it.
func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte {
if len(data) == 0 {
return nil
}
chunks := make([][][]byte, 0, 1)
cur := make([][]byte, 0, len(data))
curSize := 4 // flattenBlobs's count prefix
for _, b := range data {
entry := 4 + len(b)
if len(cur) > 0 && curSize+entry > budget {
chunks = append(chunks, cur)
cur = make([][]byte, 0, len(data))
curSize = 4
}
cur = append(cur, b)
curSize += entry
}
if len(cur) > 0 {
chunks = append(chunks, cur)
}
return chunks
}

func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
return c.retrieve(ctx, height, namespace, true)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() {
}

c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second}
c.Node.MaxPendingHeadersAndData = 50
// Tighter pending cap (was 50). At 50, a Fibre upload stall lets the
// submitter accumulate 50 × ~32 MiB blob copies + their per-validator
// retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and
// OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint
// bounded while still letting healthy uploads pipeline.
c.Node.MaxPendingHeadersAndData = 10
}

// GetNamespace returns the namespace for header submissions.
Expand Down
45 changes: 34 additions & 11 deletions tools/celestia-node-fiber/cmd/evnode-fibre/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (

"github.com/celestiaorg/celestia-app/v9/app"
"github.com/celestiaorg/celestia-app/v9/app/encoding"
appfibre "github.com/celestiaorg/celestia-app/v9/fibre"
"github.com/celestiaorg/celestia-node/api/client"
cnp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"

Expand Down Expand Up @@ -210,14 +211,16 @@ func run(cli cliFlags) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Construct the celestia-node-fiber adapter. We don't override
// SubmitConfig.Fibre — the Fibre client defaults (UploadMemoryBudget
// 512 MiB, RPCTimeout 15 s) are sized for the FSP-side concurrency
// the validators can actually absorb. We tried bumping the budget
// to 4 GiB to allow more in-flight blobs; with 16 upload workers
// the FSPs couldn't keep up and the box OOM'd at 63.9 GB. Leaving
// the defaults in place means the upload pipeline self-bounds at
// roughly what the FSPs can sustain.
// Construct the celestia-node-fiber adapter. The Fibre client
// defaults (UploadMemoryBudget 512 MiB, RPCTimeout 15 s) are sized
// for FSP-side concurrency. Bumping the budget alone caused 64 GiB
// OOMs (4 GiB budget × 16 workers), so we leave that conservative
// AND raise RPCTimeout to 30 s so a slow-but-healthy validator
// signature collection isn't cut short under load — under busy
// conditions a 32 MiB row upload + sig aggregation can exceed the
// 15 s default.
fibreCfg := appfibre.DefaultClientConfig()
fibreCfg.RPCTimeout = 30 * time.Second
adapter, err := cnfiber.New(ctx, cnfiber.Config{
Client: client.Config{
ReadConfig: client.ReadConfig{
Expand All @@ -231,6 +234,7 @@ func run(cli cliFlags) error {
CoreGRPCConfig: client.CoreGRPCConfig{
Addr: cli.coreGRPCAddr,
},
Fibre: &fibreCfg,
},
},
}, kr)
Expand Down Expand Up @@ -302,7 +306,12 @@ func run(cli cliFlags) error {
// Fiber-tuned profile: BatchingStrategy=adaptive, BatchMaxDelay=1.5s,
// DA.BlockTime=1s, MaxPendingHeadersAndData=0, plus 120 MiB blob cap.
cfg.ApplyFiberDefaults()
block.SetMaxBlobSize(120 * 1024 * 1024)
// 100 MiB — bounded by Fibre's hard ~128 MiB per-upload cap (we
// hit `data size exceeds maximum 134217723` at 128 MiB - 5 B).
// Set the per-block data cap below that so each block_data item
// fits in a single Fibre upload after the submitter splits a
// multi-blob batch into ≤120 MiB chunks.
block.SetMaxBlobSize(100 * 1024 * 1024)
cfg.P2P.ListenAddress = cli.p2pListen
cfg.P2P.DisableConnectionGater = true
cfg.RPC.Address = cli.rpcListen
Expand Down Expand Up @@ -542,10 +551,24 @@ func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.Execu
return coreexecution.ExecutionInfo{MaxGas: 0}, nil
}

func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) {
// FilterTxs admits txs in arrival order until the maxBytes budget is
// reached, then postpones the rest back to the sequencer queue so they
// land in a future batch. Skipping this enforcement (a previous version
// returned FilterOK unconditionally) lets a single block sweep up the
// entire mempool — under sustained txsim load that produced 369 MiB
// blocks that exceeded Fibre's per-upload cap and crashed the node
// with `single item exceeds DA blob size limit`.
func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) {
st := make([]coreexecution.FilterStatus, len(txs))
for i := range st {
var used uint64
for i, tx := range txs {
size := uint64(len(tx))
if maxBytes > 0 && used+size > maxBytes {
st[i] = coreexecution.FilterPostpone
continue
}
st[i] = coreexecution.FilterOK
used += size
}
return st, nil
}
Expand Down
14 changes: 13 additions & 1 deletion tools/talis/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,16 +350,28 @@ func CreateAWSInstances(ctx context.Context, insts []Instance, sshKey, keyName s
close(results)
}()

var created []Instance
var (
created []Instance
failures []string
)
for res := range results {
if res.err != nil {
fmt.Printf("❌ %s failed after %v %v\n", res.inst.Name, res.timeRequired, res.err)
failures = append(failures, fmt.Sprintf("%s: %v", res.inst.Name, res.err))
} else {
created = append(created, res.inst)
fmt.Printf("✅ %s is up (public=%s) in %v\n", res.inst.Name, res.inst.PublicIP, res.timeRequired)
}
fmt.Printf("---- Progress: %d/%d\n", len(created), total)
}
if len(failures) > 0 {
// Surface partial-failure as an error so `talis up` exits
// non-zero; without this, downstream genesis runs against a
// half-provisioned config and fails much later with confusing
// "X has no public IP yet" messages.
return created, fmt.Errorf("%d/%d instance(s) failed to launch: %s",
len(failures), total, strings.Join(failures, "; "))
}
return created, nil
}

Expand Down
14 changes: 12 additions & 2 deletions tools/talis/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,26 @@ func compressAndDownload(table, localPath, user, host, sshKeyPath string) error
return nil
}

// sshExec runs a command on a remote host via SSH and returns the combined output.
// sshExec runs a command on a remote host via SSH and returns stdout only.
//
// We intentionally do NOT use CombinedOutput here. ssh prints connection
// chatter ("Warning: Permanently added '...' to the list of known hosts.")
// on stderr, and a previous `CombinedOutput` revision caused
// `fmt.Sscanf(out, "%d")` parses to silently return 0 because the leading
// stderr line had no digits. Capturing only stdout keeps numeric output
// parseable; -q + LogLevel=ERROR further suppresses the chatter for any
// caller that does combine streams.
func sshExec(user, host, sshKeyPath, command string) ([]byte, error) {
cmd := exec.Command("ssh",
"-q",
"-o", "LogLevel=ERROR",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-i", sshKeyPath,
fmt.Sprintf("%s@%s", user, host),
command,
)
return cmd.CombinedOutput()
return cmd.Output()
}

func sftpDownload(remotePath, localPath, user, host, sshKeyPath string) error {
Expand Down
34 changes: 29 additions & 5 deletions tools/talis/fibre_bootstrap_evnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,45 @@ to fetch, then SCPs to each evnode-*.`,
defer wg.Done()
log.Printf("[%s] pushing JWT + keyring", ev.Name)

// JWT is small + atomic on the receive side because
// it's a single file, so we push it directly.
if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, localJWT, "/root/bridge-jwt.txt", false); err != nil {
errCh <- fmt.Errorf("[%s] push JWT: %w", ev.Name, err)
return
}

// mkdir the parent so scp lands at the exact path
// evnode_init.sh waits for.
if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, "mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test"); err != nil {
errCh <- fmt.Errorf("[%s] mkdir keyring-fibre: %w", ev.Name, err)
// Keyring push is staged through a tmp dir and
// promoted via mv. Without staging, evnode_init.sh's
// poll loop (which tests `[ -d keyring-test ]`)
// passes the moment scp -r mkdir's the directory,
// long before fibre-0.info is on disk. evnode then
// launches mid-scp and dies with `keyring entry
// "fibre-0" not found`. mv is atomic on the same
// filesystem so the init script either sees nothing
// (keep waiting) or the fully-populated dir (start
// the daemon cleanly).
stageDir := "/root/.keyring-fibre.staging"
prep := fmt.Sprintf(
"rm -rf %s && mkdir -p %s && mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test",
stageDir, stageDir,
)
if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, prep); err != nil {
errCh <- fmt.Errorf("[%s] stage keyring: %w", ev.Name, err)
return
}
if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), "/root/keyring-fibre/keyring-test", true); err != nil {
stageDest := stageDir + "/keyring-test"
if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), stageDest, true); err != nil {
errCh <- fmt.Errorf("[%s] push keyring: %w", ev.Name, err)
return
}
promote := fmt.Sprintf(
"mv %s /root/keyring-fibre/keyring-test && rmdir %s",
stageDest, stageDir,
)
if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, promote); err != nil {
errCh <- fmt.Errorf("[%s] promote keyring: %w", ev.Name, err)
return
}

log.Printf("[%s] ✓ pushed; daemon should start within ~10s", ev.Name)
}(ev)
Expand Down
Loading
Loading