diff --git a/EVENTS.md b/EVENTS.md new file mode 100644 index 00000000..481ead59 --- /dev/null +++ b/EVENTS.md @@ -0,0 +1,66 @@ +# Events + +`shpool` exposes an event stream so that external programs can react to changes +without polling. This way, a program (e.g. a TUI) can call `shpool list` (or the +equivalent `ConnectHeader::List` request over the main socket; see the +[`shpool-protocol`](./shpool-protocol) crate) after each event so that its model +is always consistent with shpool's state. + +## The events socket + +The daemon binds a sibling Unix socket next to the main shpool socket: + +```bash +/shpool/shpool.socket # main socket +/shpool/events.socket # events socket (this protocol) +``` + +A subscriber connects to `events.socket` and reads events. The daemon ignores anything written to the events socket, so for subscribers it's effectively read-only. + +## Event types + +| `type` | Meaning | +| ------------------ | -------------------------------------------------------- | +| `session.created` | A new session was added to the table. | +| `session.attached` | A client attached or reattached to a session. | +| `session.detached` | A client disconnected from a still-running session. | +| `session.removed` | A session was removed (shell exited, killed, or reaped). | + +Subscribers should ignore unknown `type` values so that future event types do +not break older consumers. + +## Wire format + +The daemon writes one JSON object per line (JSONL). Each event looks like: + +```json +{"type":""} +``` + +There are no other fields. To learn what the event refers to (which session, +when, etc.), call `shpool list` (or use `ConnectHeader::List`). + +The format is robust: literal newline characters only appear as delimiters +between events. Any newlines within JSON string values are automatically +escaped (as `\n`) by the daemon. + +## Subscribing + +For ad-hoc use, `shpool events` connects to the events socket and prints each +event line to stdout, flushing after each line: + +```bash +shpool events | while read -r ev; do + echo "got: $ev" + shpool list +done + +shpool events | jq . +``` + +## Slow subscribers + +Each subscriber has a bounded outbound queue. A subscriber that falls too far +behind is dropped by the daemon (in which case the subscriber can always reconnect). +There is no replay, so events that fired while a subscriber was disconnected are +lost. diff --git a/libshpool/src/daemon/mod.rs b/libshpool/src/daemon/mod.rs index 7b52ec71..82ce5b7c 100644 --- a/libshpool/src/daemon/mod.rs +++ b/libshpool/src/daemon/mod.rs @@ -17,7 +17,7 @@ use std::{env, os::unix::net::UnixListener, path::PathBuf}; use anyhow::Context; use tracing::{info, instrument}; -use crate::{config, consts, hooks}; +use crate::{config, consts, events, hooks}; mod etc_environment; mod exit_notify; @@ -81,8 +81,17 @@ pub fn run( (Some(socket.clone()), UnixListener::bind(&socket).context("binding to socket")?) } }; - // spawn the signal handler thread in the background - signals::Handler::new(cleanup_socket.clone()).spawn()?; + let events_socket = events::socket_path(&socket); + + // spawn the signal handler thread in the background. Both sockets need + // explicit cleanup on signal exit because the process exits before any + // RAII guard can run. + let mut socks_to_clean: Vec = cleanup_socket.iter().cloned().collect(); + socks_to_clean.push(events_socket.clone()); + signals::Handler::new(socks_to_clean).spawn().context("spawning signal handler")?; + + let _events_guard = + server.start_events_listener(events_socket).context("starting events listener")?; server::Server::serve(server, listener)?; diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 5c8d9cfc..3840f539 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -51,7 +51,7 @@ use crate::{ etc_environment, exit_notify::ExitNotifier, hooks, pager, pager::PagerError, prompt, shell, show_motd, ttl_reaper, }, - protocol, test_hooks, tty, user, + events, protocol, test_hooks, tty, user, }; const DEFAULT_INITIAL_SHELL_PATH: &str = "/usr/bin:/bin:/usr/sbin:/sbin"; @@ -76,6 +76,7 @@ pub struct Server { runtime_dir: PathBuf, register_new_reapable_session: crossbeam_channel::Sender<(String, Instant)>, hooks: Box, + events_bus: Arc, daily_messenger: Arc, log_level_handle: tracing_subscriber::reload::Handle< tracing_subscriber::filter::LevelFilter, @@ -96,13 +97,17 @@ impl Server { >, ) -> anyhow::Result> { let shells = Arc::new(Mutex::new(HashMap::new())); + let events_bus = events::EventBus::new().context("creating events bus")?; // buffered so that we are unlikely to block when setting up a // new session let (new_sess_tx, new_sess_rx) = crossbeam_channel::bounded(10); - let shells_tab = Arc::clone(&shells); - thread::spawn(move || { - if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab) { - warn!("ttl reaper exited with error: {:?}", e); + thread::spawn({ + let shells = Arc::clone(&shells); + let events_bus = Arc::clone(&events_bus); + move || { + if let Err(e) = ttl_reaper::run(new_sess_rx, shells, events_bus) { + warn!("ttl reaper exited with error: {:?}", e); + } } }); @@ -113,12 +118,22 @@ impl Server { runtime_dir, register_new_reapable_session: new_sess_tx, hooks, + events_bus, daily_messenger, log_level_handle, vars: HashMap::new().into(), })) } + /// Bind the events socket and spawn the accept thread. The returned + /// guard unlinks the socket file on drop. + pub fn start_events_listener( + self: &Arc, + socket_path: PathBuf, + ) -> anyhow::Result { + events::start_listener(socket_path, Arc::clone(&self.events_bus)) + } + #[instrument(skip_all)] pub fn serve(server: Arc, listener: UnixListener) -> anyhow::Result<()> { test_hooks::emit("daemon-about-to-listen"); @@ -333,7 +348,12 @@ impl Server { { let _s = span!(Level::INFO, "2_lock(shells)").entered(); let mut shells = self.shells.lock(); - shells.remove(&header.name); + // The publish below is gated on `is_some()` because a + // concurrent kill or reaper may have already removed the + // entry (and published) while we were waiting for the lock. + if shells.remove(&header.name).is_some() { + self.events_bus.publish(&events::Event::SessionRemoved); + } } // The child shell has exited, so the shell->client thread should @@ -354,6 +374,7 @@ impl Server { if let Some(session) = shells.get(&header.name) { session.lifecycle_timestamps.lock().last_disconnected_at = Some(time::SystemTime::now()); + self.events_bus.publish(&events::Event::SessionDetached); } } if let Err(err) = self.hooks.on_client_disconnect(&header.name) { @@ -429,6 +450,9 @@ impl Server { "child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell" ); } else { + // Reattach confirmed; the create path won't run + // and clobber the entry, so it's safe to publish. + self.events_bus.publish(&events::Event::SessionAttached); if let Err(err) = self.hooks.on_reattach(&header.name) { warn!("reattach hook: {:?}", err); } @@ -445,8 +469,6 @@ impl Server { AttachStatus::Attached { warnings }, )); } - - // status is already attached } Some(exit_status) => { // the channel is closed so we know the subshell exited @@ -503,10 +525,18 @@ impl Server { session.lifecycle_timestamps.lock().last_connected_at = Some(time::SystemTime::now()); { - // we unwrap to propagate the poison as an unwind let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered(); let mut shells = self.shells.lock(); + // If we're replacing a stale entry whose shell process is + // gone, surface that to subscribers before announcing the + // replacement. + let clobbered = shells.contains_key(&header.name); shells.insert(header.name.clone(), Box::new(session)); + if clobbered { + self.events_bus.publish(&events::Event::SessionRemoved); + } + self.events_bus.publish(&events::Event::SessionCreated); + self.events_bus.publish(&events::Event::SessionAttached); } // we unwrap to propagate the poison as an unwind @@ -604,6 +634,10 @@ impl Server { if let shell::ClientConnectionStatus::DetachNone = status { not_attached_sessions.push(session); } else { + // The bidi-loop unwind in handle_attach owns the + // SessionDetached publish; we just update + // last_disconnected_at eagerly so a concurrent list() + // reflects the detach immediately. s.lifecycle_timestamps.lock().last_disconnected_at = Some(time::SystemTime::now()); } @@ -714,6 +748,7 @@ impl Server { for session in to_remove.iter() { shells.remove(session); + self.events_bus.publish(&events::Event::SessionRemoved); } if !to_remove.is_empty() { test_hooks::emit("daemon-handle-kill-removed-shells"); @@ -729,8 +764,7 @@ impl Server { fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> { let _s = span!(Level::INFO, "lock(shells)").entered(); let shells = self.shells.lock(); - - let sessions: anyhow::Result> = shells + let sessions: Vec = shells .iter() .map(|(k, v)| { let status = match v.inner.try_lock() { @@ -743,7 +777,6 @@ impl Server { .last_connected_at .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) .transpose()?; - let last_disconnected_at_unix_ms = timestamps .last_disconnected_at .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) @@ -758,11 +791,9 @@ impl Server { status, }) }) - .collect(); - let sessions = sessions.context("collecting running session metadata")?; - + .collect::>() + .context("collecting running session metadata")?; write_reply(&mut stream, ListReply { sessions })?; - Ok(()) } diff --git a/libshpool/src/daemon/signals.rs b/libshpool/src/daemon/signals.rs index 3359883d..5e533f04 100644 --- a/libshpool/src/daemon/signals.rs +++ b/libshpool/src/daemon/signals.rs @@ -23,11 +23,11 @@ use signal_hook::{consts::TERM_SIGNALS, flag, iterator::Signals}; use tracing::{error, info}; pub struct Handler { - sock: Option, + socks: Vec, } impl Handler { - pub fn new(sock: Option) -> Self { - Handler { sock } + pub fn new(socks: Vec) -> Self { + Handler { socks } } pub fn spawn(self) -> anyhow::Result<()> { @@ -57,10 +57,12 @@ impl Handler { for signal in &mut signals { assert!(TERM_SIGNALS.contains(&signal)); - info!("term sig handler: cleaning up socket"); - if let Some(sock) = self.sock { - if let Err(e) = std::fs::remove_file(sock).context("cleaning up socket") { - error!("error cleaning up socket file: {}", e); + info!("term sig handler: cleaning up sockets"); + for sock in &self.socks { + match std::fs::remove_file(sock) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => error!("error cleaning up socket {:?}: {}", sock, e), } } diff --git a/libshpool/src/daemon/ttl_reaper.rs b/libshpool/src/daemon/ttl_reaper.rs index b85b8c21..b94ba5a6 100644 --- a/libshpool/src/daemon/ttl_reaper.rs +++ b/libshpool/src/daemon/ttl_reaper.rs @@ -31,12 +31,14 @@ use parking_lot::Mutex; use tracing::{info, span, warn, Level}; use super::shell; +use crate::events; /// Run the reaper thread loop. Should be invoked in a dedicated /// thread. pub fn run( new_sess: crossbeam_channel::Receiver<(String, Instant)>, shells: Arc>>>, + events_bus: Arc, ) -> anyhow::Result<()> { let _s = span!(Level::INFO, "ttl_reaper").entered(); @@ -116,6 +118,7 @@ pub fn run( continue; } shells.remove(&reapable.session_name); + events_bus.publish(&events::Event::SessionRemoved); } } } diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs new file mode 100644 index 00000000..253a9ed8 --- /dev/null +++ b/libshpool/src/events.rs @@ -0,0 +1,1097 @@ +//! Push-event protocol for the daemon. +//! +//! Events are published to subscribers connected to a sibling Unix socket +//! next to the main shpool socket. The wire format is JSON, one event per +//! line (newline-delimited; aka JSONL). Non-Rust clients only need a Unix +//! socket and a JSON parser to consume the stream. +//! +//! Events carry no payload beyond their type — they signal that *something* +//! changed in the session table. Subscribers learn the new state by calling +//! `shpool list` (or the equivalent over the main socket). Subscribers that +//! fall too far behind are dropped and may simply reconnect. +//! +//! Architecture: a single `events-sink` thread owns all subscriber state and +//! does all I/O via non-blocking `poll(2)`. `publish()` is O(1) on the +//! daemon's hot path: a `try_send` on a bounded channel + a 1-byte write to +//! a self-pipe to wake the sink. + +use std::{ + collections::VecDeque, + io::{self, BufRead, BufReader, Write}, + os::{ + fd::{AsFd, BorrowedFd, OwnedFd}, + unix::net::{UnixListener, UnixStream}, + }, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{self, Receiver, SyncSender, TrySendError}, + Arc, LazyLock, + }, + thread, +}; + +use anyhow::Context; +use nix::{ + errno::Errno, + fcntl::OFlag, + poll::{self, PollFd, PollFlags, PollTimeout}, + unistd, +}; +use parking_lot::Mutex; +use serde_derive::Serialize; +use tracing::{error, info, warn}; + +/// Per-subscriber outbound queue depth (events). Subscribers that fall this +/// far behind are dropped and must reconnect. +const SUBSCRIBER_QUEUE_DEPTH: usize = 64; + +/// Capacity of the publish→sink channel. Reaching it means the sink is +/// wedged — a real bug, not a tunable. +const EVENT_CHANNEL_CAP: usize = 4096; + +/// An event published on the events socket. +#[derive(Serialize, Debug)] +#[serde(tag = "type")] +#[allow(clippy::enum_variant_names)] +pub enum Event { + #[serde(rename = "session.created")] + SessionCreated, + #[serde(rename = "session.attached")] + SessionAttached, + #[serde(rename = "session.detached")] + SessionDetached, + #[serde(rename = "session.removed")] + SessionRemoved, +} + +/// Fans out events to all connected subscribers via a single sink thread +/// doing non-blocking I/O. +pub struct EventBus { + event_tx: SyncSender>, + wake_tx: OwnedFd, + sink_handles: Mutex>, + sink_dead_logged: AtomicBool, +} + +impl EventBus { + pub fn new() -> io::Result> { + let (event_tx, event_rx) = mpsc::sync_channel(EVENT_CHANNEL_CAP); + let (wake_rx, wake_tx) = make_self_pipe()?; + Ok(Arc::new(Self { + event_tx, + wake_tx, + sink_handles: Mutex::new(Some(SinkHandles { event_rx, wake_rx })), + sink_dead_logged: AtomicBool::new(false), + })) + } + + /// Broadcast `event` to all current subscribers. Non-blocking: a + /// `try_send` on the publish→sink channel + a 1-byte wake. Takes no + /// internal lock, so it is safe to call under arbitrary outer locks. + /// Publishing under the lock that protects the state being announced + /// keeps wire-order = causal-order across mutators. + pub fn publish(&self, event: &Event) { + match self.event_tx.try_send(serialize_line(event)) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + warn!("events channel full; sink is wedged"); + return; + } + Err(TrySendError::Disconnected(_)) => { + // Relaxed is sufficient: atomic RMW on a single location + // is linearizable regardless of ordering, so racing swaps + // already see distinct previous values — exactly one + // observes `false` and logs. Stronger ordering (e.g. + // AcqRel) would only matter if we needed happens-before + // with surrounding non-atomic memory, which we don't. + if !self.sink_dead_logged.swap(true, Ordering::Relaxed) { + error!("events sink died; subsequent events will be dropped"); + } + return; + } + } + // Wake the sink. If the pipe's kernel buffer is full (sink is + // already buried) our 1-byte signal is redundant — drop it. + let _ = unistd::write(&self.wake_tx, b"\0"); + } + + fn take_sink_handles(&self) -> Option { + self.sink_handles.lock().take() + } +} + +/// Connect to the events socket, copy each line to stdout, and flush per +/// line so the stream is usable in pipes (`shpool events | jq`). Returns +/// when the daemon closes the connection. +pub fn subscribe_to_stdout(socket_path: &Path) -> anyhow::Result<()> { + let stream = UnixStream::connect(socket_path) + .with_context(|| format!("connecting to events socket {:?}", socket_path))?; + let reader = BufReader::new(stream); + let stdout = io::stdout(); + let mut out = stdout.lock(); + for line in reader.lines() { + let line = line.context("reading event")?; + writeln!(out, "{line}").context("writing event")?; + out.flush().context("flushing stdout")?; + } + Ok(()) +} + +/// Sibling events socket path next to the main shpool socket. +pub fn socket_path(main_socket: &Path) -> PathBuf { + let mut path = main_socket.to_path_buf(); + path.set_file_name("events.socket"); + path +} + +/// Owns the events socket file. Dropping the guard unlinks the socket +/// path so a fresh daemon doesn't trip on stale files. The sink thread is +/// not stopped — daemon shutdown takes the process down. +pub struct ListenerGuard { + path: PathBuf, +} + +impl Drop for ListenerGuard { + fn drop(&mut self) { + match std::fs::remove_file(&self.path) { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::NotFound => {} + Err(e) => warn!("removing events socket {:?}: {:?}", self.path, e), + } + } +} + +/// Bind the events socket and spawn the sink thread. The sink owns the +/// listener fd and all subscriber state; it accepts connections and drives +/// non-blocking writes via `poll(2)`. The returned guard unlinks the +/// socket file on drop. +pub fn start_listener(socket_path: PathBuf, bus: Arc) -> anyhow::Result { + if socket_path.exists() { + std::fs::remove_file(&socket_path) + .with_context(|| format!("removing stale events socket {:?}", socket_path))?; + } + let listener = UnixListener::bind(&socket_path) + .with_context(|| format!("binding events socket {:?}", socket_path))?; + listener.set_nonblocking(true).context("setting events listener non-blocking")?; + info!("events socket listening at {:?}", socket_path); + + let handles = bus.take_sink_handles().context("events sink already started")?; + thread::Builder::new() + .name("events-sink".into()) + .spawn(move || run_sink(listener, handles)) + .context("spawning events sink thread")?; + Ok(ListenerGuard { path: socket_path }) +} + +struct SinkHandles { + event_rx: Receiver>, + wake_rx: OwnedFd, +} + +fn run_sink(listener: UnixListener, handles: SinkHandles) { + let SinkHandles { event_rx, wake_rx } = handles; + let mut subs: Vec = Vec::new(); + // Page-sized drain buffer; the bytes are signal-only and discarded. + let mut wake_buf = [0u8; 4096]; + + // Fixed positions in the poll set: 0 = wake fd, 1 = listener fd + // (revents ignored — see below), 2.. = subscribers wanting POLLOUT. + const WAKE_FD_IDX: usize = 0; + const SUB_FDS_START: usize = 2; + + loop { + // Build the poll set fresh each iteration: wake fd (POLLIN), + // listener fd (POLLIN), each subscriber that wants POLLOUT. + let mut fds: Vec = Vec::with_capacity(SUB_FDS_START + subs.len()); + fds.push(PollFd::new(wake_rx.as_fd(), PollFlags::POLLIN)); + fds.push(PollFd::new(listener.as_fd(), PollFlags::POLLIN)); + let mut sub_pollfd_idx: Vec = Vec::with_capacity(subs.len()); + for (i, sub) in subs.iter().enumerate() { + if sub.wants_pollout() { + fds.push(PollFd::new(sub.as_fd(), PollFlags::POLLOUT)); + sub_pollfd_idx.push(i); + } + } + + match poll::poll(&mut fds, PollTimeout::NONE) { + Ok(_) => {} + Err(Errno::EINTR) => continue, + Err(e) => panic!("events sink poll: {:?}", e), + } + + // Extract revents into owned values before any mutation: each + // `PollFd<'fd>` borrows from the fd source (including `subs`), so + // we drop `fds` before accept / broadcast / drive can run. We + // ignore the listener fd's revents deliberately; see the + // always-accept comment below. + let wake_revents = fds[WAKE_FD_IDX].revents().unwrap_or(PollFlags::empty()); + let sub_revents: Vec = (0..sub_pollfd_idx.len()) + .map(|k| fds[SUB_FDS_START + k].revents().unwrap_or(PollFlags::empty())) + .collect(); + drop(fds); + + // POLLHUP on wake_rx fires when the bus is dropped (write end + // closes); falling through without entering the drain branch + // would leave POLLHUP latched and `poll()` returning immediately + // forever. Treat any wake-fd revent as "go read it" — read will + // return Ok(0) on EOF and we exit cleanly via that path. + let wake_ready = !wake_revents.is_empty(); + + // Always drain pending accepts before processing wake/broadcast. + // The listener is in the poll set so its POLLIN wakes us, but we + // don't trust the revent for *whether* to accept: under load, + // listener POLLIN can lag behind `connect(2)` returning, and if + // we were woken by the wake-fd alone we still want to catch any + // queued connections so the same iteration's broadcast reaches + // them. The accept syscall is cheap (returns WouldBlock + // immediately when the queue is empty). + loop { + match listener.accept() { + Ok((stream, _addr)) => match SubscriberWriter::new(stream) { + Ok(sub) => subs.push(sub), + Err(e) => warn!("registering events subscriber: {:?}", e), + }, + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => { + error!("events listener accept: {:?}", e); + break; + } + } + } + + if wake_ready { + // Drain the wake pipe. Ok(0) means the EventBus was dropped; + // no more events will ever arrive — exit cleanly. + loop { + match unistd::read(&wake_rx, &mut wake_buf) { + Ok(0) => return, + Ok(_) => continue, + Err(Errno::EAGAIN) => break, + Err(Errno::EINTR) => continue, + Err(e) => panic!("events sink reading wake fd: {:?}", e), + } + } + // Drain event channel and broadcast. After each enqueue, if + // the sub's pending was empty before, drive opportunistically: + // a fast consumer's kernel buffer is likely ready, so the + // event flushes immediately and the next enqueue starts from + // empty. Without this, a burst larger than SUBSCRIBER_QUEUE_DEPTH + // would overflow even healthy subs because broadcast enqueues + // every event before any drive runs. + while let Ok(line) = event_rx.try_recv() { + for sub in subs.iter_mut() { + if sub.dropped { + continue; + } + let was_empty = !sub.wants_pollout(); + if let Err(Overflow::CapExceeded) = sub.enqueue(Arc::clone(&line)) { + warn!("dropping events subscriber: queue full"); + sub.dropped = true; + continue; + } + if was_empty { + if let Err(e) = sub.drive() { + info!("events subscriber gone: {:?}", e); + sub.dropped = true; + } + } + } + } + } + + // Drive writes for subs whose POLLOUT (or error) fired. + for (k, &i) in sub_pollfd_idx.iter().enumerate() { + // Short-circuit: a sub marked dropped during the broadcast + // above (overflow or opportunistic-drive failure) is removed + // by `subs.retain` at the end of this iteration anyway, but + // driving it again here would cost a needless `write(2)` on + // a likely-broken stream. + if subs[i].dropped { + continue; + } + let revents = sub_revents[k]; + if revents.intersects(PollFlags::POLLERR | PollFlags::POLLHUP | PollFlags::POLLNVAL) { + info!("events subscriber gone: peer error/hangup"); + subs[i].dropped = true; + } else if revents.contains(PollFlags::POLLOUT) { + if let Err(e) = subs[i].drive() { + info!("events subscriber gone: {:?}", e); + subs[i].dropped = true; + } + } + } + + subs.retain(|sub| !sub.dropped); + } +} + +/// Per-subscriber state owned by the sink. Exposes an event-shaped surface +/// (`enqueue`, `drive`, `wants_pollout`); the partial-write state machine +/// (offset into `pending.front()`) is hidden from the sink loop. +struct SubscriberWriter { + stream: UnixStream, + pending: VecDeque>, + front_offset: usize, + /// Set when the sink decides this sub should be removed; the actual + /// `Vec` removal happens at end-of-iteration via `subs.retain`. We + /// can't remove mid-iteration because `sub_pollfd_idx` holds indices + /// into `subs` and would shift. + dropped: bool, +} + +#[derive(Debug)] +enum Overflow { + CapExceeded, +} + +#[derive(Debug, Eq, PartialEq)] +enum DriveOutcome { + AllFlushed, + WouldBlock, +} + +impl SubscriberWriter { + fn new(stream: UnixStream) -> io::Result { + stream.set_nonblocking(true)?; + Ok(Self { stream, pending: VecDeque::new(), front_offset: 0, dropped: false }) + } + + fn enqueue(&mut self, line: Arc) -> Result<(), Overflow> { + if self.pending.len() >= SUBSCRIBER_QUEUE_DEPTH { + return Err(Overflow::CapExceeded); + } + self.pending.push_back(line); + Ok(()) + } + + fn drive(&mut self) -> io::Result { + drive_pending(&mut self.stream, &mut self.pending, &mut self.front_offset) + } + + fn wants_pollout(&self) -> bool { + !self.pending.is_empty() + } +} + +impl AsFd for SubscriberWriter { + fn as_fd(&self) -> BorrowedFd<'_> { + self.stream.as_fd() + } +} + +/// Drains `pending` into `stream` via non-blocking writes, advancing +/// `front_offset` for short writes. Generic over `Write` so the partial- +/// write state machine is unit-testable in isolation against a fake stream. +fn drive_pending( + stream: &mut W, + pending: &mut VecDeque>, + front_offset: &mut usize, +) -> io::Result { + while let Some(front) = pending.front() { + let bytes = &front.as_bytes()[*front_offset..]; + match stream.write(bytes) { + Ok(0) => { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "events subscriber stream returned 0 bytes", + )); + } + Ok(n) => { + *front_offset += n; + if *front_offset >= front.len() { + pending.pop_front(); + *front_offset = 0; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + return Ok(DriveOutcome::WouldBlock); + } + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } + } + Ok(DriveOutcome::AllFlushed) +} + +/// Wire-form `Arc` per `Event` variant, lazily built once via serde +/// (so the wire format stays driven by the `#[serde(rename = ...)]` +/// annotations) and cloned cheaply on every publish thereafter. Each +/// `Arc::clone` on the hot path is one atomic increment — no allocation, +/// no serialization. +fn serialize_line(event: &Event) -> Arc { + fn build(e: Event) -> Arc { + let s = serde_json::to_string(&e).expect("Event variants are infallible to serialize"); + Arc::from(format!("{s}\n")) + } + static CREATED: LazyLock> = LazyLock::new(|| build(Event::SessionCreated)); + static ATTACHED: LazyLock> = LazyLock::new(|| build(Event::SessionAttached)); + static DETACHED: LazyLock> = LazyLock::new(|| build(Event::SessionDetached)); + static REMOVED: LazyLock> = LazyLock::new(|| build(Event::SessionRemoved)); + match event { + Event::SessionCreated => Arc::clone(&CREATED), + Event::SessionAttached => Arc::clone(&ATTACHED), + Event::SessionDetached => Arc::clone(&DETACHED), + Event::SessionRemoved => Arc::clone(&REMOVED), + } +} + +fn make_self_pipe() -> io::Result<(OwnedFd, OwnedFd)> { + // pipe2(2) atomically creates an anonymous pipe with both flags set + // on each end: + // - O_NONBLOCK: the publisher's wake-byte write must never stall, and the sink + // detects "drained" via EAGAIN on read instead of blocking. + // - O_CLOEXEC: forked children (shells) shouldn't inherit these fds and hold + // the wake pipe open past the daemon exiting. + // map_err transcodes nix's Errno into std::io::Error to match the + // module's io::Result idiom. + unistd::pipe2(OFlag::O_NONBLOCK | OFlag::O_CLOEXEC).map_err(io::Error::from) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{ + io::Read, + time::{Duration, Instant}, + }; + + fn json(event: &Event) -> String { + serde_json::to_string(event).unwrap() + } + + /// Per-test scaffolding: tempdir + socket path + bus + listener guard. + /// Holds RAII handles so the socket file is unlinked and the dir is + /// cleaned up at end-of-scope. + struct Harness { + _dir: tempfile::TempDir, + _guard: ListenerGuard, + path: PathBuf, + bus: Arc, + } + + fn harness() -> Harness { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.socket"); + let bus = EventBus::new().unwrap(); + let _guard = start_listener(path.clone(), Arc::clone(&bus)).unwrap(); + Harness { _dir: dir, _guard, path, bus } + } + + /// Read timeout used for all blocking reads in tests. Generous so + /// loaded CI machines don't trip the assertion before the sink has a + /// chance to broadcast. + const READ_TIMEOUT: Duration = Duration::from_secs(10); + + /// Wire form of `Event::SessionCreated` — the most-published event in + /// the test suite. + const CREATED_LINE: &str = "{\"type\":\"session.created\"}\n"; + + fn read_line(stream: &mut UnixStream) -> String { + stream.set_read_timeout(Some(READ_TIMEOUT)).unwrap(); + let mut reader = BufReader::new(stream); + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + line + } + + fn read_n_lines(stream: &mut UnixStream, n: usize) -> Vec { + stream.set_read_timeout(Some(READ_TIMEOUT)).unwrap(); + let mut reader = BufReader::new(stream); + (0..n) + .map(|_| { + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + line + }) + .collect() + } + + /// Connect a fresh subscriber, then publish a probe event and consume + /// it. Returning means the sink has accepted the connection and + /// broadcast at least one event to it. Probe is `SessionCreated`; + /// callers continue with their own publishes from a clean stream. + fn connect_registered(path: &Path, bus: &EventBus) -> UnixStream { + let mut stream = UnixStream::connect(path).unwrap(); + // Sleep briefly so the OS schedules the sink thread to accept + // this connection before the publisher's wake byte arrives. + // `thread::yield_now()` is only a scheduler hint; under heavy + // parallel-test load it isn't reliable. A 1ms sleep guarantees + // a context switch. + thread::sleep(Duration::from_millis(1)); + bus.publish(&Event::SessionCreated); + let _ = read_line(&mut stream); + stream + } + + /// Like `connect_registered` but for `n` subscribers in one round-trip: + /// connect them all (queued in the listener backlog), publish one + /// probe, read it from each. The sink's accept-before-wake order + /// guarantees all queued subs are registered before the probe is + /// broadcast. + fn connect_n_registered(path: &Path, bus: &EventBus, n: usize) -> Vec { + let mut streams: Vec = + (0..n).map(|_| UnixStream::connect(path).unwrap()).collect(); + thread::sleep(Duration::from_millis(1)); + bus.publish(&Event::SessionCreated); + for s in streams.iter_mut() { + let _ = read_line(s); + } + streams + } + + #[test] + fn session_created_serializes_with_only_type() { + assert_eq!(json(&Event::SessionCreated), r#"{"type":"session.created"}"#); + } + + #[test] + fn session_attached_serializes_with_only_type() { + assert_eq!(json(&Event::SessionAttached), r#"{"type":"session.attached"}"#); + } + + #[test] + fn session_detached_serializes_with_only_type() { + assert_eq!(json(&Event::SessionDetached), r#"{"type":"session.detached"}"#); + } + + #[test] + fn session_removed_serializes_with_only_type() { + assert_eq!(json(&Event::SessionRemoved), r#"{"type":"session.removed"}"#); + } + + #[test] + fn bus_publish_with_no_subscribers_is_a_noop() { + let bus = EventBus::new().unwrap(); + bus.publish(&Event::SessionCreated); + } + + #[test] + fn bus_publish_reaches_subscriber() { + let h = harness(); + let mut stream = connect_registered(&h.path, &h.bus); + h.bus.publish(&Event::SessionCreated); + assert_eq!(read_line(&mut stream), CREATED_LINE); + } + + #[test] + fn bus_drops_subscriber_whose_peer_closed() { + let h = harness(); + let victim = connect_registered(&h.path, &h.bus); + let mut probe = connect_registered(&h.path, &h.bus); + + drop(victim); + h.bus.publish(&Event::SessionAttached); + assert_eq!(read_line(&mut probe), "{\"type\":\"session.attached\"}\n"); + } + + #[test] + fn bus_publish_with_many_subscribers_is_not_quadratic() { + // Publish is `try_send` + 1-byte wake — independent of N. A + // regression that re-introduced per-sub work in publish would + // make these timings explode. + let h = harness(); + let n = 200; + let _streams = connect_n_registered(&h.path, &h.bus, n); + + let start = Instant::now(); + for _ in 0..1000 { + h.bus.publish(&Event::SessionCreated); + } + let elapsed = start.elapsed(); + assert!( + elapsed < Duration::from_millis(100), + "1000 publishes with {n} subs took {elapsed:?}" + ); + } + + #[test] + fn bus_concurrent_publish_under_outer_lock_delivers_all_events() { + // Publish takes no internal lock, so an outer lock can't deadlock + // with anything inside the bus. + let h = harness(); + let mut stream = connect_registered(&h.path, &h.bus); + + let outer: Arc> = Arc::new(Mutex::new(())); + let n_threads = 4; + let n_per_thread = 8; + let total = n_threads * n_per_thread; + + let handles: Vec<_> = (0..n_threads) + .map(|_| { + let bus = Arc::clone(&h.bus); + let outer = Arc::clone(&outer); + thread::spawn(move || { + for _ in 0..n_per_thread { + let _g = outer.lock(); + bus.publish(&Event::SessionCreated); + } + }) + }) + .collect(); + for handle in handles { + handle.join().unwrap(); + } + + for line in read_n_lines(&mut stream, total) { + assert_eq!(line, CREATED_LINE); + } + } + + #[test] + fn listener_guard_unlinks_socket_on_drop() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.socket"); + let guard = start_listener(path.clone(), EventBus::new().unwrap()).unwrap(); + assert!(path.exists(), "socket file should exist while guard is alive"); + drop(guard); + assert!(!path.exists(), "socket file should be unlinked on guard drop"); + } + + #[test] + fn accept_loop_registers_concurrent_subscribers() { + let h = harness(); + let n = 20; + let mut streams: Vec = (0..n) + .map(|_| { + let path = h.path.clone(); + thread::spawn(move || UnixStream::connect(&path).unwrap()) + }) + .collect::>() + .into_iter() + .map(|jh| jh.join().unwrap()) + .collect(); + + // Probe to verify each concurrently-dialed sub is registered: the + // sink's accept-before-wake order ensures all queued connects + // join `subs` before this publish broadcasts. + h.bus.publish(&Event::SessionCreated); + for stream in streams.iter_mut() { + assert_eq!(read_line(stream), CREATED_LINE); + } + } + + #[test] + fn burst_load_within_capacity_reaches_every_subscriber() { + let h = harness(); + let m = 4; + let mut streams = connect_n_registered(&h.path, &h.bus, m); + + // Stay under SUBSCRIBER_QUEUE_DEPTH with margin so no sub is dropped. + let n_events = 32; + for _ in 0..n_events { + h.bus.publish(&Event::SessionCreated); + } + let expected = CREATED_LINE; + for stream in streams.iter_mut() { + for line in read_n_lines(stream, n_events) { + assert_eq!(line, expected); + } + } + } + + #[test] + fn events_arrive_in_publish_order() { + let h = harness(); + let mut stream = connect_registered(&h.path, &h.bus); + + h.bus.publish(&Event::SessionCreated); + h.bus.publish(&Event::SessionAttached); + h.bus.publish(&Event::SessionDetached); + h.bus.publish(&Event::SessionRemoved); + + let lines = read_n_lines(&mut stream, 4); + assert_eq!( + lines, + [ + CREATED_LINE, + "{\"type\":\"session.attached\"}\n", + "{\"type\":\"session.detached\"}\n", + "{\"type\":\"session.removed\"}\n", + ] + ); + } + + #[test] + fn slow_subscriber_drop_does_not_affect_fast_through_sink() { + // The harness uses `start_listener`; this test bypasses it so the + // listener is bound directly with the desired socket options. + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.socket"); + let bus = EventBus::new().unwrap(); + + let listener = UnixListener::bind(&path).unwrap(); + listener.set_nonblocking(true).unwrap(); + let handles = bus.take_sink_handles().unwrap(); + thread::Builder::new() + .name("events-sink".into()) + .spawn(move || run_sink(listener, handles)) + .unwrap(); + + // Small SO_RCVBUF on slow caps how much un-read data the kernel + // will buffer en-route to it; the sink's writes start returning + // EAGAIN, slow's pending grows to cap, slow is dropped. + let _slow = UnixStream::connect(&path).unwrap(); + nix::sys::socket::setsockopt(&_slow, nix::sys::socket::sockopt::RcvBuf, &1024).unwrap(); + let mut fast = UnixStream::connect(&path).unwrap(); + + // Probe both subs (fast reads it; slow's tiny buffer fits one + // event of ~30 bytes). + bus.publish(&Event::SessionCreated); + let _ = read_line(&mut fast); + + // Interleave publish + read so fast's kernel buffer never fills + // (the default AF_UNIX RcvBuf is small enough that buffering K + // events without reading would overflow fast's pending and cause + // the sink to drop fast). Slow gets enqueued every iteration too; + // its small RcvBuf forces drive into EAGAIN within a few dozen + // events, then pending overflows and the sink drops slow. The + // test passes only if fast continues receiving while slow is + // overflowing or after slow is dropped. + let expected = CREATED_LINE; + fast.set_read_timeout(Some(Duration::from_secs(10))).unwrap(); + let mut reader = BufReader::new(&mut fast); + for _ in 0..1000 { + bus.publish(&Event::SessionCreated); + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + assert_eq!(line, expected); + } + } + + /// Fake writer: per-call accept counts. Each `write` consumes the next + /// entry: `Some(n)` accepts `min(n, buf.len())` bytes; `None` (or 0) + /// returns WouldBlock; `Err` returns the given error. + struct FakeWriter { + plan: VecDeque, + written: Vec, + } + + enum FakeWrite { + Accept(usize), + WouldBlock, + Interrupted, + Err(io::ErrorKind), + } + + impl Write for FakeWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + match self.plan.pop_front() { + Some(FakeWrite::Accept(n)) => { + let take = n.min(buf.len()); + self.written.extend_from_slice(&buf[..take]); + Ok(take) + } + Some(FakeWrite::WouldBlock) | None => { + Err(io::Error::new(io::ErrorKind::WouldBlock, "fake EAGAIN")) + } + Some(FakeWrite::Interrupted) => { + Err(io::Error::new(io::ErrorKind::Interrupted, "fake EINTR")) + } + Some(FakeWrite::Err(kind)) => Err(io::Error::new(kind, "fake error")), + } + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + #[test] + fn drive_pending_flushes_a_single_complete_write() { + let mut w = FakeWriter { plan: VecDeque::from([FakeWrite::Accept(100)]), written: vec![] }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("hello\n")]); + let mut offset = 0; + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::AllFlushed); + assert_eq!(w.written, b"hello\n"); + assert!(pending.is_empty()); + assert_eq!(offset, 0); + } + + #[test] + fn drive_pending_resumes_after_partial_then_wouldblock() { + // Accept 3, then EAGAIN; resume across two further drives. + let mut w = FakeWriter { + plan: VecDeque::from([ + FakeWrite::Accept(3), + FakeWrite::WouldBlock, + FakeWrite::Accept(3), + FakeWrite::WouldBlock, + ]), + written: vec![], + }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("hello\n")]); + let mut offset = 0; + + // 1st: writes "hel", then WouldBlock. + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::WouldBlock); + assert_eq!(w.written, b"hel"); + assert_eq!(offset, 3); + assert_eq!(pending.len(), 1); + + // 2nd: writes "lo\n" (the remainder), then WouldBlock with empty pending. + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::AllFlushed); + assert_eq!(w.written, b"hello\n"); + assert_eq!(offset, 0); + assert!(pending.is_empty()); + } + + #[test] + fn drive_pending_retries_on_eintr() { + let mut w = FakeWriter { + plan: VecDeque::from([FakeWrite::Interrupted, FakeWrite::Accept(100)]), + written: vec![], + }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("ok\n")]); + let mut offset = 0; + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::AllFlushed); + assert_eq!(w.written, b"ok\n"); + } + + #[test] + fn drive_pending_propagates_other_errors() { + let mut w = FakeWriter { + plan: VecDeque::from([FakeWrite::Err(io::ErrorKind::BrokenPipe)]), + written: vec![], + }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("x\n")]); + let mut offset = 0; + let err = drive_pending(&mut w, &mut pending, &mut offset).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); + } + + #[test] + fn drive_pending_treats_zero_byte_write_as_error() { + let mut w = FakeWriter { plan: VecDeque::from([FakeWrite::Accept(0)]), written: vec![] }; + let mut pending: VecDeque> = VecDeque::from([Arc::from("x\n")]); + let mut offset = 0; + // Note: FakeWrite::Accept(0) returns Ok(0) (not WouldBlock) because + // we haven't written WouldBlock to the plan; this is a write-zero + // signal which `drive_pending` must treat as an error. + let err = drive_pending(&mut w, &mut pending, &mut offset).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::WriteZero); + } + + /// Property-style test: an arbitrary-shaped sequence of partial / + /// EAGAIN / EINTR / full-accept responses, paired with new enqueues + /// interleaved between drives, must eventually flush every enqueued + /// byte once the plan ends with enough accepts to drain. + #[test] + fn drive_pending_fuzz_eventually_flushes_everything() { + // Hand-rolled deterministic interleavings covering the + // "queue-state-changed / POLLOUT-requested" atomicity: each row is + // a sequence of (op, write-plan-entries) where op is "drive once" + // or "enqueue X". After running the whole script, drive to + // completion with a generous accept-all and assert every enqueued + // byte was written, in order. + enum Op { + Enqueue(&'static str), + Drive(Vec), + } + use FakeWrite::*; + + let scripts: Vec> = vec![ + vec![Op::Enqueue("a\n"), Op::Drive(vec![Accept(1), WouldBlock])], + vec![ + Op::Enqueue("a\n"), + Op::Drive(vec![Accept(1)]), + Op::Enqueue("b\n"), + Op::Drive(vec![Accept(1), Interrupted, Accept(2), Accept(2)]), + ], + vec![ + Op::Enqueue("hello\n"), + Op::Drive(vec![WouldBlock]), + Op::Enqueue("world\n"), + Op::Drive(vec![Accept(2), Interrupted, WouldBlock]), + Op::Enqueue("again\n"), + ], + vec![ + Op::Enqueue("x\n"), + Op::Enqueue("y\n"), + Op::Enqueue("z\n"), + Op::Drive(vec![Accept(1), Accept(1), WouldBlock]), + Op::Drive(vec![Accept(4), Accept(2)]), + ], + ]; + + for (i, script) in scripts.into_iter().enumerate() { + let mut pending: VecDeque> = VecDeque::new(); + let mut offset = 0; + let mut all_written = Vec::new(); + let mut expected = Vec::new(); + + for op in script { + match op { + Op::Enqueue(s) => { + pending.push_back(Arc::from(s)); + expected.extend_from_slice(s.as_bytes()); + } + Op::Drive(plan) => { + let mut w = FakeWriter { plan: plan.into(), written: vec![] }; + let _ = drive_pending(&mut w, &mut pending, &mut offset); + all_written.extend_from_slice(&w.written); + } + } + } + + // Final drive with unlimited accept across many calls; + // drive_pending invokes `write` once per pending entry. + let mut w = FakeWriter { + plan: std::iter::repeat_with(|| FakeWrite::Accept(usize::MAX)).take(64).collect(), + written: vec![], + }; + let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap(); + assert_eq!(outcome, DriveOutcome::AllFlushed, "script {i}"); + all_written.extend_from_slice(&w.written); + + assert_eq!(all_written, expected, "script {i}: bytes lost or reordered"); + assert!(pending.is_empty(), "script {i}: pending not drained"); + assert_eq!(offset, 0, "script {i}: offset not reset"); + } + } + + #[test] + fn enqueue_overflows_at_cap() { + let (a, _b) = UnixStream::pair().unwrap(); + let mut sub = SubscriberWriter::new(a).unwrap(); + for i in 0..SUBSCRIBER_QUEUE_DEPTH { + sub.enqueue(format!("event-{i}\n").into()).unwrap(); + } + let err = sub.enqueue("one-too-many\n".into()); + assert!(matches!(err, Err(Overflow::CapExceeded))); + } + + #[test] + fn subscriber_writer_overflows_when_peer_blocks() { + // Use a socket pair so we can shrink the *server-side* send buffer + // (a listener-accepted socket isn't reachable from outside the + // sink). With the peer never reading, drive() eventually returns + // WouldBlock; pending grows past the cap and enqueue fails. + let (server, _client) = UnixStream::pair().unwrap(); + nix::sys::socket::setsockopt(&server, nix::sys::socket::sockopt::SndBuf, &1024).unwrap(); + let mut sub = SubscriberWriter::new(server).unwrap(); + + let line: Arc = CREATED_LINE.into(); + let mut overflowed = false; + for _ in 0..(SUBSCRIBER_QUEUE_DEPTH * 1000) { + let was_empty = !sub.wants_pollout(); + match sub.enqueue(Arc::clone(&line)) { + Ok(()) => { + if was_empty { + let _ = sub.drive(); + } + } + Err(Overflow::CapExceeded) => { + overflowed = true; + break; + } + } + } + assert!(overflowed, "expected SubscriberWriter to overflow"); + } + + #[test] + fn subscriber_writer_resumes_after_peer_drains() { + // After a SubscriberWriter has piled up pending against a full + // kernel buffer, the next `drive()` must make progress once the + // peer drains. Pairs the fake-stream resume property + // (`drive_pending_resumes_after_partial_then_wouldblock`) with a + // real socket. + let (server, client) = UnixStream::pair().unwrap(); + nix::sys::socket::setsockopt(&server, nix::sys::socket::sockopt::SndBuf, &1024).unwrap(); + client.set_read_timeout(Some(Duration::from_secs(2))).unwrap(); + let mut sub = SubscriberWriter::new(server).unwrap(); + + let line: Arc = CREATED_LINE.into(); + for _ in 0..SUBSCRIBER_QUEUE_DEPTH { + sub.enqueue(Arc::clone(&line)).unwrap(); + } + + // First drive: kernel buffer fills, returns WouldBlock with + // pending non-empty. + assert_eq!(sub.drive().unwrap(), DriveOutcome::WouldBlock); + let pending_after_first = sub.pending.len(); + assert!(pending_after_first > 0); + + // Drain whatever the peer can read in one shot. + let mut buf = vec![0u8; 4096]; + let drained = (&client).read(&mut buf).unwrap(); + assert!(drained > 0); + + // Second drive: must shrink pending (proves resume). + let _ = sub.drive().unwrap(); + assert!( + sub.pending.len() < pending_after_first, + "drive must advance pending after peer drains: {} -> {}", + pending_after_first, + sub.pending.len() + ); + } + + #[test] + fn fast_writer_unaffected_when_slow_overflows() { + // Slow: small SndBuf, peer never reads → drive blocks → pending + // grows to cap → enqueue fails. + // Fast: default SndBuf, peer drains → drive flushes → pending + // stays empty → enqueue never fails. + let (slow_server, _slow_client) = UnixStream::pair().unwrap(); + nix::sys::socket::setsockopt(&slow_server, nix::sys::socket::sockopt::SndBuf, &1024) + .unwrap(); + let mut slow = SubscriberWriter::new(slow_server).unwrap(); + + let (fast_server, fast_client) = UnixStream::pair().unwrap(); + let mut fast = SubscriberWriter::new(fast_server).unwrap(); + + let drainer = thread::spawn(move || { + let mut buf = [0u8; 4096]; + let mut total = 0usize; + loop { + match (&fast_client).read(&mut buf) { + Ok(0) => break, + Ok(n) => total += n, + Err(_) => break, + } + } + total + }); + + let line: Arc = CREATED_LINE.into(); + let mut slow_dropped = false; + + // Mimic the sink's enqueue+opportunistic-drive pattern across both + // subs for each event. + for _ in 0..(SUBSCRIBER_QUEUE_DEPTH * 100) { + if !slow_dropped { + let was_empty = !slow.wants_pollout(); + match slow.enqueue(Arc::clone(&line)) { + Ok(()) => { + if was_empty { + let _ = slow.drive(); + } + } + Err(Overflow::CapExceeded) => { + slow_dropped = true; + } + } + } + let was_empty = !fast.wants_pollout(); + fast.enqueue(Arc::clone(&line)).expect("fast must not overflow"); + if was_empty { + let _ = fast.drive(); + } + if slow_dropped { + break; + } + } + + assert!(slow_dropped, "slow should have overflowed"); + + // Closing fast's server end lets the drainer thread exit on EOF. + drop(fast); + let bytes_received = drainer.join().unwrap(); + assert!(bytes_received > 0, "fast should have received events"); + } +} diff --git a/libshpool/src/lib.rs b/libshpool/src/lib.rs index aad548d6..e465df19 100644 --- a/libshpool/src/lib.rs +++ b/libshpool/src/lib.rs @@ -36,6 +36,7 @@ mod daemon; mod daemonize; mod detach; mod duration; +mod events; mod exe; mod hooks; mod kill; @@ -231,6 +232,15 @@ you could just do `shpool var set workspace key-bugfix`. #[clap(subcommand)] command: VarCommands, }, + + #[clap(about = "Subscribe to the daemon's push-event stream + +Connects to the events socket and writes each event (one JSON object +per line) to stdout, flushing after every line so the stream is +pipeline-friendly (e.g. `shpool events | jq`). The first line is a +snapshot of the current session table; subsequent lines are deltas. +Reconnect to force a fresh snapshot.")] + Events, } /// The subcommds of the var command. @@ -433,6 +443,7 @@ pub fn run(args: Args, hooks: Option>) -> an Commands::List { json } => list::run(socket, json), Commands::SetLogLevel { level } => set_log_level::run(level, socket), Commands::Var { command } => var::run(socket, command), + Commands::Events => events::subscribe_to_stdout(&events::socket_path(&socket)), }; if let Err(err) = res { diff --git a/shpool/tests/events.rs b/shpool/tests/events.rs new file mode 100644 index 00000000..7c61f2cc --- /dev/null +++ b/shpool/tests/events.rs @@ -0,0 +1,194 @@ +use std::{ + io::{BufRead, BufReader}, + os::unix::net::UnixStream, + path::PathBuf, + time::Duration, +}; + +use anyhow::{anyhow, Context}; +use ntest::timeout; +use serde_json::Value; + +mod support; + +use crate::support::daemon::{AttachArgs, DaemonArgs, Proc}; + +fn events_socket_path(daemon: &Proc) -> PathBuf { + daemon.socket_path.with_file_name("events.socket") +} + +fn connect_events(daemon: &Proc) -> anyhow::Result> { + let path = events_socket_path(daemon); + let mut sleep_dur = Duration::from_millis(5); + for _ in 0..12 { + if let Ok(stream) = UnixStream::connect(&path) { + return Ok(BufReader::new(stream)); + } + std::thread::sleep(sleep_dur); + sleep_dur *= 2; + } + Err(anyhow!("events socket never became available at {:?}", path)) +} + +fn next_event(reader: &mut BufReader) -> anyhow::Result { + let mut line = String::new(); + let n = reader.read_line(&mut line).context("reading event line")?; + if n == 0 { + return Err(anyhow!("events socket closed unexpectedly")); + } + serde_json::from_str(&line).with_context(|| format!("parsing event JSON: {line:?}")) +} + +#[test] +#[timeout(30000)] +fn lifecycle() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + + // Background attach: client connects, daemon publishes created+attached; + // the client immediately detaches, triggering the detached event. + let _attach = daemon + .attach("s1", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("starting attach proc")?; + + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + let kill_out = daemon.kill(vec!["s1".into()]).context("running kill")?; + assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); + + assert_eq!(next_event(&mut sub)?["type"], "session.removed"); + + Ok(()) +} + +// `shpool detach` triggers two code paths that both updated the session: +// the explicit handler and, asynchronously, the bidi-loop unwind in the +// attach worker. An earlier version emitted SessionDetached from both, +// producing a duplicate event. This test pins exactly one detached event +// per detach by using a kill as a known-next-event fence — if a duplicate +// detached were buffered, the next read would return it instead of +// `session.removed`. +#[test] +#[timeout(30000)] +fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + + // Foreground attach (no `background`) keeps the session attached. + let _attach = daemon + .attach("s", AttachArgs { null_stdin: true, ..AttachArgs::default() }) + .context("starting attach proc")?; + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + + let detach_out = daemon.detach(vec!["s".into()]).context("running detach")?; + assert!(detach_out.status.success(), "detach failed: {:?}", detach_out); + + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + // Wait for the unwind path to complete (session shows disconnected in + // the list output) so any duplicate detached event would already be + // queued by the time we issue the kill below. + daemon.wait_until_list_matches(|out| out.contains("disconnected"))?; + + let kill_out = daemon.kill(vec!["s".into()]).context("running kill")?; + assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); + + let next = next_event(&mut sub)?; + assert_eq!( + next["type"], "session.removed", + "expected next event to be removed, got {next} — possible duplicate detached" + ); + + Ok(()) +} + +// Reattach should produce a single `session.attached` for the existing +// session, with no `session.created`. Catches regressions where the +// reattach path accidentally falls through to the create path. +#[test] +#[timeout(30000)] +fn reattach_emits_attached_only() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + + let _attach1 = daemon + .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("first attach")?; + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + daemon.wait_until_list_matches(|out| out.contains("disconnected"))?; + + let _attach2 = daemon + .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("reattach")?; + + let attached = next_event(&mut sub)?; + assert_eq!( + attached["type"], "session.attached", + "expected attached on reattach, got {attached}" + ); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + Ok(()) +} + +// SIGTERM should clean up both sockets via the signal handler, since +// process::exit bypasses any RAII guard. +#[test] +#[timeout(30000)] +fn signal_exit_unlinks_sockets() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let main_sock = daemon.socket_path.clone(); + let events_sock = events_socket_path(&daemon); + assert!(main_sock.exists(), "main socket should exist while daemon runs"); + assert!(events_sock.exists(), "events socket should exist while daemon runs"); + + let pid = nix::unistd::Pid::from_raw( + daemon.proc.as_ref().expect("daemon process handle").id() as i32 + ); + nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM).context("sending SIGTERM")?; + daemon.proc_wait().context("waiting for daemon to exit")?; + + assert!(!main_sock.exists(), "main socket should be unlinked on signal exit"); + assert!(!events_sock.exists(), "events socket should be unlinked on signal exit"); + + Ok(()) +} + +#[test] +#[timeout(30000)] +fn multiple_subscribers_each_get_independent_streams() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub_a = connect_events(&daemon)?; + let mut sub_b = connect_events(&daemon)?; + + let _attach = daemon + .attach( + "shared", + AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, + ) + .context("starting attach proc")?; + + for sub in [&mut sub_a, &mut sub_b] { + assert_eq!(next_event(sub)?["type"], "session.created"); + assert_eq!(next_event(sub)?["type"], "session.attached"); + assert_eq!(next_event(sub)?["type"], "session.detached"); + } + + Ok(()) +} diff --git a/shpool/tests/support/daemon.rs b/shpool/tests/support/daemon.rs index 102761b2..7c04c582 100644 --- a/shpool/tests/support/daemon.rs +++ b/shpool/tests/support/daemon.rs @@ -154,9 +154,10 @@ impl Proc { if args.listen_events { Some(Events::new(&test_hook_socket_path)?) } else { None }; // spin until we can dial the socket successfully + let events_socket_path = socket_path.with_file_name("events.socket"); let mut sleep_dur = time::Duration::from_millis(5); for _ in 0..12 { - if UnixStream::connect(&socket_path).is_ok() { + if UnixStream::connect(&socket_path).is_ok() && events_socket_path.exists() { break; } else { std::thread::sleep(sleep_dur); @@ -245,9 +246,10 @@ impl Proc { }); // spin until we can dial the socket successfully + let events_socket_path = socket_path.with_file_name("events.socket"); let mut sleep_dur = time::Duration::from_millis(5); for _ in 0..12 { - if UnixStream::connect(&socket_path).is_ok() { + if UnixStream::connect(&socket_path).is_ok() && events_socket_path.exists() { break; } else { std::thread::sleep(sleep_dur);