diff --git a/crates/aspect-cli/src/builtins/aspect/MODULE.aspect b/crates/aspect-cli/src/builtins/aspect/MODULE.aspect index c24144ecd..7d3c08774 100644 --- a/crates/aspect-cli/src/builtins/aspect/MODULE.aspect +++ b/crates/aspect-cli/src/builtins/aspect/MODULE.aspect @@ -11,8 +11,8 @@ use_task("format.axl", "format") use_task("gazelle.axl", "gazelle") use_feature("feature/artifacts.axl", "ArtifactUpload") -use_feature("feature/defaults.axl", "BazelDefaults") use_feature("feature/github_lint_comments.axl", "GithubLintComments") use_feature("feature/github_status_checks.axl", "GithubStatusChecks") use_feature("feature/github_status_comments.axl", "GithubStatusComments") use_feature("feature/telemetry.axl", "Telemetry") +use_feature("feature/workflows.axl", "Workflows") diff --git a/crates/aspect-cli/src/builtins/aspect/feature/defaults.axl b/crates/aspect-cli/src/builtins/aspect/feature/workflows.axl similarity index 52% rename from crates/aspect-cli/src/builtins/aspect/feature/defaults.axl rename to crates/aspect-cli/src/builtins/aspect/feature/workflows.axl index baf62e1c8..2f5623c8e 100644 --- a/crates/aspect-cli/src/builtins/aspect/feature/defaults.axl +++ b/crates/aspect-cli/src/builtins/aspect/feature/workflows.axl @@ -8,7 +8,7 @@ load("../lib/build_metadata.axl", "get_build_metadata_flags", "get_task_metadata load("../traits.axl", "BazelTrait", "HealthCheckTrait") load("../lib/health_check.axl", "agent_health_check") -def _default_bazel_behavior(ctx: FeatureContext): +def _workflows_impl(ctx: FeatureContext): environment = get_environment(ctx.std) health_check = ctx.traits[HealthCheckTrait] @@ -47,7 +47,12 @@ def _default_bazel_behavior(ctx: FeatureContext): if bessie_endpoint: bessie_sinks.append(bazel.build_events.grpc( uri = bessie_endpoint, - metadata = {} # TODO: how does bessie authenticate? + metadata = {}, + max_retries = ctx.args.bes_max_retries, + retry_min_delay = ctx.args.bes_retry_min_delay, + retry_max_buffer_size = ctx.args.bes_retry_max_buffer_size, + timeout = ctx.args.bes_timeout, + error_strategy = ctx.args.bes_error_strategy, )) bazel_version_output = ctx.std.process.command("bazel").arg("--version").stdout("piped").spawn().wait_with_output() @@ -75,7 +80,53 @@ def _default_bazel_behavior(ctx: FeatureContext): bazel_trait.extra_flags.append("--color=yes") -BazelDefaults = feature( - implementation = _default_bazel_behavior, - args = {} -) \ No newline at end of file +Workflows = feature( + display_name = "Aspect Workflows", + summary = "CI-aware Bazel configuration for Aspect Workflows. Wires BES " + + "forwarding, build metadata, health checks, and CI-host quirks " + + "(GitHub runner tracking, Buildkite section markers, color " + + "forcing). Today this only fully works on Aspect Workflows " + + "runners; the CI-host branches for buildkite/github are partial " + + "and may grow into general-CI support later.", + implementation = _workflows_impl, + args = { + # BES sink retry/buffer/timeout knobs. Forwarded verbatim to + # `bazel.build_events.grpc(...)`; see crates/axl-runtime for the + # underlying state machine. Every parameter has a sensible default + # that mirrors Bazel's BuildEventServiceUploader, so users only set + # these when they want to deviate (e.g. fail builds on BES outage + # for compliance, or shrink buffers on memory-constrained runners). + "bes_max_retries": args.int( + default = 4, + description = "Max reconnect attempts after a transient BES error " + + "before giving up. 0 disables retry; the sink remains " + + "non-fatal regardless.", + ), + "bes_retry_min_delay": args.string( + default = "1s", + description = "Base delay for full-jitter exponential backoff " + + "between BES reconnect attempts. Duration string " + + "(e.g. '500ms', '2s', '1m').", + ), + "bes_retry_max_buffer_size": args.int( + default = 10000, + description = "Cap on the in-flight unacked BES retry buffer. " + + "Exceeding this mid-stream is treated as a terminal " + + "failure per `bes_error_strategy`.", + ), + "bes_timeout": args.string( + default = "0s", + description = "Overall BES upload deadline. Duration string; '0s' " + + "disables the deadline. Mirrors Bazel's --bes_timeout.", + ), + "bes_error_strategy": args.string( + default = "warn", + values = ["abort", "fail_at_end", "warn", "ignore"], + description = "How a terminal BES failure surfaces. " + + "'abort' kills the build; " + + "'fail_at_end' lets the build complete then exits " + + "non-zero; 'warn' (default) prints to stderr and " + + "leaves the exit code alone; 'ignore' is silent.", + ), + } +) diff --git a/crates/aspect-cli/src/builtins/aspect/format.axl b/crates/aspect-cli/src/builtins/aspect/format.axl index 10fd7b672..9ac479cfa 100644 --- a/crates/aspect-cli/src/builtins/aspect/format.axl +++ b/crates/aspect-cli/src/builtins/aspect/format.axl @@ -152,7 +152,7 @@ def _impl(ctx: TaskContext) -> int: # Task-time flag contributors — features register hooks that need ctx.task # (e.g. --build_metadata=ASPECT_TASK_NAME=...) which features can't produce # at activation time because FeatureContext has no task. - # See BazelDefaults in feature/defaults.axl. + # See Workflows in feature/workflows.axl. for hook in bazel_trait.task_flags: flags.extend(hook(ctx)) if bazel_trait.flags: diff --git a/crates/aspect-cli/src/builtins/aspect/lib/bazel_runner.axl b/crates/aspect-cli/src/builtins/aspect/lib/bazel_runner.axl index 300a397dc..0605f78bd 100644 --- a/crates/aspect-cli/src/builtins/aspect/lib/bazel_runner.axl +++ b/crates/aspect-cli/src/builtins/aspect/lib/bazel_runner.axl @@ -66,7 +66,7 @@ def run_bazel_task(ctx: TaskContext, command: str) -> int: # Task-time flag contributors — features register hooks that need # ctx.task (e.g. --build_metadata=ASPECT_TASK_NAME=...) which features # can't produce at activation time because FeatureContext has no task. - # See BazelDefaults in feature/defaults.axl for the metadata registration. + # See Workflows in feature/workflows.axl for the metadata registration. for hook in bazel_trait.task_flags: flags.extend(hook(ctx)) if bazel_trait.flags: diff --git a/crates/aspect-cli/src/builtins/aspect/lint.axl b/crates/aspect-cli/src/builtins/aspect/lint.axl index c93ec502e..6d03f6ba1 100644 --- a/crates/aspect-cli/src/builtins/aspect/lint.axl +++ b/crates/aspect-cli/src/builtins/aspect/lint.axl @@ -190,7 +190,7 @@ def _impl(ctx: TaskContext) -> int: # Task-time flag contributors — features register hooks that need # ctx.task (e.g. --build_metadata=ASPECT_TASK_NAME=...) which features # can't produce at activation time because FeatureContext has no task. - # See BazelDefaults in feature/defaults.axl. + # See Workflows in feature/workflows.axl. for hook in bazel_trait.task_flags: flags.extend(hook(ctx)) if bazel_trait.flags: diff --git a/crates/axl-runtime/src/engine/bazel/build.rs b/crates/axl-runtime/src/engine/bazel/build.rs index aef0a6945..ebe221be7 100644 --- a/crates/axl-runtime/src/engine/bazel/build.rs +++ b/crates/axl-runtime/src/engine/bazel/build.rs @@ -28,15 +28,16 @@ use starlark::values::starlark_value; use crate::engine::r#async::rt::AsyncRuntime; -use super::execlog_sink::ExecLogSink; use super::iter::BuildEventIterator; use super::iter::ExecutionLogIterator; use super::iter::WorkspaceEventIterator; +use super::sink::execlog::ExecLogSink; +use super::sink::grpc; +use super::sink::retry::{ErrorStrategy, RetryConfig, SinkError, SinkOutcome}; +use super::sink::tracing as tracing_sink; use super::stream::BuildEventStream; use super::stream::ExecLogStream; use super::stream::WorkspaceEventStream; -use super::stream_sink::GrpcEventStreamSink; -use super::stream_tracing::TracingEventStreamSink; #[derive(Debug, ProvidesStaticType, Display, Trace, NoSerialize, Allocative)] #[display("")] @@ -79,6 +80,8 @@ pub enum BuildEventSink { Grpc { uri: String, metadata: HashMap, + #[allocative(skip)] + retry: RetryConfig, }, File { path: String, @@ -109,17 +112,22 @@ impl BuildEventSink { rt: AsyncRuntime, stream: &BuildEventStream, invocation_id: String, - ) -> JoinHandle<()> { + ) -> JoinHandle { match self { - BuildEventSink::Grpc { uri, metadata } => { + BuildEventSink::Grpc { + uri, + metadata, + retry, + } => { // Use subscribe_realtime() since sinks subscribe at stream creation // and don't need history replay. - GrpcEventStreamSink::spawn( + grpc::Grpc::spawn( rt, stream.subscribe(), uri.clone(), metadata.clone(), invocation_id, + retry.clone(), ) } BuildEventSink::File { .. } => { @@ -139,8 +147,13 @@ pub struct Build { #[allocative(skip)] execlog_stream: RefCell>, + /// Threads forwarding to BES, tracing, or file sinks. Every sink returns + /// a `SinkOutcome` so `wait()` can surface failures per the sink's + /// `error_strategy`. User-configured BES sinks default to `warn`; + /// internal sinks (tracing emitter, execlog writer) default to `abort`, + /// since their failures indicate a real bug rather than a flaky backend. #[allocative(skip)] - sink_handles: RefCell>>, + sink_handles: RefCell>>, /// The shared invocation_id that every gRPC BES sink uses when forwarding /// this build's events. `None` when no BES sinks were configured; `Some` @@ -308,7 +321,7 @@ impl Build { // blocked in `Pipe::open` waiting for bazel to open the FIFO write // end, which won't happen until bazel finishes JVM startup — well // after these subscribe calls land. - let mut sink_handles: Vec> = vec![]; + let mut sink_handles: Vec> = vec![]; let sink_invocation_id: Option = if !bes_subscriber_sinks.is_empty() { let invocation_id = uuid::Uuid::new_v4().to_string(); for sink in bes_subscriber_sinks { @@ -336,7 +349,7 @@ impl Build { if build_events { // Use subscribe_realtime() since this subscribes at stream creation // and doesn't need history replay. - sink_handles.push(TracingEventStreamSink::spawn( + sink_handles.push(tracing_sink::Tracing::spawn( build_event_stream.as_ref().unwrap().subscribe(), )) } @@ -483,21 +496,49 @@ pub(crate) fn build_methods(registry: &mut MethodsBuilder) { } }; - let handles = build.sink_handles.take(); - for handle in handles { + // Resolve all sink threads. Each returns a `SinkOutcome` describing + // its terminal state and how to surface a failure (per Bazel's BES + // upload error policy plus `abort` for internal sinks). + // `Warn` and `Ignore` already printed (or stayed silent) at the sink; + // only `Abort` and `FailAtEnd` propagate further. + let mut abort_msg: Option = None; + let mut fail_at_end = false; + for handle in build.sink_handles.take() { match handle.join() { - Ok(_) => continue, - Err(err) => anyhow::bail!("one of the sinks failed: {:#?}", err), + Ok(Ok(())) => continue, + Ok(Err(SinkError { + strategy, + last_error, + })) => match strategy { + ErrorStrategy::Abort => { + if abort_msg.is_none() { + abort_msg = Some(last_error); + } + } + ErrorStrategy::FailAtEnd => { + fail_at_end = true; + } + ErrorStrategy::Warn | ErrorStrategy::Ignore => {} + }, + Err(err) => anyhow::bail!("sink thread panicked: {:#?}", err), } } // Drop the span to end the trace drop(build.span.replace(tracing::Span::none())); - Ok(BuildStatus { - success: result.success(), - code: result.code(), - }) + if let Some(msg) = abort_msg { + anyhow::bail!("BES sink failure (abort): {}", msg); + } + + let success = result.success() && !fail_at_end; + let code = if fail_at_end && result.code().unwrap_or(0) == 0 { + Some(36) + } else { + result.code() + }; + + Ok(BuildStatus { success, code }) } } @@ -593,4 +634,208 @@ Test = task(implementation = _impl) } } } + + // --- bazel.build_events.grpc validation --- + // + // These exercise the Starlark surface of the failure-knob feature. + // `.check()` runs the snippet through eval_module — the call lives at + // module level so the function's parameter validation is the *only* + // thing under test. No basil, no real network. + + #[test] + fn grpc_rejects_unknown_error_strategy() { + let err = crate::axl_check!( + r#"bazel.build_events.grpc(uri = "http://localhost:1", error_strategy = "nope")"# + ) + .expect_err("expected validation error") + .to_string(); + assert!( + err.contains("error_strategy") && err.contains("nope"), + "unexpected error: {err}" + ); + } + + #[test] + fn grpc_rejects_negative_max_retries() { + let err = crate::axl_check!( + r#"bazel.build_events.grpc(uri = "http://localhost:1", max_retries = -1)"# + ) + .expect_err("expected validation error") + .to_string(); + assert!( + err.contains("max_retries") && err.contains(">= 0"), + "unexpected error: {err}" + ); + } + + #[test] + fn grpc_rejects_zero_buffer_size() { + let err = crate::axl_check!( + r#"bazel.build_events.grpc(uri = "http://localhost:1", retry_max_buffer_size = 0)"# + ) + .expect_err("expected validation error") + .to_string(); + assert!( + err.contains("retry_max_buffer_size") && err.contains("> 0"), + "unexpected error: {err}" + ); + } + + #[test] + fn grpc_rejects_malformed_retry_min_delay() { + let err = crate::axl_check!( + r#"bazel.build_events.grpc(uri = "http://localhost:1", retry_min_delay = "garbage")"# + ) + .expect_err("expected validation error") + .to_string(); + assert!(err.contains("retry_min_delay"), "unexpected error: {err}"); + } + + #[test] + fn grpc_rejects_malformed_timeout() { + let err = crate::axl_check!( + r#"bazel.build_events.grpc(uri = "http://localhost:1", timeout = "garbage")"# + ) + .expect_err("expected validation error") + .to_string(); + assert!(err.contains("timeout"), "unexpected error: {err}"); + } + + #[test] + fn grpc_accepts_full_knob_set() { + crate::axl_check!( + r#"bazel.build_events.grpc( + uri = "grpcs://bes.example.com", + metadata = {"x-auth": "tok"}, + max_retries = 0, + retry_min_delay = "500ms", + retry_max_buffer_size = 16, + timeout = "30s", + error_strategy = "fail_at_end", +)"# + ) + .expect("snippet should validate"); + } + + // --- error_strategy end-to-end --- + // + // The tests below feed an unparseable URI into the gRPC sink so + // `Channel::from_shared` returns `InvalidEndpoint` (non-retryable) and + // the sink terminates without touching the network. We deliberately + // avoid a real TCP target — connect-refused timing varies by platform + // and would make these tests flaky. + // + // basil is told to run the "success" scenario, so bazel itself exits + // 0; only the sink path differs across these tests. + + /// `error_strategy = "ignore"`: terminal sink error is suppressed. + /// `wait()` returns success and the original bazel exit code. + #[test] + fn grpc_error_strategy_ignore_swallows_terminal_failure() { + use std::time::Duration; + let result = crate::test::with_timeout(Duration::from_secs(15), || { + crate::test::eval( + r#" +def _impl(ctx): + sink = bazel.build_events.grpc( + uri = "not a uri", + max_retries = 0, + retry_min_delay = "0s", + error_strategy = "ignore", + ) + build = ctx.bazel.build( + flags = ["--scenario=success"], + build_events = [sink], + inherit_stderr = False, + ) + status = build.wait() + if not status.success: return 1 + if status.code != 0: return 2 + return 0 + +Test = task(implementation = _impl) +"#, + ) + .with_fake_bazel() + .run_task(0) + }) + .expect("test hung"); + assert_eq!(result.expect("run_task"), Some(0)); + } + + /// `error_strategy = "fail_at_end"`: bazel succeeded but the sink + /// failed terminally — `wait()` reports `success = False` with the + /// reserved exit code 36 so callers can distinguish a sink-induced + /// failure from a genuine build failure. + #[test] + fn grpc_error_strategy_fail_at_end_reports_code_36() { + use std::time::Duration; + let result = crate::test::with_timeout(Duration::from_secs(15), || { + crate::test::eval( + r#" +def _impl(ctx): + sink = bazel.build_events.grpc( + uri = "not a uri", + max_retries = 0, + retry_min_delay = "0s", + error_strategy = "fail_at_end", + ) + build = ctx.bazel.build( + flags = ["--scenario=success"], + build_events = [sink], + inherit_stderr = False, + ) + status = build.wait() + if status.success: return 1 + if status.code != 36: return 2 + return 0 + +Test = task(implementation = _impl) +"#, + ) + .with_fake_bazel() + .run_task(0) + }) + .expect("test hung"); + assert_eq!(result.expect("run_task"), Some(0)); + } + + /// `error_strategy = "abort"`: terminal sink failure is raised out of + /// `wait()` as an evaluation error. The error message identifies the + /// abort path so users can tell a sink failure from any other Starlark + /// runtime error. + #[test] + fn grpc_error_strategy_abort_propagates_as_eval_error() { + use std::time::Duration; + let result = crate::test::with_timeout(Duration::from_secs(15), || { + crate::test::eval( + r#" +def _impl(ctx): + sink = bazel.build_events.grpc( + uri = "not a uri", + max_retries = 0, + retry_min_delay = "0s", + error_strategy = "abort", + ) + build = ctx.bazel.build( + flags = ["--scenario=success"], + build_events = [sink], + inherit_stderr = False, + ) + build.wait() + return 0 + +Test = task(implementation = _impl) +"#, + ) + .with_fake_bazel() + .run_task(0) + }) + .expect("test hung"); + let err = result.expect_err("expected wait() to bail").to_string(); + assert!( + err.contains("BES sink failure") && err.contains("abort"), + "unexpected error: {err}" + ); + } } diff --git a/crates/axl-runtime/src/engine/bazel/mod.rs b/crates/axl-runtime/src/engine/bazel/mod.rs index 269579de8..7e186a43c 100644 --- a/crates/axl-runtime/src/engine/bazel/mod.rs +++ b/crates/axl-runtime/src/engine/bazel/mod.rs @@ -32,16 +32,14 @@ use axl_proto; mod build; mod cancel; -mod execlog_sink; mod health_check; mod info; mod iter; mod process; mod query; mod rc; +mod sink; mod stream; -mod stream_sink; -mod stream_tracing; /// Resolve which `bazel` binary to spawn. Honors the `BAZEL_REAL` env var /// (the bazelisk convention) so wrapped invocations and tests can substitute @@ -220,7 +218,7 @@ pub(crate) fn bazel_methods(registry: &mut MethodsBuilder) { #[starlark(require = named, default = false)] workspace_events: bool, #[starlark(require = named, default = Either::Left(false))] execution_log: Either< bool, - UnpackList, + UnpackList, >, #[starlark(require = named, default = UnpackList::default())] flags: UnpackList< Either, (values::StringValue<'v>, values::StringValue<'v>)>, @@ -323,7 +321,7 @@ pub(crate) fn bazel_methods(registry: &mut MethodsBuilder) { #[starlark(require = named, default = false)] workspace_events: bool, #[starlark(require = named, default = Either::Left(false))] execution_log: Either< bool, - UnpackList, + UnpackList, >, #[starlark(require = named, default = UnpackList::default())] flags: UnpackList< Either, (values::StringValue<'v>, values::StringValue<'v>)>, @@ -576,16 +574,63 @@ pub(crate) fn bazel_methods(registry: &mut MethodsBuilder) { #[starlark_module] fn register_build_events(globals: &mut GlobalsBuilder) { + /// Forward Build Event Protocol events to a gRPC backend. + /// + /// Mirrors Bazel's `BuildEventServiceUploader`: transient transport or + /// gRPC errors trigger a bounded reconnect with full-jitter exponential + /// backoff, replaying buffered events under their original sequence + /// numbers. The BES protocol's per-stream dedup makes replay safe. + /// + /// # Arguments + /// * `uri` - BES endpoint. `grpcs://` is rewritten to `https://`. + /// * `metadata` - Headers attached to every request. + /// * `max_retries` - Max reconnect attempts after an error before giving + /// up (default `4`). `0` disables retry; the sink is still non-fatal. + /// * `retry_min_delay` - Base delay for exponential backoff + /// (default `"1s"`). + /// * `retry_max_buffer_size` - Cap on the in-flight unacked retry buffer + /// (default `10000`). Exceeding it mid-stream is terminal. + /// * `timeout` - Overall upload deadline (default `"0s"` = no deadline). + /// * `error_strategy` - How a terminal failure surfaces. One of + /// `"abort"`, `"fail_at_end"`, `"warn"` (default), `"ignore"`. #[starlark(as_type = build::BuildEventSink)] fn grpc( #[starlark(require = named)] uri: String, #[starlark(require = named, default = UnpackDictEntries::default())] metadata: UnpackDictEntries, + #[starlark(require = named, default = 4)] max_retries: i32, + #[starlark(require = named, default = "1s")] retry_min_delay: &str, + #[starlark(require = named, default = 10_000)] retry_max_buffer_size: i32, + #[starlark(require = named, default = "0s")] timeout: &str, + #[starlark(require = named, default = "warn")] error_strategy: &str, ) -> anyhow::Result { - // TODO: validate endpoint + if max_retries < 0 { + anyhow::bail!("max_retries must be >= 0, got {max_retries}"); + } + if retry_max_buffer_size <= 0 { + anyhow::bail!("retry_max_buffer_size must be > 0, got {retry_max_buffer_size}"); + } + let retry_min_delay = sink::retry::parse_duration(retry_min_delay) + .map_err(|e| anyhow::anyhow!("retry_min_delay: {e}"))?; + let timeout_dur = + sink::retry::parse_duration(timeout).map_err(|e| anyhow::anyhow!("timeout: {e}"))?; + let timeout = if timeout_dur.is_zero() { + None + } else { + Some(timeout_dur) + }; + let error_strategy = + sink::retry::ErrorStrategy::parse(error_strategy).map_err(|e| anyhow::anyhow!(e))?; Ok(build::BuildEventSink::Grpc { uri: uri.replace("grpcs://", "https://"), metadata: HashMap::from_iter(metadata.entries), + retry: sink::retry::RetryConfig { + max_retries: max_retries as u32, + retry_min_delay, + retry_max_buffer_size: retry_max_buffer_size as usize, + timeout, + error_strategy, + }, }) } @@ -596,17 +641,17 @@ fn register_build_events(globals: &mut GlobalsBuilder) { #[starlark_module] fn register_execlog_sinks(globals: &mut GlobalsBuilder) { - #[starlark(as_type = execlog_sink::ExecLogSink)] + #[starlark(as_type = sink::execlog::ExecLogSink)] fn file( #[starlark(require = named)] path: String, - ) -> anyhow::Result { - Ok(execlog_sink::ExecLogSink::File { path }) + ) -> anyhow::Result { + Ok(sink::execlog::ExecLogSink::File { path }) } fn compact_file( #[starlark(require = named)] path: String, - ) -> anyhow::Result { - Ok(execlog_sink::ExecLogSink::CompactFile { path }) + ) -> anyhow::Result { + Ok(sink::execlog::ExecLogSink::CompactFile { path }) } } @@ -626,7 +671,7 @@ fn register_build_types(globals: &mut GlobalsBuilder) { #[starlark_module] fn register_execlog_types(globals: &mut GlobalsBuilder) { - const ExecLogSink: StarlarkValueAsType = StarlarkValueAsType::new(); + const ExecLogSink: StarlarkValueAsType = StarlarkValueAsType::new(); } #[starlark_module] diff --git a/crates/axl-runtime/src/engine/bazel/execlog_sink.rs b/crates/axl-runtime/src/engine/bazel/sink/execlog.rs similarity index 71% rename from crates/axl-runtime/src/engine/bazel/execlog_sink.rs rename to crates/axl-runtime/src/engine/bazel/sink/execlog.rs index 30d582543..521f707f9 100644 --- a/crates/axl-runtime/src/engine/bazel/execlog_sink.rs +++ b/crates/axl-runtime/src/engine/bazel/sink/execlog.rs @@ -14,6 +14,8 @@ use starlark::values; use starlark::values::starlark_value; use starlark::values::{NoSerialize, ProvidesStaticType, UnpackValue, ValueLike}; +use super::retry::{ErrorStrategy, SinkError, SinkOutcome}; + /// Sink types for execution log output. /// /// | Variant | Format | @@ -46,21 +48,30 @@ impl<'v> UnpackValue<'v> for ExecLogSink { impl ExecLogSink { /// Spawns a thread that reads decoded `ExecLogEntry` values from `recv` and /// writes them to `path` in varint-length-prefixed binary proto format. - pub fn spawn_file(recv: Receiver, path: String) -> JoinHandle<()> { + /// + /// I/O errors surface as `Err(SinkError { strategy: Abort, .. })` so the + /// build fails cleanly instead of leaving a half-written log behind. + pub fn spawn_file(recv: Receiver, path: String) -> JoinHandle { thread::spawn(move || { - let file = File::create(&path).expect("failed to create execlog output file"); + let abort = |last_error: String| SinkError { + strategy: ErrorStrategy::Abort, + last_error, + }; + let file = File::create(&path) + .map_err(|e| abort(format!("ExecLog: failed to create '{path}': {e}")))?; let mut file = BufWriter::new(file); loop { match recv.recv() { Ok(entry) => { - if let Err(e) = file.write_all(&entry.encode_length_delimited_to_vec()) { - eprintln!("ExecLogSink: failed to write entry: {}", e); - break; - } + file.write_all(&entry.encode_length_delimited_to_vec()) + .map_err(|e| { + abort(format!("ExecLog: write to '{path}' failed: {e}")) + })?; } Err(RecvError::Disconnected) => break, } } + Ok(()) }) } } diff --git a/crates/axl-runtime/src/engine/bazel/sink/grpc.rs b/crates/axl-runtime/src/engine/bazel/sink/grpc.rs new file mode 100644 index 000000000..d2137f5fc --- /dev/null +++ b/crates/axl-runtime/src/engine/bazel/sink/grpc.rs @@ -0,0 +1,430 @@ +use std::{ + collections::HashMap, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + thread::{self, JoinHandle}, +}; + +use axl_proto::{ + build_event_stream::BuildEvent, + google::devtools::build::v1::{ + BuildStatus, PublishBuildToolEventStreamRequest, PublishLifecycleEventRequest, + }, +}; +use build_event_stream::{ + build_tool, + client::{Client, ClientError}, + lifecycle, +}; + +use tokio_stream::{StreamExt, wrappers::ReceiverStream}; + +use crate::engine::r#async::rt::AsyncRuntime; + +use super::super::stream::Subscriber; +use super::retry::{ + BufferOverflow, ErrorStrategy, RetryBuffer, RetryConfig, SinkError, SinkOutcome, backoff, + is_retryable, +}; + +#[derive(Debug)] +pub struct Grpc {} + +/// Why `drive_stream` returned. Drives the outer state machine's reconnect +/// or terminal-exit decision. +enum DriveOutcome { + /// `last_message` was sent and the response stream closed cleanly. + Done, + /// Stream broke or returned a retryable error mid-flight; reconnect if + /// budget allows. + Transient(ClientError), + /// Server returned a non-retryable status; terminal regardless of budget. + Fatal(ClientError), + /// Buffer overflowed while we held unacked events; terminal. + BufferFull(BufferOverflow), + /// Upstream broadcaster closed (subscriber disconnected) without ever + /// emitting `last_message`. Treat as clean shutdown — no more events to + /// send, just wait for outstanding acks then exit. + UpstreamClosed, +} + +impl Grpc { + /// Spawn a gRPC BES forwarding thread. + /// + /// The caller supplies `invocation_id` so that when multiple gRPC sinks + /// are configured for the same invocation (e.g. an Aspect backend plus + /// an internal mirror), every backend indexes this build under the same + /// UUID. That id is reported back to the AXL layer via + /// `Build.sink_invocation_id` — downstream consumers can build a single + /// "View invocation" URL that resolves on any backend configured for + /// this build. + /// + /// The returned thread never panics on transport or BES errors. Failures + /// are encoded in the `SinkOutcome` per the sink's `error_strategy`, so + /// `aspect`'s build is never killed by a flaky observability backend. + pub fn spawn( + rt: AsyncRuntime, + recv: Subscriber, + endpoint: String, + headers: HashMap, + invocation_id: String, + retry: RetryConfig, + ) -> JoinHandle { + thread::spawn(move || { + // We block_on here so the worker can drive both async tonic calls + // and the synchronous broadcaster `recv`. block_on cannot itself + // fail the way the previous panic-on-error path implied — if the + // tokio handle is gone, that is a runtime-shutdown bug, not a + // sink failure, so we still let it surface as a panic. + rt.block_on(work(recv, endpoint, headers, invocation_id, retry)) + }) + } +} + +async fn work( + recv: Subscriber, + endpoint: String, + headers: HashMap, + invocation_id: String, + retry: RetryConfig, +) -> SinkOutcome { + let strategy = retry.error_strategy; + let timeout = retry.timeout; + let inner = work_inner(recv, endpoint.clone(), headers, invocation_id, retry); + + // Honor the configured upload deadline. Without this wrapper the BES + // sink can stall indefinitely when the backend is slow to respond, + // even with retry budgets exhausted: lifecycle calls, the bidi + // stream, and `tokio::time::sleep` between retries are each bounded + // only by their own internal logic. Wrapping the whole upload in + // `tokio::time::timeout` mirrors Bazel's `--bes_timeout` and gives + // the user-set knob a real effect. + match timeout { + Some(d) => match tokio::time::timeout(d, inner).await { + Ok(r) => r, + Err(_) => Err(finalize( + strategy, + &endpoint, + format!("BES upload timed out after {d:?}"), + )), + }, + None => inner.await, + } +} + +async fn work_inner( + recv: Subscriber, + endpoint: String, + headers: HashMap, + invocation_id: String, + retry: RetryConfig, +) -> SinkOutcome { + let strategy = retry.error_strategy; + let context = |stage: &str, err: &dyn std::fmt::Display| -> SinkError { + finalize(strategy, &endpoint, format!("{stage}: {err}")) + }; + + // Forward the synchronous broadcaster `recv` into a tokio mpsc so the + // state machine can `select!` over it alongside the bidi response + // stream. The channel is bounded by `retry_max_buffer_size`: during a + // BES outage `drive_stream` stops draining (it's sleeping between + // reconnects or replaying the retry buffer), so without a bound the + // forwarder would queue events indefinitely and defeat the very knob + // meant to cap memory. On overflow we set `overflow_flag` and drop + // `event_tx`; `drive_stream` observes the closed receiver, checks the + // flag, and exits with `BufferFull` so the configured `error_strategy` + // takes effect. + let buffer_cap = retry.retry_max_buffer_size; + let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::(buffer_cap); + let overflow_flag = Arc::new(AtomicBool::new(false)); + let overflow_flag_forwarder = overflow_flag.clone(); + let _forwarder = tokio::task::spawn_blocking(move || { + while let Ok(ev) = recv.recv() { + match event_tx.try_send(ev) { + Ok(()) => {} + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + overflow_flag_forwarder.store(true, Ordering::SeqCst); + break; + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break, + } + } + }); + + let mut client = Client::new(endpoint.clone(), headers) + .await + .map_err(|e| context("connect failed", &e))?; + + let build_id = invocation_id.clone(); + + retry_lifecycle( + &retry, + &mut client, + lifecycle::build_enqueued(build_id.clone(), invocation_id.clone()), + ) + .await + .map_err(|e| context("build_enqueued", &e))?; + + retry_lifecycle( + &retry, + &mut client, + lifecycle::invocation_started(build_id.clone(), invocation_id.clone()), + ) + .await + .map_err(|e| context("invocation_started", &e))?; + + let mut buffer = RetryBuffer::new(retry.retry_max_buffer_size); + let mut next_seq: i64 = 1; + let mut attempt: u32 = 0; + let mut last_message_sent = false; + + 'reconnect: loop { + let outcome = drive_stream( + &mut client, + &build_id, + &invocation_id, + &mut event_rx, + &overflow_flag, + &mut buffer, + &mut next_seq, + &mut last_message_sent, + ) + .await; + + match outcome { + DriveOutcome::Done | DriveOutcome::UpstreamClosed => break 'reconnect, + DriveOutcome::Transient(err) => { + if attempt >= retry.max_retries { + return Err(finalize( + strategy, + &endpoint, + format!("giving up after {attempt} attempts: {err}"), + )); + } + let delay = backoff(retry.retry_min_delay, attempt); + tokio::time::sleep(delay).await; + attempt += 1; + continue 'reconnect; + } + DriveOutcome::Fatal(err) => { + return Err(finalize( + strategy, + &endpoint, + format!("non-retryable: {err}"), + )); + } + DriveOutcome::BufferFull(err) => { + return Err(finalize(strategy, &endpoint, err.to_string())); + } + } + } + + // We don't know the bazel exit code at this point — that lives on the + // parent process. Submit a successful BuildStatus; a future revision + // can plumb the real status through. + retry_lifecycle( + &retry, + &mut client, + lifecycle::invocation_finished( + build_id.clone(), + invocation_id.clone(), + BuildStatus { + result: 0, + final_invocation_id: build_id.clone(), + build_tool_exit_code: Some(0), + error_message: String::new(), + details: None, + }, + ), + ) + .await + .map_err(|e| context("invocation_finished", &e))?; + + retry_lifecycle( + &retry, + &mut client, + lifecycle::build_finished(build_id.clone(), invocation_id.clone()), + ) + .await + .map_err(|e| context("build_finished", &e))?; + + Ok(()) +} + +/// Drive a single bidi stream until it ends (cleanly or with error). +/// +/// Owns the request channel for this connection. Pulls events from +/// `event_rx`, assigns sequence numbers, buffers them in `buffer`, and pings +/// them down the wire. In parallel reads responses and prunes the buffer up +/// to each ack'd `sequence_number`. Replays any leftover buffered events +/// under their original seqs at the start of the connection. +#[allow(clippy::too_many_arguments)] +async fn drive_stream( + client: &mut Client, + build_id: &str, + invocation_id: &str, + event_rx: &mut tokio::sync::mpsc::Receiver, + overflow_flag: &AtomicBool, + buffer: &mut RetryBuffer, + next_seq: &mut i64, + last_message_sent: &mut bool, +) -> DriveOutcome { + let (sender, receiver) = tokio::sync::mpsc::channel::(64); + let request_stream = ReceiverStream::new(receiver); + + let response_stream = match client.publish_build_tool_event_stream(request_stream).await { + Ok(s) => s.into_inner(), + Err(e) => { + return if is_retryable(&e) { + DriveOutcome::Transient(e) + } else { + DriveOutcome::Fatal(e) + }; + } + }; + let mut response_stream = response_stream; + + // Replay any buffered (unacked) events under their original sequence + // numbers. The server dedups via OrderedBuildEvent.sequence_number. + for (_seq, req) in buffer.iter() { + if sender.send(req.clone()).await.is_err() { + // Server side closed before we could even replay; treat as + // transient so the outer loop reconnects. + return DriveOutcome::Transient(ClientError::Status(tonic::Status::unavailable( + "request stream closed during replay", + ))); + } + } + + let mut sender_opt = Some(sender); + + loop { + tokio::select! { + // Pulling new events from the upstream broadcaster. + // Disabled once last_message has been sent. + ev = event_rx.recv(), if !*last_message_sent && sender_opt.is_some() => { + let Some(event) = ev else { + // event_rx closed. Either the forwarder hit the bounded + // queue cap and bailed (overflow_flag set) — terminal + // per the retry buffer's BufferOverflow contract — or + // the upstream broadcaster closed without a final + // last_message — clean shutdown. + if overflow_flag.load(Ordering::SeqCst) { + return DriveOutcome::BufferFull(BufferOverflow { + cap: buffer.capacity(), + seq: *next_seq, + }); + } + sender_opt = None; + if buffer.is_empty() { + return DriveOutcome::UpstreamClosed; + } + continue; + }; + + let seq = *next_seq; + *next_seq += 1; + let last = event.last_message; + let req = build_tool::bazel_event( + build_id.to_string(), + invocation_id.to_string(), + seq, + &event, + ); + + if let Err(overflow) = buffer.push(seq, req.clone()) { + return DriveOutcome::BufferFull(overflow); + } + + let s = sender_opt.as_ref().unwrap(); + if s.send(req).await.is_err() { + // Request channel rejected: the bidi stream broke. + // Hold onto the buffered event for replay on reconnect. + return DriveOutcome::Transient(ClientError::Status( + tonic::Status::unavailable("request stream closed mid-send"), + )); + } + + if last { + *last_message_sent = true; + // Drop the request side to signal half-close, but stay + // in the loop so we keep draining acks until the server + // closes the response side. + sender_opt = None; + } + } + // Reading server acks from the bidi response stream. + resp = response_stream.next() => { + match resp { + Some(Ok(r)) => { + buffer.prune_until(r.sequence_number); + // Successful ack resets the retry budget for any + // future stream-level error: an intermittent flake + // shouldn't burn the budget for a later real failure. + // The reset is on the outer `attempt` counter, but + // we communicate "made progress" by cleanly returning + // Done only when the upstream is exhausted. + } + Some(Err(status)) => { + let err = ClientError::Status(status); + return if is_retryable(&err) { + DriveOutcome::Transient(err) + } else { + DriveOutcome::Fatal(err) + }; + } + None => { + // Server closed the response stream. + if *last_message_sent && buffer.is_empty() { + return DriveOutcome::Done; + } + if sender_opt.is_none() && buffer.is_empty() { + return DriveOutcome::UpstreamClosed; + } + // Server closed prematurely with unacked events — + // treat as transient so we reconnect and replay. + return DriveOutcome::Transient(ClientError::Status( + tonic::Status::unavailable("response stream closed prematurely"), + )); + } + } + } + } + } +} + +/// Retry an idempotent lifecycle call. Each attempt uses the same backoff +/// as stream reconnects. Per the design, every call gets `max_retries` +/// attempts and the counter is local to the call (a successful lifecycle +/// event resets to a clean state for the next one). +async fn retry_lifecycle( + cfg: &RetryConfig, + client: &mut Client, + request: PublishLifecycleEventRequest, +) -> Result<(), ClientError> { + let mut attempt: u32 = 0; + loop { + match client.publish_lifecycle_event(request.clone()).await { + Ok(_) => return Ok(()), + Err(err) => { + if !is_retryable(&err) || attempt >= cfg.max_retries { + return Err(err); + } + tokio::time::sleep(backoff(cfg.retry_min_delay, attempt)).await; + attempt += 1; + } + } + } +} + +fn finalize(strategy: ErrorStrategy, endpoint: &str, last_error: String) -> SinkError { + if matches!(strategy, ErrorStrategy::Warn) { + eprintln!("WARN: BES sink {endpoint} giving up: {last_error}"); + } + SinkError { + strategy, + last_error, + } +} diff --git a/crates/axl-runtime/src/engine/bazel/sink/mod.rs b/crates/axl-runtime/src/engine/bazel/sink/mod.rs new file mode 100644 index 000000000..8bf858235 --- /dev/null +++ b/crates/axl-runtime/src/engine/bazel/sink/mod.rs @@ -0,0 +1,8 @@ +//! Subscribers that consume events off a build's broadcaster and forward +//! them somewhere — a BES gRPC backend, a tracing span emitter, the +//! execution log on disk. + +pub mod execlog; +pub mod grpc; +pub mod retry; +pub mod tracing; diff --git a/crates/axl-runtime/src/engine/bazel/sink/retry.rs b/crates/axl-runtime/src/engine/bazel/sink/retry.rs new file mode 100644 index 000000000..c600215ce --- /dev/null +++ b/crates/axl-runtime/src/engine/bazel/sink/retry.rs @@ -0,0 +1,296 @@ +//! Retry / backoff machinery for the gRPC BES sink. +//! +//! Mirrors Bazel's `BuildEventServiceUploader`: bounded retry budget with +//! full-jitter exponential backoff, an in-flight buffer for replay across +//! reconnects, and four user-selectable terminal-failure strategies. + +use std::collections::VecDeque; +use std::time::Duration; + +use axl_proto::google::devtools::build::v1::PublishBuildToolEventStreamRequest; +use build_event_stream::client::ClientError; +use rand::Rng; + +/// How a terminal sink failure surfaces to the user. +/// +/// Set per-sink in Starlark via `bazel.build.events.grpc(error_strategy=...)`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ErrorStrategy { + Abort, + FailAtEnd, + Warn, + Ignore, +} + +impl ErrorStrategy { + pub fn parse(s: &str) -> Result { + match s { + "abort" => Ok(ErrorStrategy::Abort), + "fail_at_end" => Ok(ErrorStrategy::FailAtEnd), + "warn" => Ok(ErrorStrategy::Warn), + "ignore" => Ok(ErrorStrategy::Ignore), + other => Err(format!( + "invalid error_strategy '{other}': expected one of 'abort', 'fail_at_end', 'warn', 'ignore'" + )), + } + } +} + +#[derive(Debug, Clone)] +pub struct RetryConfig { + pub max_retries: u32, + pub retry_min_delay: Duration, + pub retry_max_buffer_size: usize, + pub timeout: Option, + pub error_strategy: ErrorStrategy, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_retries: 4, + retry_min_delay: Duration::from_secs(1), + retry_max_buffer_size: 10_000, + timeout: None, + error_strategy: ErrorStrategy::Warn, + } + } +} + +/// Parse a duration string like `"1s"`, `"500ms"`, `"2m"`, `"0s"`. +/// +/// `"0s"` (or any zero value) is the documented sentinel for "no deadline" +/// when used as a timeout; the caller decides what zero means. +pub fn parse_duration(s: &str) -> Result { + let s = s.trim(); + if s.is_empty() { + return Err("empty duration string".into()); + } + let (num_str, unit) = if let Some(rest) = s.strip_suffix("ms") { + (rest, "ms") + } else if let Some(rest) = s.strip_suffix('s') { + (rest, "s") + } else if let Some(rest) = s.strip_suffix('m') { + (rest, "m") + } else if let Some(rest) = s.strip_suffix('h') { + (rest, "h") + } else { + return Err(format!( + "invalid duration '{s}': expected suffix one of 'ms', 's', 'm', 'h'" + )); + }; + let n: u64 = num_str + .trim() + .parse() + .map_err(|e| format!("invalid duration '{s}': {e}"))?; + Ok(match unit { + "ms" => Duration::from_millis(n), + "s" => Duration::from_secs(n), + "m" => Duration::from_secs(n * 60), + "h" => Duration::from_secs(n * 3600), + _ => unreachable!(), + }) +} + +/// Bounded ring of unacked stream events keyed by their original sequence +/// number. On reconnect the entire buffer is replayed before fresh events +/// resume — the BES protocol's per-stream sequence-number dedup makes this +/// safe even if the server already saw some of the replayed events. +pub struct RetryBuffer { + cap: usize, + items: VecDeque<(i64, PublishBuildToolEventStreamRequest)>, +} + +impl RetryBuffer { + pub fn new(cap: usize) -> Self { + Self { + cap, + items: VecDeque::new(), + } + } + + /// Push an event into the buffer. Returns `Err` if the buffer is full — + /// the caller must transition to terminal at that point per the design. + pub fn push( + &mut self, + seq: i64, + req: PublishBuildToolEventStreamRequest, + ) -> Result<(), BufferOverflow> { + if self.items.len() >= self.cap { + return Err(BufferOverflow { cap: self.cap, seq }); + } + self.items.push_back((seq, req)); + Ok(()) + } + + /// Drop every entry with `seq <= ack_seq`. Called when the server acks a + /// response on the bidi stream. + pub fn prune_until(&mut self, ack_seq: i64) { + while let Some((seq, _)) = self.items.front() { + if *seq <= ack_seq { + self.items.pop_front(); + } else { + break; + } + } + } + + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.items.len() + } + + pub fn is_empty(&self) -> bool { + self.items.is_empty() + } + + pub fn capacity(&self) -> usize { + self.cap + } + + pub fn iter(&self) -> impl Iterator { + self.items.iter() + } +} + +#[derive(Debug, thiserror::Error)] +#[error("retry buffer overflowed (cap={cap}) while attempting to buffer seq {seq}")] +pub struct BufferOverflow { + pub cap: usize, + pub seq: i64, +} + +/// Full-jitter exponential backoff. Mirrors Bazel: +/// +/// ```text +/// delay = random(0, min(min_delay * 2^attempt, min_delay * 30)) +/// ``` +pub fn backoff(min_delay: Duration, attempt: u32) -> Duration { + let cap_ns = (min_delay.as_nanos() as u64).saturating_mul(30); + let exp = 1u64 << attempt.min(30); + let upper_ns = (min_delay.as_nanos() as u64) + .saturating_mul(exp) + .min(cap_ns); + if upper_ns == 0 { + return Duration::from_nanos(0); + } + let jitter = rand::thread_rng().gen_range(0..=upper_ns); + Duration::from_nanos(jitter) +} + +/// Whether a `ClientError` should trigger a reconnect attempt (true) or be +/// treated as terminal immediately (false). +pub fn is_retryable(err: &ClientError) -> bool { + use tonic::Code; + match err { + // Transport-level: TLS handshake, h2 protocol error, connection + // reset — all assumed transient. + ClientError::Transport(_) => true, + ClientError::InvalidEndpoint(_) => false, + ClientError::Status(status) => matches!( + status.code(), + Code::Unavailable + | Code::DeadlineExceeded + | Code::ResourceExhausted + | Code::Aborted + | Code::Internal + ), + } +} + +/// Terminal failure of a sink. Carries the user-configured surface strategy +/// plus a human-readable description of the underlying error. Implements +/// `Error` via `thiserror` so sink work functions can use `?` and `wait()` +/// can chain it through `anyhow` without ceremony. +#[derive(Debug, thiserror::Error)] +#[error("{last_error}")] +pub struct SinkError { + pub strategy: ErrorStrategy, + pub last_error: String, +} + +/// What a sink thread returns. `Ok(())` on clean exit; `Err(SinkError)` when +/// the sink gave up — `wait()` decides whether to abort, fail at end, or +/// just warn based on `SinkError::strategy`. +pub type SinkOutcome = Result<(), SinkError>; + +#[cfg(test)] +mod tests { + use super::*; + use axl_proto::google::devtools::build::v1::PublishBuildToolEventStreamRequest; + + fn req() -> PublishBuildToolEventStreamRequest { + PublishBuildToolEventStreamRequest::default() + } + + #[test] + fn buffer_push_until_cap_then_overflow() { + let mut b = RetryBuffer::new(2); + b.push(1, req()).unwrap(); + b.push(2, req()).unwrap(); + let err = b.push(3, req()).unwrap_err(); + assert_eq!(err.cap, 2); + assert_eq!(err.seq, 3); + assert_eq!(b.len(), 2); + } + + #[test] + fn buffer_prune_removes_only_le_ack() { + let mut b = RetryBuffer::new(8); + for i in 1..=5 { + b.push(i, req()).unwrap(); + } + b.prune_until(3); + let seqs: Vec = b.iter().map(|(s, _)| *s).collect(); + assert_eq!(seqs, vec![4, 5]); + } + + #[test] + fn backoff_in_envelope() { + let min = Duration::from_millis(100); + for attempt in 0..10 { + let d = backoff(min, attempt); + let cap = min * 30; + assert!(d <= cap, "attempt {attempt}: {d:?} > {cap:?}"); + } + } + + #[test] + fn parse_duration_units() { + assert_eq!(parse_duration("0s").unwrap(), Duration::from_secs(0)); + assert_eq!(parse_duration("250ms").unwrap(), Duration::from_millis(250)); + assert_eq!(parse_duration("3s").unwrap(), Duration::from_secs(3)); + assert_eq!(parse_duration("2m").unwrap(), Duration::from_secs(120)); + assert_eq!(parse_duration("1h").unwrap(), Duration::from_secs(3600)); + assert!(parse_duration("").is_err()); + assert!(parse_duration("10").is_err()); + assert!(parse_duration("abc").is_err()); + } + + #[test] + fn classifier_status_codes() { + let unavailable = ClientError::Status(tonic::Status::new(tonic::Code::Unavailable, "x")); + let unauth = ClientError::Status(tonic::Status::new(tonic::Code::Unauthenticated, "x")); + let internal = ClientError::Status(tonic::Status::new(tonic::Code::Internal, "x")); + let perm = ClientError::Status(tonic::Status::new(tonic::Code::PermissionDenied, "x")); + assert!(is_retryable(&unavailable)); + assert!(is_retryable(&internal)); + assert!(!is_retryable(&unauth)); + assert!(!is_retryable(&perm)); + } + + #[test] + fn error_strategy_parse() { + assert_eq!(ErrorStrategy::parse("abort").unwrap(), ErrorStrategy::Abort); + assert_eq!( + ErrorStrategy::parse("fail_at_end").unwrap(), + ErrorStrategy::FailAtEnd + ); + assert_eq!(ErrorStrategy::parse("warn").unwrap(), ErrorStrategy::Warn); + assert_eq!( + ErrorStrategy::parse("ignore").unwrap(), + ErrorStrategy::Ignore + ); + assert!(ErrorStrategy::parse("nope").is_err()); + } +} diff --git a/crates/axl-runtime/src/engine/bazel/stream_tracing.rs b/crates/axl-runtime/src/engine/bazel/sink/tracing.rs similarity index 96% rename from crates/axl-runtime/src/engine/bazel/stream_tracing.rs rename to crates/axl-runtime/src/engine/bazel/sink/tracing.rs index 484a13379..7df6c0925 100644 --- a/crates/axl-runtime/src/engine/bazel/stream_tracing.rs +++ b/crates/axl-runtime/src/engine/bazel/sink/tracing.rs @@ -11,10 +11,11 @@ use axl_proto::{ use tracing::{Level, span::EnteredSpan}; -use super::stream::Subscriber; +use super::super::stream::Subscriber; +use super::retry::SinkOutcome; #[derive(Debug)] -pub struct TracingEventStreamSink {} +pub struct Tracing {} fn timestamp_or_now(timestamp: Option<&Timestamp>) -> i64 { timestamp.map_or_else( @@ -28,8 +29,8 @@ fn timestamp_or_now(timestamp: Option<&Timestamp>) -> i64 { ) } -impl TracingEventStreamSink { - pub fn spawn(recv: Subscriber) -> JoinHandle<()> { +impl Tracing { + pub fn spawn(recv: Subscriber) -> JoinHandle { let events_span = tracing::info_span!("events"); thread::spawn(move || { let _events_guard = events_span.enter(); @@ -126,6 +127,7 @@ impl TracingEventStreamSink { _ => {} } } + Ok(()) }) } } diff --git a/crates/axl-runtime/src/engine/bazel/stream/broadcaster.rs b/crates/axl-runtime/src/engine/bazel/stream/broadcaster.rs index 9b6dd688c..4ade179d0 100644 --- a/crates/axl-runtime/src/engine/bazel/stream/broadcaster.rs +++ b/crates/axl-runtime/src/engine/bazel/stream/broadcaster.rs @@ -826,7 +826,7 @@ mod tests { /// Since both clones share the same `inner` Arc, dropping `broadcaster_for_thread` /// only decremented the refcount - it didn't drop the senders in `inner.subscribers`. /// The senders were only dropped when `join()` took the broadcaster from the holder. - /// But by then, `wait()` was already blocked waiting for TracingEventStreamSink + /// But by then, `wait()` was already blocked waiting for sink::tracing::Tracing /// to finish, which was blocked on `recv()` that never saw disconnect. /// /// The fix: Always call `close()` before returning from the BES thread. diff --git a/crates/axl-runtime/src/engine/bazel/stream_sink.rs b/crates/axl-runtime/src/engine/bazel/stream_sink.rs deleted file mode 100644 index bc2a8e09d..000000000 --- a/crates/axl-runtime/src/engine/bazel/stream_sink.rs +++ /dev/null @@ -1,183 +0,0 @@ -use std::{ - collections::HashMap, - sync::mpsc::RecvError, - thread::{self, JoinHandle}, -}; - -use axl_proto::{ - build_event_stream::BuildEvent, - google::devtools::build::v1::{BuildStatus, PublishBuildToolEventStreamRequest}, -}; -use build_event_stream::{ - build_tool, - client::{Client, ClientError}, - lifecycle, -}; - -use thiserror::Error; -use tokio::{sync::mpsc::error::SendError, task}; -use tokio_stream::{StreamExt, wrappers::ReceiverStream}; - -use super::super::r#async::rt::AsyncRuntime; -use super::stream::Subscriber; - -#[derive(Error, Debug)] -pub enum SinkError { - #[error("channel disconnected")] - RecvError(#[from] RecvError), - #[error(transparent)] - ClientError(#[from] ClientError), - #[error(transparent)] - SendError(#[from] SendError), -} - -#[derive(Debug)] -pub struct GrpcEventStreamSink {} - -impl GrpcEventStreamSink { - /// Spawn a gRPC BES forwarding thread. - /// - /// The caller supplies `invocation_id` so that when multiple gRPC sinks - /// are configured for the same invocation (e.g. an Aspect backend plus - /// an internal mirror), every backend indexes this build under the same - /// UUID. That id is reported back to the AXL layer via - /// `Build.sink_invocation_id` — downstream consumers can build a single - /// "View invocation" URL that resolves on any backend configured for - /// this build. - pub fn spawn( - rt: AsyncRuntime, - recv: Subscriber, - endpoint: String, - headers: HashMap, - invocation_id: String, - ) -> JoinHandle<()> { - thread::spawn(move || { - rt.block_on(async { - GrpcEventStreamSink::task_spawn(recv, endpoint, headers, invocation_id) - .await - .await - }) - .expect("failed to join") - .expect("failed to wait") - }) - } - - pub async fn task_spawn( - recv: Subscriber, - endpoint: String, - headers: HashMap, - invocation_id: String, - ) -> task::JoinHandle> { - tokio::task::spawn(GrpcEventStreamSink::work( - recv, - endpoint, - headers, - invocation_id, - )) - } - - async fn work( - recv: Subscriber, - endpoint: String, - headers: HashMap, - invocation_id: String, - ) -> Result<(), SinkError> { - let mut client = Client::new(endpoint, headers).await?; - - let build_id = invocation_id.clone(); - - client - .publish_lifecycle_event(lifecycle::build_enqueued( - build_id.to_string(), - invocation_id.to_string(), - )) - .await?; - - client - .publish_lifecycle_event(lifecycle::invocation_started( - build_id.to_string(), - invocation_id.to_string(), - )) - .await?; - - let seq = 0; - - let (sender, receiver) = - tokio::sync::mpsc::channel::(1); - - let rstream = ReceiverStream::new(receiver); - let stream = client.publish_build_tool_event_stream(rstream); - - // Clone for use in async block - let build_id_for_events = build_id.clone(); - let invocation_id_for_events = invocation_id.clone(); - - let (a, b): (Result<(), SinkError>, Result<(), SinkError>) = tokio::join!( - async move { - let mut stream = stream.await?.into_inner(); - while let Some(event) = stream.next().await { - match event { - // Succesfully received BES event ack - // TODO: Use this information to control how many inflight BES events we should be - // sending. - Ok(_ev) => {} - Err(err) => eprintln!("{}", err), - } - } - Ok(()) - }, - async move { - let mut seq = seq; - loop { - seq += 1; - let event = recv.recv(); - if event.is_err() { - break; - } - let event = event.unwrap(); - - sender - .send(build_tool::bazel_event( - build_id_for_events.to_string(), - invocation_id_for_events.to_string(), - seq, - &event, - )) - .await?; - - if event.last_message { - drop(sender); - break; - } - } - Ok(()) - } - ); - - a?; - b?; - - client - .publish_lifecycle_event(lifecycle::invocation_finished( - build_id.to_string(), - invocation_id.to_string(), - BuildStatus { - result: 0, - final_invocation_id: build_id.to_string(), - build_tool_exit_code: Some(0), - error_message: String::new(), - details: None, - }, - )) - .await?; - - client - .publish_lifecycle_event(lifecycle::build_finished( - build_id.to_string(), - invocation_id.to_string(), - )) - .await?; - - Ok(()) - } -}