From 4a746f1788ffbae65c608abc5c74f487bcaf1999 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 3 May 2026 12:14:01 +0200 Subject: [PATCH 1/6] Fix py_thread_callback_SUITE flake on slow CI hosts (#63) Pipe traffic between Python threads and the Erlang coordinator was vulnerable to short reads, two-write length+data races, and frame interleaving on the shared async pipe. Loop reads with a monotonic deadline, send each response as a single id-prefixed frame written under a non-blocking dirty-IO NIF, and serialise async writes through one Erlang process per pipe so chunked kernel writes can no longer interleave. Sync workers self-poison and the async pipe fails loud on unrecoverable read errors. Adds high-concurrency, async-concurrent and large-payload regression tests plus a local stress harness. --- c_src/CMakeLists.txt | 9 + c_src/py_callback.c | 317 ++++++++++++++++++------- c_src/py_nif.c | 18 +- c_src/py_nif.h | 191 ++++++++++++--- c_src/py_thread_worker.c | 376 +++++++++++++++++++++++------- scripts/stress_thread_callback.sh | 36 +++ src/py_nif.erl | 19 +- src/py_thread_handler.erl | 162 ++++++++++--- test/py_thread_callback_SUITE.erl | 113 ++++++++- 9 files changed, 995 insertions(+), 246 deletions(-) create mode 100755 scripts/stress_thread_callback.sh diff --git a/c_src/CMakeLists.txt b/c_src/CMakeLists.txt index 948d79a..37782d5 100644 --- a/c_src/CMakeLists.txt +++ b/c_src/CMakeLists.txt @@ -55,6 +55,15 @@ option(ENABLE_ASAN "Enable AddressSanitizer" OFF) option(ENABLE_TSAN "Enable ThreadSanitizer" OFF) option(ENABLE_UBSAN "Enable UndefinedBehaviorSanitizer" OFF) +# Optional thread-callback I/O tracing. Compiled out by default; turn on +# locally to debug py_thread_callback_SUITE-style flakiness. Logs one +# line per send/receive on stderr. +option(ENABLE_PY_THREAD_CB_TRACE "Trace thread-callback I/O on stderr" OFF) +if(ENABLE_PY_THREAD_CB_TRACE) + message(STATUS "Thread-callback I/O tracing enabled") + add_compile_definitions(PY_THREAD_CB_TRACE=1) +endif() + if(ENABLE_ASAN) message(STATUS "AddressSanitizer enabled") add_compile_options(-fsanitize=address -fno-omit-frame-pointer -g -O1) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index ab91cb6..20bcc8e 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2195,12 +2195,33 @@ extern ErlNifPid g_thread_coordinator_pid; extern bool g_has_thread_coordinator; /* Per-interpreter module state for async callbacks. - * Each subinterpreter gets its own pipe and futures dict. */ + * + * Each subinterpreter gets its own pipe and futures dict. The reader + * (process_async_callback_response) is event-driven from the asyncio + * loop and the read end is O_NONBLOCK; partial frames are buffered + * across invocations in hdr_buf/body_buf so the reader resumes on the + * next loop tick when more data arrives. */ typedef struct { int async_callback_pipe[2]; /* [0]=read, [1]=write - per-interpreter pipe */ PyObject *async_pending_futures; /* Dict: callback_id -> Future */ pthread_mutex_t async_futures_mutex; bool pipe_initialized; + + /* Set on hard read error (EIO/EBADF/EOF mid-frame). New + * erlang.async_call invocations short-circuit with a clear + * RuntimeError; pending futures are failed with the same error. */ + bool pipe_broken; + + /* Resumable frame parser state. The wire format is + * <> + * The header lives in a fixed buffer; the body is heap-allocated + * once the header arrives. EAGAIN preserves all of these so the + * next reader tick continues from the same offset. */ + uint8_t hdr_buf[12]; /* 8-byte id + 4-byte len */ + size_t hdr_have; /* 0..12 */ + char *body_buf; /* allocated once header parsed */ + uint32_t body_len; /* parsed from header */ + size_t body_have; /* 0..body_len */ } erlang_module_state_t; /* Forward declaration for module state accessor */ @@ -2254,10 +2275,17 @@ static int async_callback_init(void) { return -1; } - /* Set the read end to non-blocking for asyncio compatibility */ - int flags = fcntl(state->async_callback_pipe[0], F_GETFL, 0); - if (flags >= 0) { - fcntl(state->async_callback_pipe[0], F_SETFL, flags | O_NONBLOCK); + /* Set the read end non-blocking for asyncio compatibility, and + * the write end non-blocking so the dirty-IO NIF write path can + * timeout instead of pinning a scheduler on a stalled Python + * reader. */ + int rflags = fcntl(state->async_callback_pipe[0], F_GETFL, 0); + if (rflags >= 0) { + fcntl(state->async_callback_pipe[0], F_SETFL, rflags | O_NONBLOCK); + } + int wflags = fcntl(state->async_callback_pipe[1], F_GETFL, 0); + if (wflags >= 0) { + fcntl(state->async_callback_pipe[1], F_SETFL, wflags | O_NONBLOCK); } state->async_pending_futures = PyDict_New(); @@ -2276,119 +2304,232 @@ static int async_callback_init(void) { } /** - * Process a single async callback response from the pipe. - * Called by the asyncio reader callback. - * Returns: 1 if processed, 0 if no data, -1 on error + * Resolve a Future with a result or, if @p result is NULL, with the + * current Python exception (or a generic RuntimeError fallback). + * Borrows @p future; caller still owns its reference. */ -static int process_async_callback_response(void) { - erlang_module_state_t *state = get_erlang_module_state(); - if (state == NULL || !state->pipe_initialized) { - return -1; +static void resolve_future_with_result(PyObject *future, PyObject *result) { + if (result != NULL) { + PyObject *set_result = PyObject_GetAttrString(future, "set_result"); + if (set_result != NULL) { + PyObject *ret = PyObject_CallFunctionObjArgs(set_result, result, NULL); + Py_XDECREF(ret); + Py_DECREF(set_result); + } + return; } - /* Read callback_id (8 bytes) + response_len (4 bytes) + response_data */ - uint64_t callback_id; - uint32_t response_len; - ssize_t n; + /* Result was NULL: carry the Python exception over to the future. */ + PyObject *exc_type = NULL, *exc_value = NULL, *exc_tb = NULL; + PyErr_Fetch(&exc_type, &exc_value, &exc_tb); + + PyObject *set_exception = PyObject_GetAttrString(future, "set_exception"); + if (set_exception != NULL) { + if (exc_value != NULL) { + PyObject *ret = PyObject_CallFunctionObjArgs(set_exception, exc_value, NULL); + Py_XDECREF(ret); + } else { + PyObject *runtime_err = PyObject_CallFunction( + PyExc_RuntimeError, "s", "Erlang callback failed"); + PyObject *ret = PyObject_CallFunctionObjArgs( + set_exception, runtime_err, NULL); + Py_XDECREF(ret); + Py_XDECREF(runtime_err); + } + Py_DECREF(set_exception); + } + + Py_XDECREF(exc_type); + Py_XDECREF(exc_value); + Py_XDECREF(exc_tb); + PyErr_Clear(); +} + +/** + * Mark the async pipe broken and fail every pending future with + * RuntimeError("async callback pipe broken"). + * + * Mutex policy: snapshot the futures dict under + * async_futures_mutex (incref each future, then clear the dict), + * release the mutex, then call set_exception on the snapshotted + * list. Calling Python methods while holding the mutex is forbidden + * because future callbacks may re-enter any code path that takes + * the same mutex. + */ +static void async_pipe_break_and_fail_pending(erlang_module_state_t *state) { + if (state->pipe_broken) { + return; + } + + /* Reset frame buffer; any in-flight body is unrecoverable. */ + state->hdr_have = 0; + if (state->body_buf != NULL) { + enif_free(state->body_buf); + state->body_buf = NULL; + } + state->body_have = 0; + state->body_len = 0; + state->pipe_broken = true; - n = read(state->async_callback_pipe[0], &callback_id, sizeof(callback_id)); - if (n < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return 0; /* No data available (non-blocking) */ + PyObject *items = NULL; + pthread_mutex_lock(&state->async_futures_mutex); + if (state->async_pending_futures != NULL) { + items = PyDict_Items(state->async_pending_futures); + if (items != NULL) { + Py_ssize_t n = PyList_GET_SIZE(items); + for (Py_ssize_t i = 0; i < n; i++) { + PyObject *pair = PyList_GET_ITEM(items, i); + PyObject *fut = PyTuple_GET_ITEM(pair, 1); + Py_INCREF(fut); /* survive the dict clear */ + } + PyDict_Clear(state->async_pending_futures); } - return -1; /* Error */ } - if (n == 0) { - return 0; /* EOF / No data */ + pthread_mutex_unlock(&state->async_futures_mutex); + + if (items == NULL) { + return; } - if (n != sizeof(callback_id)) { - return -1; /* Partial read - error */ + + Py_ssize_t n = PyList_GET_SIZE(items); + for (Py_ssize_t i = 0; i < n; i++) { + PyObject *pair = PyList_GET_ITEM(items, i); + PyObject *fut = PyTuple_GET_ITEM(pair, 1); /* borrowed */ + PyObject *exc = PyObject_CallFunction( + PyExc_RuntimeError, "s", "async callback pipe broken"); + PyObject *set_exception = PyObject_GetAttrString(fut, "set_exception"); + if (set_exception != NULL && exc != NULL) { + PyObject *ret = PyObject_CallFunctionObjArgs(set_exception, exc, NULL); + Py_XDECREF(ret); + } + Py_XDECREF(set_exception); + Py_XDECREF(exc); + Py_DECREF(fut); /* match the INCREF in the snapshot loop */ } + Py_DECREF(items); +} - n = read(state->async_callback_pipe[0], &response_len, sizeof(response_len)); - if (n != sizeof(response_len)) { +/** + * Resumable nonblocking parser of the async-callback pipe. + * + * Wire format (matches nif_async_callback_response in py_thread_worker.c): + * <> + * + * Each invocation reads what the kernel will give without blocking and + * advances the parser state held in @c erlang_module_state_t. EAGAIN + * preserves the state for the next reader tick from the asyncio loop. + * + * Returns 1 when a frame was completed, 0 when no progress was made + * (EAGAIN at a frame boundary), -1 on hard error after marking the + * pipe broken and failing pending futures. + */ +static int process_async_callback_response(void) { + erlang_module_state_t *state = get_erlang_module_state(); + if (state == NULL || !state->pipe_initialized) { + return -1; + } + if (state->pipe_broken) { return -1; } - char *response_data = NULL; - if (response_len > 0) { - response_data = enif_alloc(response_len); - if (response_data == NULL) { + /* Stage 1: header (12 bytes). */ + while (state->hdr_have < sizeof(state->hdr_buf)) { + ssize_t n = read(state->async_callback_pipe[0], + state->hdr_buf + state->hdr_have, + sizeof(state->hdr_buf) - state->hdr_have); + if (n > 0) { + state->hdr_have += (size_t)n; + continue; + } + if (n < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) return 0; + async_pipe_break_and_fail_pending(state); return -1; } - n = read(state->async_callback_pipe[0], response_data, response_len); - if (n != (ssize_t)response_len) { - enif_free(response_data); + /* n == 0 (EOF). Clean only at a frame boundary. */ + if (state->hdr_have == 0) return 0; + async_pipe_break_and_fail_pending(state); + return -1; + } + + uint64_t callback_id; + uint32_t body_len; + memcpy(&callback_id, state->hdr_buf, sizeof(callback_id)); + memcpy(&body_len, state->hdr_buf + sizeof(callback_id), sizeof(body_len)); + + if (state->body_buf == NULL && body_len > 0) { + state->body_buf = enif_alloc(body_len); + if (state->body_buf == NULL) { + async_pipe_break_and_fail_pending(state); return -1; } + state->body_have = 0; + } + state->body_len = body_len; + + /* Stage 2: body (body_len bytes). */ + while (state->body_have < state->body_len) { + ssize_t n = read(state->async_callback_pipe[0], + state->body_buf + state->body_have, + state->body_len - state->body_have); + if (n > 0) { + state->body_have += (size_t)n; + continue; + } + if (n < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) return 0; + async_pipe_break_and_fail_pending(state); + return -1; + } + /* n == 0 mid-body is always an error. */ + async_pipe_break_and_fail_pending(state); + return -1; } - /* Look up and resolve the Future */ - pthread_mutex_lock(&state->async_futures_mutex); + /* Detach frame buffers and reset parser state BEFORE any Python + * re-entry. A recursive call from set_result / GC will then see a + * clean parser. */ + char *body = state->body_buf; + uint32_t blen = state->body_len; + state->hdr_have = 0; + state->body_buf = NULL; + state->body_have = 0; + state->body_len = 0; + /* Look up the future; copy it out under the mutex, resolve + * outside (review #4). */ + PyObject *future = NULL; + pthread_mutex_lock(&state->async_futures_mutex); PyObject *key = PyLong_FromUnsignedLongLong(callback_id); - PyObject *future = PyDict_GetItem(state->async_pending_futures, key); - - if (future != NULL) { - Py_INCREF(future); /* Keep reference while we use it */ - PyDict_DelItem(state->async_pending_futures, key); + if (key != NULL && state->async_pending_futures != NULL) { + PyObject *found = PyDict_GetItem(state->async_pending_futures, key); + if (found != NULL) { + Py_INCREF(found); + PyDict_DelItem(state->async_pending_futures, key); + future = found; + } } - Py_DECREF(key); - + Py_XDECREF(key); pthread_mutex_unlock(&state->async_futures_mutex); if (future != NULL) { - /* Parse response and resolve Future */ PyObject *result = NULL; - if (response_data != NULL) { - result = parse_callback_response((unsigned char *)response_data, response_len); + if (blen > 0) { + result = parse_callback_response((unsigned char *)body, blen); } else { Py_INCREF(Py_None); result = Py_None; } - - if (result != NULL) { - /* Call future.set_result(result) */ - PyObject *set_result = PyObject_GetAttrString(future, "set_result"); - if (set_result != NULL) { - PyObject *ret = PyObject_CallFunctionObjArgs(set_result, result, NULL); - Py_XDECREF(ret); - Py_DECREF(set_result); - } - Py_DECREF(result); - } else { - /* Error occurred - set exception on Future */ - PyObject *exc_type, *exc_value, *exc_tb; - PyErr_Fetch(&exc_type, &exc_value, &exc_tb); - - PyObject *set_exception = PyObject_GetAttrString(future, "set_exception"); - if (set_exception != NULL) { - if (exc_value != NULL) { - PyObject *ret = PyObject_CallFunctionObjArgs(set_exception, exc_value, NULL); - Py_XDECREF(ret); - } else { - PyObject *runtime_err = PyObject_CallFunction(PyExc_RuntimeError, - "s", "Erlang callback failed"); - PyObject *ret = PyObject_CallFunctionObjArgs(set_exception, runtime_err, NULL); - Py_XDECREF(ret); - Py_XDECREF(runtime_err); - } - Py_DECREF(set_exception); - } - - Py_XDECREF(exc_type); - Py_XDECREF(exc_value); - Py_XDECREF(exc_tb); - PyErr_Clear(); - } - + resolve_future_with_result(future, result); + Py_XDECREF(result); Py_DECREF(future); } - if (response_data != NULL) { - enif_free(response_data); + if (body != NULL) { + enif_free(body); } - return 1; } @@ -2474,6 +2615,12 @@ static PyObject *send_async_callback_request(PyObject *self, PyObject *args) { PyErr_SetString(PyExc_RuntimeError, "Async callback system not initialized"); return NULL; } + if (state->pipe_broken) { + PyErr_SetString(PyExc_RuntimeError, + "async callback pipe broken; restart the erlang_python " + "application to recover"); + return NULL; + } /* Generate callback ID */ uint64_t callback_id = atomic_fetch_add(&g_callback_id_counter, 1); diff --git a/c_src/py_nif.c b/c_src/py_nif.c index f81fd16..a2e638c 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -7801,13 +7801,19 @@ static ErlNifFunc nif_funcs[] = { /* Execution mode info */ {"execution_mode", 0, nif_execution_mode, 0}, - /* Thread worker support (ThreadPoolExecutor) */ + /* Thread worker support (ThreadPoolExecutor). + * Writes are ERL_NIF_DIRTY_JOB_IO_BOUND because the response pipe + * has a non-blocking write end and the looped write may briefly + * wait for write-readiness when the Python reader is slow. */ {"thread_worker_set_coordinator", 1, nif_thread_worker_set_coordinator, 0}, - {"thread_worker_write", 2, nif_thread_worker_write, 0}, - {"thread_worker_signal_ready", 1, nif_thread_worker_signal_ready, 0}, - - /* Async callback support (for erlang.async_call) */ - {"async_callback_response", 3, nif_async_callback_response, 0}, + {"thread_worker_write_with_id", 3, nif_thread_worker_write_with_id, + ERL_NIF_DIRTY_JOB_IO_BOUND}, + {"thread_worker_signal_ready", 1, nif_thread_worker_signal_ready, 0}, + + /* Async callback support (for erlang.async_call). Same dirty-IO + * rationale as thread_worker_write_with_id above. */ + {"async_callback_response", 3, nif_async_callback_response, + ERL_NIF_DIRTY_JOB_IO_BOUND}, /* Callback name registry (prevents torch introspection issues) */ {"register_callback_name", 1, nif_register_callback_name, 0}, diff --git a/c_src/py_nif.h b/c_src/py_nif.h index e353fd5..066024e 100644 --- a/c_src/py_nif.h +++ b/c_src/py_nif.h @@ -1683,62 +1683,108 @@ static char *binary_to_string(const ErlNifBinary *bin) { } /** - * @brief Read from a file descriptor with optional timeout + * @brief Read exactly @p count bytes from a file descriptor with a deadline. * - * Uses select() to implement a timeout on blocking read operations. - * This prevents indefinite blocking on pipe reads. + * Loops on partial reads and EINTR until @p count bytes are received or + * the deadline expires. Uses a monotonic deadline (not a per-call + * timeout) so retries cannot extend the wait indefinitely. * * @param fd File descriptor to read from * @param buf Buffer to read into * @param count Number of bytes to read - * @param timeout_ms Timeout in milliseconds (0 = no timeout, wait indefinitely) + * @param timeout_ms Total timeout in milliseconds (0 = no timeout) * - * @return Number of bytes read on success, 0 on timeout, -1 on error + * @return Bytes read so far on timeout/EOF (0 to count), -1 on hard error. + * A return value < count with errno == ETIMEDOUT signals timeout; + * < count with errno == 0 signals clean EOF. * - * @note On timeout, errno is set to ETIMEDOUT + * @note Callers MUST treat any return < count as a desynchronised pipe. + * There is no in-band recovery: the leftover bytes (if any) of a + * partial frame stay in the pipe. */ static ssize_t read_with_timeout(int fd, void *buf, size_t count, int timeout_ms) { - if (timeout_ms > 0) { - struct timeval tv; - tv.tv_sec = timeout_ms / 1000; - tv.tv_usec = (timeout_ms % 1000) * 1000; - - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - - int ret = select(fd + 1, &fds, NULL, NULL, &tv); - if (ret < 0) { - return -1; /* select error */ - } - if (ret == 0) { - errno = ETIMEDOUT; - return 0; /* timeout */ + char *p = buf; + size_t got = 0; + struct timespec deadline = {0}; + bool have_deadline = (timeout_ms > 0); + + if (have_deadline) { + clock_gettime(CLOCK_MONOTONIC, &deadline); + deadline.tv_sec += timeout_ms / 1000; + deadline.tv_nsec += (long)(timeout_ms % 1000) * 1000000L; + if (deadline.tv_nsec >= 1000000000L) { + deadline.tv_sec += 1; + deadline.tv_nsec -= 1000000000L; } } - return read(fd, buf, count); + while (got < count) { + if (have_deadline) { + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + long remain_ms = + (deadline.tv_sec - now.tv_sec) * 1000L + + (deadline.tv_nsec - now.tv_nsec) / 1000000L; + if (remain_ms <= 0) { + errno = ETIMEDOUT; + return (ssize_t)got; + } + struct timeval tv; + tv.tv_sec = remain_ms / 1000; + tv.tv_usec = (remain_ms % 1000) * 1000; + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + int s = select(fd + 1, &fds, NULL, NULL, &tv); + if (s < 0) { + if (errno == EINTR) continue; + return -1; + } + if (s == 0) { + errno = ETIMEDOUT; + return (ssize_t)got; + } + } + ssize_t n = read(fd, p + got, count - got); + if (n > 0) { + got += (size_t)n; + continue; + } + if (n < 0) { + if (errno == EINTR) continue; + return -1; + } + /* n == 0: writer closed pipe (clean EOF) */ + errno = 0; + return (ssize_t)got; + } + return (ssize_t)got; } /** - * @brief Read length-prefixed data from a file descriptor + * @brief Read length-prefixed data from a file descriptor. * - * Reads a 4-byte length prefix followed by the data payload. - * Uses read_with_timeout for optional timeout support. + * Wire format: 4-byte length prefix (native endianness) followed by + * @p len bytes of payload. Built on top of read_with_timeout, so + * partial reads / EINTR are handled transparently. * * @param fd File descriptor to read from - * @param data_out Pointer to store allocated data buffer (caller must free with enif_free) + * @param data_out Pointer to store allocated data buffer * @param len_out Pointer to store data length - * @param timeout_ms Timeout in milliseconds (0 = no timeout) + * @param timeout_ms Total timeout in milliseconds (0 = no timeout) * * @return 0 on success, -1 on read error/timeout, -2 on allocation failure * - * @note On success with len > 0, caller must call enif_free(*data_out) + * @note Updated contract: on -1, the calling layer MUST treat the pipe as + * desynchronised. There is no resync attempt here. Callers (sync + * thread-callback, suspended-callback) react: poison the worker or + * destroy the context. On success with len > 0, caller must free + * *data_out via enif_free(). */ static int read_length_prefixed_data(int fd, char **data_out, uint32_t *len_out, int timeout_ms) { uint32_t len; ssize_t n = read_with_timeout(fd, &len, sizeof(len), timeout_ms); - if (n != sizeof(len)) { + if (n != (ssize_t)sizeof(len)) { return -1; } @@ -1762,6 +1808,91 @@ static int read_length_prefixed_data(int fd, char **data_out, uint32_t *len_out, return 0; } +/** + * @brief Write status codes for write_all_with_deadline(). + */ +typedef enum { + WRITE_OK = 0, + WRITE_TIMEOUT = -1, + WRITE_ERROR = -2 +} write_result_t; + +/** + * @brief Write exactly @p count bytes to a (typically non-blocking) fd + * with a deadline. + * + * Loops on partial writes / EINTR / EAGAIN. On EAGAIN, uses select() for + * write-readiness with the remaining deadline. Used by the thread-worker + * write path to avoid pinning a dirty I/O scheduler thread on a stalled + * Python reader. + * + * @param fd File descriptor to write to + * @param buf Buffer to write + * @param count Number of bytes to write + * @param timeout_ms Total timeout in milliseconds (0 = no timeout) + * + * @return WRITE_OK on full write, WRITE_TIMEOUT on deadline expiry, + * WRITE_ERROR on hard error (errno preserved). + */ +static write_result_t write_all_with_deadline(int fd, const void *buf, + size_t count, int timeout_ms) { + const char *p = buf; + size_t sent = 0; + struct timespec deadline = {0}; + bool have_deadline = (timeout_ms > 0); + + if (have_deadline) { + clock_gettime(CLOCK_MONOTONIC, &deadline); + deadline.tv_sec += timeout_ms / 1000; + deadline.tv_nsec += (long)(timeout_ms % 1000) * 1000000L; + if (deadline.tv_nsec >= 1000000000L) { + deadline.tv_sec += 1; + deadline.tv_nsec -= 1000000000L; + } + } + + while (sent < count) { + ssize_t n = write(fd, p + sent, count - sent); + if (n > 0) { + sent += (size_t)n; + continue; + } + if (n < 0) { + if (errno == EINTR) continue; + if (errno != EAGAIN && errno != EWOULDBLOCK) { + return WRITE_ERROR; + } + /* Pipe full: wait for write-readiness within the deadline. */ + if (!have_deadline) { + return WRITE_ERROR; /* would block, no deadline */ + } + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + long remain_ms = + (deadline.tv_sec - now.tv_sec) * 1000L + + (deadline.tv_nsec - now.tv_nsec) / 1000000L; + if (remain_ms <= 0) return WRITE_TIMEOUT; + struct timeval tv; + tv.tv_sec = remain_ms / 1000; + tv.tv_usec = (remain_ms % 1000) * 1000; + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + int s = select(fd + 1, NULL, &fds, NULL, &tv); + if (s < 0) { + if (errno == EINTR) continue; + return WRITE_ERROR; + } + if (s == 0) return WRITE_TIMEOUT; + /* fd writable; loop back. */ + continue; + } + /* n == 0: shouldn't happen for write(); treat as error. */ + return WRITE_ERROR; + } + return WRITE_OK; +} + /** @} */ /* ============================================================================ diff --git a/c_src/py_thread_worker.c b/c_src/py_thread_worker.c index 5e0aed7..c099b62 100644 --- a/c_src/py_thread_worker.c +++ b/c_src/py_thread_worker.c @@ -60,8 +60,8 @@ typedef struct thread_worker { /** * @brief Response pipe file descriptors - * - [0] = read end (Python reads responses) - * - [1] = write end (Erlang writes responses) + * - [0] = read end (Python reads responses, blocking) + * - [1] = write end (Erlang writes responses, O_NONBLOCK) */ int response_pipe[2]; @@ -71,6 +71,13 @@ typedef struct thread_worker { /** @brief Flag: worker is currently in use */ bool in_use; + /** + * @brief Flag: worker is permanently retired due to a callback + * synchronisation failure. Skipped by acquire_thread_worker; not + * recycled until NIF unload. + */ + bool poisoned; + /** @brief PID of the Erlang handler process for this worker */ ErlNifPid handler_pid; @@ -109,6 +116,45 @@ ErlNifPid g_thread_coordinator_pid; /** @brief Flag: coordinator PID has been set */ bool g_has_thread_coordinator = false; +/** + * @brief Counter of poisoned (permanently retired) workers. + * + * Bounded diagnostic ceiling: when this exceeds MAX_POISONED_WORKERS we + * surface a loud error to the user instead of silently leaking pipe pairs. + * Reaching this threshold means the callback subsystem is genuinely broken + * and warrants a bug report. + */ +static _Atomic uint64_t g_poisoned_workers_count = 0; + +/** @brief Diagnostic ceiling on poisoned-worker accumulation. */ +#define MAX_POISONED_WORKERS 64 + +/** @brief Read/write timeout for thread-callback pipe traffic (30s). */ +#define THREAD_WORKER_IO_TIMEOUT_MS 30000 + +/** @brief Stack-buffer threshold for combined-frame writes. */ +#define THREAD_WORKER_STACK_FRAME_LIMIT 1024 + +/* PY_THREAD_CB_TRACE: optional stderr tracing for callback I/O. + * Compile-time gate, default OFF. Enable with CMake + * -DENABLE_PY_THREAD_CB_TRACE=ON + * which sets PY_THREAD_CB_TRACE=1. + */ +#ifdef PY_THREAD_CB_TRACE +# define PY_THREAD_CB_LOG(fmt, ...) \ + do { \ + struct timespec _ts; \ + clock_gettime(CLOCK_MONOTONIC, &_ts); \ + enif_fprintf(stderr, \ + "[py_thread_cb %ld.%09ld tid=0x%08x] " fmt "\n", \ + (long)_ts.tv_sec, (long)_ts.tv_nsec, \ + (unsigned)((uintptr_t)pthread_self() & 0xffffffff), \ + ##__VA_ARGS__); \ + } while (0) +#else +# define PY_THREAD_CB_LOG(fmt, ...) ((void)0) +#endif + /* ============================================================================ * Thread Worker Destructor (pthread_key cleanup) * ============================================================================ */ @@ -247,6 +293,7 @@ static thread_worker_t *create_thread_worker(void) { tw->response_pipe[1] = -1; tw->in_use = true; tw->has_handler = false; + tw->poisoned = false; /* Create response pipe */ if (pipe(tw->response_pipe) < 0) { @@ -254,6 +301,15 @@ static thread_worker_t *create_thread_worker(void) { return NULL; } + /* Set the WRITE end non-blocking. The Erlang handler runs on a dirty + * I/O scheduler thread (see py_nif.c registration); making the write + * end non-blocking lets write_all_with_deadline() return on a stalled + * Python reader instead of pinning a dirty thread indefinitely. */ + int wflags = fcntl(tw->response_pipe[1], F_GETFL, 0); + if (wflags >= 0) { + (void)fcntl(tw->response_pipe[1], F_SETFL, wflags | O_NONBLOCK); + } + pthread_mutex_init(&tw->mutex, NULL); /* Add to pool */ @@ -263,25 +319,51 @@ static thread_worker_t *create_thread_worker(void) { return tw; } +/** + * @brief Drain any leftover bytes from a pipe read end (non-blocking). + * + * Called when reusing a worker so the new owner does not inherit stale + * bytes from a previous owner that exited mid-frame. Always restores + * the original flags, even on intermediate read errors. + */ +static void drain_pipe(int fd) { + int orig = fcntl(fd, F_GETFL, 0); + if (orig < 0) return; + if (fcntl(fd, F_SETFL, orig | O_NONBLOCK) < 0) return; + + char scratch[256]; + for (;;) { + ssize_t n = read(fd, scratch, sizeof(scratch)); + if (n > 0) continue; + if (n < 0 && errno == EINTR) continue; + break; /* EAGAIN, 0, or hard error */ + } + (void)fcntl(fd, F_SETFL, orig); +} + /** * @brief Acquire a thread worker from the pool * - * First tries to find an existing unused worker in the pool. - * If none available, creates a new one. + * First tries to find an existing unused, non-poisoned worker in the + * pool. If none available, creates a new one. Poisoned workers stay in + * the linked list but are never returned. * * @return Pointer to acquired worker, or NULL on failure */ static thread_worker_t *acquire_thread_worker(void) { pthread_mutex_lock(&g_thread_pool_mutex); - /* Look for unused worker */ + /* Look for unused, non-poisoned worker */ thread_worker_t *tw = g_thread_pool_head; while (tw != NULL) { pthread_mutex_lock(&tw->mutex); - if (!tw->in_use) { + if (!tw->in_use && !tw->poisoned) { tw->in_use = true; + int read_fd = tw->response_pipe[0]; pthread_mutex_unlock(&tw->mutex); pthread_mutex_unlock(&g_thread_pool_mutex); + /* Drain any stale bytes the previous owner left behind. */ + if (read_fd >= 0) drain_pipe(read_fd); return tw; } pthread_mutex_unlock(&tw->mutex); @@ -295,6 +377,48 @@ static thread_worker_t *acquire_thread_worker(void) { return tw; } +/** + * @brief Permanently retire a worker after a callback synchronisation + * failure. + * + * Called by thread_worker_call when the read path cannot guarantee + * frame integrity (id mismatch + corrupted payload, partial-frame + * timeout, EOF mid-frame). Closes both pipe ends so any further write + * by the existing handler fails loudly, marks the worker as out of + * service, and bumps the global counter for diagnostics. + * + * @note Caller must clear tl_thread_worker before returning to Python so + * the next call from this thread acquires a fresh worker. + */ +static void poison_thread_worker(thread_worker_t *tw) { + pthread_mutex_lock(&tw->mutex); + if (tw->poisoned) { + pthread_mutex_unlock(&tw->mutex); + return; + } + tw->poisoned = true; + tw->in_use = false; + int read_fd = tw->response_pipe[0]; + int write_fd = tw->response_pipe[1]; + tw->response_pipe[0] = -1; + tw->response_pipe[1] = -1; + pthread_mutex_unlock(&tw->mutex); + + if (read_fd >= 0) close(read_fd); + if (write_fd >= 0) close(write_fd); + + uint64_t prev = atomic_fetch_add(&g_poisoned_workers_count, 1); + if (prev + 1 >= MAX_POISONED_WORKERS) { + enif_fprintf(stderr, + "[erlang_python] WARNING: %llu thread-callback workers have been " + "poisoned. The thread-callback subsystem is unhealthy; please " + "file a bug report with the failing test case.\n", + (unsigned long long)(prev + 1)); + } + PY_THREAD_CB_LOG("poison tw_id=%llu count=%llu", + (unsigned long long)tw->id, (unsigned long long)(prev + 1)); +} + /* ============================================================================ * Thread Worker Call Implementation * ============================================================================ */ @@ -330,26 +454,15 @@ static int thread_worker_spawn_handler(thread_worker_t *tw) { } enif_free_env(msg_env); - /* Read handler PID from pipe (Erlang writes it after spawning) - * Use 10 second timeout for handler spawn */ + /* Read readiness signal: 4 bytes of length == 0. Strict success + * condition (Defect 5): both the byte count must match AND the + * value must be zero. Short reads are not silently accepted. */ uint32_t response_len = 0; - ssize_t n = read_with_timeout(tw->response_pipe[0], &response_len, sizeof(response_len), 10000); - if (n <= 0) { - /* Timeout or error - handler spawn failed */ - return -1; - } - if (n != sizeof(response_len) || response_len == 0) { - /* Handler spawned successfully - pid info is in the length as a signal */ - tw->has_handler = true; - return 0; - } - - /* If response_len > 0, there might be error data */ - if (response_len > 1000) { - /* Sanity check - something went wrong */ + ssize_t n = read_with_timeout(tw->response_pipe[0], &response_len, + sizeof(response_len), 10000); + if (n != (ssize_t)sizeof(response_len) || response_len != 0) { return -1; } - tw->has_handler = true; return 0; } @@ -360,6 +473,15 @@ static int thread_worker_spawn_handler(thread_worker_t *tw) { * This is called when a Python thread that is NOT an executor thread * (i.e., tl_current_worker == NULL) tries to call erlang.call(). * + * Wire format (response from Erlang handler): + * <> + * + * The callback_id is matched against the request's expected id; on + * mismatch, the rest of the frame is consumed and the loop continues. + * Any partial-frame failure poisons the worker (see + * poison_thread_worker) so subsequent calls from this thread acquire a + * fresh one. + * * @param func_name Name of the Erlang function to call * @param func_name_len Length of function name * @param call_args Python tuple of arguments @@ -367,6 +489,14 @@ static int thread_worker_spawn_handler(thread_worker_t *tw) { */ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, PyObject *call_args) { + /* Diagnostic ceiling: surface the unhealthy subsystem to the user. */ + if (atomic_load(&g_poisoned_workers_count) >= MAX_POISONED_WORKERS) { + PyErr_SetString(PyExc_RuntimeError, + "thread-callback subsystem unhealthy " + "(too many poisoned workers; see stderr)"); + return NULL; + } + /* Get or create sticky worker for this thread */ if (tl_thread_worker == NULL) { tl_thread_worker = acquire_thread_worker(); @@ -412,8 +542,8 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, ERL_NIF_TERM args_term = py_to_term(msg_env, call_args); /* Generate callback ID */ - uint64_t callback_id = atomic_fetch_add(&g_callback_id_counter, 1); - ERL_NIF_TERM id_term = enif_make_uint64(msg_env, callback_id); + uint64_t expected_id = atomic_fetch_add(&g_callback_id_counter, 1); + ERL_NIF_TERM id_term = enif_make_uint64(msg_env, expected_id); /* Send message: {thread_callback, WorkerId, CallbackId, FuncName, Args} */ ERL_NIF_TERM msg = enif_make_tuple5(msg_env, @@ -423,10 +553,6 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, func_term, args_term); - char *response_data = NULL; - uint32_t response_len = 0; - int read_result; - /* Send message to coordinator (can be done with GIL held) */ if (!enif_send(NULL, &g_thread_coordinator_pid, msg_env, msg)) { enif_free_env(msg_env); @@ -434,33 +560,69 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, return NULL; } enif_free_env(msg_env); + PY_THREAD_CB_LOG("send tw_id=%llu cb_id=%llu", + (unsigned long long)tw->id, (unsigned long long)expected_id); + + /* Read frames until we get the one matching expected_id, or fail. */ + char *response_data = NULL; + uint32_t response_len = 0; + int desync = 0; /* 1 = poison the worker before returning */ - /* Release GIL while waiting for response */ Py_BEGIN_ALLOW_THREADS - /* Use 30 second timeout to prevent indefinite blocking */ - read_result = read_length_prefixed_data( - tw->response_pipe[0], &response_data, &response_len, 30000); + while (1) { + uint64_t got_id = 0; + ssize_t nid = read_with_timeout(tw->response_pipe[0], + &got_id, sizeof(got_id), + THREAD_WORKER_IO_TIMEOUT_MS); + if (nid != (ssize_t)sizeof(got_id)) { + desync = 1; + break; + } + int r = read_length_prefixed_data( + tw->response_pipe[0], &response_data, &response_len, + THREAD_WORKER_IO_TIMEOUT_MS); + if (r != 0) { + desync = 1; + break; + } + if (got_id == expected_id) { + break; /* response_data + response_len ready */ + } + /* Stale frame: discard and continue draining. */ + if (response_data != NULL) { + enif_free(response_data); + response_data = NULL; + response_len = 0; + } + } Py_END_ALLOW_THREADS - if (read_result == -1) { - if (errno == ETIMEDOUT) { - PyErr_SetString(PyExc_TimeoutError, "Callback response timed out"); - } else { - PyErr_SetString(PyExc_RuntimeError, "Failed to read callback response"); + if (desync) { + if (response_data != NULL) { + enif_free(response_data); } + PY_THREAD_CB_LOG("desync tw_id=%llu cb_id=%llu errno=%d", + (unsigned long long)tw->id, + (unsigned long long)expected_id, errno); + poison_thread_worker(tw); + tl_thread_worker = NULL; + if (g_thread_worker_key_created) { + pthread_setspecific(g_thread_worker_key, NULL); + } + PyErr_SetString(PyExc_RuntimeError, + "callback synchronisation lost; retry"); return NULL; } - if (read_result == -2) { - PyErr_SetString(PyExc_MemoryError, "Failed to allocate response buffer"); - return NULL; - } - /* Parse response using existing function */ - PyObject *result = parse_callback_response((unsigned char *)response_data, response_len); + PY_THREAD_CB_LOG("recv tw_id=%llu cb_id=%llu len=%u", + (unsigned long long)tw->id, (unsigned long long)expected_id, + (unsigned)response_len); + + PyObject *result = parse_callback_response( + (unsigned char *)response_data, response_len); if (response_data != NULL) { enif_free(response_data); } - return result; } @@ -488,38 +650,67 @@ static ERL_NIF_TERM nif_thread_worker_set_coordinator(ErlNifEnv *env, int argc, } /** - * @brief NIF to write response to a thread worker's pipe + * @brief NIF to write an id-prefixed response to a thread worker's pipe. * - * Args: WriteFd, ResponseBinary - * Called by Erlang handler to send callback result back to Python thread. + * Args: WriteFd, CallbackId, ResponseBinary + * + * Wire format (matches thread_worker_call's reader): + * <> + * + * Combining the three pieces into one looped write closes the + * length-then-data race (Defect 3) and lets the reader correlate + * responses by callback_id. The write end is non-blocking and the NIF + * is registered as ERL_NIF_DIRTY_JOB_IO_BOUND, so a stalled Python + * reader produces a write_timeout instead of pinning a scheduler. */ -static ERL_NIF_TERM nif_thread_worker_write(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]) { +static ERL_NIF_TERM nif_thread_worker_write_with_id(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { (void)argc; int fd; + ErlNifUInt64 callback_id; ErlNifBinary response; if (!enif_get_int(env, argv[0], &fd)) { return make_error(env, "invalid_fd"); } - - if (!enif_inspect_binary(env, argv[1], &response)) { + if (!enif_get_uint64(env, argv[1], &callback_id)) { + return make_error(env, "invalid_callback_id"); + } + if (!enif_inspect_binary(env, argv[2], &response)) { return make_error(env, "invalid_response"); } - /* Write length then data */ - uint32_t len = (uint32_t)response.size; - ssize_t n = write(fd, &len, sizeof(len)); - if (n != sizeof(len)) { - return make_error(env, "write_length_failed"); + size_t total = sizeof(callback_id) + sizeof(uint32_t) + response.size; + char stack_buf[THREAD_WORKER_STACK_FRAME_LIMIT]; + char *buf = (total <= sizeof(stack_buf)) + ? stack_buf + : enif_alloc(total); + if (buf == NULL) { + return make_error(env, "alloc_failed"); } - n = write(fd, response.data, response.size); - if (n != (ssize_t)response.size) { - return make_error(env, "write_data_failed"); + uint64_t cb_id = (uint64_t)callback_id; + uint32_t len = (uint32_t)response.size; + memcpy(buf, &cb_id, sizeof(cb_id)); + memcpy(buf + sizeof(cb_id), &len, sizeof(len)); + if (response.size > 0) { + memcpy(buf + sizeof(cb_id) + sizeof(len), + response.data, response.size); } - return ATOM_OK; + write_result_t r = write_all_with_deadline( + fd, buf, total, THREAD_WORKER_IO_TIMEOUT_MS); + + if (buf != stack_buf) { + enif_free(buf); + } + + switch (r) { + case WRITE_OK: return ATOM_OK; + case WRITE_TIMEOUT: return make_error(env, "write_timeout"); + case WRITE_ERROR: return make_error(env, "write_failed"); + } + return make_error(env, "write_failed"); } /** @@ -548,12 +739,24 @@ static ERL_NIF_TERM nif_thread_worker_signal_ready(ErlNifEnv *env, int argc, } /** - * @brief NIF to write an async callback response + * @brief NIF to write an async callback response. * * Args: WriteFd, CallbackId, ResponseBinary - * Called by Erlang to send the result of an async callback back to Python. - * The response is written to the async callback pipe in the format: - * callback_id (8 bytes) + response_len (4 bytes) + response_data + * + * Wire format (matches process_async_callback_response in py_callback.c): + * <> + * + * Atomicity (review #4): the kernel does NOT guarantee per-write() + * atomicity beyond PIPE_BUF, so frame-on-the-wire integrity is + * guaranteed by the *single-writer process invariant* on the Erlang + * side (one async_writer_loop per WriteFd; see py_thread_handler.erl). + * This NIF does one looped non-blocking write of the combined buffer; + * even when the kernel chunks it, no other writer can interleave. + * + * Registered as ERL_NIF_DIRTY_JOB_IO_BOUND (see py_nif.c). The write + * end of async_callback_pipe is set non-blocking on the C side + * (py_callback.c init); a stalled Python reader produces + * write_timeout instead of holding a scheduler thread. */ static ERL_NIF_TERM nif_async_callback_response(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { @@ -565,35 +768,42 @@ static ERL_NIF_TERM nif_async_callback_response(ErlNifEnv *env, int argc, if (!enif_get_int(env, argv[0], &fd)) { return make_error(env, "invalid_fd"); } - if (!enif_get_uint64(env, argv[1], &callback_id)) { return make_error(env, "invalid_callback_id"); } - if (!enif_inspect_binary(env, argv[2], &response)) { return make_error(env, "invalid_response"); } - /* Write callback_id (8 bytes) */ - ssize_t n = write(fd, &callback_id, sizeof(callback_id)); - if (n != sizeof(callback_id)) { - return make_error(env, "write_callback_id_failed"); + size_t total = sizeof(callback_id) + sizeof(uint32_t) + response.size; + char stack_buf[THREAD_WORKER_STACK_FRAME_LIMIT]; + char *buf = (total <= sizeof(stack_buf)) + ? stack_buf + : enif_alloc(total); + if (buf == NULL) { + return make_error(env, "alloc_failed"); } - /* Write response_len (4 bytes) */ - uint32_t len = (uint32_t)response.size; - n = write(fd, &len, sizeof(len)); - if (n != sizeof(len)) { - return make_error(env, "write_length_failed"); + uint64_t cb_id = (uint64_t)callback_id; + uint32_t len = (uint32_t)response.size; + memcpy(buf, &cb_id, sizeof(cb_id)); + memcpy(buf + sizeof(cb_id), &len, sizeof(len)); + if (response.size > 0) { + memcpy(buf + sizeof(cb_id) + sizeof(len), + response.data, response.size); } - /* Write response_data */ - if (len > 0) { - n = write(fd, response.data, response.size); - if (n != (ssize_t)response.size) { - return make_error(env, "write_data_failed"); - } + write_result_t r = write_all_with_deadline( + fd, buf, total, THREAD_WORKER_IO_TIMEOUT_MS); + + if (buf != stack_buf) { + enif_free(buf); } - return ATOM_OK; + switch (r) { + case WRITE_OK: return ATOM_OK; + case WRITE_TIMEOUT: return make_error(env, "write_timeout"); + case WRITE_ERROR: return make_error(env, "write_failed"); + } + return make_error(env, "write_failed"); } diff --git a/scripts/stress_thread_callback.sh b/scripts/stress_thread_callback.sh new file mode 100755 index 0000000..68e026e --- /dev/null +++ b/scripts/stress_thread_callback.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +# Stress harness for py_thread_callback_SUITE. Runs the suite repeatedly +# and aborts on the first failure. Use under CPU pressure to widen race +# windows: +# +# for i in 1 2 3 4; do yes > /dev/null & done +# ENABLE_PY_THREAD_CB_TRACE=ON ./scripts/stress_thread_callback.sh 50 +# kill %1 %2 %3 %4 +# +# A clean run prints "PASS " on every iteration and exits 0. + +set -euo pipefail + +ITERATIONS="${1:-50}" + +cd "$(dirname "$0")/.." + +if [[ "${ENABLE_PY_THREAD_CB_TRACE:-OFF}" == "ON" ]]; then + echo "Rebuilding with PY_THREAD_CB_TRACE on..." + rm -rf _build/cmake _build/default/lib/erlang_python/priv/erlang_python_nif* + mkdir -p _build/cmake + (cd _build/cmake && cmake ../../c_src -DENABLE_PY_THREAD_CB_TRACE=ON \ + && cmake --build . -- -j "$(nproc 2>/dev/null || sysctl -n hw.ncpu)") +fi + +rebar3 compile + +for i in $(seq 1 "$ITERATIONS"); do + if ! rebar3 ct --suite=test/py_thread_callback_SUITE --readable=compact; then + echo "FAIL on iteration $i" + exit 1 + fi + echo "PASS $i" +done + +echo "All $ITERATIONS iterations passed." diff --git a/src/py_nif.erl b/src/py_nif.erl index 55b8b17..4d07a6d 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -71,7 +71,7 @@ execution_mode/0, %% Thread worker support (ThreadPoolExecutor) thread_worker_set_coordinator/1, - thread_worker_write/2, + thread_worker_write_with_id/3, thread_worker_signal_ready/1, %% Async callback support (for erlang.async_call) async_callback_response/3, @@ -593,11 +593,18 @@ execution_mode() -> thread_worker_set_coordinator(_Pid) -> ?NIF_STUB. -%% @doc Write a callback response to a thread worker's pipe. -%% Fd is the write end of the response pipe. -%% Response is the result binary (status byte + python repr). --spec thread_worker_write(integer(), binary()) -> ok | {error, term()}. -thread_worker_write(_Fd, _Response) -> +%% @doc Write a callback response to a thread worker's pipe, prefixed +%% with the callback id so the Python reader can correlate responses +%% to outstanding requests. +%% +%% Wire format: `<>'. +%% The whole frame is written in one looped non-blocking write; this +%% NIF is registered as `ERL_NIF_DIRTY_JOB_IO_BOUND' so a stalled +%% Python reader produces `{error, write_timeout}' rather than pinning +%% a scheduler. +-spec thread_worker_write_with_id(integer(), non_neg_integer(), binary()) -> + ok | {error, term()}. +thread_worker_write_with_id(_Fd, _CallbackId, _Response) -> ?NIF_STUB. %% @doc Signal that a thread worker handler is ready. diff --git a/src/py_thread_handler.erl b/src/py_thread_handler.erl index cd463d6..51f7235 100644 --- a/src/py_thread_handler.erl +++ b/src/py_thread_handler.erl @@ -48,11 +48,25 @@ terminate/2 ]). -%% State: map of WorkerId => {HandlerPid, WriteFd} +%% State: +%% handlers — sync thread workers: WorkerId => {HandlerPid, WriteFd} +%% async_writers — async pipes: WriteFd => WriterPid (one writer process +%% per pipe; serialises py_nif:async_callback_response/3 +%% so frames cannot interleave on the wire). +%% writer_fds — reverse map: WriterPid => WriteFd (fast lookup on +%% 'DOWN' messages). -record(state, { - handlers = #{} :: #{non_neg_integer() => {pid(), integer()}} + handlers = #{} :: #{non_neg_integer() => {pid(), integer()}}, + async_writers = #{} :: #{integer() => pid()}, + writer_fds = #{} :: #{pid() => integer()} }). +%% Bound on async-writer mailbox length. Submissions beyond this point +%% are failed inline (the writer serialises an error frame for the +%% callback) so a stalled Python event loop cannot grow the writer's +%% mailbox without bound. +-define(ASYNC_WRITER_MAX_QUEUE, 10000). + %%% ============================================================================ %%% API %%% ============================================================================ @@ -66,6 +80,11 @@ start_link() -> %%% ============================================================================ init([]) -> + %% Trap exits so a dying handler / async writer is reaped via + %% handle_info({'EXIT', ...}) instead of taking the coordinator + %% down. (Also enables the 'DOWN' clause for monitored writers, + %% which we route through the same cleanup logic.) + process_flag(trap_exit, true), %% Register ourselves as the thread worker coordinator case py_nif:thread_worker_set_coordinator(self()) of ok -> @@ -105,25 +124,50 @@ handle_info({thread_callback, WorkerId, CallbackId, FuncName, Args}, end, {noreply, State}; -%% Handle async callback request from Python async_call() -%% Unlike thread_callback, this uses a global pipe for all async callbacks. -%% Each response includes the callback_id so Python can match it to the right Future. -handle_info({async_callback, CallbackId, FuncName, Args, WriteFd}, State) -> - %% Spawn a process to handle this callback asynchronously - %% This allows multiple async callbacks to be processed concurrently - spawn_link(fun() -> - handle_async_callback(WriteFd, CallbackId, FuncName, Args) - end), - {noreply, State}; +%% Handle async callback request from Python async_call(). +%% +%% Unlike thread_callback, this uses one shared per-interpreter pipe +%% for all async callbacks. To prevent frame-on-the-wire interleaving +%% from concurrent writers (issue #63), every response is funnelled +%% through a single writer process per WriteFd: callback execution +%% stays parallel, only the pipe write is serialised. +handle_info({async_callback, CallbackId, FuncName, Args, WriteFd}, State0) -> + {Writer, State1} = ensure_async_writer(WriteFd, State0), + case writer_overload(Writer) of + true -> + %% Backpressure: surface "queue full" as a normal error + %% frame on the pipe so the Future resolves with a clear + %% RuntimeError instead of growing the writer's mailbox + %% indefinitely. + Writer ! {overflow, CallbackId}; + false -> + spawn(fun() -> + Response = run_async_callback(FuncName, Args), + Writer ! {respond, CallbackId, Response} + end) + end, + {noreply, State1}; -%% Handle handler process exit +%% Handle linked sync handler exit. handle_info({'EXIT', Pid, _Reason}, #state{handlers = Handlers} = State) -> - %% Remove handler from map NewHandlers = maps:filter( fun(_WorkerId, {HandlerPid, _Fd}) -> HandlerPid =/= Pid end, Handlers), {noreply, State#state{handlers = NewHandlers}}; +%% Handle monitored async writer exit (writer hits a hard write error). +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + #state{async_writers = Writers, + writer_fds = WriterFds} = State) -> + case maps:take(Pid, WriterFds) of + {WriteFd, NewWriterFds} -> + NewWriters = maps:remove(WriteFd, Writers), + {noreply, State#state{async_writers = NewWriters, + writer_fds = NewWriterFds}}; + error -> + {noreply, State} + end; + handle_info(_Info, State) -> {noreply, State}. @@ -138,9 +182,12 @@ terminate(_Reason, _State) -> %% Each handler is dedicated to one Python thread and has its own response pipe. handler_loop(WorkerId, WriteFd) -> receive - {thread_callback, _CallbackId, FuncName, Args} -> - %% Execute the callback and send response - handle_thread_callback(WriteFd, FuncName, Args), + {thread_callback, CallbackId, FuncName, Args} -> + %% Execute the callback and send response. If the write fails + %% (pipe closed by the C side after worker poisoning, or + %% write_timeout) we exit so the coordinator's trap_exit + %% clause removes us from its handler map. + handle_thread_callback(WriteFd, CallbackId, FuncName, Args), handler_loop(WorkerId, WriteFd); {pipe_closed} -> @@ -154,8 +201,8 @@ handler_loop(WorkerId, WriteFd) -> handler_loop(WorkerId, WriteFd) end. -%% Execute a callback and write response to the pipe -handle_thread_callback(WriteFd, FuncName, Args) -> +%% Execute a callback and write the id-prefixed response to the pipe. +handle_thread_callback(WriteFd, CallbackId, FuncName, Args) -> %% Convert Args from tuple to list if needed ArgsList = case Args of T when is_tuple(T) -> tuple_to_list(T); @@ -180,25 +227,26 @@ handle_thread_callback(WriteFd, FuncName, Args) -> <<1, ErrMsg/binary>> end, - %% Write response to pipe - py_nif:thread_worker_write(WriteFd, Response). + %% Write response to pipe; exit on failure so we are reaped. + case py_nif:thread_worker_write_with_id(WriteFd, CallbackId, Response) of + ok -> ok; + {error, Reason1} -> + exit({thread_worker_write_failed, Reason1}) + end. -%% Execute an async callback and write response to the async callback pipe. -%% Unlike handle_thread_callback, this includes the callback_id in the response -%% so Python can match it to the correct Future. -handle_async_callback(WriteFd, CallbackId, FuncName, Args) -> - %% Convert Args from tuple to list if needed +%% Execute the user's registered function and encode the result as a +%% wire-format response body (`<>`). +%% Used by async_writer_loop (and indirectly by handle_thread_callback +%% via the same encoding shape — the sync path keeps its own copy +%% close to the write call site). +run_async_callback(FuncName, Args) -> ArgsList = case Args of T when is_tuple(T) -> tuple_to_list(T); L when is_list(L) -> L; _ -> [Args] end, - - %% Execute the registered function - Response = case py_callback:execute(FuncName, ArgsList) of + case py_callback:execute(FuncName, ArgsList) of {ok, Result} -> - %% Encode result as Python-parseable string - %% Format: status_byte (0=ok) + python_repr ResultStr = term_to_python_repr(Result), <<0, ResultStr/binary>>; {error, {not_found, Name}} -> @@ -209,10 +257,56 @@ handle_async_callback(WriteFd, CallbackId, FuncName, Args) -> ErrMsg = iolist_to_binary( io_lib:format("~p: ~p", [Class, Reason])), <<1, ErrMsg/binary>> - end, + end. - %% Write response to async callback pipe (includes callback_id) - py_nif:async_callback_response(WriteFd, CallbackId, Response). +%% Look up or spawn the async writer for a given WriteFd. Writers are +%% monitored (not linked) so a writer death does not take the +%% coordinator down; the 'DOWN' clause cleans up the maps. +ensure_async_writer(WriteFd, + #state{async_writers = Writers, + writer_fds = WriterFds} = State) -> + case maps:get(WriteFd, Writers, undefined) of + undefined -> + Writer = spawn(fun() -> async_writer_loop(WriteFd) end), + erlang:monitor(process, Writer), + {Writer, State#state{ + async_writers = Writers#{WriteFd => Writer}, + writer_fds = WriterFds#{Writer => WriteFd}}}; + Existing -> + {Existing, State} + end. + +%% Backpressure check: bound the writer's mailbox. +writer_overload(Writer) -> + case erlang:process_info(Writer, message_queue_len) of + {message_queue_len, N} when N >= ?ASYNC_WRITER_MAX_QUEUE -> true; + _ -> false + end. + +%% Per-fd writer process. Owns one async pipe write fd; its mailbox is +%% the implicit serialisation point for all responses on that fd. +%% Exits on hard write errors so the coordinator's 'DOWN' clause can +%% drop the entry; the next callback respawns it. +async_writer_loop(WriteFd) -> + receive + {respond, CallbackId, Response} -> + case py_nif:async_callback_response(WriteFd, CallbackId, Response) of + ok -> async_writer_loop(WriteFd); + {error, _} = Err -> + exit({async_callback_response_failed, Err}) + end; + {overflow, CallbackId} -> + Response = <<1, "async callback queue full">>, + case py_nif:async_callback_response(WriteFd, CallbackId, Response) of + ok -> async_writer_loop(WriteFd); + {error, _} = Err -> + exit({async_callback_response_failed, Err}) + end; + shutdown -> + ok; + _Other -> + async_writer_loop(WriteFd) + end. %%% ============================================================================ %%% Term to Python repr conversion diff --git a/test/py_thread_callback_SUITE.erl b/test/py_thread_callback_SUITE.erl index adce52b..a656bc3 100644 --- a/test/py_thread_callback_SUITE.erl +++ b/test/py_thread_callback_SUITE.erl @@ -23,7 +23,10 @@ threadpool_thread_reuse_test/1, simple_thread_basic_test/1, simple_thread_multiple_calls_test/1, - simple_thread_concurrent_test/1 + simple_thread_concurrent_test/1, + threadpool_high_concurrency_test/1, + async_callback_concurrent_test/1, + async_callback_large_payload_test/1 ]). all() -> @@ -36,7 +39,10 @@ all() -> threadpool_thread_reuse_test, simple_thread_basic_test, simple_thread_multiple_calls_test, - simple_thread_concurrent_test + simple_thread_concurrent_test, + threadpool_high_concurrency_test, + async_callback_concurrent_test, + async_callback_large_payload_test ]. init_per_suite(Config) -> @@ -63,6 +69,8 @@ end_per_testcase(_TestCase, _Config) -> catch py:unregister_function(square_in_erlang), catch py:unregister_function(maybe_fail), catch py:unregister_function(get_id), + catch py:unregister_function(async_double), + catch py:unregister_function(async_blob), ok. %%% ============================================================================ @@ -204,3 +212,104 @@ simple_thread_concurrent_test(_Config) -> {ok, Results} = py:eval(Code), [0, 2, 4, 6, 8] = Results, ok. + +%%% ============================================================================ +%%% Regression tests for issue #63 (sync + async pipe race / interleave) +%%% ============================================================================ + +%% @doc Stress the sync thread-callback path: +%% - 8 worker threads, +%% - 200 tasks per repetition, each task makes 5 sequential +%% erl.call('add_one', x) invocations, +%% - 10 repetitions per case. +%% +%% Without the Phase 2-5 fixes (looped reads, combined write, +%% callback_id correlation, worker poisoning), short-read or +%% length-then-data races deliver wrong values to the wrong caller — +%% the symptom captured in issue #63. With the fixes, every result +%% must equal x + 5. +threadpool_high_concurrency_test(_Config) -> + py:register_function(add_one, fun([X]) -> X + 1 end), + Code = << + "(lambda cf, erl: list(" + "cf.ThreadPoolExecutor(max_workers=8).__enter__().map(" + "lambda x: erl.call('add_one', erl.call('add_one', " + "erl.call('add_one', erl.call('add_one', erl.call('add_one', x))))), " + "range(200))))(__import__('concurrent.futures', " + "fromlist=['ThreadPoolExecutor']), __import__('erlang'))" + >>, + Expected = [X + 5 || X <- lists:seq(0, 199)], + lists:foreach( + fun(I) -> + {ok, Results} = py:eval(Code), + case Results of + Expected -> ok; + _ -> ct:fail({wrong_results_iter, I, Results}) + end + end, + lists:seq(1, 10)), + ok. + +%% @doc Stress the async thread-callback path (erlang.async_call from +%% Python). 50 concurrent invocations through a single +%% asyncio.gather; without the Phase 4a per-fd writer process or +%% Phase 4c resumable parser, frame-on-the-wire interleaving on the +%% shared async_callback_pipe corrupts results. +async_callback_concurrent_test(_Config) -> + py:register_function(async_double, fun([X]) -> X * 2 end), + %% Add the existing test/ directory to sys.path so the Python + %% helper module is importable. + TestDir = filename:join(code:lib_dir(erlang_python), "test"), + ok = py:exec(iolist_to_binary(io_lib:format( + "import sys; sys.path.insert(0, '~s')", [TestDir]))), + %% Define a Python coroutine that gathers 50 erlang.async_call + %% invocations and returns the result list. + ok = py:exec(<< + "import asyncio, erlang\n" + "async def _gather50():\n" + " return await asyncio.gather(*[\n" + " erlang.async_call('async_double', i)\n" + " for i in range(50)\n" + " ])\n" + "def _run_gather50():\n" + " return asyncio.run(_gather50())\n" + >>), + {ok, Results} = py:call('__main__', '_run_gather50', []), + Expected = [I * 2 || I <- lists:seq(0, 49)], + Expected = Results, + py:unregister_function(async_double), + ok. + +%% @doc Async callback with a payload larger than PIPE_BUF (4 KiB on +%% Linux, 512 on FreeBSD per POSIX minimum). Combined with concurrent +%% submissions, this exercises the looped non-blocking write under +%% the single-writer-process invariant: the kernel may chunk the +%% write but no other writer can interleave, so the resumable parser +%% on the Python side reassembles the frame intact. +async_callback_large_payload_test(_Config) -> + Size = 64 * 1024, + %% Printable ASCII only — the goal is wire-frame integrity across + %% the chunked write boundary, not arbitrary-byte encoding through + %% term_to_python_repr (which double-quotes binaries as Python + %% string literals). + Payload = list_to_binary([($a + (I rem 26)) || I <- lists:seq(1, Size)]), + py:register_function(async_blob, fun([_]) -> Payload end), + TestDir = filename:join(code:lib_dir(erlang_python), "test"), + ok = py:exec(iolist_to_binary(io_lib:format( + "import sys; sys.path.insert(0, '~s')", [TestDir]))), + ok = py:exec(<< + "import asyncio, erlang\n" + "async def _gather_blobs():\n" + " return await asyncio.gather(*[\n" + " erlang.async_call('async_blob', i)\n" + " for i in range(8)\n" + " ])\n" + "def _run_gather_blobs():\n" + " blobs = asyncio.run(_gather_blobs())\n" + " return [len(b) for b in blobs]\n" + >>), + {ok, Lengths} = py:call('__main__', '_run_gather_blobs', []), + Expected = lists:duplicate(8, Size), + Expected = Lengths, + py:unregister_function(async_blob), + ok. From 10cd7000a0e1e548045707a0c08ab87073e219f7 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 3 May 2026 12:19:53 +0200 Subject: [PATCH 2/6] Bound async-callback in-flight via per-pipe atomics counter The previous backpressure check looked only at the writer's mailbox length at submission time, so concurrent executors could complete after the check and still pile {respond,...} into the writer unbounded. The counter now tracks executors-plus-queued together: incremented at submission, decremented after the writer hands the frame to the NIF. --- src/py_thread_handler.erl | 90 ++++++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 35 deletions(-) diff --git a/src/py_thread_handler.erl b/src/py_thread_handler.erl index 51f7235..0bde98b 100644 --- a/src/py_thread_handler.erl +++ b/src/py_thread_handler.erl @@ -50,21 +50,24 @@ %% State: %% handlers — sync thread workers: WorkerId => {HandlerPid, WriteFd} -%% async_writers — async pipes: WriteFd => WriterPid (one writer process -%% per pipe; serialises py_nif:async_callback_response/3 -%% so frames cannot interleave on the wire). +%% async_writers — async pipes: WriteFd => {WriterPid, Counter} where +%% Counter is an atomics ref tracking in-flight async +%% callbacks (incremented at submission, decremented +%% after the writer hands the frame to the NIF). %% writer_fds — reverse map: WriterPid => WriteFd (fast lookup on %% 'DOWN' messages). -record(state, { handlers = #{} :: #{non_neg_integer() => {pid(), integer()}}, - async_writers = #{} :: #{integer() => pid()}, + async_writers = #{} :: #{integer() => {pid(), atomics:atomics_ref()}}, writer_fds = #{} :: #{pid() => integer()} }). -%% Bound on async-writer mailbox length. Submissions beyond this point -%% are failed inline (the writer serialises an error frame for the -%% callback) so a stalled Python event loop cannot grow the writer's -%% mailbox without bound. +%% Bound on combined async in-flight + writer-mailbox depth. The +%% atomics counter covers BOTH executors that have spawned but not +%% yet enqueued AND messages already sitting in the writer's mailbox, +%% so the bound holds even when many parallel callbacks complete +%% before the writer drains. Submissions beyond this point are failed +%% inline with an "async callback queue full" error frame. -define(ASYNC_WRITER_MAX_QUEUE, 10000). %%% ============================================================================ @@ -132,18 +135,25 @@ handle_info({thread_callback, WorkerId, CallbackId, FuncName, Args}, %% through a single writer process per WriteFd: callback execution %% stays parallel, only the pipe write is serialised. handle_info({async_callback, CallbackId, FuncName, Args, WriteFd}, State0) -> - {Writer, State1} = ensure_async_writer(WriteFd, State0), - case writer_overload(Writer) of - true -> - %% Backpressure: surface "queue full" as a normal error - %% frame on the pipe so the Future resolves with a clear - %% RuntimeError instead of growing the writer's mailbox - %% indefinitely. - Writer ! {overflow, CallbackId}; - false -> + {Writer, Counter, State1} = ensure_async_writer(WriteFd, State0), + %% Reserve a slot in the in-flight counter. atomics:add_get is + %% atomic and gives us the post-increment value, so the check is + %% race-free across submissions on different schedulers. + case atomics:add_get(Counter, 1, 1) of + N when N > ?ASYNC_WRITER_MAX_QUEUE -> + %% Already at the bound: undo the reservation and fail + %% this callback with a queue-full error frame. The + %% writer still serialises the error frame on the pipe + %% (so the Future resolves with a clear RuntimeError + %% rather than hanging), and we use the same atomics ref + %% so the queued overflow message stays accounted for + %% until the writer drains it. + atomics:sub(Counter, 1, 1), + Writer ! {overflow, CallbackId, Counter}; + _ -> spawn(fun() -> Response = run_async_callback(FuncName, Args), - Writer ! {respond, CallbackId, Response} + Writer ! {respond, CallbackId, Response, Counter} end) end, {noreply, State1}; @@ -261,36 +271,46 @@ run_async_callback(FuncName, Args) -> %% Look up or spawn the async writer for a given WriteFd. Writers are %% monitored (not linked) so a writer death does not take the -%% coordinator down; the 'DOWN' clause cleans up the maps. +%% coordinator down; the 'DOWN' clause cleans up the maps. Each +%% writer gets its own atomics counter that bounds total in-flight +%% async callbacks (executors + queued responses) on that pipe. ensure_async_writer(WriteFd, #state{async_writers = Writers, writer_fds = WriterFds} = State) -> case maps:get(WriteFd, Writers, undefined) of undefined -> + Counter = atomics:new(1, [{signed, false}]), Writer = spawn(fun() -> async_writer_loop(WriteFd) end), erlang:monitor(process, Writer), - {Writer, State#state{ - async_writers = Writers#{WriteFd => Writer}, + {Writer, Counter, State#state{ + async_writers = Writers#{WriteFd => {Writer, Counter}}, writer_fds = WriterFds#{Writer => WriteFd}}}; - Existing -> - {Existing, State} - end. - -%% Backpressure check: bound the writer's mailbox. -writer_overload(Writer) -> - case erlang:process_info(Writer, message_queue_len) of - {message_queue_len, N} when N >= ?ASYNC_WRITER_MAX_QUEUE -> true; - _ -> false + {Writer, Counter} -> + {Writer, Counter, State} end. -%% Per-fd writer process. Owns one async pipe write fd; its mailbox is -%% the implicit serialisation point for all responses on that fd. +%% Per-fd writer process. Owns one async pipe write fd and is the only +%% process that calls py_nif:async_callback_response/3 for it; its +%% mailbox is the serialisation point for all responses on that fd. +%% +%% The {respond, ...} message carries the per-pipe atomics counter +%% from ensure_async_writer/2. The writer ALWAYS decrements after +%% handing the frame to the NIF (success or error) so the in-flight +%% bound is honoured even when the underlying write fails. {overflow, +%% ...} messages do not carry the counter — the gen_server already +%% un-reserved before sending — and the writer just emits the +%% error frame. +%% %% Exits on hard write errors so the coordinator's 'DOWN' clause can -%% drop the entry; the next callback respawns it. +%% drop the entry; the next callback respawns the writer (with a +%% fresh counter, so any leaked count from the previous instance is +%% discarded with the dead writer). async_writer_loop(WriteFd) -> receive - {respond, CallbackId, Response} -> - case py_nif:async_callback_response(WriteFd, CallbackId, Response) of + {respond, CallbackId, Response, Counter} -> + R = py_nif:async_callback_response(WriteFd, CallbackId, Response), + atomics:sub(Counter, 1, 1), + case R of ok -> async_writer_loop(WriteFd); {error, _} = Err -> exit({async_callback_response_failed, Err}) From 39c14d257101a86f84715e7f5ed20b36a63b9943 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 3 May 2026 12:23:25 +0200 Subject: [PATCH 3/6] Drain async pipe after pipe_broken to stop reader busy-fire MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reader returned -1 immediately when pipe_broken was set, but left bytes in the fd, so asyncio kept firing the callback while the Erlang writer was still draining its mailbox into the pipe. Discard the bytes and return 0 so the wrapper's while loop exits cleanly and the fd goes quiet once the writer times out. Also document that recovery is fail-loud only — there is no event-loop reference stored and no re-registration path to fail. --- c_src/py_callback.c | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index 20bcc8e..8e981fa 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2200,7 +2200,31 @@ extern bool g_has_thread_coordinator; * (process_async_callback_response) is event-driven from the asyncio * loop and the read end is O_NONBLOCK; partial frames are buffered * across invocations in hdr_buf/body_buf so the reader resumes on the - * next loop tick when more data arrives. */ + * next loop tick when more data arrives. + * + * == Recovery on hard read error == + * + * `pipe_broken` is set when the reader hits an unrecoverable error + * (EIO/EBADF/EOF mid-frame). The behaviour is fail-loud, no rebuild: + * + * 1. Pending futures are failed with RuntimeError("async callback + * pipe broken") (see async_pipe_break_and_fail_pending). + * 2. New erlang.async_call invocations short-circuit with the same + * error before reaching the pipe (see the gate in + * send_async_callback_message). + * 3. The reader keeps draining and discarding any bytes Erlang + * still writes so the asyncio loop's fd-readable callback does + * not busy-fire. Once Erlang's writer process hits its NIF + * write timeout (30s) and dies, the gen_server reaps it via + * 'DOWN'; from that point the fd is quiet. + * 4. The asyncio reader registration stays in place — there is no + * reference to the loop stored in this struct, no + * remove_reader call, and no `async_pipe_renewed` protocol. + * Recovery requires restarting the erlang_python application + * (or the interpreter) so a fresh state struct is created. + * + * No existing event-loop reference is mutated; there is no + * re-registration path that could fail. */ typedef struct { int async_callback_pipe[2]; /* [0]=read, [1]=write - per-interpreter pipe */ PyObject *async_pending_futures; /* Dict: callback_id -> Future */ @@ -2429,7 +2453,20 @@ static int process_async_callback_response(void) { return -1; } if (state->pipe_broken) { - return -1; + /* Drain and discard whatever Erlang still writes so the + * asyncio loop's fd-readable callback stops firing once the + * writer process times out and dies. Returning 0 (instead of + * -1) lets async_callback_reader's `while(... > 0)` loop + * exit cleanly without flagging an error. */ + char scratch[256]; + for (;;) { + ssize_t n = read(state->async_callback_pipe[0], + scratch, sizeof(scratch)); + if (n > 0) continue; + if (n < 0 && errno == EINTR) continue; + break; + } + return 0; } /* Stage 1: header (12 bytes). */ From 3c56eceee85e78400cb9caf50b137747fbd5dcb1 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 3 May 2026 12:33:58 +0200 Subject: [PATCH 4/6] Link async writers to coordinator instead of monitoring Monitored writers survived a coordinator crash, leaking the async pipe fd until the OS reclaimed it. spawn_link from the coordinator (which traps exits) propagates the EXIT signal in both directions: a writer death surfaces in the existing trap_exit clause for cleanup, and a coordinator crash now takes the writer with it so supervision can restart from a clean slate. --- src/py_thread_handler.erl | 52 ++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/src/py_thread_handler.erl b/src/py_thread_handler.erl index 0bde98b..343e4df 100644 --- a/src/py_thread_handler.erl +++ b/src/py_thread_handler.erl @@ -158,24 +158,26 @@ handle_info({async_callback, CallbackId, FuncName, Args, WriteFd}, State0) -> end, {noreply, State1}; -%% Handle linked sync handler exit. -handle_info({'EXIT', Pid, _Reason}, #state{handlers = Handlers} = State) -> +%% Linked-process exit: covers both sync handlers and async writers. +%% Both are spawn_link'd so a coordinator crash takes them with it +%% (no leaked writer holding the async pipe fd after the gen_server +%% restarts). When THEY die independently we trap the EXIT here and +%% remove the dead pid from whichever map it lives in. +handle_info({'EXIT', Pid, _Reason}, + #state{handlers = Handlers, + async_writers = Writers, + writer_fds = WriterFds} = State) -> NewHandlers = maps:filter( fun(_WorkerId, {HandlerPid, _Fd}) -> HandlerPid =/= Pid end, Handlers), - {noreply, State#state{handlers = NewHandlers}}; - -%% Handle monitored async writer exit (writer hits a hard write error). -handle_info({'DOWN', _MRef, process, Pid, _Reason}, - #state{async_writers = Writers, - writer_fds = WriterFds} = State) -> case maps:take(Pid, WriterFds) of {WriteFd, NewWriterFds} -> NewWriters = maps:remove(WriteFd, Writers), - {noreply, State#state{async_writers = NewWriters, + {noreply, State#state{handlers = NewHandlers, + async_writers = NewWriters, writer_fds = NewWriterFds}}; error -> - {noreply, State} + {noreply, State#state{handlers = NewHandlers}} end; handle_info(_Info, State) -> @@ -270,18 +272,22 @@ run_async_callback(FuncName, Args) -> end. %% Look up or spawn the async writer for a given WriteFd. Writers are -%% monitored (not linked) so a writer death does not take the -%% coordinator down; the 'DOWN' clause cleans up the maps. Each -%% writer gets its own atomics counter that bounds total in-flight -%% async callbacks (executors + queued responses) on that pipe. +%% spawn_link'd from the coordinator (which traps exits): a writer +%% death surfaces as `{'EXIT', WriterPid, _}' for cleanup, and a +%% coordinator crash propagates the EXIT signal back to the writer +%% (which does not trap) so it dies with the gen_server instead of +%% leaking the pipe fd. Supervision then restarts the gen_server with +%% an empty `async_writers' map and writers are respawned lazily on +%% the next callback. Each writer gets its own atomics counter that +%% bounds total in-flight async callbacks (executors + queued +%% responses) on that pipe. ensure_async_writer(WriteFd, #state{async_writers = Writers, writer_fds = WriterFds} = State) -> case maps:get(WriteFd, Writers, undefined) of undefined -> Counter = atomics:new(1, [{signed, false}]), - Writer = spawn(fun() -> async_writer_loop(WriteFd) end), - erlang:monitor(process, Writer), + Writer = spawn_link(fun() -> async_writer_loop(WriteFd) end), {Writer, Counter, State#state{ async_writers = Writers#{WriteFd => {Writer, Counter}}, writer_fds = WriterFds#{Writer => WriteFd}}}; @@ -301,10 +307,16 @@ ensure_async_writer(WriteFd, %% un-reserved before sending — and the writer just emits the %% error frame. %% -%% Exits on hard write errors so the coordinator's 'DOWN' clause can -%% drop the entry; the next callback respawns the writer (with a -%% fresh counter, so any leaked count from the previous instance is -%% discarded with the dead writer). +%% Lifecycle: +%% - spawn_link'd from the coordinator: a coordinator crash +%% propagates an unhandled EXIT here (we do NOT trap_exit) and +%% the writer dies with it, releasing the fd before supervision +%% restarts the gen_server. +%% - On hard write errors the writer exits normally. The +%% coordinator's trap_exit clause catches the EXIT, drops the +%% map entry, and the next callback respawns the writer with a +%% fresh atomics counter so any leaked count from the previous +%% instance is discarded with the dead writer. async_writer_loop(WriteFd) -> receive {respond, CallbackId, Response, Counter} -> From e368ab8cac952c63a424545f727b761a2ae5392e Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 3 May 2026 12:39:35 +0200 Subject: [PATCH 5/6] Document the async_futures_mutex no-Python-call invariant All three callsites already snapshot or modify the dict under the lock and then call set_result / set_exception with the lock released, but the rule was only spelled out next to one of them. Pin it on the struct definition so the next contributor cannot put a future method back under the mutex and reintroduce the deadlock / re-entry hazard. --- c_src/py_callback.c | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index 8e981fa..a8d0589 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2228,6 +2228,29 @@ extern bool g_has_thread_coordinator; typedef struct { int async_callback_pipe[2]; /* [0]=read, [1]=write - per-interpreter pipe */ PyObject *async_pending_futures; /* Dict: callback_id -> Future */ + /* + * async_futures_mutex protects async_pending_futures. + * + * INVARIANT: never call a Future method (set_result, set_exception, + * cancel, ...) while holding this lock. set_exception in particular + * can run done-callbacks that re-enter user code, which may then + * call back into erlang.* — and any path that takes this same mutex + * would deadlock or expose a partially-mutated dict. + * + * The supported pattern at every call site is: + * 1. lock + * 2. allocate keys, snapshot/transfer ownership of futures + * (PyDict_Items + INCREF; or PyDict_GetItem + INCREF + DelItem), + * clear/modify the dict + * 3. unlock + * 4. THEN call set_result / set_exception on the snapshotted + * futures and release the snapshot's references. + * + * See process_async_callback_response and + * async_pipe_break_and_fail_pending for the two read sides; + * register_async_future is the write side and only does a dict + * insert under the lock. + */ pthread_mutex_t async_futures_mutex; bool pipe_initialized; From f4e7194bc29d457d62b8845ea8c9f183a12fc9fe Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 3 May 2026 12:57:56 +0200 Subject: [PATCH 6/6] Free poisoned thread workers instead of leaking them The previous code marked workers poisoned and skipped them in acquire_thread_worker but kept the struct on g_thread_pool_head until NIF unload, so a hot loop of synchronisation failures grew runtime memory linearly. Unlink under g_thread_pool_mutex and free immediately; the lifetime counter still bumps so the diagnostic ceiling and stderr warning fire. Caller now clears tl_thread_worker and the pthread key BEFORE poisoning so no dangling references can survive the free. --- c_src/py_thread_worker.c | 63 ++++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/c_src/py_thread_worker.c b/c_src/py_thread_worker.c index c099b62..d8a3e37 100644 --- a/c_src/py_thread_worker.c +++ b/c_src/py_thread_worker.c @@ -383,19 +383,30 @@ static thread_worker_t *acquire_thread_worker(void) { * * Called by thread_worker_call when the read path cannot guarantee * frame integrity (id mismatch + corrupted payload, partial-frame - * timeout, EOF mid-frame). Closes both pipe ends so any further write - * by the existing handler fails loudly, marks the worker as out of - * service, and bumps the global counter for diagnostics. - * - * @note Caller must clear tl_thread_worker before returning to Python so - * the next call from this thread acquires a fresh worker. + * timeout, EOF mid-frame). The worker is unlinked from the pool, + * its pipes are closed (so the Erlang handler's next write fails and + * the gen_server's trap_exit clause reaps it), its mutex is + * destroyed, and the struct itself is freed — so a hot loop of + * synchronisation failures cannot grow runtime memory until NIF + * unload. + * + * The lifetime counter g_poisoned_workers_count still increments + * monotonically so the diagnostic ceiling + * (MAX_POISONED_WORKERS) and stderr warning still fire even when + * structs are reclaimed. + * + * @note Caller MUST clear tl_thread_worker (and the pthread-key + * binding) BEFORE calling this function. Once the worker is + * unlinked it cannot be referenced again — the pointer is + * freed and any later access is use-after-free. */ static void poison_thread_worker(thread_worker_t *tw) { + /* Mark + unlink atomically under the pool lock so a racing + * acquire_thread_worker either runs before us (sees a healthy + * worker that we then unlink, but never returns it again) or + * after us (cannot find tw in the list). */ + pthread_mutex_lock(&g_thread_pool_mutex); pthread_mutex_lock(&tw->mutex); - if (tw->poisoned) { - pthread_mutex_unlock(&tw->mutex); - return; - } tw->poisoned = true; tw->in_use = false; int read_fd = tw->response_pipe[0]; @@ -404,19 +415,38 @@ static void poison_thread_worker(thread_worker_t *tw) { tw->response_pipe[1] = -1; pthread_mutex_unlock(&tw->mutex); + if (g_thread_pool_head == tw) { + g_thread_pool_head = tw->next; + } else { + thread_worker_t *prev = g_thread_pool_head; + while (prev != NULL && prev->next != tw) { + prev = prev->next; + } + if (prev != NULL) { + prev->next = tw->next; + } + } + pthread_mutex_unlock(&g_thread_pool_mutex); + + /* No other thread can reach tw now: not via the pool list, not + * via tl_thread_worker (caller cleared it), not via the pthread + * key (caller cleared it). Safe to destroy and free. */ if (read_fd >= 0) close(read_fd); if (write_fd >= 0) close(write_fd); + pthread_mutex_destroy(&tw->mutex); - uint64_t prev = atomic_fetch_add(&g_poisoned_workers_count, 1); - if (prev + 1 >= MAX_POISONED_WORKERS) { + uint64_t prev_count = atomic_fetch_add(&g_poisoned_workers_count, 1); + if (prev_count + 1 >= MAX_POISONED_WORKERS) { enif_fprintf(stderr, "[erlang_python] WARNING: %llu thread-callback workers have been " "poisoned. The thread-callback subsystem is unhealthy; please " "file a bug report with the failing test case.\n", - (unsigned long long)(prev + 1)); + (unsigned long long)(prev_count + 1)); } PY_THREAD_CB_LOG("poison tw_id=%llu count=%llu", - (unsigned long long)tw->id, (unsigned long long)(prev + 1)); + (unsigned long long)tw->id, (unsigned long long)(prev_count + 1)); + + enif_free(tw); } /* ============================================================================ @@ -604,11 +634,14 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, PY_THREAD_CB_LOG("desync tw_id=%llu cb_id=%llu errno=%d", (unsigned long long)tw->id, (unsigned long long)expected_id, errno); - poison_thread_worker(tw); + /* poison_thread_worker frees `tw`; clear every reference to + * it BEFORE calling so the pthread-key destructor cannot + * later observe a dangling pointer. */ tl_thread_worker = NULL; if (g_thread_worker_key_created) { pthread_setspecific(g_thread_worker_key, NULL); } + poison_thread_worker(tw); PyErr_SetString(PyExc_RuntimeError, "callback synchronisation lost; retry"); return NULL;