From 5e302b6a28a4d4a9d104dd360af585a7cee71222 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 20:47:41 -0400 Subject: [PATCH 01/12] feat(events): define push-event JSON schema Adds the Event enum and RemovedReason for the push-event socket. Wire format is JSONL: snapshot reuses shpool_protocol::Session so the schema matches `shpool list --json`; deltas are flat objects tagged with `type`. --- libshpool/src/events.rs | 123 ++++++++++++++++++++++++++++++++++++++++ libshpool/src/lib.rs | 1 + 2 files changed, 124 insertions(+) create mode 100644 libshpool/src/events.rs diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs new file mode 100644 index 00000000..27794135 --- /dev/null +++ b/libshpool/src/events.rs @@ -0,0 +1,123 @@ +//! Push-event protocol for the daemon. +//! +//! Events are published to subscribers connected to a sibling Unix socket +//! next to the main shpool socket. The wire format is JSON, one event per +//! line (newline-delimited; aka JSONL). Non-Rust clients only need a Unix +//! socket and a JSON parser to consume the stream. +//! +//! On connect, a subscriber receives a `snapshot` event reflecting the +//! current session table, atomically with respect to the table mutations +//! that produce subsequent delta events. After the snapshot, the subscriber +//! receives delta events as the session table changes. To force a re-sync, +//! a subscriber may simply reconnect. +//! +//! The `sessions` field of a snapshot event uses the same schema as the +//! `sessions` field of `shpool list --json`, so the two surfaces stay in +//! sync by construction. + +use serde_derive::Serialize; +use shpool_protocol::Session; + +/// An event published on the events socket. +#[derive(Serialize, Debug)] +#[serde(tag = "type")] +pub enum Event { + /// Sent as the first message after a subscriber connects, reflecting + /// the current session table. + #[serde(rename = "snapshot")] + Snapshot { sessions: Vec }, + + /// A new session was created. + #[serde(rename = "session.created")] + SessionCreated { name: String, started_at_unix_ms: i64 }, + + /// A client attached to an existing session. + #[serde(rename = "session.attached")] + SessionAttached { name: String, last_connected_at_unix_ms: i64 }, + + /// A client detached from a session that is still alive. + #[serde(rename = "session.detached")] + SessionDetached { name: String, last_disconnected_at_unix_ms: i64 }, + + /// A session was removed from the session table. + #[serde(rename = "session.removed")] + SessionRemoved { name: String, reason: RemovedReason }, +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "lowercase")] +pub enum RemovedReason { + /// The shell process exited on its own. + Exited, + /// The session was killed by an explicit `shpool kill` request. + Killed, +} + +#[cfg(test)] +mod tests { + use super::*; + use shpool_protocol::SessionStatus; + + fn json(event: &Event) -> String { + serde_json::to_string(event).unwrap() + } + + #[test] + fn snapshot_serializes_with_sessions_array() { + let event = Event::Snapshot { + sessions: vec![Session { + name: "main".into(), + started_at_unix_ms: 100, + last_connected_at_unix_ms: Some(200), + last_disconnected_at_unix_ms: None, + status: SessionStatus::Attached, + }], + }; + assert_eq!( + json(&event), + r#"{"type":"snapshot","sessions":[{"name":"main","started_at_unix_ms":100,"last_connected_at_unix_ms":200,"last_disconnected_at_unix_ms":null,"status":"Attached"}]}"# + ); + } + + #[test] + fn session_created_serializes_flat() { + let event = Event::SessionCreated { name: "main".into(), started_at_unix_ms: 42 }; + assert_eq!(json(&event), r#"{"type":"session.created","name":"main","started_at_unix_ms":42}"#); + } + + #[test] + fn session_attached_serializes_flat() { + let event = + Event::SessionAttached { name: "main".into(), last_connected_at_unix_ms: 42 }; + assert_eq!( + json(&event), + r#"{"type":"session.attached","name":"main","last_connected_at_unix_ms":42}"# + ); + } + + #[test] + fn session_detached_serializes_flat() { + let event = + Event::SessionDetached { name: "main".into(), last_disconnected_at_unix_ms: 42 }; + assert_eq!( + json(&event), + r#"{"type":"session.detached","name":"main","last_disconnected_at_unix_ms":42}"# + ); + } + + #[test] + fn session_removed_serializes_with_reason() { + let exited = + Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Exited }; + assert_eq!( + json(&exited), + r#"{"type":"session.removed","name":"main","reason":"exited"}"# + ); + let killed = + Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Killed }; + assert_eq!( + json(&killed), + r#"{"type":"session.removed","name":"main","reason":"killed"}"# + ); + } +} diff --git a/libshpool/src/lib.rs b/libshpool/src/lib.rs index aad548d6..1ebc71d3 100644 --- a/libshpool/src/lib.rs +++ b/libshpool/src/lib.rs @@ -36,6 +36,7 @@ mod daemon; mod daemonize; mod detach; mod duration; +mod events; mod exe; mod hooks; mod kill; From bb7193968eb6b4e15984c52d875eaf61653b6975 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 21:01:28 -0400 Subject: [PATCH 02/12] feat(events): add EventBus and sibling events socket listener The daemon binds a sibling Unix socket (events.socket) next to the main shpool socket and accepts long-lived subscribers. Each new subscriber receives a snapshot of the session table as its first message, built under the shells lock so subsequent deltas (published in a follow-up change) cannot race the registration. Per-subscriber writer threads with bounded channels and a write timeout isolate slow or stuck consumers from the daemon's hot path; subscribers that fall behind are dropped and re-sync by reconnecting. Extracts collect_sessions from handle_list so the snapshot and the existing `shpool list --json` output share one schema-producing path. --- libshpool/src/daemon/mod.rs | 4 +- libshpool/src/daemon/server.rs | 101 +++++++----- libshpool/src/events.rs | 272 +++++++++++++++++++++++++++++++++ 3 files changed, 342 insertions(+), 35 deletions(-) diff --git a/libshpool/src/daemon/mod.rs b/libshpool/src/daemon/mod.rs index 7b52ec71..2df4ea7a 100644 --- a/libshpool/src/daemon/mod.rs +++ b/libshpool/src/daemon/mod.rs @@ -17,7 +17,7 @@ use std::{env, os::unix::net::UnixListener, path::PathBuf}; use anyhow::Context; use tracing::{info, instrument}; -use crate::{config, consts, hooks}; +use crate::{config, consts, events, hooks}; mod etc_environment; mod exit_notify; @@ -84,6 +84,8 @@ pub fn run( // spawn the signal handler thread in the background signals::Handler::new(cleanup_socket.clone()).spawn()?; + let _events_guard = server.start_events_listener(events::socket_path(&socket))?; + server::Server::serve(server, listener)?; if let Some(sock) = cleanup_socket { diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 5c8d9cfc..9f3f5799 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -51,7 +51,7 @@ use crate::{ etc_environment, exit_notify::ExitNotifier, hooks, pager, pager::PagerError, prompt, shell, show_motd, ttl_reaper, }, - protocol, test_hooks, tty, user, + events, protocol, test_hooks, tty, user, }; const DEFAULT_INITIAL_SHELL_PATH: &str = "/usr/bin:/bin:/usr/sbin:/sbin"; @@ -76,6 +76,7 @@ pub struct Server { runtime_dir: PathBuf, register_new_reapable_session: crossbeam_channel::Sender<(String, Instant)>, hooks: Box, + pub events_bus: Arc, daily_messenger: Arc, log_level_handle: tracing_subscriber::reload::Handle< tracing_subscriber::filter::LevelFilter, @@ -113,12 +114,39 @@ impl Server { runtime_dir, register_new_reapable_session: new_sess_tx, hooks, + events_bus: events::EventBus::new(), daily_messenger, log_level_handle, vars: HashMap::new().into(), })) } + /// Bind the events socket and spawn the accept thread. Each accepted + /// connection is handed a snapshot built under the session-table lock + /// so subsequent deltas (published under the same lock) cannot race. + /// The returned guard unlinks the socket file on drop. + pub fn start_events_listener( + self: &Arc, + socket_path: PathBuf, + ) -> anyhow::Result { + let server = Arc::clone(self); + events::start_listener(socket_path, move |stream| { + server.handle_events_subscriber(stream) + }) + } + + fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> { + let receiver = { + let _s = span!(Level::INFO, "events_subscribe_lock(shells)").entered(); + let shells = self.shells.lock(); + let sessions = + collect_sessions(&shells).context("collecting snapshot for events subscriber")?; + let snapshot = events::Event::Snapshot { sessions }; + self.events_bus.register(&snapshot) + }; + events::spawn_writer(stream, receiver) + } + #[instrument(skip_all)] pub fn serve(server: Arc, listener: UnixListener) -> anyhow::Result<()> { test_hooks::emit("daemon-about-to-listen"); @@ -729,40 +757,8 @@ impl Server { fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> { let _s = span!(Level::INFO, "lock(shells)").entered(); let shells = self.shells.lock(); - - let sessions: anyhow::Result> = shells - .iter() - .map(|(k, v)| { - let status = match v.inner.try_lock() { - Some(_) => SessionStatus::Disconnected, - None => SessionStatus::Attached, - }; - - let timestamps = v.lifecycle_timestamps.lock(); - let last_connected_at_unix_ms = timestamps - .last_connected_at - .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) - .transpose()?; - - let last_disconnected_at_unix_ms = timestamps - .last_disconnected_at - .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) - .transpose()?; - - Ok(Session { - name: k.to_string(), - started_at_unix_ms: v.started_at.duration_since(time::UNIX_EPOCH)?.as_millis() - as i64, - last_connected_at_unix_ms, - last_disconnected_at_unix_ms, - status, - }) - }) - .collect(); - let sessions = sessions.context("collecting running session metadata")?; - + let sessions = collect_sessions(&shells)?; write_reply(&mut stream, ListReply { sessions })?; - Ok(()) } @@ -1253,6 +1249,43 @@ impl Server { } } +/// Collect a snapshot of the session table for `list` replies and event +/// snapshots. The caller must hold the shells lock for the duration of +/// the call so the resulting list is consistent with concurrent mutators. +fn collect_sessions( + shells: &HashMap>, +) -> anyhow::Result> { + shells + .iter() + .map(|(k, v)| { + let status = match v.inner.try_lock() { + Some(_) => SessionStatus::Disconnected, + None => SessionStatus::Attached, + }; + + let timestamps = v.lifecycle_timestamps.lock(); + let last_connected_at_unix_ms = timestamps + .last_connected_at + .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) + .transpose()?; + let last_disconnected_at_unix_ms = timestamps + .last_disconnected_at + .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) + .transpose()?; + + Ok(Session { + name: k.to_string(), + started_at_unix_ms: v.started_at.duration_since(time::UNIX_EPOCH)?.as_millis() + as i64, + last_connected_at_unix_ms, + last_disconnected_at_unix_ms, + status, + }) + }) + .collect::>>() + .context("collecting running session metadata") +} + // HACK: this is not a good way to detect shells that don't support our // sentinel injection approach, but it is better than just hanging when a // user tries to start one. diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index 27794135..9aae456c 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -15,8 +15,32 @@ //! `sessions` field of `shpool list --json`, so the two surfaces stay in //! sync by construction. +use std::{ + io::Write, + os::unix::net::{UnixListener, UnixStream}, + path::{Path, PathBuf}, + sync::{ + mpsc::{self, Receiver, SyncSender, TrySendError}, + Arc, + }, + thread, + time::Duration, +}; + +use anyhow::Context; +use parking_lot::Mutex; use serde_derive::Serialize; use shpool_protocol::Session; +use tracing::{error, info, warn}; + +/// Per-subscriber outbound queue depth. Subscribers that fall this far +/// behind are dropped; reconnection re-syncs them via a fresh snapshot. +const SUBSCRIBER_QUEUE_DEPTH: usize = 64; + +/// Write timeout for stuck subscribers (e.g. suspended via Ctrl-Z). After +/// this elapses on a blocked write, the writer thread exits and the +/// subscriber is implicitly dropped on the next publish. +const WRITE_TIMEOUT: Duration = Duration::from_secs(5); /// An event published on the events socket. #[derive(Serialize, Debug)] @@ -53,6 +77,149 @@ pub enum RemovedReason { Killed, } +/// Fans out events to all connected subscribers. +/// +/// Lock ordering: callers that publish under another lock (e.g. the session +/// table) must take that lock before [`EventBus::publish`] takes its own +/// internal lock. [`EventBus::register`] follows the same order, so a +/// subscriber registered while the session-table lock is held cannot miss +/// a delta from a mutation that committed under that lock. +pub struct EventBus { + subscribers: Mutex>>>, +} + +impl EventBus { + pub fn new() -> Arc { + Arc::new(Self { subscribers: Mutex::new(Vec::new()) }) + } + + /// Broadcast `event` to all current subscribers. Subscribers whose + /// queues are full or whose receivers have hung up are dropped. + pub fn publish(&self, event: &Event) { + let line = match serialize_line(event) { + Some(line) => line, + None => return, + }; + let mut subs = self.subscribers.lock(); + subs.retain(|tx| match tx.try_send(line.clone()) { + Ok(()) => true, + Err(TrySendError::Full(_)) => { + warn!("dropping events subscriber: queue full"); + false + } + Err(TrySendError::Disconnected(_)) => false, + }); + } + + /// Register a new subscriber with `snapshot` as the first message in + /// its queue. Returns the receiver to be handed to a writer thread. + pub fn register(&self, snapshot: &Event) -> Receiver> { + let line = serialize_line(snapshot).expect("snapshot serialization"); + let (tx, rx) = mpsc::sync_channel(SUBSCRIBER_QUEUE_DEPTH); + tx.try_send(line).expect("seeding empty channel cannot fail"); + self.subscribers.lock().push(tx); + rx + } +} + +fn serialize_line(event: &Event) -> Option> { + match serde_json::to_string(event) { + Ok(s) => Some(format!("{s}\n").into()), + Err(e) => { + error!("serializing event {:?}: {:?}", event, e); + None + } + } +} + +/// Sibling events socket path next to the main shpool socket. +pub fn socket_path(main_socket: &Path) -> PathBuf { + let mut path = main_socket.to_path_buf(); + path.set_file_name("events.socket"); + path +} + +/// Owns the events socket file. Dropping the guard unlinks the socket +/// path so a fresh daemon doesn't trip on stale files. The accept thread +/// is not stopped — daemon shutdown takes the process down. +pub struct ListenerGuard { + path: PathBuf, +} + +impl Drop for ListenerGuard { + fn drop(&mut self) { + match std::fs::remove_file(&self.path) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => warn!("removing events socket {:?}: {:?}", self.path, e), + } + } +} + +/// Bind the events socket and spawn the accept thread. For each accepted +/// connection, `on_accept` is invoked with the stream; it is expected to +/// register the subscriber with the bus and spawn a writer thread (see +/// [`spawn_writer`]). The returned guard unlinks the socket file on drop. +pub fn start_listener( + socket_path: PathBuf, + on_accept: F, +) -> anyhow::Result +where + F: Fn(UnixStream) -> anyhow::Result<()> + Send + 'static, +{ + if socket_path.exists() { + std::fs::remove_file(&socket_path) + .with_context(|| format!("removing stale events socket {:?}", socket_path))?; + } + let listener = UnixListener::bind(&socket_path) + .with_context(|| format!("binding events socket {:?}", socket_path))?; + info!("events socket listening at {:?}", socket_path); + thread::Builder::new() + .name("events-accept".into()) + .spawn(move || run_accept_loop(listener, on_accept)) + .context("spawning events accept thread")?; + Ok(ListenerGuard { path: socket_path }) +} + +fn run_accept_loop(listener: UnixListener, on_accept: F) +where + F: Fn(UnixStream) -> anyhow::Result<()>, +{ + for stream in listener.incoming() { + match stream { + Ok(stream) => { + if let Err(e) = on_accept(stream) { + warn!("accepting events subscriber: {:?}", e); + } + } + Err(e) => { + error!("events listener accept failed: {:?}", e); + break; + } + } + } +} + +/// Set the write timeout and spawn a thread that drains `receiver` to +/// `stream` until either side closes or a write times out. +pub fn spawn_writer(stream: UnixStream, receiver: Receiver>) -> anyhow::Result<()> { + stream.set_write_timeout(Some(WRITE_TIMEOUT)).context("setting write timeout")?; + thread::Builder::new() + .name("events-writer".into()) + .spawn(move || run_writer(stream, receiver)) + .context("spawning events writer thread")?; + Ok(()) +} + +fn run_writer(mut stream: UnixStream, receiver: Receiver>) { + while let Ok(line) = receiver.recv() { + if let Err(e) = stream.write_all(line.as_bytes()) { + info!("events subscriber gone: {:?}", e); + break; + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -105,6 +272,111 @@ mod tests { ); } + #[test] + fn bus_publish_with_no_subscribers_is_a_noop() { + let bus = EventBus::new(); + bus.publish(&Event::SessionCreated { name: "x".into(), started_at_unix_ms: 1 }); + } + + #[test] + fn bus_register_seeds_receiver_with_snapshot() { + let bus = EventBus::new(); + let snapshot = Event::Snapshot { sessions: vec![] }; + let rx = bus.register(&snapshot); + let line = rx.try_recv().unwrap(); + assert_eq!(&*line, "{\"type\":\"snapshot\",\"sessions\":[]}\n"); + } + + #[test] + fn bus_publish_reaches_subscriber_after_snapshot() { + let bus = EventBus::new(); + let rx = bus.register(&Event::Snapshot { sessions: vec![] }); + bus.publish(&Event::SessionCreated { name: "main".into(), started_at_unix_ms: 7 }); + let snapshot_line = rx.recv().unwrap(); + let delta_line = rx.recv().unwrap(); + assert_eq!(&*snapshot_line, "{\"type\":\"snapshot\",\"sessions\":[]}\n"); + assert_eq!( + &*delta_line, + "{\"type\":\"session.created\",\"name\":\"main\",\"started_at_unix_ms\":7}\n" + ); + } + + #[test] + fn bus_drops_subscriber_whose_queue_is_full() { + let bus = EventBus::new(); + let rx = bus.register(&Event::Snapshot { sessions: vec![] }); + // Fill the channel to capacity (the snapshot already used 1 slot). + for i in 0..(SUBSCRIBER_QUEUE_DEPTH - 1) { + bus.publish(&Event::SessionCreated { + name: format!("s{i}"), + started_at_unix_ms: i as i64, + }); + } + assert_eq!(bus.subscribers.lock().len(), 1); + // One more publish overflows and the subscriber is dropped. + bus.publish(&Event::SessionCreated { name: "overflow".into(), started_at_unix_ms: 0 }); + assert_eq!(bus.subscribers.lock().len(), 0); + // The receiver still has the buffered events; the channel is not + // closed for it from the receiving side. + drop(rx); + } + + #[test] + fn bus_drops_subscriber_whose_receiver_hung_up() { + let bus = EventBus::new(); + let rx = bus.register(&Event::Snapshot { sessions: vec![] }); + drop(rx); + bus.publish(&Event::SessionCreated { name: "x".into(), started_at_unix_ms: 0 }); + assert_eq!(bus.subscribers.lock().len(), 0); + } + + #[test] + fn bus_publish_reaches_every_subscriber() { + let bus = EventBus::new(); + let rx_a = bus.register(&Event::Snapshot { sessions: vec![] }); + let rx_b = bus.register(&Event::Snapshot { sessions: vec![] }); + bus.publish(&Event::SessionCreated { name: "main".into(), started_at_unix_ms: 1 }); + for rx in [&rx_a, &rx_b] { + let _snapshot = rx.recv().unwrap(); + let delta = rx.recv().unwrap(); + assert!(delta.contains(r#""type":"session.created""#)); + assert!(delta.contains(r#""name":"main""#)); + } + } + + #[test] + fn writer_exits_when_peer_closes_stream() { + let (a, b) = UnixStream::pair().unwrap(); + let (tx, rx) = mpsc::sync_channel::>(8); + let handle = thread::spawn(move || run_writer(a, rx)); + drop(b); + // The send may succeed (kernel buffered) or fail; what matters is + // that closing the channel unblocks the writer thread on the next + // recv, regardless of write outcome. + let _ = tx.try_send("ignored\n".into()); + drop(tx); + handle.join().unwrap(); + } + + #[test] + fn spawn_writer_sets_write_timeout() { + let (a, _b) = UnixStream::pair().unwrap(); + let probe = a.try_clone().unwrap(); + let (_tx, rx) = mpsc::sync_channel(1); + spawn_writer(a, rx).unwrap(); + assert_eq!(probe.write_timeout().unwrap(), Some(WRITE_TIMEOUT)); + } + + #[test] + fn listener_guard_unlinks_socket_on_drop() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.socket"); + let guard = start_listener(path.clone(), |_| Ok(())).unwrap(); + assert!(path.exists(), "socket file should exist while guard is alive"); + drop(guard); + assert!(!path.exists(), "socket file should be unlinked on guard drop"); + } + #[test] fn session_removed_serializes_with_reason() { let exited = From fd4094042abbe624959c8593a525abd68193a31c Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 21:35:12 -0400 Subject: [PATCH 03/12] feat(events): publish deltas from session lifecycle mutations Emits session.created, session.attached, session.detached, and session.removed (with reason exited|killed) at the seven mutation sites that change the session table: * select_shell_desc create path: created + attached * select_shell_desc reattach path: attached * handle_attach client-disconnect path: detached * handle_attach shell-exit path: removed{exited} * handle_detach: detached * handle_kill: removed{killed} * ttl_reaper expiry: removed{killed} Each publish runs inside the same shells-lock scope as its mutation, so wire-order matches causal-order for any subscriber. Reaping is surfaced as `killed` for now; a dedicated reason can be added later if a use case appears. --- libshpool/src/daemon/server.rs | 55 ++++++++++++++++++++++++------ libshpool/src/daemon/ttl_reaper.rs | 9 +++++ 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 9f3f5799..81694f6d 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -97,12 +97,14 @@ impl Server { >, ) -> anyhow::Result> { let shells = Arc::new(Mutex::new(HashMap::new())); + let events_bus = events::EventBus::new(); // buffered so that we are unlikely to block when setting up a // new session let (new_sess_tx, new_sess_rx) = crossbeam_channel::bounded(10); let shells_tab = Arc::clone(&shells); + let reaper_bus = Arc::clone(&events_bus); thread::spawn(move || { - if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab) { + if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab, reaper_bus) { warn!("ttl reaper exited with error: {:?}", e); } }); @@ -114,7 +116,7 @@ impl Server { runtime_dir, register_new_reapable_session: new_sess_tx, hooks, - events_bus: events::EventBus::new(), + events_bus, daily_messenger, log_level_handle, vars: HashMap::new().into(), @@ -362,6 +364,10 @@ impl Server { let _s = span!(Level::INFO, "2_lock(shells)").entered(); let mut shells = self.shells.lock(); shells.remove(&header.name); + self.events_bus.publish(&events::Event::SessionRemoved { + name: header.name.clone(), + reason: events::RemovedReason::Exited, + }); } // The child shell has exited, so the shell->client thread should @@ -380,8 +386,12 @@ impl Server { let _s = span!(Level::INFO, "disconnect_lock(shells)").entered(); let shells = self.shells.lock(); if let Some(session) = shells.get(&header.name) { - session.lifecycle_timestamps.lock().last_disconnected_at = - Some(time::SystemTime::now()); + let now = time::SystemTime::now(); + session.lifecycle_timestamps.lock().last_disconnected_at = Some(now); + self.events_bus.publish(&events::Event::SessionDetached { + name: header.name.clone(), + last_disconnected_at_unix_ms: unix_ms(now), + }); } } if let Err(err) = self.hooks.on_client_disconnect(&header.name) { @@ -444,8 +454,12 @@ impl Server { // the channel is still open so the subshell is still running info!("taking over existing session inner"); inner.client_stream = Some(stream.try_clone()?); - session.lifecycle_timestamps.lock().last_connected_at = - Some(time::SystemTime::now()); + let now = time::SystemTime::now(); + session.lifecycle_timestamps.lock().last_connected_at = Some(now); + self.events_bus.publish(&events::Event::SessionAttached { + name: header.name.clone(), + last_connected_at_unix_ms: unix_ms(now), + }); if inner .shell_to_client_join_h @@ -529,12 +543,21 @@ impl Server { matches!(motd, MotdDisplayMode::Dump), )?; - session.lifecycle_timestamps.lock().last_connected_at = Some(time::SystemTime::now()); + let now = time::SystemTime::now(); + session.lifecycle_timestamps.lock().last_connected_at = Some(now); + let started_at = session.started_at; { - // we unwrap to propagate the poison as an unwind let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered(); let mut shells = self.shells.lock(); shells.insert(header.name.clone(), Box::new(session)); + self.events_bus.publish(&events::Event::SessionCreated { + name: header.name.clone(), + started_at_unix_ms: unix_ms(started_at), + }); + self.events_bus.publish(&events::Event::SessionAttached { + name: header.name.clone(), + last_connected_at_unix_ms: unix_ms(now), + }); } // we unwrap to propagate the poison as an unwind @@ -632,8 +655,12 @@ impl Server { if let shell::ClientConnectionStatus::DetachNone = status { not_attached_sessions.push(session); } else { - s.lifecycle_timestamps.lock().last_disconnected_at = - Some(time::SystemTime::now()); + let now = time::SystemTime::now(); + s.lifecycle_timestamps.lock().last_disconnected_at = Some(now); + self.events_bus.publish(&events::Event::SessionDetached { + name: session.clone(), + last_disconnected_at_unix_ms: unix_ms(now), + }); } } else { not_found_sessions.push(session); @@ -742,6 +769,10 @@ impl Server { for session in to_remove.iter() { shells.remove(session); + self.events_bus.publish(&events::Event::SessionRemoved { + name: session.clone(), + reason: events::RemovedReason::Killed, + }); } if !to_remove.is_empty() { test_hooks::emit("daemon-handle-kill-removed-shells"); @@ -1249,6 +1280,10 @@ impl Server { } } +fn unix_ms(t: time::SystemTime) -> i64 { + t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64).unwrap_or(0) +} + /// Collect a snapshot of the session table for `list` replies and event /// snapshots. The caller must hold the shells lock for the duration of /// the call so the resulting list is consistent with concurrent mutators. diff --git a/libshpool/src/daemon/ttl_reaper.rs b/libshpool/src/daemon/ttl_reaper.rs index b85b8c21..a42ab0f2 100644 --- a/libshpool/src/daemon/ttl_reaper.rs +++ b/libshpool/src/daemon/ttl_reaper.rs @@ -31,12 +31,14 @@ use parking_lot::Mutex; use tracing::{info, span, warn, Level}; use super::shell; +use crate::events; /// Run the reaper thread loop. Should be invoked in a dedicated /// thread. pub fn run( new_sess: crossbeam_channel::Receiver<(String, Instant)>, shells: Arc>>>, + events_bus: Arc, ) -> anyhow::Result<()> { let _s = span!(Level::INFO, "ttl_reaper").entered(); @@ -116,6 +118,13 @@ pub fn run( continue; } shells.remove(&reapable.session_name); + // Reaping is a daemon-initiated termination; surfaced to + // subscribers as `killed` until we have a use case for a + // dedicated reason. + events_bus.publish(&events::Event::SessionRemoved { + name: reapable.session_name.clone(), + reason: events::RemovedReason::Killed, + }); } } } From a8e11c029ae6cd885ca55205006fee7ec4423684 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 21:37:54 -0400 Subject: [PATCH 04/12] feat(events): add `shpool events` subcommand Connects to the daemon's events socket and prints each JSON line to stdout, flushing per line so the stream is pipeline-friendly: shpool events | jq 'select(.type == "session.removed")' The first line is a snapshot of the current session table; subsequent lines are deltas. Reconnect to force a fresh snapshot. --- libshpool/src/events.rs | 19 ++++++++++++++++++- libshpool/src/lib.rs | 10 ++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index 9aae456c..5146dc7a 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -16,7 +16,7 @@ //! sync by construction. use std::{ - io::Write, + io::{BufRead, BufReader, Write}, os::unix::net::{UnixListener, UnixStream}, path::{Path, PathBuf}, sync::{ @@ -132,6 +132,23 @@ fn serialize_line(event: &Event) -> Option> { } } +/// Connect to the events socket, copy each line to stdout, and flush per +/// line so the stream is usable in pipes (`shpool events | jq`). Returns +/// when the daemon closes the connection. +pub fn subscribe_to_stdout(socket_path: &Path) -> anyhow::Result<()> { + let stream = UnixStream::connect(socket_path) + .with_context(|| format!("connecting to events socket {:?}", socket_path))?; + let reader = BufReader::new(stream); + let stdout = std::io::stdout(); + let mut out = stdout.lock(); + for line in reader.lines() { + let line = line.context("reading event")?; + writeln!(out, "{line}").context("writing event")?; + out.flush().context("flushing stdout")?; + } + Ok(()) +} + /// Sibling events socket path next to the main shpool socket. pub fn socket_path(main_socket: &Path) -> PathBuf { let mut path = main_socket.to_path_buf(); diff --git a/libshpool/src/lib.rs b/libshpool/src/lib.rs index 1ebc71d3..e465df19 100644 --- a/libshpool/src/lib.rs +++ b/libshpool/src/lib.rs @@ -232,6 +232,15 @@ you could just do `shpool var set workspace key-bugfix`. #[clap(subcommand)] command: VarCommands, }, + + #[clap(about = "Subscribe to the daemon's push-event stream + +Connects to the events socket and writes each event (one JSON object +per line) to stdout, flushing after every line so the stream is +pipeline-friendly (e.g. `shpool events | jq`). The first line is a +snapshot of the current session table; subsequent lines are deltas. +Reconnect to force a fresh snapshot.")] + Events, } /// The subcommds of the var command. @@ -434,6 +443,7 @@ pub fn run(args: Args, hooks: Option>) -> an Commands::List { json } => list::run(socket, json), Commands::SetLogLevel { level } => set_log_level::run(level, socket), Commands::Var { command } => var::run(socket, command), + Commands::Events => events::subscribe_to_stdout(&events::socket_path(&socket)), }; if let Err(err) = res { From cab78b527f5483b072c62c5bbd85f83c52408a95 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 21:52:12 -0400 Subject: [PATCH 05/12] test(events): integration tests for the events socket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three end-to-end tests, exercising the full daemon → events socket → JSON wire path: * snapshot_then_lifecycle: snapshot, then session.created / .attached / .detached / .removed{killed} as a session is created, detached (via background mode), and killed. * snapshot_includes_existing_sessions: a subscriber that connects after a session already exists receives that session in the snapshot. * multiple_subscribers_each_get_independent_streams: two concurrent subscribers both receive the snapshot and full delta sequence. --- shpool/tests/events.rs | 146 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 shpool/tests/events.rs diff --git a/shpool/tests/events.rs b/shpool/tests/events.rs new file mode 100644 index 00000000..aac6fbc6 --- /dev/null +++ b/shpool/tests/events.rs @@ -0,0 +1,146 @@ +use std::{ + io::{BufRead, BufReader}, + os::unix::net::UnixStream, + path::PathBuf, + time::Duration, +}; + +use anyhow::{anyhow, Context}; +use ntest::timeout; +use serde_json::Value; + +mod support; + +use crate::support::daemon::{AttachArgs, DaemonArgs, Proc}; + +fn events_socket_path(daemon: &Proc) -> PathBuf { + daemon.socket_path.with_file_name("events.socket") +} + +fn connect_events(daemon: &Proc) -> anyhow::Result> { + let path = events_socket_path(daemon); + let mut sleep_dur = Duration::from_millis(5); + for _ in 0..12 { + if let Ok(stream) = UnixStream::connect(&path) { + return Ok(BufReader::new(stream)); + } + std::thread::sleep(sleep_dur); + sleep_dur *= 2; + } + Err(anyhow!("events socket never became available at {:?}", path)) +} + +fn next_event(reader: &mut BufReader) -> anyhow::Result { + let mut line = String::new(); + let n = reader.read_line(&mut line).context("reading event line")?; + if n == 0 { + return Err(anyhow!("events socket closed unexpectedly")); + } + serde_json::from_str(&line).with_context(|| format!("parsing event JSON: {line:?}")) +} + +#[test] +#[timeout(30000)] +fn snapshot_then_lifecycle() -> anyhow::Result<()> { + let mut daemon = Proc::new( + "norc.toml", + DaemonArgs { listen_events: false, ..DaemonArgs::default() }, + ) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + + let snap = next_event(&mut sub)?; + assert_eq!(snap["type"], "snapshot"); + assert_eq!(snap["sessions"].as_array().unwrap().len(), 0); + + // Background attach: client connects, daemon publishes created+attached; + // the client immediately detaches, triggering the detached event. + let _attach = daemon + .attach( + "s1", + AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, + ) + .context("starting attach proc")?; + + let created = next_event(&mut sub)?; + assert_eq!(created["type"], "session.created"); + assert_eq!(created["name"], "s1"); + assert!(created["started_at_unix_ms"].is_number()); + + let attached = next_event(&mut sub)?; + assert_eq!(attached["type"], "session.attached"); + assert_eq!(attached["name"], "s1"); + + let detached = next_event(&mut sub)?; + assert_eq!(detached["type"], "session.detached"); + assert_eq!(detached["name"], "s1"); + + let kill_out = daemon.kill(vec!["s1".into()]).context("running kill")?; + assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); + + let removed = next_event(&mut sub)?; + assert_eq!(removed["type"], "session.removed"); + assert_eq!(removed["name"], "s1"); + assert_eq!(removed["reason"], "killed"); + + Ok(()) +} + +#[test] +#[timeout(30000)] +fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { + let mut daemon = Proc::new( + "norc.toml", + DaemonArgs { listen_events: false, ..DaemonArgs::default() }, + ) + .context("starting daemon proc")?; + + let _attach = daemon + .attach( + "pre-existing", + AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, + ) + .context("starting attach proc")?; + + // Wait for the session to land in the table before subscribing. + daemon.wait_until_list_matches(|out| out.contains("pre-existing"))?; + + let mut sub = connect_events(&daemon)?; + let snap = next_event(&mut sub)?; + assert_eq!(snap["type"], "snapshot"); + let sessions = snap["sessions"].as_array().unwrap(); + assert_eq!(sessions.len(), 1); + assert_eq!(sessions[0]["name"], "pre-existing"); + + Ok(()) +} + +#[test] +#[timeout(30000)] +fn multiple_subscribers_each_get_independent_streams() -> anyhow::Result<()> { + let mut daemon = Proc::new( + "norc.toml", + DaemonArgs { listen_events: false, ..DaemonArgs::default() }, + ) + .context("starting daemon proc")?; + let mut sub_a = connect_events(&daemon)?; + let mut sub_b = connect_events(&daemon)?; + + assert_eq!(next_event(&mut sub_a)?["type"], "snapshot"); + assert_eq!(next_event(&mut sub_b)?["type"], "snapshot"); + + let _attach = daemon + .attach( + "shared", + AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, + ) + .context("starting attach proc")?; + + for sub in [&mut sub_a, &mut sub_b] { + assert_eq!(next_event(sub)?["type"], "session.created"); + assert_eq!(next_event(sub)?["type"], "session.attached"); + assert_eq!(next_event(sub)?["type"], "session.detached"); + } + + Ok(()) +} From a33674293962558e9ba579a1bb65cc92fb864989 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Mon, 27 Apr 2026 09:09:34 -0400 Subject: [PATCH 06/12] fix(events): de-duplicate lifecycle deltas; clean socket on signal exit - Gate the SessionRemoved{Exited} publish in handle_attach's shell-exit branch on shells.remove() actually returning Some, so a concurrent kill or reaper that already removed the entry doesn't produce a duplicate removal event. - Drop the eager SessionDetached publish in handle_detach. The bidi-loop unwind path in handle_attach already publishes the matching event with its own timestamp; emitting it twice was observable to subscribers. Keep the eager last_disconnected_at write so concurrent list() callers still see fresh state immediately. - In select_shell_desc, defer the reattach SessionAttached publish past the is_finished() check so it isn't emitted for a session about to be implicitly clobbered by the create path. Have the create path publish SessionRemoved{Exited} when it overwrites an existing entry, so the replacement is explicit on the wire. - Thread the events socket path through signals::Handler so signal exits clean it up alongside the main socket. Switch to a Vec to handle both. Tolerate NotFound on cleanup since the events socket may not have been bound yet when a signal arrives. - Drop the unnecessary `pub` on Server.events_bus. - Apply nightly rustfmt. Three new integration tests: - explicit_detach_publishes_one_event: pins the no-duplicate-detached invariant by using a kill as a known-next-event fence; a duplicate detached would surface as the next read instead of session.removed. - signal_exit_unlinks_sockets: SIGTERM the daemon and assert both socket files are gone. - reattach_emits_attached_only: regression guard for the reattach path. --- libshpool/src/daemon/mod.rs | 12 ++- libshpool/src/daemon/server.rs | 57 +++++++------ libshpool/src/daemon/signals.rs | 16 ++-- libshpool/src/events.rs | 29 +++---- shpool/tests/events.rs | 138 +++++++++++++++++++++++++++----- 5 files changed, 180 insertions(+), 72 deletions(-) diff --git a/libshpool/src/daemon/mod.rs b/libshpool/src/daemon/mod.rs index 2df4ea7a..d23e8357 100644 --- a/libshpool/src/daemon/mod.rs +++ b/libshpool/src/daemon/mod.rs @@ -81,10 +81,16 @@ pub fn run( (Some(socket.clone()), UnixListener::bind(&socket).context("binding to socket")?) } }; - // spawn the signal handler thread in the background - signals::Handler::new(cleanup_socket.clone()).spawn()?; + let events_socket = events::socket_path(&socket); - let _events_guard = server.start_events_listener(events::socket_path(&socket))?; + // spawn the signal handler thread in the background. Both sockets need + // explicit cleanup on signal exit because the process exits before any + // RAII guard can run. + let mut socks_to_clean: Vec = cleanup_socket.iter().cloned().collect(); + socks_to_clean.push(events_socket.clone()); + signals::Handler::new(socks_to_clean).spawn()?; + + let _events_guard = server.start_events_listener(events_socket)?; server::Server::serve(server, listener)?; diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 81694f6d..5111243c 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -76,7 +76,7 @@ pub struct Server { runtime_dir: PathBuf, register_new_reapable_session: crossbeam_channel::Sender<(String, Instant)>, hooks: Box, - pub events_bus: Arc, + events_bus: Arc, daily_messenger: Arc, log_level_handle: tracing_subscriber::reload::Handle< tracing_subscriber::filter::LevelFilter, @@ -132,9 +132,7 @@ impl Server { socket_path: PathBuf, ) -> anyhow::Result { let server = Arc::clone(self); - events::start_listener(socket_path, move |stream| { - server.handle_events_subscriber(stream) - }) + events::start_listener(socket_path, move |stream| server.handle_events_subscriber(stream)) } fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> { @@ -363,11 +361,14 @@ impl Server { { let _s = span!(Level::INFO, "2_lock(shells)").entered(); let mut shells = self.shells.lock(); - shells.remove(&header.name); - self.events_bus.publish(&events::Event::SessionRemoved { - name: header.name.clone(), - reason: events::RemovedReason::Exited, - }); + // Gated: a concurrent kill or reaper may have already + // removed the entry and published its own removal. + if shells.remove(&header.name).is_some() { + self.events_bus.publish(&events::Event::SessionRemoved { + name: header.name.clone(), + reason: events::RemovedReason::Exited, + }); + } } // The child shell has exited, so the shell->client thread should @@ -456,10 +457,6 @@ impl Server { inner.client_stream = Some(stream.try_clone()?); let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().last_connected_at = Some(now); - self.events_bus.publish(&events::Event::SessionAttached { - name: header.name.clone(), - last_connected_at_unix_ms: unix_ms(now), - }); if inner .shell_to_client_join_h @@ -471,6 +468,12 @@ impl Server { "child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell" ); } else { + // Reattach confirmed; the create path won't run + // and clobber the entry, so it's safe to publish. + self.events_bus.publish(&events::Event::SessionAttached { + name: header.name.clone(), + last_connected_at_unix_ms: unix_ms(now), + }); if let Err(err) = self.hooks.on_reattach(&header.name) { warn!("reattach hook: {:?}", err); } @@ -487,8 +490,6 @@ impl Server { AttachStatus::Attached { warnings }, )); } - - // status is already attached } Some(exit_status) => { // the channel is closed so we know the subshell exited @@ -549,7 +550,17 @@ impl Server { { let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered(); let mut shells = self.shells.lock(); + // If we're replacing a stale entry whose shell process is + // gone, surface that to subscribers before announcing the + // replacement. + let clobbered = shells.contains_key(&header.name); shells.insert(header.name.clone(), Box::new(session)); + if clobbered { + self.events_bus.publish(&events::Event::SessionRemoved { + name: header.name.clone(), + reason: events::RemovedReason::Exited, + }); + } self.events_bus.publish(&events::Event::SessionCreated { name: header.name.clone(), started_at_unix_ms: unix_ms(started_at), @@ -655,12 +666,12 @@ impl Server { if let shell::ClientConnectionStatus::DetachNone = status { not_attached_sessions.push(session); } else { - let now = time::SystemTime::now(); - s.lifecycle_timestamps.lock().last_disconnected_at = Some(now); - self.events_bus.publish(&events::Event::SessionDetached { - name: session.clone(), - last_disconnected_at_unix_ms: unix_ms(now), - }); + // The bidi-loop unwind in handle_attach owns the + // SessionDetached publish (with its own timestamp); + // we just update last_disconnected_at eagerly so a + // concurrent list() reflects the detach immediately. + s.lifecycle_timestamps.lock().last_disconnected_at = + Some(time::SystemTime::now()); } } else { not_found_sessions.push(session); @@ -1287,9 +1298,7 @@ fn unix_ms(t: time::SystemTime) -> i64 { /// Collect a snapshot of the session table for `list` replies and event /// snapshots. The caller must hold the shells lock for the duration of /// the call so the resulting list is consistent with concurrent mutators. -fn collect_sessions( - shells: &HashMap>, -) -> anyhow::Result> { +fn collect_sessions(shells: &HashMap>) -> anyhow::Result> { shells .iter() .map(|(k, v)| { diff --git a/libshpool/src/daemon/signals.rs b/libshpool/src/daemon/signals.rs index 3359883d..5e533f04 100644 --- a/libshpool/src/daemon/signals.rs +++ b/libshpool/src/daemon/signals.rs @@ -23,11 +23,11 @@ use signal_hook::{consts::TERM_SIGNALS, flag, iterator::Signals}; use tracing::{error, info}; pub struct Handler { - sock: Option, + socks: Vec, } impl Handler { - pub fn new(sock: Option) -> Self { - Handler { sock } + pub fn new(socks: Vec) -> Self { + Handler { socks } } pub fn spawn(self) -> anyhow::Result<()> { @@ -57,10 +57,12 @@ impl Handler { for signal in &mut signals { assert!(TERM_SIGNALS.contains(&signal)); - info!("term sig handler: cleaning up socket"); - if let Some(sock) = self.sock { - if let Err(e) = std::fs::remove_file(sock).context("cleaning up socket") { - error!("error cleaning up socket file: {}", e); + info!("term sig handler: cleaning up sockets"); + for sock in &self.socks { + match std::fs::remove_file(sock) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => error!("error cleaning up socket {:?}: {}", sock, e), } } diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index 5146dc7a..23a36c97 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -177,10 +177,7 @@ impl Drop for ListenerGuard { /// connection, `on_accept` is invoked with the stream; it is expected to /// register the subscriber with the bus and spawn a writer thread (see /// [`spawn_writer`]). The returned guard unlinks the socket file on drop. -pub fn start_listener( - socket_path: PathBuf, - on_accept: F, -) -> anyhow::Result +pub fn start_listener(socket_path: PathBuf, on_accept: F) -> anyhow::Result where F: Fn(UnixStream) -> anyhow::Result<()> + Send + 'static, { @@ -266,13 +263,15 @@ mod tests { #[test] fn session_created_serializes_flat() { let event = Event::SessionCreated { name: "main".into(), started_at_unix_ms: 42 }; - assert_eq!(json(&event), r#"{"type":"session.created","name":"main","started_at_unix_ms":42}"#); + assert_eq!( + json(&event), + r#"{"type":"session.created","name":"main","started_at_unix_ms":42}"# + ); } #[test] fn session_attached_serializes_flat() { - let event = - Event::SessionAttached { name: "main".into(), last_connected_at_unix_ms: 42 }; + let event = Event::SessionAttached { name: "main".into(), last_connected_at_unix_ms: 42 }; assert_eq!( json(&event), r#"{"type":"session.attached","name":"main","last_connected_at_unix_ms":42}"# @@ -396,17 +395,9 @@ mod tests { #[test] fn session_removed_serializes_with_reason() { - let exited = - Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Exited }; - assert_eq!( - json(&exited), - r#"{"type":"session.removed","name":"main","reason":"exited"}"# - ); - let killed = - Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Killed }; - assert_eq!( - json(&killed), - r#"{"type":"session.removed","name":"main","reason":"killed"}"# - ); + let exited = Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Exited }; + assert_eq!(json(&exited), r#"{"type":"session.removed","name":"main","reason":"exited"}"#); + let killed = Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Killed }; + assert_eq!(json(&killed), r#"{"type":"session.removed","name":"main","reason":"killed"}"#); } } diff --git a/shpool/tests/events.rs b/shpool/tests/events.rs index aac6fbc6..cbda498d 100644 --- a/shpool/tests/events.rs +++ b/shpool/tests/events.rs @@ -42,11 +42,9 @@ fn next_event(reader: &mut BufReader) -> anyhow::Result { #[test] #[timeout(30000)] fn snapshot_then_lifecycle() -> anyhow::Result<()> { - let mut daemon = Proc::new( - "norc.toml", - DaemonArgs { listen_events: false, ..DaemonArgs::default() }, - ) - .context("starting daemon proc")?; + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; let mut sub = connect_events(&daemon)?; let snap = next_event(&mut sub)?; @@ -56,10 +54,7 @@ fn snapshot_then_lifecycle() -> anyhow::Result<()> { // Background attach: client connects, daemon publishes created+attached; // the client immediately detaches, triggering the detached event. let _attach = daemon - .attach( - "s1", - AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, - ) + .attach("s1", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) .context("starting attach proc")?; let created = next_event(&mut sub)?; @@ -89,11 +84,9 @@ fn snapshot_then_lifecycle() -> anyhow::Result<()> { #[test] #[timeout(30000)] fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { - let mut daemon = Proc::new( - "norc.toml", - DaemonArgs { listen_events: false, ..DaemonArgs::default() }, - ) - .context("starting daemon proc")?; + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; let _attach = daemon .attach( @@ -115,14 +108,121 @@ fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { Ok(()) } +// `shpool detach` triggers two code paths that both updated the session: +// the explicit handler and, asynchronously, the bidi-loop unwind in the +// attach worker. An earlier version emitted SessionDetached from both, +// producing a duplicate event with two different timestamps. This test +// pins exactly one detached event per detach by using a kill as a +// known-next-event fence — if a duplicate detached were buffered, the +// next read would return it instead of `session.removed`. +#[test] +#[timeout(30000)] +fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + let _snap = next_event(&mut sub)?; + + // Foreground attach (no `background`) keeps the session attached. + let _attach = daemon + .attach("s", AttachArgs { null_stdin: true, ..AttachArgs::default() }) + .context("starting attach proc")?; + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + + let detach_out = daemon.detach(vec!["s".into()]).context("running detach")?; + assert!(detach_out.status.success(), "detach failed: {:?}", detach_out); + + let detached = next_event(&mut sub)?; + assert_eq!(detached["type"], "session.detached"); + assert_eq!(detached["name"], "s"); + + // Wait for the unwind path to complete (session shows disconnected in + // the list output) so any duplicate detached event would already be + // queued by the time we issue the kill below. + daemon.wait_until_list_matches(|out| out.contains("disconnected"))?; + + let kill_out = daemon.kill(vec!["s".into()]).context("running kill")?; + assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); + + let next = next_event(&mut sub)?; + assert_eq!( + next["type"], "session.removed", + "expected next event to be removed, got {next} — possible duplicate detached" + ); + assert_eq!(next["reason"], "killed"); + + Ok(()) +} + +// Reattach should produce a single `session.attached` for the existing +// session, with no `session.created`. Catches regressions where the +// reattach path accidentally falls through to the create path. +#[test] +#[timeout(30000)] +fn reattach_emits_attached_only() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + let _snap = next_event(&mut sub)?; + + let _attach1 = daemon + .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("first attach")?; + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + daemon.wait_until_list_matches(|out| out.contains("disconnected"))?; + + let _attach2 = daemon + .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("reattach")?; + + let attached = next_event(&mut sub)?; + assert_eq!( + attached["type"], "session.attached", + "expected attached on reattach, got {attached}" + ); + assert_eq!(attached["name"], "s"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + Ok(()) +} + +// SIGTERM should clean up both sockets via the signal handler, since +// process::exit bypasses any RAII guard. +#[test] +#[timeout(30000)] +fn signal_exit_unlinks_sockets() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let main_sock = daemon.socket_path.clone(); + let events_sock = events_socket_path(&daemon); + assert!(main_sock.exists(), "main socket should exist while daemon runs"); + assert!(events_sock.exists(), "events socket should exist while daemon runs"); + + let pid = nix::unistd::Pid::from_raw( + daemon.proc.as_ref().expect("daemon process handle").id() as i32 + ); + nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM).context("sending SIGTERM")?; + daemon.proc_wait().context("waiting for daemon to exit")?; + + assert!(!main_sock.exists(), "main socket should be unlinked on signal exit"); + assert!(!events_sock.exists(), "events socket should be unlinked on signal exit"); + + Ok(()) +} + #[test] #[timeout(30000)] fn multiple_subscribers_each_get_independent_streams() -> anyhow::Result<()> { - let mut daemon = Proc::new( - "norc.toml", - DaemonArgs { listen_events: false, ..DaemonArgs::default() }, - ) - .context("starting daemon proc")?; + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; let mut sub_a = connect_events(&daemon)?; let mut sub_b = connect_events(&daemon)?; From 3f38bb79d5d10c7918735232c13c8a64fe9d7f0d Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Mon, 27 Apr 2026 21:40:30 -0400 Subject: [PATCH 07/12] refactor(events): trim event payloads to type fields only Each event now serializes as `{"type":"session."}` with no other fields. To learn what the event refers to, subscribers follow up with `shpool list` (or send `ConnectHeader::List` over the main socket). - Drop the welcome `snapshot` event; subscribers do their own bootstrap list call after connecting. - Drop `name`, timestamps, and `reason` from the lifecycle events. - Remove `RemovedReason`; reaped/killed/exited share `session.removed`. - Inline `collect_sessions` back into `handle_list` (the snapshot was its only other consumer) and drop the now-unused `unix_ms` helper. --- libshpool/src/daemon/server.rs | 133 ++++++++--------------- libshpool/src/daemon/ttl_reaper.rs | 8 +- libshpool/src/events.rs | 166 +++++++---------------------- shpool/tests/events.rs | 72 ++----------- 4 files changed, 96 insertions(+), 283 deletions(-) diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 5111243c..397ea443 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -123,10 +123,8 @@ impl Server { })) } - /// Bind the events socket and spawn the accept thread. Each accepted - /// connection is handed a snapshot built under the session-table lock - /// so subsequent deltas (published under the same lock) cannot race. - /// The returned guard unlinks the socket file on drop. + /// Bind the events socket and spawn the accept thread. The returned + /// guard unlinks the socket file on drop. pub fn start_events_listener( self: &Arc, socket_path: PathBuf, @@ -136,14 +134,7 @@ impl Server { } fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> { - let receiver = { - let _s = span!(Level::INFO, "events_subscribe_lock(shells)").entered(); - let shells = self.shells.lock(); - let sessions = - collect_sessions(&shells).context("collecting snapshot for events subscriber")?; - let snapshot = events::Event::Snapshot { sessions }; - self.events_bus.register(&snapshot) - }; + let receiver = self.events_bus.register(); events::spawn_writer(stream, receiver) } @@ -364,10 +355,7 @@ impl Server { // Gated: a concurrent kill or reaper may have already // removed the entry and published its own removal. if shells.remove(&header.name).is_some() { - self.events_bus.publish(&events::Event::SessionRemoved { - name: header.name.clone(), - reason: events::RemovedReason::Exited, - }); + self.events_bus.publish(&events::Event::SessionRemoved); } } @@ -387,12 +375,9 @@ impl Server { let _s = span!(Level::INFO, "disconnect_lock(shells)").entered(); let shells = self.shells.lock(); if let Some(session) = shells.get(&header.name) { - let now = time::SystemTime::now(); - session.lifecycle_timestamps.lock().last_disconnected_at = Some(now); - self.events_bus.publish(&events::Event::SessionDetached { - name: header.name.clone(), - last_disconnected_at_unix_ms: unix_ms(now), - }); + session.lifecycle_timestamps.lock().last_disconnected_at = + Some(time::SystemTime::now()); + self.events_bus.publish(&events::Event::SessionDetached); } } if let Err(err) = self.hooks.on_client_disconnect(&header.name) { @@ -470,10 +455,7 @@ impl Server { } else { // Reattach confirmed; the create path won't run // and clobber the entry, so it's safe to publish. - self.events_bus.publish(&events::Event::SessionAttached { - name: header.name.clone(), - last_connected_at_unix_ms: unix_ms(now), - }); + self.events_bus.publish(&events::Event::SessionAttached); if let Err(err) = self.hooks.on_reattach(&header.name) { warn!("reattach hook: {:?}", err); } @@ -544,9 +526,7 @@ impl Server { matches!(motd, MotdDisplayMode::Dump), )?; - let now = time::SystemTime::now(); - session.lifecycle_timestamps.lock().last_connected_at = Some(now); - let started_at = session.started_at; + session.lifecycle_timestamps.lock().last_connected_at = Some(time::SystemTime::now()); { let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered(); let mut shells = self.shells.lock(); @@ -556,19 +536,10 @@ impl Server { let clobbered = shells.contains_key(&header.name); shells.insert(header.name.clone(), Box::new(session)); if clobbered { - self.events_bus.publish(&events::Event::SessionRemoved { - name: header.name.clone(), - reason: events::RemovedReason::Exited, - }); + self.events_bus.publish(&events::Event::SessionRemoved); } - self.events_bus.publish(&events::Event::SessionCreated { - name: header.name.clone(), - started_at_unix_ms: unix_ms(started_at), - }); - self.events_bus.publish(&events::Event::SessionAttached { - name: header.name.clone(), - last_connected_at_unix_ms: unix_ms(now), - }); + self.events_bus.publish(&events::Event::SessionCreated); + self.events_bus.publish(&events::Event::SessionAttached); } // we unwrap to propagate the poison as an unwind @@ -667,9 +638,9 @@ impl Server { not_attached_sessions.push(session); } else { // The bidi-loop unwind in handle_attach owns the - // SessionDetached publish (with its own timestamp); - // we just update last_disconnected_at eagerly so a - // concurrent list() reflects the detach immediately. + // SessionDetached publish; we just update + // last_disconnected_at eagerly so a concurrent list() + // reflects the detach immediately. s.lifecycle_timestamps.lock().last_disconnected_at = Some(time::SystemTime::now()); } @@ -780,10 +751,7 @@ impl Server { for session in to_remove.iter() { shells.remove(session); - self.events_bus.publish(&events::Event::SessionRemoved { - name: session.clone(), - reason: events::RemovedReason::Killed, - }); + self.events_bus.publish(&events::Event::SessionRemoved); } if !to_remove.is_empty() { test_hooks::emit("daemon-handle-kill-removed-shells"); @@ -799,7 +767,35 @@ impl Server { fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> { let _s = span!(Level::INFO, "lock(shells)").entered(); let shells = self.shells.lock(); - let sessions = collect_sessions(&shells)?; + let sessions = shells + .iter() + .map(|(k, v)| { + let status = match v.inner.try_lock() { + Some(_) => SessionStatus::Disconnected, + None => SessionStatus::Attached, + }; + + let timestamps = v.lifecycle_timestamps.lock(); + let last_connected_at_unix_ms = timestamps + .last_connected_at + .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) + .transpose()?; + let last_disconnected_at_unix_ms = timestamps + .last_disconnected_at + .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) + .transpose()?; + + Ok(Session { + name: k.to_string(), + started_at_unix_ms: v.started_at.duration_since(time::UNIX_EPOCH)?.as_millis() + as i64, + last_connected_at_unix_ms, + last_disconnected_at_unix_ms, + status, + }) + }) + .collect::>>() + .context("collecting running session metadata")?; write_reply(&mut stream, ListReply { sessions })?; Ok(()) } @@ -1291,45 +1287,6 @@ impl Server { } } -fn unix_ms(t: time::SystemTime) -> i64 { - t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64).unwrap_or(0) -} - -/// Collect a snapshot of the session table for `list` replies and event -/// snapshots. The caller must hold the shells lock for the duration of -/// the call so the resulting list is consistent with concurrent mutators. -fn collect_sessions(shells: &HashMap>) -> anyhow::Result> { - shells - .iter() - .map(|(k, v)| { - let status = match v.inner.try_lock() { - Some(_) => SessionStatus::Disconnected, - None => SessionStatus::Attached, - }; - - let timestamps = v.lifecycle_timestamps.lock(); - let last_connected_at_unix_ms = timestamps - .last_connected_at - .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) - .transpose()?; - let last_disconnected_at_unix_ms = timestamps - .last_disconnected_at - .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) - .transpose()?; - - Ok(Session { - name: k.to_string(), - started_at_unix_ms: v.started_at.duration_since(time::UNIX_EPOCH)?.as_millis() - as i64, - last_connected_at_unix_ms, - last_disconnected_at_unix_ms, - status, - }) - }) - .collect::>>() - .context("collecting running session metadata") -} - // HACK: this is not a good way to detect shells that don't support our // sentinel injection approach, but it is better than just hanging when a // user tries to start one. diff --git a/libshpool/src/daemon/ttl_reaper.rs b/libshpool/src/daemon/ttl_reaper.rs index a42ab0f2..b94ba5a6 100644 --- a/libshpool/src/daemon/ttl_reaper.rs +++ b/libshpool/src/daemon/ttl_reaper.rs @@ -118,13 +118,7 @@ pub fn run( continue; } shells.remove(&reapable.session_name); - // Reaping is a daemon-initiated termination; surfaced to - // subscribers as `killed` until we have a use case for a - // dedicated reason. - events_bus.publish(&events::Event::SessionRemoved { - name: reapable.session_name.clone(), - reason: events::RemovedReason::Killed, - }); + events_bus.publish(&events::Event::SessionRemoved); } } } diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index 23a36c97..335b4dcc 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -5,15 +5,10 @@ //! line (newline-delimited; aka JSONL). Non-Rust clients only need a Unix //! socket and a JSON parser to consume the stream. //! -//! On connect, a subscriber receives a `snapshot` event reflecting the -//! current session table, atomically with respect to the table mutations -//! that produce subsequent delta events. After the snapshot, the subscriber -//! receives delta events as the session table changes. To force a re-sync, -//! a subscriber may simply reconnect. -//! -//! The `sessions` field of a snapshot event uses the same schema as the -//! `sessions` field of `shpool list --json`, so the two surfaces stay in -//! sync by construction. +//! Events carry no payload beyond their type — they signal that *something* +//! changed in the session table. Subscribers learn the new state by calling +//! `shpool list` (or the equivalent over the main socket). Subscribers that +//! fall too far behind are dropped and may simply reconnect. use std::{ io::{BufRead, BufReader, Write}, @@ -30,11 +25,10 @@ use std::{ use anyhow::Context; use parking_lot::Mutex; use serde_derive::Serialize; -use shpool_protocol::Session; use tracing::{error, info, warn}; /// Per-subscriber outbound queue depth. Subscribers that fall this far -/// behind are dropped; reconnection re-syncs them via a fresh snapshot. +/// behind are dropped and must reconnect. const SUBSCRIBER_QUEUE_DEPTH: usize = 64; /// Write timeout for stuck subscribers (e.g. suspended via Ctrl-Z). After @@ -45,45 +39,24 @@ const WRITE_TIMEOUT: Duration = Duration::from_secs(5); /// An event published on the events socket. #[derive(Serialize, Debug)] #[serde(tag = "type")] +#[allow(clippy::enum_variant_names)] pub enum Event { - /// Sent as the first message after a subscriber connects, reflecting - /// the current session table. - #[serde(rename = "snapshot")] - Snapshot { sessions: Vec }, - - /// A new session was created. #[serde(rename = "session.created")] - SessionCreated { name: String, started_at_unix_ms: i64 }, - - /// A client attached to an existing session. + SessionCreated, #[serde(rename = "session.attached")] - SessionAttached { name: String, last_connected_at_unix_ms: i64 }, - - /// A client detached from a session that is still alive. + SessionAttached, #[serde(rename = "session.detached")] - SessionDetached { name: String, last_disconnected_at_unix_ms: i64 }, - - /// A session was removed from the session table. + SessionDetached, #[serde(rename = "session.removed")] - SessionRemoved { name: String, reason: RemovedReason }, -} - -#[derive(Serialize, Debug)] -#[serde(rename_all = "lowercase")] -pub enum RemovedReason { - /// The shell process exited on its own. - Exited, - /// The session was killed by an explicit `shpool kill` request. - Killed, + SessionRemoved, } /// Fans out events to all connected subscribers. /// /// Lock ordering: callers that publish under another lock (e.g. the session /// table) must take that lock before [`EventBus::publish`] takes its own -/// internal lock. [`EventBus::register`] follows the same order, so a -/// subscriber registered while the session-table lock is held cannot miss -/// a delta from a mutation that committed under that lock. +/// internal lock. Publishing under the table lock keeps wire-order = +/// causal-order across mutators. pub struct EventBus { subscribers: Mutex>>>, } @@ -111,12 +84,10 @@ impl EventBus { }); } - /// Register a new subscriber with `snapshot` as the first message in - /// its queue. Returns the receiver to be handed to a writer thread. - pub fn register(&self, snapshot: &Event) -> Receiver> { - let line = serialize_line(snapshot).expect("snapshot serialization"); + /// Register a new subscriber. Returns the receiver to be handed to a + /// writer thread. + pub fn register(&self) -> Receiver> { let (tx, rx) = mpsc::sync_channel(SUBSCRIBER_QUEUE_DEPTH); - tx.try_send(line).expect("seeding empty channel cannot fail"); self.subscribers.lock().push(tx); rx } @@ -237,126 +208,77 @@ fn run_writer(mut stream: UnixStream, receiver: Receiver>) { #[cfg(test)] mod tests { use super::*; - use shpool_protocol::SessionStatus; fn json(event: &Event) -> String { serde_json::to_string(event).unwrap() } #[test] - fn snapshot_serializes_with_sessions_array() { - let event = Event::Snapshot { - sessions: vec![Session { - name: "main".into(), - started_at_unix_ms: 100, - last_connected_at_unix_ms: Some(200), - last_disconnected_at_unix_ms: None, - status: SessionStatus::Attached, - }], - }; - assert_eq!( - json(&event), - r#"{"type":"snapshot","sessions":[{"name":"main","started_at_unix_ms":100,"last_connected_at_unix_ms":200,"last_disconnected_at_unix_ms":null,"status":"Attached"}]}"# - ); + fn session_created_serializes_with_only_type() { + assert_eq!(json(&Event::SessionCreated), r#"{"type":"session.created"}"#); } #[test] - fn session_created_serializes_flat() { - let event = Event::SessionCreated { name: "main".into(), started_at_unix_ms: 42 }; - assert_eq!( - json(&event), - r#"{"type":"session.created","name":"main","started_at_unix_ms":42}"# - ); + fn session_attached_serializes_with_only_type() { + assert_eq!(json(&Event::SessionAttached), r#"{"type":"session.attached"}"#); } #[test] - fn session_attached_serializes_flat() { - let event = Event::SessionAttached { name: "main".into(), last_connected_at_unix_ms: 42 }; - assert_eq!( - json(&event), - r#"{"type":"session.attached","name":"main","last_connected_at_unix_ms":42}"# - ); + fn session_detached_serializes_with_only_type() { + assert_eq!(json(&Event::SessionDetached), r#"{"type":"session.detached"}"#); } #[test] - fn session_detached_serializes_flat() { - let event = - Event::SessionDetached { name: "main".into(), last_disconnected_at_unix_ms: 42 }; - assert_eq!( - json(&event), - r#"{"type":"session.detached","name":"main","last_disconnected_at_unix_ms":42}"# - ); + fn session_removed_serializes_with_only_type() { + assert_eq!(json(&Event::SessionRemoved), r#"{"type":"session.removed"}"#); } #[test] fn bus_publish_with_no_subscribers_is_a_noop() { let bus = EventBus::new(); - bus.publish(&Event::SessionCreated { name: "x".into(), started_at_unix_ms: 1 }); - } - - #[test] - fn bus_register_seeds_receiver_with_snapshot() { - let bus = EventBus::new(); - let snapshot = Event::Snapshot { sessions: vec![] }; - let rx = bus.register(&snapshot); - let line = rx.try_recv().unwrap(); - assert_eq!(&*line, "{\"type\":\"snapshot\",\"sessions\":[]}\n"); + bus.publish(&Event::SessionCreated); } #[test] - fn bus_publish_reaches_subscriber_after_snapshot() { + fn bus_publish_reaches_subscriber() { let bus = EventBus::new(); - let rx = bus.register(&Event::Snapshot { sessions: vec![] }); - bus.publish(&Event::SessionCreated { name: "main".into(), started_at_unix_ms: 7 }); - let snapshot_line = rx.recv().unwrap(); - let delta_line = rx.recv().unwrap(); - assert_eq!(&*snapshot_line, "{\"type\":\"snapshot\",\"sessions\":[]}\n"); - assert_eq!( - &*delta_line, - "{\"type\":\"session.created\",\"name\":\"main\",\"started_at_unix_ms\":7}\n" - ); + let rx = bus.register(); + bus.publish(&Event::SessionCreated); + let line = rx.recv().unwrap(); + assert_eq!(&*line, "{\"type\":\"session.created\"}\n"); } #[test] fn bus_drops_subscriber_whose_queue_is_full() { let bus = EventBus::new(); - let rx = bus.register(&Event::Snapshot { sessions: vec![] }); - // Fill the channel to capacity (the snapshot already used 1 slot). - for i in 0..(SUBSCRIBER_QUEUE_DEPTH - 1) { - bus.publish(&Event::SessionCreated { - name: format!("s{i}"), - started_at_unix_ms: i as i64, - }); + let rx = bus.register(); + for _ in 0..SUBSCRIBER_QUEUE_DEPTH { + bus.publish(&Event::SessionCreated); } assert_eq!(bus.subscribers.lock().len(), 1); - // One more publish overflows and the subscriber is dropped. - bus.publish(&Event::SessionCreated { name: "overflow".into(), started_at_unix_ms: 0 }); + bus.publish(&Event::SessionCreated); assert_eq!(bus.subscribers.lock().len(), 0); - // The receiver still has the buffered events; the channel is not - // closed for it from the receiving side. drop(rx); } #[test] fn bus_drops_subscriber_whose_receiver_hung_up() { let bus = EventBus::new(); - let rx = bus.register(&Event::Snapshot { sessions: vec![] }); + let rx = bus.register(); drop(rx); - bus.publish(&Event::SessionCreated { name: "x".into(), started_at_unix_ms: 0 }); + bus.publish(&Event::SessionCreated); assert_eq!(bus.subscribers.lock().len(), 0); } #[test] fn bus_publish_reaches_every_subscriber() { let bus = EventBus::new(); - let rx_a = bus.register(&Event::Snapshot { sessions: vec![] }); - let rx_b = bus.register(&Event::Snapshot { sessions: vec![] }); - bus.publish(&Event::SessionCreated { name: "main".into(), started_at_unix_ms: 1 }); + let rx_a = bus.register(); + let rx_b = bus.register(); + bus.publish(&Event::SessionCreated); for rx in [&rx_a, &rx_b] { - let _snapshot = rx.recv().unwrap(); - let delta = rx.recv().unwrap(); - assert!(delta.contains(r#""type":"session.created""#)); - assert!(delta.contains(r#""name":"main""#)); + let line = rx.recv().unwrap(); + assert_eq!(&*line, "{\"type\":\"session.created\"}\n"); } } @@ -392,12 +314,4 @@ mod tests { drop(guard); assert!(!path.exists(), "socket file should be unlinked on guard drop"); } - - #[test] - fn session_removed_serializes_with_reason() { - let exited = Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Exited }; - assert_eq!(json(&exited), r#"{"type":"session.removed","name":"main","reason":"exited"}"#); - let killed = Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Killed }; - assert_eq!(json(&killed), r#"{"type":"session.removed","name":"main","reason":"killed"}"#); - } } diff --git a/shpool/tests/events.rs b/shpool/tests/events.rs index cbda498d..7c61f2cc 100644 --- a/shpool/tests/events.rs +++ b/shpool/tests/events.rs @@ -41,69 +41,26 @@ fn next_event(reader: &mut BufReader) -> anyhow::Result { #[test] #[timeout(30000)] -fn snapshot_then_lifecycle() -> anyhow::Result<()> { +fn lifecycle() -> anyhow::Result<()> { let mut daemon = Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) .context("starting daemon proc")?; let mut sub = connect_events(&daemon)?; - let snap = next_event(&mut sub)?; - assert_eq!(snap["type"], "snapshot"); - assert_eq!(snap["sessions"].as_array().unwrap().len(), 0); - // Background attach: client connects, daemon publishes created+attached; // the client immediately detaches, triggering the detached event. let _attach = daemon .attach("s1", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) .context("starting attach proc")?; - let created = next_event(&mut sub)?; - assert_eq!(created["type"], "session.created"); - assert_eq!(created["name"], "s1"); - assert!(created["started_at_unix_ms"].is_number()); - - let attached = next_event(&mut sub)?; - assert_eq!(attached["type"], "session.attached"); - assert_eq!(attached["name"], "s1"); - - let detached = next_event(&mut sub)?; - assert_eq!(detached["type"], "session.detached"); - assert_eq!(detached["name"], "s1"); + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); let kill_out = daemon.kill(vec!["s1".into()]).context("running kill")?; assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); - let removed = next_event(&mut sub)?; - assert_eq!(removed["type"], "session.removed"); - assert_eq!(removed["name"], "s1"); - assert_eq!(removed["reason"], "killed"); - - Ok(()) -} - -#[test] -#[timeout(30000)] -fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { - let mut daemon = - Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) - .context("starting daemon proc")?; - - let _attach = daemon - .attach( - "pre-existing", - AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, - ) - .context("starting attach proc")?; - - // Wait for the session to land in the table before subscribing. - daemon.wait_until_list_matches(|out| out.contains("pre-existing"))?; - - let mut sub = connect_events(&daemon)?; - let snap = next_event(&mut sub)?; - assert_eq!(snap["type"], "snapshot"); - let sessions = snap["sessions"].as_array().unwrap(); - assert_eq!(sessions.len(), 1); - assert_eq!(sessions[0]["name"], "pre-existing"); + assert_eq!(next_event(&mut sub)?["type"], "session.removed"); Ok(()) } @@ -111,10 +68,10 @@ fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { // `shpool detach` triggers two code paths that both updated the session: // the explicit handler and, asynchronously, the bidi-loop unwind in the // attach worker. An earlier version emitted SessionDetached from both, -// producing a duplicate event with two different timestamps. This test -// pins exactly one detached event per detach by using a kill as a -// known-next-event fence — if a duplicate detached were buffered, the -// next read would return it instead of `session.removed`. +// producing a duplicate event. This test pins exactly one detached event +// per detach by using a kill as a known-next-event fence — if a duplicate +// detached were buffered, the next read would return it instead of +// `session.removed`. #[test] #[timeout(30000)] fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { @@ -122,7 +79,6 @@ fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) .context("starting daemon proc")?; let mut sub = connect_events(&daemon)?; - let _snap = next_event(&mut sub)?; // Foreground attach (no `background`) keeps the session attached. let _attach = daemon @@ -134,9 +90,7 @@ fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { let detach_out = daemon.detach(vec!["s".into()]).context("running detach")?; assert!(detach_out.status.success(), "detach failed: {:?}", detach_out); - let detached = next_event(&mut sub)?; - assert_eq!(detached["type"], "session.detached"); - assert_eq!(detached["name"], "s"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); // Wait for the unwind path to complete (session shows disconnected in // the list output) so any duplicate detached event would already be @@ -151,7 +105,6 @@ fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { next["type"], "session.removed", "expected next event to be removed, got {next} — possible duplicate detached" ); - assert_eq!(next["reason"], "killed"); Ok(()) } @@ -166,7 +119,6 @@ fn reattach_emits_attached_only() -> anyhow::Result<()> { Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) .context("starting daemon proc")?; let mut sub = connect_events(&daemon)?; - let _snap = next_event(&mut sub)?; let _attach1 = daemon .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) @@ -186,7 +138,6 @@ fn reattach_emits_attached_only() -> anyhow::Result<()> { attached["type"], "session.attached", "expected attached on reattach, got {attached}" ); - assert_eq!(attached["name"], "s"); assert_eq!(next_event(&mut sub)?["type"], "session.detached"); Ok(()) @@ -226,9 +177,6 @@ fn multiple_subscribers_each_get_independent_streams() -> anyhow::Result<()> { let mut sub_a = connect_events(&daemon)?; let mut sub_b = connect_events(&daemon)?; - assert_eq!(next_event(&mut sub_a)?["type"], "snapshot"); - assert_eq!(next_event(&mut sub_b)?["type"], "snapshot"); - let _attach = daemon .attach( "shared", From 0b99ca0841bd07b1dfe67d6719fc566f457eb21e Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Mon, 27 Apr 2026 21:40:55 -0400 Subject: [PATCH 08/12] docs(events): add EVENTS.md describing the events protocol Cover the sibling-socket transport, the four event types, the JSONL wire format, the `shpool events` CLI helper plus direct-socket use for heavier-duty consumers, the ordering guarantee (publish under the session-table lock so wire-order matches causal-order), and the slow-subscriber drop policy. --- EVENTS.md | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 EVENTS.md diff --git a/EVENTS.md b/EVENTS.md new file mode 100644 index 00000000..58379280 --- /dev/null +++ b/EVENTS.md @@ -0,0 +1,67 @@ +# Events + +`shpool` exposes an event stream so that external programs can react to changes +without polling. This way, a program (e.g. a TUI) can call `shpool list` (or the +equivalent `ConnectHeader::List` request over the main socket; see the +[`shpool-protocol`](./shpool-protocol) crate) after each event so that its model +is always consistent with shpool's state. + +## The events socket + +The daemon binds a sibling Unix socket next to the main shpool socket: + +```bash +/shpool/shpool.socket # main RPC socket +/shpool/events.socket # events socket (this protocol) +``` + +A subscriber connects to `events.socket` and reads events. The daemon ignores anything written to the events socket, so for subscribers it's effectively read-only. + +## Event types + +| `type` | Meaning | +| ------------------ | -------------------------------------------------------- | +| `session.created` | A new session was added to the table. | +| `session.attached` | A client attached or reattached to a session. | +| `session.detached` | A client disconnected from a still-running session. | +| `session.removed` | A session was removed (shell exited, killed, or reaped). | + +Subscribers should ignore unknown `type` values so that future event types do +not break older consumers. + +## Wire format + +The daemon writes one JSON object per line (JSONL). Each event looks like: + +```json +{"type":""} +``` + +There are no other fields. To learn what the event refers to (which session, +when, etc.), call `shpool list` (or use `ConnectHeader::List`). + +## Subscribing + +For ad-hoc use, `shpool events` connects to the events socket and prints each +event line to stdout, flushing after each line: + +```bash +shpool events | while read -r ev; do + echo "got: $ev" + shpool list +done + +shpool events | jq . +``` + +## Ordering + +Events are published while the daemon holds its session-table lock, so the order +events appear on the wire matches the order in which they happened. + +## Slow subscribers + +Each subscriber has a bounded outbound queue. A subscriber that falls too far +behind is dropped by the daemon (in which case the subscriber can always reconnect). +There is no replay, so events that fired while a subscriber was disconnected are +lost. From 47ac2d84f93c0b2bb36a60ac9483091cdcdefebc Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Wed, 29 Apr 2026 22:52:54 -0400 Subject: [PATCH 09/12] EVENTS.md: fix main socket description and remove "Ordering" section Per https://github.com/shell-pool/shpool/pull/352#pullrequestreview-4198152162 --- EVENTS.md | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/EVENTS.md b/EVENTS.md index 58379280..4d122561 100644 --- a/EVENTS.md +++ b/EVENTS.md @@ -11,7 +11,7 @@ is always consistent with shpool's state. The daemon binds a sibling Unix socket next to the main shpool socket: ```bash -/shpool/shpool.socket # main RPC socket +/shpool/shpool.socket # main socket /shpool/events.socket # events socket (this protocol) ``` @@ -54,11 +54,6 @@ done shpool events | jq . ``` -## Ordering - -Events are published while the daemon holds its session-table lock, so the order -events appear on the wire matches the order in which they happened. - ## Slow subscribers Each subscriber has a bounded outbound queue. A subscriber that falls too far From aff52798e6f0bfbe849e4ffdb05bdc6f6f815451 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Tue, 5 May 2026 13:45:05 -0400 Subject: [PATCH 10/12] events: polish error contexts, naming, and module boundary - Add .context() on the signal-handler spawn and events-listener startup paths in daemon::run. - Move the ttl-reaper thread::spawn's capture-clones (shells, events_bus) inside a block expression. The cloned Arcs shadow the outer names rather than introducing parallel shells_tab / reaper_bus bindings in the function scope. - Reword the comment on the shells-remove + SessionRemoved publish to explain why the publish is gated on is_some(): a concurrent kill or reaper may have removed the entry (and published) while we were waiting for the lock. - Have start_listener take Arc directly instead of a closure that registers + spawns a writer. EventBus::register and spawn_writer drop their pub modifiers now that the only call site is within events.rs. - Restore the let-binding type annotation on the collected sessions Result in handle_list. - Document JSONL newline handling in EVENTS.md. --- EVENTS.md | 4 ++++ libshpool/src/daemon/mod.rs | 5 +++-- libshpool/src/daemon/server.rs | 33 +++++++++++++++------------------ libshpool/src/events.rs | 27 +++++++++++---------------- 4 files changed, 33 insertions(+), 36 deletions(-) diff --git a/EVENTS.md b/EVENTS.md index 4d122561..481ead59 100644 --- a/EVENTS.md +++ b/EVENTS.md @@ -40,6 +40,10 @@ The daemon writes one JSON object per line (JSONL). Each event looks like: There are no other fields. To learn what the event refers to (which session, when, etc.), call `shpool list` (or use `ConnectHeader::List`). +The format is robust: literal newline characters only appear as delimiters +between events. Any newlines within JSON string values are automatically +escaped (as `\n`) by the daemon. + ## Subscribing For ad-hoc use, `shpool events` connects to the events socket and prints each diff --git a/libshpool/src/daemon/mod.rs b/libshpool/src/daemon/mod.rs index d23e8357..82ce5b7c 100644 --- a/libshpool/src/daemon/mod.rs +++ b/libshpool/src/daemon/mod.rs @@ -88,9 +88,10 @@ pub fn run( // RAII guard can run. let mut socks_to_clean: Vec = cleanup_socket.iter().cloned().collect(); socks_to_clean.push(events_socket.clone()); - signals::Handler::new(socks_to_clean).spawn()?; + signals::Handler::new(socks_to_clean).spawn().context("spawning signal handler")?; - let _events_guard = server.start_events_listener(events_socket)?; + let _events_guard = + server.start_events_listener(events_socket).context("starting events listener")?; server::Server::serve(server, listener)?; diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 397ea443..35e58fff 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -101,11 +101,13 @@ impl Server { // buffered so that we are unlikely to block when setting up a // new session let (new_sess_tx, new_sess_rx) = crossbeam_channel::bounded(10); - let shells_tab = Arc::clone(&shells); - let reaper_bus = Arc::clone(&events_bus); - thread::spawn(move || { - if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab, reaper_bus) { - warn!("ttl reaper exited with error: {:?}", e); + thread::spawn({ + let shells = Arc::clone(&shells); + let events_bus = Arc::clone(&events_bus); + move || { + if let Err(e) = ttl_reaper::run(new_sess_rx, shells, events_bus) { + warn!("ttl reaper exited with error: {:?}", e); + } } }); @@ -129,13 +131,7 @@ impl Server { self: &Arc, socket_path: PathBuf, ) -> anyhow::Result { - let server = Arc::clone(self); - events::start_listener(socket_path, move |stream| server.handle_events_subscriber(stream)) - } - - fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> { - let receiver = self.events_bus.register(); - events::spawn_writer(stream, receiver) + events::start_listener(socket_path, Arc::clone(&self.events_bus)) } #[instrument(skip_all)] @@ -352,8 +348,9 @@ impl Server { { let _s = span!(Level::INFO, "2_lock(shells)").entered(); let mut shells = self.shells.lock(); - // Gated: a concurrent kill or reaper may have already - // removed the entry and published its own removal. + // The publish below is gated on `is_some()` because a + // concurrent kill or reaper may have already removed the + // entry (and published) while we were waiting for the lock. if shells.remove(&header.name).is_some() { self.events_bus.publish(&events::Event::SessionRemoved); } @@ -440,8 +437,8 @@ impl Server { // the channel is still open so the subshell is still running info!("taking over existing session inner"); inner.client_stream = Some(stream.try_clone()?); - let now = time::SystemTime::now(); - session.lifecycle_timestamps.lock().last_connected_at = Some(now); + session.lifecycle_timestamps.lock().last_connected_at = + Some(time::SystemTime::now()); if inner .shell_to_client_join_h @@ -767,7 +764,7 @@ impl Server { fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> { let _s = span!(Level::INFO, "lock(shells)").entered(); let shells = self.shells.lock(); - let sessions = shells + let sessions: Vec = shells .iter() .map(|(k, v)| { let status = match v.inner.try_lock() { @@ -794,7 +791,7 @@ impl Server { status, }) }) - .collect::>>() + .collect::>() .context("collecting running session metadata")?; write_reply(&mut stream, ListReply { sessions })?; Ok(()) diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index 335b4dcc..76e94172 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -86,7 +86,7 @@ impl EventBus { /// Register a new subscriber. Returns the receiver to be handed to a /// writer thread. - pub fn register(&self) -> Receiver> { + fn register(&self) -> Receiver> { let (tx, rx) = mpsc::sync_channel(SUBSCRIBER_QUEUE_DEPTH); self.subscribers.lock().push(tx); rx @@ -145,13 +145,10 @@ impl Drop for ListenerGuard { } /// Bind the events socket and spawn the accept thread. For each accepted -/// connection, `on_accept` is invoked with the stream; it is expected to -/// register the subscriber with the bus and spawn a writer thread (see -/// [`spawn_writer`]). The returned guard unlinks the socket file on drop. -pub fn start_listener(socket_path: PathBuf, on_accept: F) -> anyhow::Result -where - F: Fn(UnixStream) -> anyhow::Result<()> + Send + 'static, -{ +/// connection, the subscriber is registered with `bus` and a writer +/// thread is spawned to drain its queue. The returned guard unlinks the +/// socket file on drop. +pub fn start_listener(socket_path: PathBuf, bus: Arc) -> anyhow::Result { if socket_path.exists() { std::fs::remove_file(&socket_path) .with_context(|| format!("removing stale events socket {:?}", socket_path))?; @@ -161,19 +158,17 @@ where info!("events socket listening at {:?}", socket_path); thread::Builder::new() .name("events-accept".into()) - .spawn(move || run_accept_loop(listener, on_accept)) + .spawn(move || run_accept_loop(listener, bus)) .context("spawning events accept thread")?; Ok(ListenerGuard { path: socket_path }) } -fn run_accept_loop(listener: UnixListener, on_accept: F) -where - F: Fn(UnixStream) -> anyhow::Result<()>, -{ +fn run_accept_loop(listener: UnixListener, bus: Arc) { for stream in listener.incoming() { match stream { Ok(stream) => { - if let Err(e) = on_accept(stream) { + let receiver = bus.register(); + if let Err(e) = spawn_writer(stream, receiver) { warn!("accepting events subscriber: {:?}", e); } } @@ -187,7 +182,7 @@ where /// Set the write timeout and spawn a thread that drains `receiver` to /// `stream` until either side closes or a write times out. -pub fn spawn_writer(stream: UnixStream, receiver: Receiver>) -> anyhow::Result<()> { +fn spawn_writer(stream: UnixStream, receiver: Receiver>) -> anyhow::Result<()> { stream.set_write_timeout(Some(WRITE_TIMEOUT)).context("setting write timeout")?; thread::Builder::new() .name("events-writer".into()) @@ -309,7 +304,7 @@ mod tests { fn listener_guard_unlinks_socket_on_drop() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("events.socket"); - let guard = start_listener(path.clone(), |_| Ok(())).unwrap(); + let guard = start_listener(path.clone(), EventBus::new()).unwrap(); assert!(path.exists(), "socket file should exist while guard is alive"); drop(guard); assert!(!path.exists(), "socket file should be unlinked on guard drop"); From 4865abd04db92db2f39f33324d2b70f846c01a7a Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Tue, 5 May 2026 14:45:09 -0400 Subject: [PATCH 11/12] events: add tests for bus isolation, latency, and lock ordering - bus_publish_with_many_subscribers_is_not_quadratic: bound a single publish to 10K subscribers under 50 ms (~4 ms on a 2.1 GHz CPU; 50 ms absorbs CI tail latency while still catching quadratic regressions, which would be on the order of seconds at N=10K). - bus_drops_slow_subscriber_on_overflow_without_affecting_fast: a fast subscriber keeps receiving every event while a slow one's queue fills and is eventually dropped on overflow. Asserts SUBSCRIBER_QUEUE_DEPTH stays small since the test scales with it. - accept_loop_registers_concurrent_subscribers: dial 20 connections in parallel and confirm all are registered and each receives a published event. - bus_concurrent_publish_under_outer_lock_delivers_all_events: publish from multiple threads while each holds an outer mutex; all events arrive in order. --- libshpool/src/events.rs | 120 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index 76e94172..c88bc330 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -309,4 +309,124 @@ mod tests { drop(guard); assert!(!path.exists(), "socket file should be unlinked on guard drop"); } + + #[test] + fn bus_publish_with_many_subscribers_is_not_quadratic() { + let bus = EventBus::new(); + let n = 10_000; + let mut rxs = Vec::with_capacity(n); + for _ in 0..n { + rxs.push(bus.register()); + } + let start = std::time::Instant::now(); + bus.publish(&Event::SessionCreated); + let elapsed = start.elapsed(); + // Measured ~4 ms on a 2.1 GHz CPU; the 50 ms bound absorbs CI + // tail latency while still catching gross regressions (e.g. an + // accidental O(N^2) would be on the order of seconds at N=10K). + assert!(elapsed < Duration::from_millis(50), "publish to {n} subscribers took {elapsed:?}"); + } + + #[test] + fn bus_drops_slow_subscriber_on_overflow_without_affecting_fast() { + // This test is O(SUBSCRIBER_QUEUE_DEPTH); cap the constant so a + // future bump (e.g. after moving to a dynamically-grown queue) doesn't make + // this test unboundedly slow. + assert!(SUBSCRIBER_QUEUE_DEPTH < 1024); + + let bus = EventBus::new(); + let slow_rx = bus.register(); + let fast_rx = bus.register(); + + // Fill slow's queue while draining fast each iteration so only slow + // accumulates a backlog. + for _ in 0..SUBSCRIBER_QUEUE_DEPTH { + bus.publish(&Event::SessionCreated); + assert_eq!(&*fast_rx.recv().unwrap(), "{\"type\":\"session.created\"}\n"); + } + assert_eq!(bus.subscribers.lock().unwrap().len(), 2); + + // The next publish overflows slow's queue and drops it. Fast's queue + // is empty so it keeps receiving. + bus.publish(&Event::SessionCreated); + assert_eq!(&*fast_rx.recv().unwrap(), "{\"type\":\"session.created\"}\n"); + assert_eq!(bus.subscribers.lock().unwrap().len(), 1); + + drop(slow_rx); + } + + #[test] + fn accept_loop_registers_concurrent_subscribers() { + use std::io::Read; + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.socket"); + let bus = EventBus::new(); + let _guard = start_listener(path.clone(), Arc::clone(&bus)).unwrap(); + + let n = 20; + let dial_handles: Vec<_> = (0..n) + .map(|_| { + let path = path.clone(); + thread::spawn(move || UnixStream::connect(&path).unwrap()) + }) + .collect(); + let streams: Vec = + dial_handles.into_iter().map(|h| h.join().unwrap()).collect(); + + // Wait for the accept thread to register all subscribers. + let deadline = std::time::Instant::now() + Duration::from_secs(2); + loop { + let count = bus.subscribers.lock().unwrap().len(); + if count >= n { + break; + } + assert!( + std::time::Instant::now() < deadline, + "only {count}/{n} subscribers registered before timeout" + ); + thread::sleep(Duration::from_millis(10)); + } + + bus.publish(&Event::SessionCreated); + let expected = b"{\"type\":\"session.created\"}\n"; + for mut stream in streams { + stream.set_read_timeout(Some(Duration::from_secs(2))).unwrap(); + let mut buf = vec![0u8; expected.len()]; + stream.read_exact(&mut buf).unwrap(); + assert_eq!(buf.as_slice(), expected); + } + } + + #[test] + fn bus_concurrent_publish_under_outer_lock_delivers_all_events() { + let bus = EventBus::new(); + let rx = bus.register(); + let outer: Arc> = Arc::new(Mutex::new(())); + + let n_threads = 4; + let n_per_thread = 8; + let total = n_threads * n_per_thread; + + let handles: Vec<_> = (0..n_threads) + .map(|_| { + let bus = Arc::clone(&bus); + let outer = Arc::clone(&outer); + thread::spawn(move || { + for _ in 0..n_per_thread { + let _g = outer.lock().unwrap(); + bus.publish(&Event::SessionCreated); + } + }) + }) + .collect(); + for h in handles { + h.join().unwrap(); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + assert!(rx.try_recv().is_err()); + } } From 8f5e08cddc4d882428ddd57d49ab4fbb89c07c6a Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Thu, 7 May 2026 17:39:20 -0400 Subject: [PATCH 12/12] events: replace thread-per-subscriber writers with a single-sink poll loop Move all subscriber I/O onto one events-sink thread driven by poll(2). publish() becomes O(1) on the daemon's hot path: a try_send on a bounded mpsc channel + a 1-byte write to a self-pipe to wake the sink. The sink owns the listener, accepts connections, and drives non-blocking writes per subscriber via a VecDeque pending-queue plus a front-line offset for partial writes. A subscriber that falls SUBSCRIBER_QUEUE_DEPTH events behind is marked dropped and removed at end-of-iteration without affecting the others. publish takes no internal lock, so it is safe under arbitrary outer locks; publishing inside the lock that protects the state being announced preserves wire-order = causal-order. Sink loop: - Drain pending accepts unconditionally each iteration; the listener stays in the poll set so its POLLIN wakes us, but we don't trust the revent as a gate (listener POLLIN can lag behind connect(2) returning under load, leaving a queued connection unaccepted while the same iteration's broadcast lands on the pre-connect sub set). - Treat any wake-fd revent as "drain it"; POLLHUP-on-EOF no longer busy-loops poll(2). - Skip already-dropped subs in the POLLOUT-driven drive pass. - panic! on non-EINTR poll(2) errors and non-EAGAIN/EINTR wake-fd read errors (programmer-error categories on fds the sink owns). Publish: - Pre-cache each Event variant's wire form as Arc via LazyLock; the hot path is one try_send + 1-byte wake + one Arc::clone, no per-publish allocation. - Log "events sink died" once via AtomicBool when try_send returns Disconnected. - EventBus::new returns io::Result so pipe2(2) failure propagates through Server::new instead of panicking. Pipe creation: - pipe2(O_NONBLOCK | O_CLOEXEC) instead of pipe + 4 fcntl calls. Tests: - harness(), connect_registered, connect_n_registered, read_n_lines: per-test scaffolding around tempdir + bus + listener + round-trip registration probe. The probe replaces the subscriber_count hook the bus previously exposed for tests; the field, method, and per-iteration atomic store are gone. - Slow-subscriber drop test uses SO_RCVBUF on the slow client (SO_SNDBUF on the listener does not reliably inherit to AF_UNIX accepted sockets). - New: events_arrive_in_publish_order, drive_pending property fuzz, subscriber_writer_resumes_after_peer_drains, sink-level slow_subscriber_drop_does_not_affect_fast_through_sink. - Removed sink_thread_is_named_events_sink (didn't observe the actual sink) and bus_publish_reaches_every_subscriber (subsumed by accept_loop_registers_concurrent_subscribers). Test support: - Wait for sibling events.socket to exist alongside the main socket before declaring the daemon ready. --- libshpool/src/daemon/server.rs | 2 +- libshpool/src/events.rs | 1113 +++++++++++++++++++++++++------- shpool/tests/support/daemon.rs | 6 +- 3 files changed, 894 insertions(+), 227 deletions(-) diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 35e58fff..3840f539 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -97,7 +97,7 @@ impl Server { >, ) -> anyhow::Result> { let shells = Arc::new(Mutex::new(HashMap::new())); - let events_bus = events::EventBus::new(); + let events_bus = events::EventBus::new().context("creating events bus")?; // buffered so that we are unlikely to block when setting up a // new session let (new_sess_tx, new_sess_rx) = crossbeam_channel::bounded(10); diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index c88bc330..253a9ed8 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -9,32 +9,46 @@ //! changed in the session table. Subscribers learn the new state by calling //! `shpool list` (or the equivalent over the main socket). Subscribers that //! fall too far behind are dropped and may simply reconnect. +//! +//! Architecture: a single `events-sink` thread owns all subscriber state and +//! does all I/O via non-blocking `poll(2)`. `publish()` is O(1) on the +//! daemon's hot path: a `try_send` on a bounded channel + a 1-byte write to +//! a self-pipe to wake the sink. use std::{ - io::{BufRead, BufReader, Write}, - os::unix::net::{UnixListener, UnixStream}, + collections::VecDeque, + io::{self, BufRead, BufReader, Write}, + os::{ + fd::{AsFd, BorrowedFd, OwnedFd}, + unix::net::{UnixListener, UnixStream}, + }, path::{Path, PathBuf}, sync::{ + atomic::{AtomicBool, Ordering}, mpsc::{self, Receiver, SyncSender, TrySendError}, - Arc, + Arc, LazyLock, }, thread, - time::Duration, }; use anyhow::Context; +use nix::{ + errno::Errno, + fcntl::OFlag, + poll::{self, PollFd, PollFlags, PollTimeout}, + unistd, +}; use parking_lot::Mutex; use serde_derive::Serialize; use tracing::{error, info, warn}; -/// Per-subscriber outbound queue depth. Subscribers that fall this far -/// behind are dropped and must reconnect. +/// Per-subscriber outbound queue depth (events). Subscribers that fall this +/// far behind are dropped and must reconnect. const SUBSCRIBER_QUEUE_DEPTH: usize = 64; -/// Write timeout for stuck subscribers (e.g. suspended via Ctrl-Z). After -/// this elapses on a blocked write, the writer thread exits and the -/// subscriber is implicitly dropped on the next publish. -const WRITE_TIMEOUT: Duration = Duration::from_secs(5); +/// Capacity of the publish→sink channel. Reaching it means the sink is +/// wedged — a real bug, not a tunable. +const EVENT_CHANNEL_CAP: usize = 4096; /// An event published on the events socket. #[derive(Serialize, Debug)] @@ -51,55 +65,59 @@ pub enum Event { SessionRemoved, } -/// Fans out events to all connected subscribers. -/// -/// Lock ordering: callers that publish under another lock (e.g. the session -/// table) must take that lock before [`EventBus::publish`] takes its own -/// internal lock. Publishing under the table lock keeps wire-order = -/// causal-order across mutators. +/// Fans out events to all connected subscribers via a single sink thread +/// doing non-blocking I/O. pub struct EventBus { - subscribers: Mutex>>>, + event_tx: SyncSender>, + wake_tx: OwnedFd, + sink_handles: Mutex>, + sink_dead_logged: AtomicBool, } impl EventBus { - pub fn new() -> Arc { - Arc::new(Self { subscribers: Mutex::new(Vec::new()) }) + pub fn new() -> io::Result> { + let (event_tx, event_rx) = mpsc::sync_channel(EVENT_CHANNEL_CAP); + let (wake_rx, wake_tx) = make_self_pipe()?; + Ok(Arc::new(Self { + event_tx, + wake_tx, + sink_handles: Mutex::new(Some(SinkHandles { event_rx, wake_rx })), + sink_dead_logged: AtomicBool::new(false), + })) } - /// Broadcast `event` to all current subscribers. Subscribers whose - /// queues are full or whose receivers have hung up are dropped. + /// Broadcast `event` to all current subscribers. Non-blocking: a + /// `try_send` on the publish→sink channel + a 1-byte wake. Takes no + /// internal lock, so it is safe to call under arbitrary outer locks. + /// Publishing under the lock that protects the state being announced + /// keeps wire-order = causal-order across mutators. pub fn publish(&self, event: &Event) { - let line = match serialize_line(event) { - Some(line) => line, - None => return, - }; - let mut subs = self.subscribers.lock(); - subs.retain(|tx| match tx.try_send(line.clone()) { - Ok(()) => true, + match self.event_tx.try_send(serialize_line(event)) { + Ok(()) => {} Err(TrySendError::Full(_)) => { - warn!("dropping events subscriber: queue full"); - false + warn!("events channel full; sink is wedged"); + return; } - Err(TrySendError::Disconnected(_)) => false, - }); - } - - /// Register a new subscriber. Returns the receiver to be handed to a - /// writer thread. - fn register(&self) -> Receiver> { - let (tx, rx) = mpsc::sync_channel(SUBSCRIBER_QUEUE_DEPTH); - self.subscribers.lock().push(tx); - rx + Err(TrySendError::Disconnected(_)) => { + // Relaxed is sufficient: atomic RMW on a single location + // is linearizable regardless of ordering, so racing swaps + // already see distinct previous values — exactly one + // observes `false` and logs. Stronger ordering (e.g. + // AcqRel) would only matter if we needed happens-before + // with surrounding non-atomic memory, which we don't. + if !self.sink_dead_logged.swap(true, Ordering::Relaxed) { + error!("events sink died; subsequent events will be dropped"); + } + return; + } + } + // Wake the sink. If the pipe's kernel buffer is full (sink is + // already buried) our 1-byte signal is redundant — drop it. + let _ = unistd::write(&self.wake_tx, b"\0"); } -} -fn serialize_line(event: &Event) -> Option> { - match serde_json::to_string(event) { - Ok(s) => Some(format!("{s}\n").into()), - Err(e) => { - error!("serializing event {:?}: {:?}", event, e); - None - } + fn take_sink_handles(&self) -> Option { + self.sink_handles.lock().take() } } @@ -110,7 +128,7 @@ pub fn subscribe_to_stdout(socket_path: &Path) -> anyhow::Result<()> { let stream = UnixStream::connect(socket_path) .with_context(|| format!("connecting to events socket {:?}", socket_path))?; let reader = BufReader::new(stream); - let stdout = std::io::stdout(); + let stdout = io::stdout(); let mut out = stdout.lock(); for line in reader.lines() { let line = line.context("reading event")?; @@ -128,8 +146,8 @@ pub fn socket_path(main_socket: &Path) -> PathBuf { } /// Owns the events socket file. Dropping the guard unlinks the socket -/// path so a fresh daemon doesn't trip on stale files. The accept thread -/// is not stopped — daemon shutdown takes the process down. +/// path so a fresh daemon doesn't trip on stale files. The sink thread is +/// not stopped — daemon shutdown takes the process down. pub struct ListenerGuard { path: PathBuf, } @@ -138,15 +156,15 @@ impl Drop for ListenerGuard { fn drop(&mut self) { match std::fs::remove_file(&self.path) { Ok(()) => {} - Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) if e.kind() == io::ErrorKind::NotFound => {} Err(e) => warn!("removing events socket {:?}: {:?}", self.path, e), } } } -/// Bind the events socket and spawn the accept thread. For each accepted -/// connection, the subscriber is registered with `bus` and a writer -/// thread is spawned to drain its queue. The returned guard unlinks the +/// Bind the events socket and spawn the sink thread. The sink owns the +/// listener fd and all subscriber state; it accepts connections and drives +/// non-blocking writes via `poll(2)`. The returned guard unlinks the /// socket file on drop. pub fn start_listener(socket_path: PathBuf, bus: Arc) -> anyhow::Result { if socket_path.exists() { @@ -155,59 +173,373 @@ pub fn start_listener(socket_path: PathBuf, bus: Arc) -> anyhow::Resul } let listener = UnixListener::bind(&socket_path) .with_context(|| format!("binding events socket {:?}", socket_path))?; + listener.set_nonblocking(true).context("setting events listener non-blocking")?; info!("events socket listening at {:?}", socket_path); + + let handles = bus.take_sink_handles().context("events sink already started")?; thread::Builder::new() - .name("events-accept".into()) - .spawn(move || run_accept_loop(listener, bus)) - .context("spawning events accept thread")?; + .name("events-sink".into()) + .spawn(move || run_sink(listener, handles)) + .context("spawning events sink thread")?; Ok(ListenerGuard { path: socket_path }) } -fn run_accept_loop(listener: UnixListener, bus: Arc) { - for stream in listener.incoming() { - match stream { - Ok(stream) => { - let receiver = bus.register(); - if let Err(e) = spawn_writer(stream, receiver) { - warn!("accepting events subscriber: {:?}", e); +struct SinkHandles { + event_rx: Receiver>, + wake_rx: OwnedFd, +} + +fn run_sink(listener: UnixListener, handles: SinkHandles) { + let SinkHandles { event_rx, wake_rx } = handles; + let mut subs: Vec = Vec::new(); + // Page-sized drain buffer; the bytes are signal-only and discarded. + let mut wake_buf = [0u8; 4096]; + + // Fixed positions in the poll set: 0 = wake fd, 1 = listener fd + // (revents ignored — see below), 2.. = subscribers wanting POLLOUT. + const WAKE_FD_IDX: usize = 0; + const SUB_FDS_START: usize = 2; + + loop { + // Build the poll set fresh each iteration: wake fd (POLLIN), + // listener fd (POLLIN), each subscriber that wants POLLOUT. + let mut fds: Vec = Vec::with_capacity(SUB_FDS_START + subs.len()); + fds.push(PollFd::new(wake_rx.as_fd(), PollFlags::POLLIN)); + fds.push(PollFd::new(listener.as_fd(), PollFlags::POLLIN)); + let mut sub_pollfd_idx: Vec = Vec::with_capacity(subs.len()); + for (i, sub) in subs.iter().enumerate() { + if sub.wants_pollout() { + fds.push(PollFd::new(sub.as_fd(), PollFlags::POLLOUT)); + sub_pollfd_idx.push(i); + } + } + + match poll::poll(&mut fds, PollTimeout::NONE) { + Ok(_) => {} + Err(Errno::EINTR) => continue, + Err(e) => panic!("events sink poll: {:?}", e), + } + + // Extract revents into owned values before any mutation: each + // `PollFd<'fd>` borrows from the fd source (including `subs`), so + // we drop `fds` before accept / broadcast / drive can run. We + // ignore the listener fd's revents deliberately; see the + // always-accept comment below. + let wake_revents = fds[WAKE_FD_IDX].revents().unwrap_or(PollFlags::empty()); + let sub_revents: Vec = (0..sub_pollfd_idx.len()) + .map(|k| fds[SUB_FDS_START + k].revents().unwrap_or(PollFlags::empty())) + .collect(); + drop(fds); + + // POLLHUP on wake_rx fires when the bus is dropped (write end + // closes); falling through without entering the drain branch + // would leave POLLHUP latched and `poll()` returning immediately + // forever. Treat any wake-fd revent as "go read it" — read will + // return Ok(0) on EOF and we exit cleanly via that path. + let wake_ready = !wake_revents.is_empty(); + + // Always drain pending accepts before processing wake/broadcast. + // The listener is in the poll set so its POLLIN wakes us, but we + // don't trust the revent for *whether* to accept: under load, + // listener POLLIN can lag behind `connect(2)` returning, and if + // we were woken by the wake-fd alone we still want to catch any + // queued connections so the same iteration's broadcast reaches + // them. The accept syscall is cheap (returns WouldBlock + // immediately when the queue is empty). + loop { + match listener.accept() { + Ok((stream, _addr)) => match SubscriberWriter::new(stream) { + Ok(sub) => subs.push(sub), + Err(e) => warn!("registering events subscriber: {:?}", e), + }, + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => { + error!("events listener accept: {:?}", e); + break; } } - Err(e) => { - error!("events listener accept failed: {:?}", e); - break; + } + + if wake_ready { + // Drain the wake pipe. Ok(0) means the EventBus was dropped; + // no more events will ever arrive — exit cleanly. + loop { + match unistd::read(&wake_rx, &mut wake_buf) { + Ok(0) => return, + Ok(_) => continue, + Err(Errno::EAGAIN) => break, + Err(Errno::EINTR) => continue, + Err(e) => panic!("events sink reading wake fd: {:?}", e), + } + } + // Drain event channel and broadcast. After each enqueue, if + // the sub's pending was empty before, drive opportunistically: + // a fast consumer's kernel buffer is likely ready, so the + // event flushes immediately and the next enqueue starts from + // empty. Without this, a burst larger than SUBSCRIBER_QUEUE_DEPTH + // would overflow even healthy subs because broadcast enqueues + // every event before any drive runs. + while let Ok(line) = event_rx.try_recv() { + for sub in subs.iter_mut() { + if sub.dropped { + continue; + } + let was_empty = !sub.wants_pollout(); + if let Err(Overflow::CapExceeded) = sub.enqueue(Arc::clone(&line)) { + warn!("dropping events subscriber: queue full"); + sub.dropped = true; + continue; + } + if was_empty { + if let Err(e) = sub.drive() { + info!("events subscriber gone: {:?}", e); + sub.dropped = true; + } + } + } + } + } + + // Drive writes for subs whose POLLOUT (or error) fired. + for (k, &i) in sub_pollfd_idx.iter().enumerate() { + // Short-circuit: a sub marked dropped during the broadcast + // above (overflow or opportunistic-drive failure) is removed + // by `subs.retain` at the end of this iteration anyway, but + // driving it again here would cost a needless `write(2)` on + // a likely-broken stream. + if subs[i].dropped { + continue; + } + let revents = sub_revents[k]; + if revents.intersects(PollFlags::POLLERR | PollFlags::POLLHUP | PollFlags::POLLNVAL) { + info!("events subscriber gone: peer error/hangup"); + subs[i].dropped = true; + } else if revents.contains(PollFlags::POLLOUT) { + if let Err(e) = subs[i].drive() { + info!("events subscriber gone: {:?}", e); + subs[i].dropped = true; + } } } + + subs.retain(|sub| !sub.dropped); } } -/// Set the write timeout and spawn a thread that drains `receiver` to -/// `stream` until either side closes or a write times out. -fn spawn_writer(stream: UnixStream, receiver: Receiver>) -> anyhow::Result<()> { - stream.set_write_timeout(Some(WRITE_TIMEOUT)).context("setting write timeout")?; - thread::Builder::new() - .name("events-writer".into()) - .spawn(move || run_writer(stream, receiver)) - .context("spawning events writer thread")?; - Ok(()) +/// Per-subscriber state owned by the sink. Exposes an event-shaped surface +/// (`enqueue`, `drive`, `wants_pollout`); the partial-write state machine +/// (offset into `pending.front()`) is hidden from the sink loop. +struct SubscriberWriter { + stream: UnixStream, + pending: VecDeque>, + front_offset: usize, + /// Set when the sink decides this sub should be removed; the actual + /// `Vec` removal happens at end-of-iteration via `subs.retain`. We + /// can't remove mid-iteration because `sub_pollfd_idx` holds indices + /// into `subs` and would shift. + dropped: bool, +} + +#[derive(Debug)] +enum Overflow { + CapExceeded, +} + +#[derive(Debug, Eq, PartialEq)] +enum DriveOutcome { + AllFlushed, + WouldBlock, } -fn run_writer(mut stream: UnixStream, receiver: Receiver>) { - while let Ok(line) = receiver.recv() { - if let Err(e) = stream.write_all(line.as_bytes()) { - info!("events subscriber gone: {:?}", e); - break; +impl SubscriberWriter { + fn new(stream: UnixStream) -> io::Result { + stream.set_nonblocking(true)?; + Ok(Self { stream, pending: VecDeque::new(), front_offset: 0, dropped: false }) + } + + fn enqueue(&mut self, line: Arc) -> Result<(), Overflow> { + if self.pending.len() >= SUBSCRIBER_QUEUE_DEPTH { + return Err(Overflow::CapExceeded); + } + self.pending.push_back(line); + Ok(()) + } + + fn drive(&mut self) -> io::Result { + drive_pending(&mut self.stream, &mut self.pending, &mut self.front_offset) + } + + fn wants_pollout(&self) -> bool { + !self.pending.is_empty() + } +} + +impl AsFd for SubscriberWriter { + fn as_fd(&self) -> BorrowedFd<'_> { + self.stream.as_fd() + } +} + +/// Drains `pending` into `stream` via non-blocking writes, advancing +/// `front_offset` for short writes. Generic over `Write` so the partial- +/// write state machine is unit-testable in isolation against a fake stream. +fn drive_pending( + stream: &mut W, + pending: &mut VecDeque>, + front_offset: &mut usize, +) -> io::Result { + while let Some(front) = pending.front() { + let bytes = &front.as_bytes()[*front_offset..]; + match stream.write(bytes) { + Ok(0) => { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "events subscriber stream returned 0 bytes", + )); + } + Ok(n) => { + *front_offset += n; + if *front_offset >= front.len() { + pending.pop_front(); + *front_offset = 0; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + return Ok(DriveOutcome::WouldBlock); + } + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), } } + Ok(DriveOutcome::AllFlushed) +} + +/// Wire-form `Arc` per `Event` variant, lazily built once via serde +/// (so the wire format stays driven by the `#[serde(rename = ...)]` +/// annotations) and cloned cheaply on every publish thereafter. Each +/// `Arc::clone` on the hot path is one atomic increment — no allocation, +/// no serialization. +fn serialize_line(event: &Event) -> Arc { + fn build(e: Event) -> Arc { + let s = serde_json::to_string(&e).expect("Event variants are infallible to serialize"); + Arc::from(format!("{s}\n")) + } + static CREATED: LazyLock> = LazyLock::new(|| build(Event::SessionCreated)); + static ATTACHED: LazyLock> = LazyLock::new(|| build(Event::SessionAttached)); + static DETACHED: LazyLock> = LazyLock::new(|| build(Event::SessionDetached)); + static REMOVED: LazyLock> = LazyLock::new(|| build(Event::SessionRemoved)); + match event { + Event::SessionCreated => Arc::clone(&CREATED), + Event::SessionAttached => Arc::clone(&ATTACHED), + Event::SessionDetached => Arc::clone(&DETACHED), + Event::SessionRemoved => Arc::clone(&REMOVED), + } +} + +fn make_self_pipe() -> io::Result<(OwnedFd, OwnedFd)> { + // pipe2(2) atomically creates an anonymous pipe with both flags set + // on each end: + // - O_NONBLOCK: the publisher's wake-byte write must never stall, and the sink + // detects "drained" via EAGAIN on read instead of blocking. + // - O_CLOEXEC: forked children (shells) shouldn't inherit these fds and hold + // the wake pipe open past the daemon exiting. + // map_err transcodes nix's Errno into std::io::Error to match the + // module's io::Result idiom. + unistd::pipe2(OFlag::O_NONBLOCK | OFlag::O_CLOEXEC).map_err(io::Error::from) } #[cfg(test)] mod tests { use super::*; + use std::{ + io::Read, + time::{Duration, Instant}, + }; fn json(event: &Event) -> String { serde_json::to_string(event).unwrap() } + /// Per-test scaffolding: tempdir + socket path + bus + listener guard. + /// Holds RAII handles so the socket file is unlinked and the dir is + /// cleaned up at end-of-scope. + struct Harness { + _dir: tempfile::TempDir, + _guard: ListenerGuard, + path: PathBuf, + bus: Arc, + } + + fn harness() -> Harness { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.socket"); + let bus = EventBus::new().unwrap(); + let _guard = start_listener(path.clone(), Arc::clone(&bus)).unwrap(); + Harness { _dir: dir, _guard, path, bus } + } + + /// Read timeout used for all blocking reads in tests. Generous so + /// loaded CI machines don't trip the assertion before the sink has a + /// chance to broadcast. + const READ_TIMEOUT: Duration = Duration::from_secs(10); + + /// Wire form of `Event::SessionCreated` — the most-published event in + /// the test suite. + const CREATED_LINE: &str = "{\"type\":\"session.created\"}\n"; + + fn read_line(stream: &mut UnixStream) -> String { + stream.set_read_timeout(Some(READ_TIMEOUT)).unwrap(); + let mut reader = BufReader::new(stream); + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + line + } + + fn read_n_lines(stream: &mut UnixStream, n: usize) -> Vec { + stream.set_read_timeout(Some(READ_TIMEOUT)).unwrap(); + let mut reader = BufReader::new(stream); + (0..n) + .map(|_| { + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + line + }) + .collect() + } + + /// Connect a fresh subscriber, then publish a probe event and consume + /// it. Returning means the sink has accepted the connection and + /// broadcast at least one event to it. Probe is `SessionCreated`; + /// callers continue with their own publishes from a clean stream. + fn connect_registered(path: &Path, bus: &EventBus) -> UnixStream { + let mut stream = UnixStream::connect(path).unwrap(); + // Sleep briefly so the OS schedules the sink thread to accept + // this connection before the publisher's wake byte arrives. + // `thread::yield_now()` is only a scheduler hint; under heavy + // parallel-test load it isn't reliable. A 1ms sleep guarantees + // a context switch. + thread::sleep(Duration::from_millis(1)); + bus.publish(&Event::SessionCreated); + let _ = read_line(&mut stream); + stream + } + + /// Like `connect_registered` but for `n` subscribers in one round-trip: + /// connect them all (queued in the listener backlog), publish one + /// probe, read it from each. The sink's accept-before-wake order + /// guarantees all queued subs are registered before the probe is + /// broadcast. + fn connect_n_registered(path: &Path, bus: &EventBus, n: usize) -> Vec { + let mut streams: Vec = + (0..n).map(|_| UnixStream::connect(path).unwrap()).collect(); + thread::sleep(Duration::from_millis(1)); + bus.publish(&Event::SessionCreated); + for s in streams.iter_mut() { + let _ = read_line(s); + } + streams + } + #[test] fn session_created_serializes_with_only_type() { assert_eq!(json(&Event::SessionCreated), r#"{"type":"session.created"}"#); @@ -230,203 +562,536 @@ mod tests { #[test] fn bus_publish_with_no_subscribers_is_a_noop() { - let bus = EventBus::new(); + let bus = EventBus::new().unwrap(); bus.publish(&Event::SessionCreated); } #[test] fn bus_publish_reaches_subscriber() { - let bus = EventBus::new(); - let rx = bus.register(); - bus.publish(&Event::SessionCreated); - let line = rx.recv().unwrap(); - assert_eq!(&*line, "{\"type\":\"session.created\"}\n"); + let h = harness(); + let mut stream = connect_registered(&h.path, &h.bus); + h.bus.publish(&Event::SessionCreated); + assert_eq!(read_line(&mut stream), CREATED_LINE); } #[test] - fn bus_drops_subscriber_whose_queue_is_full() { - let bus = EventBus::new(); - let rx = bus.register(); - for _ in 0..SUBSCRIBER_QUEUE_DEPTH { - bus.publish(&Event::SessionCreated); - } - assert_eq!(bus.subscribers.lock().len(), 1); - bus.publish(&Event::SessionCreated); - assert_eq!(bus.subscribers.lock().len(), 0); - drop(rx); - } + fn bus_drops_subscriber_whose_peer_closed() { + let h = harness(); + let victim = connect_registered(&h.path, &h.bus); + let mut probe = connect_registered(&h.path, &h.bus); - #[test] - fn bus_drops_subscriber_whose_receiver_hung_up() { - let bus = EventBus::new(); - let rx = bus.register(); - drop(rx); - bus.publish(&Event::SessionCreated); - assert_eq!(bus.subscribers.lock().len(), 0); + drop(victim); + h.bus.publish(&Event::SessionAttached); + assert_eq!(read_line(&mut probe), "{\"type\":\"session.attached\"}\n"); } #[test] - fn bus_publish_reaches_every_subscriber() { - let bus = EventBus::new(); - let rx_a = bus.register(); - let rx_b = bus.register(); - bus.publish(&Event::SessionCreated); - for rx in [&rx_a, &rx_b] { - let line = rx.recv().unwrap(); - assert_eq!(&*line, "{\"type\":\"session.created\"}\n"); + fn bus_publish_with_many_subscribers_is_not_quadratic() { + // Publish is `try_send` + 1-byte wake — independent of N. A + // regression that re-introduced per-sub work in publish would + // make these timings explode. + let h = harness(); + let n = 200; + let _streams = connect_n_registered(&h.path, &h.bus, n); + + let start = Instant::now(); + for _ in 0..1000 { + h.bus.publish(&Event::SessionCreated); } + let elapsed = start.elapsed(); + assert!( + elapsed < Duration::from_millis(100), + "1000 publishes with {n} subs took {elapsed:?}" + ); } #[test] - fn writer_exits_when_peer_closes_stream() { - let (a, b) = UnixStream::pair().unwrap(); - let (tx, rx) = mpsc::sync_channel::>(8); - let handle = thread::spawn(move || run_writer(a, rx)); - drop(b); - // The send may succeed (kernel buffered) or fail; what matters is - // that closing the channel unblocks the writer thread on the next - // recv, regardless of write outcome. - let _ = tx.try_send("ignored\n".into()); - drop(tx); - handle.join().unwrap(); - } + fn bus_concurrent_publish_under_outer_lock_delivers_all_events() { + // Publish takes no internal lock, so an outer lock can't deadlock + // with anything inside the bus. + let h = harness(); + let mut stream = connect_registered(&h.path, &h.bus); - #[test] - fn spawn_writer_sets_write_timeout() { - let (a, _b) = UnixStream::pair().unwrap(); - let probe = a.try_clone().unwrap(); - let (_tx, rx) = mpsc::sync_channel(1); - spawn_writer(a, rx).unwrap(); - assert_eq!(probe.write_timeout().unwrap(), Some(WRITE_TIMEOUT)); + let outer: Arc> = Arc::new(Mutex::new(())); + let n_threads = 4; + let n_per_thread = 8; + let total = n_threads * n_per_thread; + + let handles: Vec<_> = (0..n_threads) + .map(|_| { + let bus = Arc::clone(&h.bus); + let outer = Arc::clone(&outer); + thread::spawn(move || { + for _ in 0..n_per_thread { + let _g = outer.lock(); + bus.publish(&Event::SessionCreated); + } + }) + }) + .collect(); + for handle in handles { + handle.join().unwrap(); + } + + for line in read_n_lines(&mut stream, total) { + assert_eq!(line, CREATED_LINE); + } } #[test] fn listener_guard_unlinks_socket_on_drop() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("events.socket"); - let guard = start_listener(path.clone(), EventBus::new()).unwrap(); + let guard = start_listener(path.clone(), EventBus::new().unwrap()).unwrap(); assert!(path.exists(), "socket file should exist while guard is alive"); drop(guard); assert!(!path.exists(), "socket file should be unlinked on guard drop"); } #[test] - fn bus_publish_with_many_subscribers_is_not_quadratic() { - let bus = EventBus::new(); - let n = 10_000; - let mut rxs = Vec::with_capacity(n); - for _ in 0..n { - rxs.push(bus.register()); + fn accept_loop_registers_concurrent_subscribers() { + let h = harness(); + let n = 20; + let mut streams: Vec = (0..n) + .map(|_| { + let path = h.path.clone(); + thread::spawn(move || UnixStream::connect(&path).unwrap()) + }) + .collect::>() + .into_iter() + .map(|jh| jh.join().unwrap()) + .collect(); + + // Probe to verify each concurrently-dialed sub is registered: the + // sink's accept-before-wake order ensures all queued connects + // join `subs` before this publish broadcasts. + h.bus.publish(&Event::SessionCreated); + for stream in streams.iter_mut() { + assert_eq!(read_line(stream), CREATED_LINE); } - let start = std::time::Instant::now(); - bus.publish(&Event::SessionCreated); - let elapsed = start.elapsed(); - // Measured ~4 ms on a 2.1 GHz CPU; the 50 ms bound absorbs CI - // tail latency while still catching gross regressions (e.g. an - // accidental O(N^2) would be on the order of seconds at N=10K). - assert!(elapsed < Duration::from_millis(50), "publish to {n} subscribers took {elapsed:?}"); } #[test] - fn bus_drops_slow_subscriber_on_overflow_without_affecting_fast() { - // This test is O(SUBSCRIBER_QUEUE_DEPTH); cap the constant so a - // future bump (e.g. after moving to a dynamically-grown queue) doesn't make - // this test unboundedly slow. - assert!(SUBSCRIBER_QUEUE_DEPTH < 1024); - - let bus = EventBus::new(); - let slow_rx = bus.register(); - let fast_rx = bus.register(); - - // Fill slow's queue while draining fast each iteration so only slow - // accumulates a backlog. - for _ in 0..SUBSCRIBER_QUEUE_DEPTH { - bus.publish(&Event::SessionCreated); - assert_eq!(&*fast_rx.recv().unwrap(), "{\"type\":\"session.created\"}\n"); + fn burst_load_within_capacity_reaches_every_subscriber() { + let h = harness(); + let m = 4; + let mut streams = connect_n_registered(&h.path, &h.bus, m); + + // Stay under SUBSCRIBER_QUEUE_DEPTH with margin so no sub is dropped. + let n_events = 32; + for _ in 0..n_events { + h.bus.publish(&Event::SessionCreated); + } + let expected = CREATED_LINE; + for stream in streams.iter_mut() { + for line in read_n_lines(stream, n_events) { + assert_eq!(line, expected); + } } - assert_eq!(bus.subscribers.lock().unwrap().len(), 2); + } - // The next publish overflows slow's queue and drops it. Fast's queue - // is empty so it keeps receiving. - bus.publish(&Event::SessionCreated); - assert_eq!(&*fast_rx.recv().unwrap(), "{\"type\":\"session.created\"}\n"); - assert_eq!(bus.subscribers.lock().unwrap().len(), 1); + #[test] + fn events_arrive_in_publish_order() { + let h = harness(); + let mut stream = connect_registered(&h.path, &h.bus); + + h.bus.publish(&Event::SessionCreated); + h.bus.publish(&Event::SessionAttached); + h.bus.publish(&Event::SessionDetached); + h.bus.publish(&Event::SessionRemoved); - drop(slow_rx); + let lines = read_n_lines(&mut stream, 4); + assert_eq!( + lines, + [ + CREATED_LINE, + "{\"type\":\"session.attached\"}\n", + "{\"type\":\"session.detached\"}\n", + "{\"type\":\"session.removed\"}\n", + ] + ); } #[test] - fn accept_loop_registers_concurrent_subscribers() { - use std::io::Read; - + fn slow_subscriber_drop_does_not_affect_fast_through_sink() { + // The harness uses `start_listener`; this test bypasses it so the + // listener is bound directly with the desired socket options. let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("events.socket"); - let bus = EventBus::new(); - let _guard = start_listener(path.clone(), Arc::clone(&bus)).unwrap(); + let bus = EventBus::new().unwrap(); - let n = 20; - let dial_handles: Vec<_> = (0..n) - .map(|_| { - let path = path.clone(); - thread::spawn(move || UnixStream::connect(&path).unwrap()) - }) - .collect(); - let streams: Vec = - dial_handles.into_iter().map(|h| h.join().unwrap()).collect(); + let listener = UnixListener::bind(&path).unwrap(); + listener.set_nonblocking(true).unwrap(); + let handles = bus.take_sink_handles().unwrap(); + thread::Builder::new() + .name("events-sink".into()) + .spawn(move || run_sink(listener, handles)) + .unwrap(); - // Wait for the accept thread to register all subscribers. - let deadline = std::time::Instant::now() + Duration::from_secs(2); - loop { - let count = bus.subscribers.lock().unwrap().len(); - if count >= n { - break; + // Small SO_RCVBUF on slow caps how much un-read data the kernel + // will buffer en-route to it; the sink's writes start returning + // EAGAIN, slow's pending grows to cap, slow is dropped. + let _slow = UnixStream::connect(&path).unwrap(); + nix::sys::socket::setsockopt(&_slow, nix::sys::socket::sockopt::RcvBuf, &1024).unwrap(); + let mut fast = UnixStream::connect(&path).unwrap(); + + // Probe both subs (fast reads it; slow's tiny buffer fits one + // event of ~30 bytes). + bus.publish(&Event::SessionCreated); + let _ = read_line(&mut fast); + + // Interleave publish + read so fast's kernel buffer never fills + // (the default AF_UNIX RcvBuf is small enough that buffering K + // events without reading would overflow fast's pending and cause + // the sink to drop fast). Slow gets enqueued every iteration too; + // its small RcvBuf forces drive into EAGAIN within a few dozen + // events, then pending overflows and the sink drops slow. The + // test passes only if fast continues receiving while slow is + // overflowing or after slow is dropped. + let expected = CREATED_LINE; + fast.set_read_timeout(Some(Duration::from_secs(10))).unwrap(); + let mut reader = BufReader::new(&mut fast); + for _ in 0..1000 { + bus.publish(&Event::SessionCreated); + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + assert_eq!(line, expected); + } + } + + /// Fake writer: per-call accept counts. Each `write` consumes the next + /// entry: `Some(n)` accepts `min(n, buf.len())` bytes; `None` (or 0) + /// returns WouldBlock; `Err` returns the given error. + struct FakeWriter { + plan: VecDeque, + written: Vec, + } + + enum FakeWrite { + Accept(usize), + WouldBlock, + Interrupted, + Err(io::ErrorKind), + } + + impl Write for FakeWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + match self.plan.pop_front() { + Some(FakeWrite::Accept(n)) => { + let take = n.min(buf.len()); + self.written.extend_from_slice(&buf[..take]); + Ok(take) + } + Some(FakeWrite::WouldBlock) | None => { + Err(io::Error::new(io::ErrorKind::WouldBlock, "fake EAGAIN")) + } + Some(FakeWrite::Interrupted) => { + Err(io::Error::new(io::ErrorKind::Interrupted, "fake EINTR")) + } + Some(FakeWrite::Err(kind)) => Err(io::Error::new(kind, "fake error")), } - assert!( - std::time::Instant::now() < deadline, - "only {count}/{n} subscribers registered before timeout" - ); - thread::sleep(Duration::from_millis(10)); } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } - bus.publish(&Event::SessionCreated); - let expected = b"{\"type\":\"session.created\"}\n"; - for mut stream in streams { - stream.set_read_timeout(Some(Duration::from_secs(2))).unwrap(); - let mut buf = vec![0u8; expected.len()]; - stream.read_exact(&mut buf).unwrap(); - assert_eq!(buf.as_slice(), expected); + #[test] + fn drive_pending_flushes_a_single_complete_write() { + let mut w = FakeWriter { plan: VecDeque::from([FakeWrite::Accept(100)]), written: vec![] }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("hello\n")]); + let mut offset = 0; + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::AllFlushed); + assert_eq!(w.written, b"hello\n"); + assert!(pending.is_empty()); + assert_eq!(offset, 0); + } + + #[test] + fn drive_pending_resumes_after_partial_then_wouldblock() { + // Accept 3, then EAGAIN; resume across two further drives. + let mut w = FakeWriter { + plan: VecDeque::from([ + FakeWrite::Accept(3), + FakeWrite::WouldBlock, + FakeWrite::Accept(3), + FakeWrite::WouldBlock, + ]), + written: vec![], + }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("hello\n")]); + let mut offset = 0; + + // 1st: writes "hel", then WouldBlock. + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::WouldBlock); + assert_eq!(w.written, b"hel"); + assert_eq!(offset, 3); + assert_eq!(pending.len(), 1); + + // 2nd: writes "lo\n" (the remainder), then WouldBlock with empty pending. + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::AllFlushed); + assert_eq!(w.written, b"hello\n"); + assert_eq!(offset, 0); + assert!(pending.is_empty()); + } + + #[test] + fn drive_pending_retries_on_eintr() { + let mut w = FakeWriter { + plan: VecDeque::from([FakeWrite::Interrupted, FakeWrite::Accept(100)]), + written: vec![], + }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("ok\n")]); + let mut offset = 0; + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::AllFlushed); + assert_eq!(w.written, b"ok\n"); + } + + #[test] + fn drive_pending_propagates_other_errors() { + let mut w = FakeWriter { + plan: VecDeque::from([FakeWrite::Err(io::ErrorKind::BrokenPipe)]), + written: vec![], + }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("x\n")]); + let mut offset = 0; + let err = drive_pending(&mut w, &mut pending, &mut offset).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); + } + + #[test] + fn drive_pending_treats_zero_byte_write_as_error() { + let mut w = FakeWriter { plan: VecDeque::from([FakeWrite::Accept(0)]), written: vec![] }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("x\n")]); + let mut offset = 0; + // Note: FakeWrite::Accept(0) returns Ok(0) (not WouldBlock) because + // we haven't written WouldBlock to the plan; this is a write-zero + // signal which `drive_pending` must treat as an error. + let err = drive_pending(&mut w, &mut pending, &mut offset).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::WriteZero); + } + + /// Property-style test: an arbitrary-shaped sequence of partial / + /// EAGAIN / EINTR / full-accept responses, paired with new enqueues + /// interleaved between drives, must eventually flush every enqueued + /// byte once the plan ends with enough accepts to drain. + #[test] + fn drive_pending_fuzz_eventually_flushes_everything() { + // Hand-rolled deterministic interleavings covering the + // "queue-state-changed / POLLOUT-requested" atomicity: each row is + // a sequence of (op, write-plan-entries) where op is "drive once" + // or "enqueue X". After running the whole script, drive to + // completion with a generous accept-all and assert every enqueued + // byte was written, in order. + enum Op { + Enqueue(&'static str), + Drive(Vec), + } + use FakeWrite::*; + + let scripts: Vec> = vec![ + vec![Op::Enqueue("a\n"), Op::Drive(vec![Accept(1), WouldBlock])], + vec![ + Op::Enqueue("a\n"), + Op::Drive(vec![Accept(1)]), + Op::Enqueue("b\n"), + Op::Drive(vec![Accept(1), Interrupted, Accept(2), Accept(2)]), + ], + vec![ + Op::Enqueue("hello\n"), + Op::Drive(vec![WouldBlock]), + Op::Enqueue("world\n"), + Op::Drive(vec![Accept(2), Interrupted, WouldBlock]), + Op::Enqueue("again\n"), + ], + vec![ + Op::Enqueue("x\n"), + Op::Enqueue("y\n"), + Op::Enqueue("z\n"), + Op::Drive(vec![Accept(1), Accept(1), WouldBlock]), + Op::Drive(vec![Accept(4), Accept(2)]), + ], + ]; + + for (i, script) in scripts.into_iter().enumerate() { + let mut pending: VecDeque> = VecDeque::new(); + let mut offset = 0; + let mut all_written = Vec::new(); + let mut expected = Vec::new(); + + for op in script { + match op { + Op::Enqueue(s) => { + pending.push_back(Arc::from(s)); + expected.extend_from_slice(s.as_bytes()); + } + Op::Drive(plan) => { + let mut w = FakeWriter { plan: plan.into(), written: vec![] }; + let _ = drive_pending(&mut w, &mut pending, &mut offset); + all_written.extend_from_slice(&w.written); + } + } + } + + // Final drive with unlimited accept across many calls; + // drive_pending invokes `write` once per pending entry. + let mut w = FakeWriter { + plan: std::iter::repeat_with(|| FakeWrite::Accept(usize::MAX)).take(64).collect(), + written: vec![], + }; + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::AllFlushed, "script {i}"); + all_written.extend_from_slice(&w.written); + + assert_eq!(all_written, expected, "script {i}: bytes lost or reordered"); + assert!(pending.is_empty(), "script {i}: pending not drained"); + assert_eq!(offset, 0, "script {i}: offset not reset"); } } #[test] - fn bus_concurrent_publish_under_outer_lock_delivers_all_events() { - let bus = EventBus::new(); - let rx = bus.register(); - let outer: Arc> = Arc::new(Mutex::new(())); + fn enqueue_overflows_at_cap() { + let (a, _b) = UnixStream::pair().unwrap(); + let mut sub = SubscriberWriter::new(a).unwrap(); + for i in 0..SUBSCRIBER_QUEUE_DEPTH { + sub.enqueue(format!("event-{i}\n").into()).unwrap(); + } + let err = sub.enqueue("one-too-many\n".into()); + assert!(matches!(err, Err(Overflow::CapExceeded))); + } - let n_threads = 4; - let n_per_thread = 8; - let total = n_threads * n_per_thread; + #[test] + fn subscriber_writer_overflows_when_peer_blocks() { + // Use a socket pair so we can shrink the *server-side* send buffer + // (a listener-accepted socket isn't reachable from outside the + // sink). With the peer never reading, drive() eventually returns + // WouldBlock; pending grows past the cap and enqueue fails. + let (server, _client) = UnixStream::pair().unwrap(); + nix::sys::socket::setsockopt(&server, nix::sys::socket::sockopt::SndBuf, &1024).unwrap(); + let mut sub = SubscriberWriter::new(server).unwrap(); - let handles: Vec<_> = (0..n_threads) - .map(|_| { - let bus = Arc::clone(&bus); - let outer = Arc::clone(&outer); - thread::spawn(move || { - for _ in 0..n_per_thread { - let _g = outer.lock().unwrap(); - bus.publish(&Event::SessionCreated); + let line: Arc = CREATED_LINE.into(); + let mut overflowed = false; + for _ in 0..(SUBSCRIBER_QUEUE_DEPTH * 1000) { + let was_empty = !sub.wants_pollout(); + match sub.enqueue(Arc::clone(&line)) { + Ok(()) => { + if was_empty { + let _ = sub.drive(); } - }) - }) - .collect(); - for h in handles { - h.join().unwrap(); + } + Err(Overflow::CapExceeded) => { + overflowed = true; + break; + } + } } + assert!(overflowed, "expected SubscriberWriter to overflow"); + } + + #[test] + fn subscriber_writer_resumes_after_peer_drains() { + // After a SubscriberWriter has piled up pending against a full + // kernel buffer, the next `drive()` must make progress once the + // peer drains. Pairs the fake-stream resume property + // (`drive_pending_resumes_after_partial_then_wouldblock`) with a + // real socket. + let (server, client) = UnixStream::pair().unwrap(); + nix::sys::socket::setsockopt(&server, nix::sys::socket::sockopt::SndBuf, &1024).unwrap(); + client.set_read_timeout(Some(Duration::from_secs(2))).unwrap(); + let mut sub = SubscriberWriter::new(server).unwrap(); - for _ in 0..total { - rx.recv().unwrap(); + let line: Arc = CREATED_LINE.into(); + for _ in 0..SUBSCRIBER_QUEUE_DEPTH { + sub.enqueue(Arc::clone(&line)).unwrap(); } - assert!(rx.try_recv().is_err()); + + // First drive: kernel buffer fills, returns WouldBlock with + // pending non-empty. + assert_eq!(sub.drive().unwrap(), DriveOutcome::WouldBlock); + let pending_after_first = sub.pending.len(); + assert!(pending_after_first > 0); + + // Drain whatever the peer can read in one shot. + let mut buf = vec![0u8; 4096]; + let drained = (&client).read(&mut buf).unwrap(); + assert!(drained > 0); + + // Second drive: must shrink pending (proves resume). + let _ = sub.drive().unwrap(); + assert!( + sub.pending.len() < pending_after_first, + "drive must advance pending after peer drains: {} -> {}", + pending_after_first, + sub.pending.len() + ); + } + + #[test] + fn fast_writer_unaffected_when_slow_overflows() { + // Slow: small SndBuf, peer never reads → drive blocks → pending + // grows to cap → enqueue fails. + // Fast: default SndBuf, peer drains → drive flushes → pending + // stays empty → enqueue never fails. + let (slow_server, _slow_client) = UnixStream::pair().unwrap(); + nix::sys::socket::setsockopt(&slow_server, nix::sys::socket::sockopt::SndBuf, &1024) + .unwrap(); + let mut slow = SubscriberWriter::new(slow_server).unwrap(); + + let (fast_server, fast_client) = UnixStream::pair().unwrap(); + let mut fast = SubscriberWriter::new(fast_server).unwrap(); + + let drainer = thread::spawn(move || { + let mut buf = [0u8; 4096]; + let mut total = 0usize; + loop { + match (&fast_client).read(&mut buf) { + Ok(0) => break, + Ok(n) => total += n, + Err(_) => break, + } + } + total + }); + + let line: Arc = CREATED_LINE.into(); + let mut slow_dropped = false; + + // Mimic the sink's enqueue+opportunistic-drive pattern across both + // subs for each event. + for _ in 0..(SUBSCRIBER_QUEUE_DEPTH * 100) { + if !slow_dropped { + let was_empty = !slow.wants_pollout(); + match slow.enqueue(Arc::clone(&line)) { + Ok(()) => { + if was_empty { + let _ = slow.drive(); + } + } + Err(Overflow::CapExceeded) => { + slow_dropped = true; + } + } + } + let was_empty = !fast.wants_pollout(); + fast.enqueue(Arc::clone(&line)).expect("fast must not overflow"); + if was_empty { + let _ = fast.drive(); + } + if slow_dropped { + break; + } + } + + assert!(slow_dropped, "slow should have overflowed"); + + // Closing fast's server end lets the drainer thread exit on EOF. + drop(fast); + let bytes_received = drainer.join().unwrap(); + assert!(bytes_received > 0, "fast should have received events"); } } diff --git a/shpool/tests/support/daemon.rs b/shpool/tests/support/daemon.rs index 102761b2..7c04c582 100644 --- a/shpool/tests/support/daemon.rs +++ b/shpool/tests/support/daemon.rs @@ -154,9 +154,10 @@ impl Proc { if args.listen_events { Some(Events::new(&test_hook_socket_path)?) } else { None }; // spin until we can dial the socket successfully + let events_socket_path = socket_path.with_file_name("events.socket"); let mut sleep_dur = time::Duration::from_millis(5); for _ in 0..12 { - if UnixStream::connect(&socket_path).is_ok() { + if UnixStream::connect(&socket_path).is_ok() && events_socket_path.exists() { break; } else { std::thread::sleep(sleep_dur); @@ -245,9 +246,10 @@ impl Proc { }); // spin until we can dial the socket successfully + let events_socket_path = socket_path.with_file_name("events.socket"); let mut sleep_dur = time::Duration::from_millis(5); for _ in 0..12 { - if UnixStream::connect(&socket_path).is_ok() { + if UnixStream::connect(&socket_path).is_ok() && events_socket_path.exists() { break; } else { std::thread::sleep(sleep_dur);