Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions EVENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Events

`shpool` exposes an event stream so that external programs can react to changes
without polling. This way, a program (e.g. a TUI) can call `shpool list` (or the
equivalent `ConnectHeader::List` request over the main socket; see the
[`shpool-protocol`](./shpool-protocol) crate) after each event so that its model
is always consistent with shpool's state.

## The events socket

The daemon binds a sibling Unix socket next to the main shpool socket:

```bash
<runtime_dir>/shpool/shpool.socket # main socket
<runtime_dir>/shpool/events.socket # events socket (this protocol)
```

A subscriber connects to `events.socket` and reads events. The daemon ignores anything written to the events socket, so for subscribers it's effectively read-only.

## Event types

| `type` | Meaning |
| ------------------ | -------------------------------------------------------- |
| `session.created` | A new session was added to the table. |
| `session.attached` | A client attached or reattached to a session. |
| `session.detached` | A client disconnected from a still-running session. |
| `session.removed` | A session was removed (shell exited, killed, or reaped). |

Subscribers should ignore unknown `type` values so that future event types do
not break older consumers.

## Wire format

The daemon writes one JSON object per line (JSONL). Each event looks like:

```json
{"type":"<event-type>"}
```

There are no other fields. To learn what the event refers to (which session,
when, etc.), call `shpool list` (or use `ConnectHeader::List`).

The format is robust: literal newline characters only appear as delimiters
between events. Any newlines within JSON string values are automatically
escaped (as `\n`) by the daemon.

## Subscribing

For ad-hoc use, `shpool events` connects to the events socket and prints each
event line to stdout, flushing after each line:

```bash
shpool events | while read -r ev; do
echo "got: $ev"
shpool list
done

shpool events | jq .
```

## Slow subscribers

Each subscriber has a bounded outbound queue. A subscriber that falls too far
behind is dropped by the daemon (in which case the subscriber can always reconnect).
There is no replay, so events that fired while a subscriber was disconnected are
lost.
15 changes: 12 additions & 3 deletions libshpool/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{env, os::unix::net::UnixListener, path::PathBuf};
use anyhow::Context;
use tracing::{info, instrument};

use crate::{config, consts, hooks};
use crate::{config, consts, events, hooks};

mod etc_environment;
mod exit_notify;
Expand Down Expand Up @@ -81,8 +81,17 @@ pub fn run(
(Some(socket.clone()), UnixListener::bind(&socket).context("binding to socket")?)
}
};
// spawn the signal handler thread in the background
signals::Handler::new(cleanup_socket.clone()).spawn()?;
let events_socket = events::socket_path(&socket);

// spawn the signal handler thread in the background. Both sockets need
// explicit cleanup on signal exit because the process exits before any
// RAII guard can run.
let mut socks_to_clean: Vec<PathBuf> = cleanup_socket.iter().cloned().collect();
socks_to_clean.push(events_socket.clone());
signals::Handler::new(socks_to_clean).spawn().context("spawning signal handler")?;

let _events_guard =
server.start_events_listener(events_socket).context("starting events listener")?;

server::Server::serve(server, listener)?;

Expand Down
63 changes: 47 additions & 16 deletions libshpool/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{
etc_environment, exit_notify::ExitNotifier, hooks, pager, pager::PagerError, prompt, shell,
show_motd, ttl_reaper,
},
protocol, test_hooks, tty, user,
events, protocol, test_hooks, tty, user,
};

const DEFAULT_INITIAL_SHELL_PATH: &str = "/usr/bin:/bin:/usr/sbin:/sbin";
Expand All @@ -76,6 +76,7 @@ pub struct Server {
runtime_dir: PathBuf,
register_new_reapable_session: crossbeam_channel::Sender<(String, Instant)>,
hooks: Box<dyn hooks::Hooks + Send + Sync>,
events_bus: Arc<events::EventBus>,
daily_messenger: Arc<show_motd::DailyMessenger>,
log_level_handle: tracing_subscriber::reload::Handle<
tracing_subscriber::filter::LevelFilter,
Expand All @@ -96,13 +97,17 @@ impl Server {
>,
) -> anyhow::Result<Arc<Self>> {
let shells = Arc::new(Mutex::new(HashMap::new()));
let events_bus = events::EventBus::new().context("creating events bus")?;
// buffered so that we are unlikely to block when setting up a
// new session
let (new_sess_tx, new_sess_rx) = crossbeam_channel::bounded(10);
let shells_tab = Arc::clone(&shells);
thread::spawn(move || {
if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab) {
warn!("ttl reaper exited with error: {:?}", e);
thread::spawn({
let shells = Arc::clone(&shells);
let events_bus = Arc::clone(&events_bus);
move || {
if let Err(e) = ttl_reaper::run(new_sess_rx, shells, events_bus) {
warn!("ttl reaper exited with error: {:?}", e);
}
}
});

Expand All @@ -113,12 +118,22 @@ impl Server {
runtime_dir,
register_new_reapable_session: new_sess_tx,
hooks,
events_bus,
daily_messenger,
log_level_handle,
vars: HashMap::new().into(),
}))
}

/// Bind the events socket and spawn the accept thread. The returned
/// guard unlinks the socket file on drop.
pub fn start_events_listener(
self: &Arc<Self>,
socket_path: PathBuf,
) -> anyhow::Result<events::ListenerGuard> {
events::start_listener(socket_path, Arc::clone(&self.events_bus))
}

#[instrument(skip_all)]
pub fn serve(server: Arc<Self>, listener: UnixListener) -> anyhow::Result<()> {
test_hooks::emit("daemon-about-to-listen");
Expand Down Expand Up @@ -333,7 +348,12 @@ impl Server {
{
let _s = span!(Level::INFO, "2_lock(shells)").entered();
let mut shells = self.shells.lock();
shells.remove(&header.name);
// The publish below is gated on `is_some()` because a
// concurrent kill or reaper may have already removed the
// entry (and published) while we were waiting for the lock.
if shells.remove(&header.name).is_some() {
self.events_bus.publish(&events::Event::SessionRemoved);
}
}

// The child shell has exited, so the shell->client thread should
Expand All @@ -354,6 +374,7 @@ impl Server {
if let Some(session) = shells.get(&header.name) {
session.lifecycle_timestamps.lock().last_disconnected_at =
Some(time::SystemTime::now());
self.events_bus.publish(&events::Event::SessionDetached);
}
}
if let Err(err) = self.hooks.on_client_disconnect(&header.name) {
Expand Down Expand Up @@ -429,6 +450,9 @@ impl Server {
"child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell"
);
} else {
// Reattach confirmed; the create path won't run
// and clobber the entry, so it's safe to publish.
self.events_bus.publish(&events::Event::SessionAttached);
if let Err(err) = self.hooks.on_reattach(&header.name) {
warn!("reattach hook: {:?}", err);
}
Expand All @@ -445,8 +469,6 @@ impl Server {
AttachStatus::Attached { warnings },
));
}

// status is already attached
}
Some(exit_status) => {
// the channel is closed so we know the subshell exited
Expand Down Expand Up @@ -503,10 +525,18 @@ impl Server {

session.lifecycle_timestamps.lock().last_connected_at = Some(time::SystemTime::now());
{
// we unwrap to propagate the poison as an unwind
let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered();
let mut shells = self.shells.lock();
// If we're replacing a stale entry whose shell process is
// gone, surface that to subscribers before announcing the
// replacement.
let clobbered = shells.contains_key(&header.name);
shells.insert(header.name.clone(), Box::new(session));
if clobbered {
self.events_bus.publish(&events::Event::SessionRemoved);
}
self.events_bus.publish(&events::Event::SessionCreated);
self.events_bus.publish(&events::Event::SessionAttached);
}

// we unwrap to propagate the poison as an unwind
Expand Down Expand Up @@ -604,6 +634,10 @@ impl Server {
if let shell::ClientConnectionStatus::DetachNone = status {
not_attached_sessions.push(session);
} else {
// The bidi-loop unwind in handle_attach owns the
// SessionDetached publish; we just update
// last_disconnected_at eagerly so a concurrent list()
// reflects the detach immediately.
s.lifecycle_timestamps.lock().last_disconnected_at =
Some(time::SystemTime::now());
}
Expand Down Expand Up @@ -714,6 +748,7 @@ impl Server {

for session in to_remove.iter() {
shells.remove(session);
self.events_bus.publish(&events::Event::SessionRemoved);
}
if !to_remove.is_empty() {
test_hooks::emit("daemon-handle-kill-removed-shells");
Expand All @@ -729,8 +764,7 @@ impl Server {
fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> {
let _s = span!(Level::INFO, "lock(shells)").entered();
let shells = self.shells.lock();

let sessions: anyhow::Result<Vec<Session>> = shells
let sessions: Vec<Session> = shells
.iter()
.map(|(k, v)| {
let status = match v.inner.try_lock() {
Expand All @@ -743,7 +777,6 @@ impl Server {
.last_connected_at
.map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64))
.transpose()?;

let last_disconnected_at_unix_ms = timestamps
.last_disconnected_at
.map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64))
Expand All @@ -758,11 +791,9 @@ impl Server {
status,
})
})
.collect();
let sessions = sessions.context("collecting running session metadata")?;

.collect::<anyhow::Result<_>>()
.context("collecting running session metadata")?;
write_reply(&mut stream, ListReply { sessions })?;

Ok(())
}

Expand Down
16 changes: 9 additions & 7 deletions libshpool/src/daemon/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use signal_hook::{consts::TERM_SIGNALS, flag, iterator::Signals};
use tracing::{error, info};

pub struct Handler {
sock: Option<PathBuf>,
socks: Vec<PathBuf>,
}
impl Handler {
pub fn new(sock: Option<PathBuf>) -> Self {
Handler { sock }
pub fn new(socks: Vec<PathBuf>) -> Self {
Handler { socks }
}

pub fn spawn(self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -57,10 +57,12 @@ impl Handler {
for signal in &mut signals {
assert!(TERM_SIGNALS.contains(&signal));

info!("term sig handler: cleaning up socket");
if let Some(sock) = self.sock {
if let Err(e) = std::fs::remove_file(sock).context("cleaning up socket") {
error!("error cleaning up socket file: {}", e);
info!("term sig handler: cleaning up sockets");
for sock in &self.socks {
match std::fs::remove_file(sock) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => error!("error cleaning up socket {:?}: {}", sock, e),
}
}

Expand Down
3 changes: 3 additions & 0 deletions libshpool/src/daemon/ttl_reaper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ use parking_lot::Mutex;
use tracing::{info, span, warn, Level};

use super::shell;
use crate::events;

/// Run the reaper thread loop. Should be invoked in a dedicated
/// thread.
pub fn run(
new_sess: crossbeam_channel::Receiver<(String, Instant)>,
shells: Arc<Mutex<HashMap<String, Box<shell::Session>>>>,
events_bus: Arc<events::EventBus>,
) -> anyhow::Result<()> {
let _s = span!(Level::INFO, "ttl_reaper").entered();

Expand Down Expand Up @@ -116,6 +118,7 @@ pub fn run(
continue;
}
shells.remove(&reapable.session_name);
events_bus.publish(&events::Event::SessionRemoved);
}
}
}
Expand Down
Loading