runnable is a Go package that provides a Runnable interface for functions or objects that can be started and stopped.
It provides a simple way to run a function or object in a goroutine and stop it when needed. It also provides a way to run a
function with retry and statistics number of restarts, when started and stopped, if returned error, etc.
fmt.Println("Simple function...")
err := runnable.New(func(ctx context.Context) error {
fmt.Println("Starting...")
defer fmt.Println("Stopping...")
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
return nil
default:
}
time.Sleep(1 * time.Second)
fmt.Println("Running...")
}
return nil
}).Run(context.Background())
if err != nil {
fmt.Println(err)
}fmt.Println("Simple function with stop...")
r := runnable.New(func(ctx context.Context) error {
fmt.Println("Starting...")
defer fmt.Println("Stopping...")
for {
select {
case <-ctx.Done():
return nil
case <-time.After(time.Second):
}
fmt.Println("Running...")
}
})
go func() {
time.Sleep(5 * time.Second)
fmt.Println("Calling Stop...")
err := r.Stop(context.Background())
if err != nil {
fmt.Println(err)
}
}()
err = r.Run(context.Background())
if err != nil {
fmt.Println(err)
}fmt.Println("Simple function with timeout...")
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = runnable.New(func(ctx context.Context) error {
fmt.Println("Starting...")
defer fmt.Println("Stopping...")
for {
select {
case <-ctx.Done():
return nil
case <-time.After(time.Second):
}
fmt.Println("Running...")
}
}).Run(ctxWithTimeout)
if err != nil {
fmt.Println(err)
}Cross-cutting behaviors that aren't part of the core lifecycle live in
the runnable/adapters subpackage as chi-style middleware: each
runnable.Adapter has the shape func(next RunFunc) RunFunc. Apply
them with runnable.WithAdapters (left-to-right = outermost-to-innermost):
r := runnable.New(reconcile, runnable.WithAdapters(
adapters.Draining(10*time.Second),
adapters.Recovering(),
adapters.Retry(3, time.Minute),
adapters.Ticker(30*time.Second),
))Draining — graceful shutdown with a grace window. When the outer
ctx is cancelled, the wrapped work has timeout to return via
adapters.Stopping(ctx) before its ctx is force-cancelled and
adapters.ErrDrainTimedOut is returned.
Ticker — calls the wrapped work once per interval until ctx is cancelled or the work returns an error. Composes with Draining: an in-flight tick is allowed to finish before the loop exits.
Recovering — turns panics in the wrapped work into errors and
emits a runnable.PanicRecoveredEvent to the Publisher on ctx. Place
inside Draining when both are in use.
Retry — re-invokes the wrapped work up to maxRetries times on
non-context errors. If resetAfter is non-zero and at least that long
has passed since the previous attempt, the retry budget resets. Emits
a runnable.RetryEvent after each failed attempt.
Inside long-running work, always select on both ctx.Done() and
adapters.Stopping(ctx) — Stopping signals drain start, ctx.Done()
fires only when the drain timer expires.
A full SIGTERM-safe service shape lives in
examples/ticker-with-drain.
Adapters emit typed events to a runnable.Publisher installed on the
runnable's ctx. Use runnable.WithPublisher to register one (or many —
multiple WithPublisher calls fan out):
type log struct{}
func (log) Publish(event any) {
switch ev := event.(type) {
case runnable.RetryEvent:
fmt.Printf("retry attempt %d: %v\n", ev.Attempt, ev.Err)
case runnable.DrainStartedEvent:
fmt.Printf("drain started, %s window\n", ev.Timeout)
case runnable.PanicRecoveredEvent:
fmt.Fprintf(os.Stderr, "panic: %v\n%s", ev.Recovered, ev.Stack)
}
}
r := runnable.New(work,
runnable.WithPublisher(log{}),
runnable.WithAdapters(adapters.Retry(3, time.Minute), adapters.Recovering()),
)StatusStore is a Publisher too — WithStatus(id, store) wires it
automatically and counts RetryEvents into Status.Restarts.
Publisher.Publish runs on the caller's goroutine, so subscribers must
not block. Buffer internally if you need async dispatch.
v0.1.0 moves retry and panic recovery out of the core package, and
introduces drain-on-shutdown and periodic execution as new adapters.
The Option-based WithRetry and WithRecoverer are removed; their
replacements live at runnable/adapters as chi-style middleware
applied via runnable.WithAdapters.
Before (v0.0.x):
r := runnable.New(doWork,
runnable.WithRecoverer(reporter, nil),
runnable.WithRetry(3, time.Minute),
)
After (v0.1.0):
r := runnable.New(doWork, runnable.WithAdapters(
adapters.Recovering(),
adapters.Retry(3, time.Minute),
))
Symbol mapping:
runnable.WithRetry/runnable.ResetNever→adapters.Retry/adapters.ResetNever.runnable.WithRecoverer→adapters.Recovering()plus arunnable.WithPublishersubscriber listening forrunnable.PanicRecoveredEvent(the two-interfaceRecoveryReporter/StackPrintercallback split is gone).
Status.Restarts is event-driven. The Restarts field on Status
is unchanged from a caller's perspective, but it now counts
runnable.RetryEvents published by adapters.Retry (or any other
Publisher source) rather than being incremented by an onStart
side-channel from WithRetry. No call-site change required when
using WithStatus + adapters.Retry.
New in v0.1.0: adapters.Draining for graceful shutdown,
adapters.Ticker for periodic execution, adapters.Stopping(ctx) to
observe drain start, adapters.ErrDrainTimedOut. See the Adapters
section above.
package main
import (
"time"
"github.com/0xsequence/runnable"
)
type Monitor struct {
runnable.Runnable
}
func NewMonitor() *Monitor {
m := &Monitor{}
m.Runnable = runnable.New(m.run)
return m
}
func (m *Monitor) run(ctx context.Context) error {
fmt.Println("Starting...")
defer fmt.Println("Stopping...")
// Start monitoring
for {
select {
case <-ctx.Done():
return nil
default:
}
time.Sleep(1 * time.Second)
fmt.Println("Monitoring...")
}
return nil
}
func main() {
fmt.Println("Runnable object(Monitor)...")
m := NewMonitor()
go func() {
time.Sleep(5 * time.Second)
fmt.Println("Calling Stop...")
err := m.Stop(context.Background())
if err != nil {
fmt.Println(err)
}
}()
err = m.Run(context.Background())
if err != nil {
fmt.Println(err)
}
}