diff --git a/c_src/CMakeLists.txt b/c_src/CMakeLists.txt index 948d79a..37782d5 100644 --- a/c_src/CMakeLists.txt +++ b/c_src/CMakeLists.txt @@ -55,6 +55,15 @@ option(ENABLE_ASAN "Enable AddressSanitizer" OFF) option(ENABLE_TSAN "Enable ThreadSanitizer" OFF) option(ENABLE_UBSAN "Enable UndefinedBehaviorSanitizer" OFF) +# Optional thread-callback I/O tracing. Compiled out by default; turn on +# locally to debug py_thread_callback_SUITE-style flakiness. Logs one +# line per send/receive on stderr. +option(ENABLE_PY_THREAD_CB_TRACE "Trace thread-callback I/O on stderr" OFF) +if(ENABLE_PY_THREAD_CB_TRACE) + message(STATUS "Thread-callback I/O tracing enabled") + add_compile_definitions(PY_THREAD_CB_TRACE=1) +endif() + if(ENABLE_ASAN) message(STATUS "AddressSanitizer enabled") add_compile_options(-fsanitize=address -fno-omit-frame-pointer -g -O1) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index ab91cb6..a8d0589 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2195,12 +2195,80 @@ extern ErlNifPid g_thread_coordinator_pid; extern bool g_has_thread_coordinator; /* Per-interpreter module state for async callbacks. - * Each subinterpreter gets its own pipe and futures dict. */ + * + * Each subinterpreter gets its own pipe and futures dict. The reader + * (process_async_callback_response) is event-driven from the asyncio + * loop and the read end is O_NONBLOCK; partial frames are buffered + * across invocations in hdr_buf/body_buf so the reader resumes on the + * next loop tick when more data arrives. + * + * == Recovery on hard read error == + * + * `pipe_broken` is set when the reader hits an unrecoverable error + * (EIO/EBADF/EOF mid-frame). The behaviour is fail-loud, no rebuild: + * + * 1. Pending futures are failed with RuntimeError("async callback + * pipe broken") (see async_pipe_break_and_fail_pending). + * 2. New erlang.async_call invocations short-circuit with the same + * error before reaching the pipe (see the gate in + * send_async_callback_message). + * 3. The reader keeps draining and discarding any bytes Erlang + * still writes so the asyncio loop's fd-readable callback does + * not busy-fire. Once Erlang's writer process hits its NIF + * write timeout (30s) and dies, the gen_server reaps it via + * 'DOWN'; from that point the fd is quiet. + * 4. The asyncio reader registration stays in place — there is no + * reference to the loop stored in this struct, no + * remove_reader call, and no `async_pipe_renewed` protocol. + * Recovery requires restarting the erlang_python application + * (or the interpreter) so a fresh state struct is created. + * + * No existing event-loop reference is mutated; there is no + * re-registration path that could fail. */ typedef struct { int async_callback_pipe[2]; /* [0]=read, [1]=write - per-interpreter pipe */ PyObject *async_pending_futures; /* Dict: callback_id -> Future */ + /* + * async_futures_mutex protects async_pending_futures. + * + * INVARIANT: never call a Future method (set_result, set_exception, + * cancel, ...) while holding this lock. set_exception in particular + * can run done-callbacks that re-enter user code, which may then + * call back into erlang.* — and any path that takes this same mutex + * would deadlock or expose a partially-mutated dict. + * + * The supported pattern at every call site is: + * 1. lock + * 2. allocate keys, snapshot/transfer ownership of futures + * (PyDict_Items + INCREF; or PyDict_GetItem + INCREF + DelItem), + * clear/modify the dict + * 3. unlock + * 4. THEN call set_result / set_exception on the snapshotted + * futures and release the snapshot's references. + * + * See process_async_callback_response and + * async_pipe_break_and_fail_pending for the two read sides; + * register_async_future is the write side and only does a dict + * insert under the lock. + */ pthread_mutex_t async_futures_mutex; bool pipe_initialized; + + /* Set on hard read error (EIO/EBADF/EOF mid-frame). New + * erlang.async_call invocations short-circuit with a clear + * RuntimeError; pending futures are failed with the same error. */ + bool pipe_broken; + + /* Resumable frame parser state. The wire format is + * <> + * The header lives in a fixed buffer; the body is heap-allocated + * once the header arrives. EAGAIN preserves all of these so the + * next reader tick continues from the same offset. */ + uint8_t hdr_buf[12]; /* 8-byte id + 4-byte len */ + size_t hdr_have; /* 0..12 */ + char *body_buf; /* allocated once header parsed */ + uint32_t body_len; /* parsed from header */ + size_t body_have; /* 0..body_len */ } erlang_module_state_t; /* Forward declaration for module state accessor */ @@ -2254,10 +2322,17 @@ static int async_callback_init(void) { return -1; } - /* Set the read end to non-blocking for asyncio compatibility */ - int flags = fcntl(state->async_callback_pipe[0], F_GETFL, 0); - if (flags >= 0) { - fcntl(state->async_callback_pipe[0], F_SETFL, flags | O_NONBLOCK); + /* Set the read end non-blocking for asyncio compatibility, and + * the write end non-blocking so the dirty-IO NIF write path can + * timeout instead of pinning a scheduler on a stalled Python + * reader. */ + int rflags = fcntl(state->async_callback_pipe[0], F_GETFL, 0); + if (rflags >= 0) { + fcntl(state->async_callback_pipe[0], F_SETFL, rflags | O_NONBLOCK); + } + int wflags = fcntl(state->async_callback_pipe[1], F_GETFL, 0); + if (wflags >= 0) { + fcntl(state->async_callback_pipe[1], F_SETFL, wflags | O_NONBLOCK); } state->async_pending_futures = PyDict_New(); @@ -2276,119 +2351,245 @@ static int async_callback_init(void) { } /** - * Process a single async callback response from the pipe. - * Called by the asyncio reader callback. - * Returns: 1 if processed, 0 if no data, -1 on error + * Resolve a Future with a result or, if @p result is NULL, with the + * current Python exception (or a generic RuntimeError fallback). + * Borrows @p future; caller still owns its reference. */ -static int process_async_callback_response(void) { - erlang_module_state_t *state = get_erlang_module_state(); - if (state == NULL || !state->pipe_initialized) { - return -1; +static void resolve_future_with_result(PyObject *future, PyObject *result) { + if (result != NULL) { + PyObject *set_result = PyObject_GetAttrString(future, "set_result"); + if (set_result != NULL) { + PyObject *ret = PyObject_CallFunctionObjArgs(set_result, result, NULL); + Py_XDECREF(ret); + Py_DECREF(set_result); + } + return; } - /* Read callback_id (8 bytes) + response_len (4 bytes) + response_data */ - uint64_t callback_id; - uint32_t response_len; - ssize_t n; + /* Result was NULL: carry the Python exception over to the future. */ + PyObject *exc_type = NULL, *exc_value = NULL, *exc_tb = NULL; + PyErr_Fetch(&exc_type, &exc_value, &exc_tb); - n = read(state->async_callback_pipe[0], &callback_id, sizeof(callback_id)); - if (n < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return 0; /* No data available (non-blocking) */ + PyObject *set_exception = PyObject_GetAttrString(future, "set_exception"); + if (set_exception != NULL) { + if (exc_value != NULL) { + PyObject *ret = PyObject_CallFunctionObjArgs(set_exception, exc_value, NULL); + Py_XDECREF(ret); + } else { + PyObject *runtime_err = PyObject_CallFunction( + PyExc_RuntimeError, "s", "Erlang callback failed"); + PyObject *ret = PyObject_CallFunctionObjArgs( + set_exception, runtime_err, NULL); + Py_XDECREF(ret); + Py_XDECREF(runtime_err); } - return -1; /* Error */ + Py_DECREF(set_exception); } - if (n == 0) { - return 0; /* EOF / No data */ + + Py_XDECREF(exc_type); + Py_XDECREF(exc_value); + Py_XDECREF(exc_tb); + PyErr_Clear(); +} + +/** + * Mark the async pipe broken and fail every pending future with + * RuntimeError("async callback pipe broken"). + * + * Mutex policy: snapshot the futures dict under + * async_futures_mutex (incref each future, then clear the dict), + * release the mutex, then call set_exception on the snapshotted + * list. Calling Python methods while holding the mutex is forbidden + * because future callbacks may re-enter any code path that takes + * the same mutex. + */ +static void async_pipe_break_and_fail_pending(erlang_module_state_t *state) { + if (state->pipe_broken) { + return; + } + + /* Reset frame buffer; any in-flight body is unrecoverable. */ + state->hdr_have = 0; + if (state->body_buf != NULL) { + enif_free(state->body_buf); + state->body_buf = NULL; + } + state->body_have = 0; + state->body_len = 0; + state->pipe_broken = true; + + PyObject *items = NULL; + pthread_mutex_lock(&state->async_futures_mutex); + if (state->async_pending_futures != NULL) { + items = PyDict_Items(state->async_pending_futures); + if (items != NULL) { + Py_ssize_t n = PyList_GET_SIZE(items); + for (Py_ssize_t i = 0; i < n; i++) { + PyObject *pair = PyList_GET_ITEM(items, i); + PyObject *fut = PyTuple_GET_ITEM(pair, 1); + Py_INCREF(fut); /* survive the dict clear */ + } + PyDict_Clear(state->async_pending_futures); + } + } + pthread_mutex_unlock(&state->async_futures_mutex); + + if (items == NULL) { + return; + } + + Py_ssize_t n = PyList_GET_SIZE(items); + for (Py_ssize_t i = 0; i < n; i++) { + PyObject *pair = PyList_GET_ITEM(items, i); + PyObject *fut = PyTuple_GET_ITEM(pair, 1); /* borrowed */ + PyObject *exc = PyObject_CallFunction( + PyExc_RuntimeError, "s", "async callback pipe broken"); + PyObject *set_exception = PyObject_GetAttrString(fut, "set_exception"); + if (set_exception != NULL && exc != NULL) { + PyObject *ret = PyObject_CallFunctionObjArgs(set_exception, exc, NULL); + Py_XDECREF(ret); + } + Py_XDECREF(set_exception); + Py_XDECREF(exc); + Py_DECREF(fut); /* match the INCREF in the snapshot loop */ + } + Py_DECREF(items); +} + +/** + * Resumable nonblocking parser of the async-callback pipe. + * + * Wire format (matches nif_async_callback_response in py_thread_worker.c): + * <> + * + * Each invocation reads what the kernel will give without blocking and + * advances the parser state held in @c erlang_module_state_t. EAGAIN + * preserves the state for the next reader tick from the asyncio loop. + * + * Returns 1 when a frame was completed, 0 when no progress was made + * (EAGAIN at a frame boundary), -1 on hard error after marking the + * pipe broken and failing pending futures. + */ +static int process_async_callback_response(void) { + erlang_module_state_t *state = get_erlang_module_state(); + if (state == NULL || !state->pipe_initialized) { + return -1; } - if (n != sizeof(callback_id)) { - return -1; /* Partial read - error */ + if (state->pipe_broken) { + /* Drain and discard whatever Erlang still writes so the + * asyncio loop's fd-readable callback stops firing once the + * writer process times out and dies. Returning 0 (instead of + * -1) lets async_callback_reader's `while(... > 0)` loop + * exit cleanly without flagging an error. */ + char scratch[256]; + for (;;) { + ssize_t n = read(state->async_callback_pipe[0], + scratch, sizeof(scratch)); + if (n > 0) continue; + if (n < 0 && errno == EINTR) continue; + break; + } + return 0; } - n = read(state->async_callback_pipe[0], &response_len, sizeof(response_len)); - if (n != sizeof(response_len)) { + /* Stage 1: header (12 bytes). */ + while (state->hdr_have < sizeof(state->hdr_buf)) { + ssize_t n = read(state->async_callback_pipe[0], + state->hdr_buf + state->hdr_have, + sizeof(state->hdr_buf) - state->hdr_have); + if (n > 0) { + state->hdr_have += (size_t)n; + continue; + } + if (n < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) return 0; + async_pipe_break_and_fail_pending(state); + return -1; + } + /* n == 0 (EOF). Clean only at a frame boundary. */ + if (state->hdr_have == 0) return 0; + async_pipe_break_and_fail_pending(state); return -1; } - char *response_data = NULL; - if (response_len > 0) { - response_data = enif_alloc(response_len); - if (response_data == NULL) { + uint64_t callback_id; + uint32_t body_len; + memcpy(&callback_id, state->hdr_buf, sizeof(callback_id)); + memcpy(&body_len, state->hdr_buf + sizeof(callback_id), sizeof(body_len)); + + if (state->body_buf == NULL && body_len > 0) { + state->body_buf = enif_alloc(body_len); + if (state->body_buf == NULL) { + async_pipe_break_and_fail_pending(state); return -1; } - n = read(state->async_callback_pipe[0], response_data, response_len); - if (n != (ssize_t)response_len) { - enif_free(response_data); + state->body_have = 0; + } + state->body_len = body_len; + + /* Stage 2: body (body_len bytes). */ + while (state->body_have < state->body_len) { + ssize_t n = read(state->async_callback_pipe[0], + state->body_buf + state->body_have, + state->body_len - state->body_have); + if (n > 0) { + state->body_have += (size_t)n; + continue; + } + if (n < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) return 0; + async_pipe_break_and_fail_pending(state); return -1; } + /* n == 0 mid-body is always an error. */ + async_pipe_break_and_fail_pending(state); + return -1; } - /* Look up and resolve the Future */ - pthread_mutex_lock(&state->async_futures_mutex); + /* Detach frame buffers and reset parser state BEFORE any Python + * re-entry. A recursive call from set_result / GC will then see a + * clean parser. */ + char *body = state->body_buf; + uint32_t blen = state->body_len; + state->hdr_have = 0; + state->body_buf = NULL; + state->body_have = 0; + state->body_len = 0; + /* Look up the future; copy it out under the mutex, resolve + * outside (review #4). */ + PyObject *future = NULL; + pthread_mutex_lock(&state->async_futures_mutex); PyObject *key = PyLong_FromUnsignedLongLong(callback_id); - PyObject *future = PyDict_GetItem(state->async_pending_futures, key); - - if (future != NULL) { - Py_INCREF(future); /* Keep reference while we use it */ - PyDict_DelItem(state->async_pending_futures, key); + if (key != NULL && state->async_pending_futures != NULL) { + PyObject *found = PyDict_GetItem(state->async_pending_futures, key); + if (found != NULL) { + Py_INCREF(found); + PyDict_DelItem(state->async_pending_futures, key); + future = found; + } } - Py_DECREF(key); - + Py_XDECREF(key); pthread_mutex_unlock(&state->async_futures_mutex); if (future != NULL) { - /* Parse response and resolve Future */ PyObject *result = NULL; - if (response_data != NULL) { - result = parse_callback_response((unsigned char *)response_data, response_len); + if (blen > 0) { + result = parse_callback_response((unsigned char *)body, blen); } else { Py_INCREF(Py_None); result = Py_None; } - - if (result != NULL) { - /* Call future.set_result(result) */ - PyObject *set_result = PyObject_GetAttrString(future, "set_result"); - if (set_result != NULL) { - PyObject *ret = PyObject_CallFunctionObjArgs(set_result, result, NULL); - Py_XDECREF(ret); - Py_DECREF(set_result); - } - Py_DECREF(result); - } else { - /* Error occurred - set exception on Future */ - PyObject *exc_type, *exc_value, *exc_tb; - PyErr_Fetch(&exc_type, &exc_value, &exc_tb); - - PyObject *set_exception = PyObject_GetAttrString(future, "set_exception"); - if (set_exception != NULL) { - if (exc_value != NULL) { - PyObject *ret = PyObject_CallFunctionObjArgs(set_exception, exc_value, NULL); - Py_XDECREF(ret); - } else { - PyObject *runtime_err = PyObject_CallFunction(PyExc_RuntimeError, - "s", "Erlang callback failed"); - PyObject *ret = PyObject_CallFunctionObjArgs(set_exception, runtime_err, NULL); - Py_XDECREF(ret); - Py_XDECREF(runtime_err); - } - Py_DECREF(set_exception); - } - - Py_XDECREF(exc_type); - Py_XDECREF(exc_value); - Py_XDECREF(exc_tb); - PyErr_Clear(); - } - + resolve_future_with_result(future, result); + Py_XDECREF(result); Py_DECREF(future); } - if (response_data != NULL) { - enif_free(response_data); + if (body != NULL) { + enif_free(body); } - return 1; } @@ -2474,6 +2675,12 @@ static PyObject *send_async_callback_request(PyObject *self, PyObject *args) { PyErr_SetString(PyExc_RuntimeError, "Async callback system not initialized"); return NULL; } + if (state->pipe_broken) { + PyErr_SetString(PyExc_RuntimeError, + "async callback pipe broken; restart the erlang_python " + "application to recover"); + return NULL; + } /* Generate callback ID */ uint64_t callback_id = atomic_fetch_add(&g_callback_id_counter, 1); diff --git a/c_src/py_nif.c b/c_src/py_nif.c index f81fd16..a2e638c 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -7801,13 +7801,19 @@ static ErlNifFunc nif_funcs[] = { /* Execution mode info */ {"execution_mode", 0, nif_execution_mode, 0}, - /* Thread worker support (ThreadPoolExecutor) */ + /* Thread worker support (ThreadPoolExecutor). + * Writes are ERL_NIF_DIRTY_JOB_IO_BOUND because the response pipe + * has a non-blocking write end and the looped write may briefly + * wait for write-readiness when the Python reader is slow. */ {"thread_worker_set_coordinator", 1, nif_thread_worker_set_coordinator, 0}, - {"thread_worker_write", 2, nif_thread_worker_write, 0}, - {"thread_worker_signal_ready", 1, nif_thread_worker_signal_ready, 0}, - - /* Async callback support (for erlang.async_call) */ - {"async_callback_response", 3, nif_async_callback_response, 0}, + {"thread_worker_write_with_id", 3, nif_thread_worker_write_with_id, + ERL_NIF_DIRTY_JOB_IO_BOUND}, + {"thread_worker_signal_ready", 1, nif_thread_worker_signal_ready, 0}, + + /* Async callback support (for erlang.async_call). Same dirty-IO + * rationale as thread_worker_write_with_id above. */ + {"async_callback_response", 3, nif_async_callback_response, + ERL_NIF_DIRTY_JOB_IO_BOUND}, /* Callback name registry (prevents torch introspection issues) */ {"register_callback_name", 1, nif_register_callback_name, 0}, diff --git a/c_src/py_nif.h b/c_src/py_nif.h index e353fd5..066024e 100644 --- a/c_src/py_nif.h +++ b/c_src/py_nif.h @@ -1683,62 +1683,108 @@ static char *binary_to_string(const ErlNifBinary *bin) { } /** - * @brief Read from a file descriptor with optional timeout + * @brief Read exactly @p count bytes from a file descriptor with a deadline. * - * Uses select() to implement a timeout on blocking read operations. - * This prevents indefinite blocking on pipe reads. + * Loops on partial reads and EINTR until @p count bytes are received or + * the deadline expires. Uses a monotonic deadline (not a per-call + * timeout) so retries cannot extend the wait indefinitely. * * @param fd File descriptor to read from * @param buf Buffer to read into * @param count Number of bytes to read - * @param timeout_ms Timeout in milliseconds (0 = no timeout, wait indefinitely) + * @param timeout_ms Total timeout in milliseconds (0 = no timeout) * - * @return Number of bytes read on success, 0 on timeout, -1 on error + * @return Bytes read so far on timeout/EOF (0 to count), -1 on hard error. + * A return value < count with errno == ETIMEDOUT signals timeout; + * < count with errno == 0 signals clean EOF. * - * @note On timeout, errno is set to ETIMEDOUT + * @note Callers MUST treat any return < count as a desynchronised pipe. + * There is no in-band recovery: the leftover bytes (if any) of a + * partial frame stay in the pipe. */ static ssize_t read_with_timeout(int fd, void *buf, size_t count, int timeout_ms) { - if (timeout_ms > 0) { - struct timeval tv; - tv.tv_sec = timeout_ms / 1000; - tv.tv_usec = (timeout_ms % 1000) * 1000; - - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - - int ret = select(fd + 1, &fds, NULL, NULL, &tv); - if (ret < 0) { - return -1; /* select error */ - } - if (ret == 0) { - errno = ETIMEDOUT; - return 0; /* timeout */ + char *p = buf; + size_t got = 0; + struct timespec deadline = {0}; + bool have_deadline = (timeout_ms > 0); + + if (have_deadline) { + clock_gettime(CLOCK_MONOTONIC, &deadline); + deadline.tv_sec += timeout_ms / 1000; + deadline.tv_nsec += (long)(timeout_ms % 1000) * 1000000L; + if (deadline.tv_nsec >= 1000000000L) { + deadline.tv_sec += 1; + deadline.tv_nsec -= 1000000000L; } } - return read(fd, buf, count); + while (got < count) { + if (have_deadline) { + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + long remain_ms = + (deadline.tv_sec - now.tv_sec) * 1000L + + (deadline.tv_nsec - now.tv_nsec) / 1000000L; + if (remain_ms <= 0) { + errno = ETIMEDOUT; + return (ssize_t)got; + } + struct timeval tv; + tv.tv_sec = remain_ms / 1000; + tv.tv_usec = (remain_ms % 1000) * 1000; + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + int s = select(fd + 1, &fds, NULL, NULL, &tv); + if (s < 0) { + if (errno == EINTR) continue; + return -1; + } + if (s == 0) { + errno = ETIMEDOUT; + return (ssize_t)got; + } + } + ssize_t n = read(fd, p + got, count - got); + if (n > 0) { + got += (size_t)n; + continue; + } + if (n < 0) { + if (errno == EINTR) continue; + return -1; + } + /* n == 0: writer closed pipe (clean EOF) */ + errno = 0; + return (ssize_t)got; + } + return (ssize_t)got; } /** - * @brief Read length-prefixed data from a file descriptor + * @brief Read length-prefixed data from a file descriptor. * - * Reads a 4-byte length prefix followed by the data payload. - * Uses read_with_timeout for optional timeout support. + * Wire format: 4-byte length prefix (native endianness) followed by + * @p len bytes of payload. Built on top of read_with_timeout, so + * partial reads / EINTR are handled transparently. * * @param fd File descriptor to read from - * @param data_out Pointer to store allocated data buffer (caller must free with enif_free) + * @param data_out Pointer to store allocated data buffer * @param len_out Pointer to store data length - * @param timeout_ms Timeout in milliseconds (0 = no timeout) + * @param timeout_ms Total timeout in milliseconds (0 = no timeout) * * @return 0 on success, -1 on read error/timeout, -2 on allocation failure * - * @note On success with len > 0, caller must call enif_free(*data_out) + * @note Updated contract: on -1, the calling layer MUST treat the pipe as + * desynchronised. There is no resync attempt here. Callers (sync + * thread-callback, suspended-callback) react: poison the worker or + * destroy the context. On success with len > 0, caller must free + * *data_out via enif_free(). */ static int read_length_prefixed_data(int fd, char **data_out, uint32_t *len_out, int timeout_ms) { uint32_t len; ssize_t n = read_with_timeout(fd, &len, sizeof(len), timeout_ms); - if (n != sizeof(len)) { + if (n != (ssize_t)sizeof(len)) { return -1; } @@ -1762,6 +1808,91 @@ static int read_length_prefixed_data(int fd, char **data_out, uint32_t *len_out, return 0; } +/** + * @brief Write status codes for write_all_with_deadline(). + */ +typedef enum { + WRITE_OK = 0, + WRITE_TIMEOUT = -1, + WRITE_ERROR = -2 +} write_result_t; + +/** + * @brief Write exactly @p count bytes to a (typically non-blocking) fd + * with a deadline. + * + * Loops on partial writes / EINTR / EAGAIN. On EAGAIN, uses select() for + * write-readiness with the remaining deadline. Used by the thread-worker + * write path to avoid pinning a dirty I/O scheduler thread on a stalled + * Python reader. + * + * @param fd File descriptor to write to + * @param buf Buffer to write + * @param count Number of bytes to write + * @param timeout_ms Total timeout in milliseconds (0 = no timeout) + * + * @return WRITE_OK on full write, WRITE_TIMEOUT on deadline expiry, + * WRITE_ERROR on hard error (errno preserved). + */ +static write_result_t write_all_with_deadline(int fd, const void *buf, + size_t count, int timeout_ms) { + const char *p = buf; + size_t sent = 0; + struct timespec deadline = {0}; + bool have_deadline = (timeout_ms > 0); + + if (have_deadline) { + clock_gettime(CLOCK_MONOTONIC, &deadline); + deadline.tv_sec += timeout_ms / 1000; + deadline.tv_nsec += (long)(timeout_ms % 1000) * 1000000L; + if (deadline.tv_nsec >= 1000000000L) { + deadline.tv_sec += 1; + deadline.tv_nsec -= 1000000000L; + } + } + + while (sent < count) { + ssize_t n = write(fd, p + sent, count - sent); + if (n > 0) { + sent += (size_t)n; + continue; + } + if (n < 0) { + if (errno == EINTR) continue; + if (errno != EAGAIN && errno != EWOULDBLOCK) { + return WRITE_ERROR; + } + /* Pipe full: wait for write-readiness within the deadline. */ + if (!have_deadline) { + return WRITE_ERROR; /* would block, no deadline */ + } + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + long remain_ms = + (deadline.tv_sec - now.tv_sec) * 1000L + + (deadline.tv_nsec - now.tv_nsec) / 1000000L; + if (remain_ms <= 0) return WRITE_TIMEOUT; + struct timeval tv; + tv.tv_sec = remain_ms / 1000; + tv.tv_usec = (remain_ms % 1000) * 1000; + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + int s = select(fd + 1, NULL, &fds, NULL, &tv); + if (s < 0) { + if (errno == EINTR) continue; + return WRITE_ERROR; + } + if (s == 0) return WRITE_TIMEOUT; + /* fd writable; loop back. */ + continue; + } + /* n == 0: shouldn't happen for write(); treat as error. */ + return WRITE_ERROR; + } + return WRITE_OK; +} + /** @} */ /* ============================================================================ diff --git a/c_src/py_thread_worker.c b/c_src/py_thread_worker.c index 5e0aed7..d8a3e37 100644 --- a/c_src/py_thread_worker.c +++ b/c_src/py_thread_worker.c @@ -60,8 +60,8 @@ typedef struct thread_worker { /** * @brief Response pipe file descriptors - * - [0] = read end (Python reads responses) - * - [1] = write end (Erlang writes responses) + * - [0] = read end (Python reads responses, blocking) + * - [1] = write end (Erlang writes responses, O_NONBLOCK) */ int response_pipe[2]; @@ -71,6 +71,13 @@ typedef struct thread_worker { /** @brief Flag: worker is currently in use */ bool in_use; + /** + * @brief Flag: worker is permanently retired due to a callback + * synchronisation failure. Skipped by acquire_thread_worker; not + * recycled until NIF unload. + */ + bool poisoned; + /** @brief PID of the Erlang handler process for this worker */ ErlNifPid handler_pid; @@ -109,6 +116,45 @@ ErlNifPid g_thread_coordinator_pid; /** @brief Flag: coordinator PID has been set */ bool g_has_thread_coordinator = false; +/** + * @brief Counter of poisoned (permanently retired) workers. + * + * Bounded diagnostic ceiling: when this exceeds MAX_POISONED_WORKERS we + * surface a loud error to the user instead of silently leaking pipe pairs. + * Reaching this threshold means the callback subsystem is genuinely broken + * and warrants a bug report. + */ +static _Atomic uint64_t g_poisoned_workers_count = 0; + +/** @brief Diagnostic ceiling on poisoned-worker accumulation. */ +#define MAX_POISONED_WORKERS 64 + +/** @brief Read/write timeout for thread-callback pipe traffic (30s). */ +#define THREAD_WORKER_IO_TIMEOUT_MS 30000 + +/** @brief Stack-buffer threshold for combined-frame writes. */ +#define THREAD_WORKER_STACK_FRAME_LIMIT 1024 + +/* PY_THREAD_CB_TRACE: optional stderr tracing for callback I/O. + * Compile-time gate, default OFF. Enable with CMake + * -DENABLE_PY_THREAD_CB_TRACE=ON + * which sets PY_THREAD_CB_TRACE=1. + */ +#ifdef PY_THREAD_CB_TRACE +# define PY_THREAD_CB_LOG(fmt, ...) \ + do { \ + struct timespec _ts; \ + clock_gettime(CLOCK_MONOTONIC, &_ts); \ + enif_fprintf(stderr, \ + "[py_thread_cb %ld.%09ld tid=0x%08x] " fmt "\n", \ + (long)_ts.tv_sec, (long)_ts.tv_nsec, \ + (unsigned)((uintptr_t)pthread_self() & 0xffffffff), \ + ##__VA_ARGS__); \ + } while (0) +#else +# define PY_THREAD_CB_LOG(fmt, ...) ((void)0) +#endif + /* ============================================================================ * Thread Worker Destructor (pthread_key cleanup) * ============================================================================ */ @@ -247,6 +293,7 @@ static thread_worker_t *create_thread_worker(void) { tw->response_pipe[1] = -1; tw->in_use = true; tw->has_handler = false; + tw->poisoned = false; /* Create response pipe */ if (pipe(tw->response_pipe) < 0) { @@ -254,6 +301,15 @@ static thread_worker_t *create_thread_worker(void) { return NULL; } + /* Set the WRITE end non-blocking. The Erlang handler runs on a dirty + * I/O scheduler thread (see py_nif.c registration); making the write + * end non-blocking lets write_all_with_deadline() return on a stalled + * Python reader instead of pinning a dirty thread indefinitely. */ + int wflags = fcntl(tw->response_pipe[1], F_GETFL, 0); + if (wflags >= 0) { + (void)fcntl(tw->response_pipe[1], F_SETFL, wflags | O_NONBLOCK); + } + pthread_mutex_init(&tw->mutex, NULL); /* Add to pool */ @@ -263,25 +319,51 @@ static thread_worker_t *create_thread_worker(void) { return tw; } +/** + * @brief Drain any leftover bytes from a pipe read end (non-blocking). + * + * Called when reusing a worker so the new owner does not inherit stale + * bytes from a previous owner that exited mid-frame. Always restores + * the original flags, even on intermediate read errors. + */ +static void drain_pipe(int fd) { + int orig = fcntl(fd, F_GETFL, 0); + if (orig < 0) return; + if (fcntl(fd, F_SETFL, orig | O_NONBLOCK) < 0) return; + + char scratch[256]; + for (;;) { + ssize_t n = read(fd, scratch, sizeof(scratch)); + if (n > 0) continue; + if (n < 0 && errno == EINTR) continue; + break; /* EAGAIN, 0, or hard error */ + } + (void)fcntl(fd, F_SETFL, orig); +} + /** * @brief Acquire a thread worker from the pool * - * First tries to find an existing unused worker in the pool. - * If none available, creates a new one. + * First tries to find an existing unused, non-poisoned worker in the + * pool. If none available, creates a new one. Poisoned workers stay in + * the linked list but are never returned. * * @return Pointer to acquired worker, or NULL on failure */ static thread_worker_t *acquire_thread_worker(void) { pthread_mutex_lock(&g_thread_pool_mutex); - /* Look for unused worker */ + /* Look for unused, non-poisoned worker */ thread_worker_t *tw = g_thread_pool_head; while (tw != NULL) { pthread_mutex_lock(&tw->mutex); - if (!tw->in_use) { + if (!tw->in_use && !tw->poisoned) { tw->in_use = true; + int read_fd = tw->response_pipe[0]; pthread_mutex_unlock(&tw->mutex); pthread_mutex_unlock(&g_thread_pool_mutex); + /* Drain any stale bytes the previous owner left behind. */ + if (read_fd >= 0) drain_pipe(read_fd); return tw; } pthread_mutex_unlock(&tw->mutex); @@ -295,6 +377,78 @@ static thread_worker_t *acquire_thread_worker(void) { return tw; } +/** + * @brief Permanently retire a worker after a callback synchronisation + * failure. + * + * Called by thread_worker_call when the read path cannot guarantee + * frame integrity (id mismatch + corrupted payload, partial-frame + * timeout, EOF mid-frame). The worker is unlinked from the pool, + * its pipes are closed (so the Erlang handler's next write fails and + * the gen_server's trap_exit clause reaps it), its mutex is + * destroyed, and the struct itself is freed — so a hot loop of + * synchronisation failures cannot grow runtime memory until NIF + * unload. + * + * The lifetime counter g_poisoned_workers_count still increments + * monotonically so the diagnostic ceiling + * (MAX_POISONED_WORKERS) and stderr warning still fire even when + * structs are reclaimed. + * + * @note Caller MUST clear tl_thread_worker (and the pthread-key + * binding) BEFORE calling this function. Once the worker is + * unlinked it cannot be referenced again — the pointer is + * freed and any later access is use-after-free. + */ +static void poison_thread_worker(thread_worker_t *tw) { + /* Mark + unlink atomically under the pool lock so a racing + * acquire_thread_worker either runs before us (sees a healthy + * worker that we then unlink, but never returns it again) or + * after us (cannot find tw in the list). */ + pthread_mutex_lock(&g_thread_pool_mutex); + pthread_mutex_lock(&tw->mutex); + tw->poisoned = true; + tw->in_use = false; + int read_fd = tw->response_pipe[0]; + int write_fd = tw->response_pipe[1]; + tw->response_pipe[0] = -1; + tw->response_pipe[1] = -1; + pthread_mutex_unlock(&tw->mutex); + + if (g_thread_pool_head == tw) { + g_thread_pool_head = tw->next; + } else { + thread_worker_t *prev = g_thread_pool_head; + while (prev != NULL && prev->next != tw) { + prev = prev->next; + } + if (prev != NULL) { + prev->next = tw->next; + } + } + pthread_mutex_unlock(&g_thread_pool_mutex); + + /* No other thread can reach tw now: not via the pool list, not + * via tl_thread_worker (caller cleared it), not via the pthread + * key (caller cleared it). Safe to destroy and free. */ + if (read_fd >= 0) close(read_fd); + if (write_fd >= 0) close(write_fd); + pthread_mutex_destroy(&tw->mutex); + + uint64_t prev_count = atomic_fetch_add(&g_poisoned_workers_count, 1); + if (prev_count + 1 >= MAX_POISONED_WORKERS) { + enif_fprintf(stderr, + "[erlang_python] WARNING: %llu thread-callback workers have been " + "poisoned. The thread-callback subsystem is unhealthy; please " + "file a bug report with the failing test case.\n", + (unsigned long long)(prev_count + 1)); + } + PY_THREAD_CB_LOG("poison tw_id=%llu count=%llu", + (unsigned long long)tw->id, (unsigned long long)(prev_count + 1)); + + enif_free(tw); +} + /* ============================================================================ * Thread Worker Call Implementation * ============================================================================ */ @@ -330,26 +484,15 @@ static int thread_worker_spawn_handler(thread_worker_t *tw) { } enif_free_env(msg_env); - /* Read handler PID from pipe (Erlang writes it after spawning) - * Use 10 second timeout for handler spawn */ + /* Read readiness signal: 4 bytes of length == 0. Strict success + * condition (Defect 5): both the byte count must match AND the + * value must be zero. Short reads are not silently accepted. */ uint32_t response_len = 0; - ssize_t n = read_with_timeout(tw->response_pipe[0], &response_len, sizeof(response_len), 10000); - if (n <= 0) { - /* Timeout or error - handler spawn failed */ + ssize_t n = read_with_timeout(tw->response_pipe[0], &response_len, + sizeof(response_len), 10000); + if (n != (ssize_t)sizeof(response_len) || response_len != 0) { return -1; } - if (n != sizeof(response_len) || response_len == 0) { - /* Handler spawned successfully - pid info is in the length as a signal */ - tw->has_handler = true; - return 0; - } - - /* If response_len > 0, there might be error data */ - if (response_len > 1000) { - /* Sanity check - something went wrong */ - return -1; - } - tw->has_handler = true; return 0; } @@ -360,6 +503,15 @@ static int thread_worker_spawn_handler(thread_worker_t *tw) { * This is called when a Python thread that is NOT an executor thread * (i.e., tl_current_worker == NULL) tries to call erlang.call(). * + * Wire format (response from Erlang handler): + * <> + * + * The callback_id is matched against the request's expected id; on + * mismatch, the rest of the frame is consumed and the loop continues. + * Any partial-frame failure poisons the worker (see + * poison_thread_worker) so subsequent calls from this thread acquire a + * fresh one. + * * @param func_name Name of the Erlang function to call * @param func_name_len Length of function name * @param call_args Python tuple of arguments @@ -367,6 +519,14 @@ static int thread_worker_spawn_handler(thread_worker_t *tw) { */ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, PyObject *call_args) { + /* Diagnostic ceiling: surface the unhealthy subsystem to the user. */ + if (atomic_load(&g_poisoned_workers_count) >= MAX_POISONED_WORKERS) { + PyErr_SetString(PyExc_RuntimeError, + "thread-callback subsystem unhealthy " + "(too many poisoned workers; see stderr)"); + return NULL; + } + /* Get or create sticky worker for this thread */ if (tl_thread_worker == NULL) { tl_thread_worker = acquire_thread_worker(); @@ -412,8 +572,8 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, ERL_NIF_TERM args_term = py_to_term(msg_env, call_args); /* Generate callback ID */ - uint64_t callback_id = atomic_fetch_add(&g_callback_id_counter, 1); - ERL_NIF_TERM id_term = enif_make_uint64(msg_env, callback_id); + uint64_t expected_id = atomic_fetch_add(&g_callback_id_counter, 1); + ERL_NIF_TERM id_term = enif_make_uint64(msg_env, expected_id); /* Send message: {thread_callback, WorkerId, CallbackId, FuncName, Args} */ ERL_NIF_TERM msg = enif_make_tuple5(msg_env, @@ -423,10 +583,6 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, func_term, args_term); - char *response_data = NULL; - uint32_t response_len = 0; - int read_result; - /* Send message to coordinator (can be done with GIL held) */ if (!enif_send(NULL, &g_thread_coordinator_pid, msg_env, msg)) { enif_free_env(msg_env); @@ -434,33 +590,72 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, return NULL; } enif_free_env(msg_env); + PY_THREAD_CB_LOG("send tw_id=%llu cb_id=%llu", + (unsigned long long)tw->id, (unsigned long long)expected_id); + + /* Read frames until we get the one matching expected_id, or fail. */ + char *response_data = NULL; + uint32_t response_len = 0; + int desync = 0; /* 1 = poison the worker before returning */ - /* Release GIL while waiting for response */ Py_BEGIN_ALLOW_THREADS - /* Use 30 second timeout to prevent indefinite blocking */ - read_result = read_length_prefixed_data( - tw->response_pipe[0], &response_data, &response_len, 30000); + while (1) { + uint64_t got_id = 0; + ssize_t nid = read_with_timeout(tw->response_pipe[0], + &got_id, sizeof(got_id), + THREAD_WORKER_IO_TIMEOUT_MS); + if (nid != (ssize_t)sizeof(got_id)) { + desync = 1; + break; + } + int r = read_length_prefixed_data( + tw->response_pipe[0], &response_data, &response_len, + THREAD_WORKER_IO_TIMEOUT_MS); + if (r != 0) { + desync = 1; + break; + } + if (got_id == expected_id) { + break; /* response_data + response_len ready */ + } + /* Stale frame: discard and continue draining. */ + if (response_data != NULL) { + enif_free(response_data); + response_data = NULL; + response_len = 0; + } + } Py_END_ALLOW_THREADS - if (read_result == -1) { - if (errno == ETIMEDOUT) { - PyErr_SetString(PyExc_TimeoutError, "Callback response timed out"); - } else { - PyErr_SetString(PyExc_RuntimeError, "Failed to read callback response"); + if (desync) { + if (response_data != NULL) { + enif_free(response_data); } + PY_THREAD_CB_LOG("desync tw_id=%llu cb_id=%llu errno=%d", + (unsigned long long)tw->id, + (unsigned long long)expected_id, errno); + /* poison_thread_worker frees `tw`; clear every reference to + * it BEFORE calling so the pthread-key destructor cannot + * later observe a dangling pointer. */ + tl_thread_worker = NULL; + if (g_thread_worker_key_created) { + pthread_setspecific(g_thread_worker_key, NULL); + } + poison_thread_worker(tw); + PyErr_SetString(PyExc_RuntimeError, + "callback synchronisation lost; retry"); return NULL; } - if (read_result == -2) { - PyErr_SetString(PyExc_MemoryError, "Failed to allocate response buffer"); - return NULL; - } - /* Parse response using existing function */ - PyObject *result = parse_callback_response((unsigned char *)response_data, response_len); + PY_THREAD_CB_LOG("recv tw_id=%llu cb_id=%llu len=%u", + (unsigned long long)tw->id, (unsigned long long)expected_id, + (unsigned)response_len); + + PyObject *result = parse_callback_response( + (unsigned char *)response_data, response_len); if (response_data != NULL) { enif_free(response_data); } - return result; } @@ -488,38 +683,67 @@ static ERL_NIF_TERM nif_thread_worker_set_coordinator(ErlNifEnv *env, int argc, } /** - * @brief NIF to write response to a thread worker's pipe + * @brief NIF to write an id-prefixed response to a thread worker's pipe. + * + * Args: WriteFd, CallbackId, ResponseBinary * - * Args: WriteFd, ResponseBinary - * Called by Erlang handler to send callback result back to Python thread. + * Wire format (matches thread_worker_call's reader): + * <> + * + * Combining the three pieces into one looped write closes the + * length-then-data race (Defect 3) and lets the reader correlate + * responses by callback_id. The write end is non-blocking and the NIF + * is registered as ERL_NIF_DIRTY_JOB_IO_BOUND, so a stalled Python + * reader produces a write_timeout instead of pinning a scheduler. */ -static ERL_NIF_TERM nif_thread_worker_write(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]) { +static ERL_NIF_TERM nif_thread_worker_write_with_id(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { (void)argc; int fd; + ErlNifUInt64 callback_id; ErlNifBinary response; if (!enif_get_int(env, argv[0], &fd)) { return make_error(env, "invalid_fd"); } - - if (!enif_inspect_binary(env, argv[1], &response)) { + if (!enif_get_uint64(env, argv[1], &callback_id)) { + return make_error(env, "invalid_callback_id"); + } + if (!enif_inspect_binary(env, argv[2], &response)) { return make_error(env, "invalid_response"); } - /* Write length then data */ - uint32_t len = (uint32_t)response.size; - ssize_t n = write(fd, &len, sizeof(len)); - if (n != sizeof(len)) { - return make_error(env, "write_length_failed"); + size_t total = sizeof(callback_id) + sizeof(uint32_t) + response.size; + char stack_buf[THREAD_WORKER_STACK_FRAME_LIMIT]; + char *buf = (total <= sizeof(stack_buf)) + ? stack_buf + : enif_alloc(total); + if (buf == NULL) { + return make_error(env, "alloc_failed"); } - n = write(fd, response.data, response.size); - if (n != (ssize_t)response.size) { - return make_error(env, "write_data_failed"); + uint64_t cb_id = (uint64_t)callback_id; + uint32_t len = (uint32_t)response.size; + memcpy(buf, &cb_id, sizeof(cb_id)); + memcpy(buf + sizeof(cb_id), &len, sizeof(len)); + if (response.size > 0) { + memcpy(buf + sizeof(cb_id) + sizeof(len), + response.data, response.size); } - return ATOM_OK; + write_result_t r = write_all_with_deadline( + fd, buf, total, THREAD_WORKER_IO_TIMEOUT_MS); + + if (buf != stack_buf) { + enif_free(buf); + } + + switch (r) { + case WRITE_OK: return ATOM_OK; + case WRITE_TIMEOUT: return make_error(env, "write_timeout"); + case WRITE_ERROR: return make_error(env, "write_failed"); + } + return make_error(env, "write_failed"); } /** @@ -548,12 +772,24 @@ static ERL_NIF_TERM nif_thread_worker_signal_ready(ErlNifEnv *env, int argc, } /** - * @brief NIF to write an async callback response + * @brief NIF to write an async callback response. * * Args: WriteFd, CallbackId, ResponseBinary - * Called by Erlang to send the result of an async callback back to Python. - * The response is written to the async callback pipe in the format: - * callback_id (8 bytes) + response_len (4 bytes) + response_data + * + * Wire format (matches process_async_callback_response in py_callback.c): + * <> + * + * Atomicity (review #4): the kernel does NOT guarantee per-write() + * atomicity beyond PIPE_BUF, so frame-on-the-wire integrity is + * guaranteed by the *single-writer process invariant* on the Erlang + * side (one async_writer_loop per WriteFd; see py_thread_handler.erl). + * This NIF does one looped non-blocking write of the combined buffer; + * even when the kernel chunks it, no other writer can interleave. + * + * Registered as ERL_NIF_DIRTY_JOB_IO_BOUND (see py_nif.c). The write + * end of async_callback_pipe is set non-blocking on the C side + * (py_callback.c init); a stalled Python reader produces + * write_timeout instead of holding a scheduler thread. */ static ERL_NIF_TERM nif_async_callback_response(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { @@ -565,35 +801,42 @@ static ERL_NIF_TERM nif_async_callback_response(ErlNifEnv *env, int argc, if (!enif_get_int(env, argv[0], &fd)) { return make_error(env, "invalid_fd"); } - if (!enif_get_uint64(env, argv[1], &callback_id)) { return make_error(env, "invalid_callback_id"); } - if (!enif_inspect_binary(env, argv[2], &response)) { return make_error(env, "invalid_response"); } - /* Write callback_id (8 bytes) */ - ssize_t n = write(fd, &callback_id, sizeof(callback_id)); - if (n != sizeof(callback_id)) { - return make_error(env, "write_callback_id_failed"); + size_t total = sizeof(callback_id) + sizeof(uint32_t) + response.size; + char stack_buf[THREAD_WORKER_STACK_FRAME_LIMIT]; + char *buf = (total <= sizeof(stack_buf)) + ? stack_buf + : enif_alloc(total); + if (buf == NULL) { + return make_error(env, "alloc_failed"); } - /* Write response_len (4 bytes) */ - uint32_t len = (uint32_t)response.size; - n = write(fd, &len, sizeof(len)); - if (n != sizeof(len)) { - return make_error(env, "write_length_failed"); + uint64_t cb_id = (uint64_t)callback_id; + uint32_t len = (uint32_t)response.size; + memcpy(buf, &cb_id, sizeof(cb_id)); + memcpy(buf + sizeof(cb_id), &len, sizeof(len)); + if (response.size > 0) { + memcpy(buf + sizeof(cb_id) + sizeof(len), + response.data, response.size); } - /* Write response_data */ - if (len > 0) { - n = write(fd, response.data, response.size); - if (n != (ssize_t)response.size) { - return make_error(env, "write_data_failed"); - } + write_result_t r = write_all_with_deadline( + fd, buf, total, THREAD_WORKER_IO_TIMEOUT_MS); + + if (buf != stack_buf) { + enif_free(buf); } - return ATOM_OK; + switch (r) { + case WRITE_OK: return ATOM_OK; + case WRITE_TIMEOUT: return make_error(env, "write_timeout"); + case WRITE_ERROR: return make_error(env, "write_failed"); + } + return make_error(env, "write_failed"); } diff --git a/scripts/stress_thread_callback.sh b/scripts/stress_thread_callback.sh new file mode 100755 index 0000000..68e026e --- /dev/null +++ b/scripts/stress_thread_callback.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +# Stress harness for py_thread_callback_SUITE. Runs the suite repeatedly +# and aborts on the first failure. Use under CPU pressure to widen race +# windows: +# +# for i in 1 2 3 4; do yes > /dev/null & done +# ENABLE_PY_THREAD_CB_TRACE=ON ./scripts/stress_thread_callback.sh 50 +# kill %1 %2 %3 %4 +# +# A clean run prints "PASS " on every iteration and exits 0. + +set -euo pipefail + +ITERATIONS="${1:-50}" + +cd "$(dirname "$0")/.." + +if [[ "${ENABLE_PY_THREAD_CB_TRACE:-OFF}" == "ON" ]]; then + echo "Rebuilding with PY_THREAD_CB_TRACE on..." + rm -rf _build/cmake _build/default/lib/erlang_python/priv/erlang_python_nif* + mkdir -p _build/cmake + (cd _build/cmake && cmake ../../c_src -DENABLE_PY_THREAD_CB_TRACE=ON \ + && cmake --build . -- -j "$(nproc 2>/dev/null || sysctl -n hw.ncpu)") +fi + +rebar3 compile + +for i in $(seq 1 "$ITERATIONS"); do + if ! rebar3 ct --suite=test/py_thread_callback_SUITE --readable=compact; then + echo "FAIL on iteration $i" + exit 1 + fi + echo "PASS $i" +done + +echo "All $ITERATIONS iterations passed." diff --git a/src/py_nif.erl b/src/py_nif.erl index 55b8b17..4d07a6d 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -71,7 +71,7 @@ execution_mode/0, %% Thread worker support (ThreadPoolExecutor) thread_worker_set_coordinator/1, - thread_worker_write/2, + thread_worker_write_with_id/3, thread_worker_signal_ready/1, %% Async callback support (for erlang.async_call) async_callback_response/3, @@ -593,11 +593,18 @@ execution_mode() -> thread_worker_set_coordinator(_Pid) -> ?NIF_STUB. -%% @doc Write a callback response to a thread worker's pipe. -%% Fd is the write end of the response pipe. -%% Response is the result binary (status byte + python repr). --spec thread_worker_write(integer(), binary()) -> ok | {error, term()}. -thread_worker_write(_Fd, _Response) -> +%% @doc Write a callback response to a thread worker's pipe, prefixed +%% with the callback id so the Python reader can correlate responses +%% to outstanding requests. +%% +%% Wire format: `<>'. +%% The whole frame is written in one looped non-blocking write; this +%% NIF is registered as `ERL_NIF_DIRTY_JOB_IO_BOUND' so a stalled +%% Python reader produces `{error, write_timeout}' rather than pinning +%% a scheduler. +-spec thread_worker_write_with_id(integer(), non_neg_integer(), binary()) -> + ok | {error, term()}. +thread_worker_write_with_id(_Fd, _CallbackId, _Response) -> ?NIF_STUB. %% @doc Signal that a thread worker handler is ready. diff --git a/src/py_thread_handler.erl b/src/py_thread_handler.erl index cd463d6..343e4df 100644 --- a/src/py_thread_handler.erl +++ b/src/py_thread_handler.erl @@ -48,11 +48,28 @@ terminate/2 ]). -%% State: map of WorkerId => {HandlerPid, WriteFd} +%% State: +%% handlers — sync thread workers: WorkerId => {HandlerPid, WriteFd} +%% async_writers — async pipes: WriteFd => {WriterPid, Counter} where +%% Counter is an atomics ref tracking in-flight async +%% callbacks (incremented at submission, decremented +%% after the writer hands the frame to the NIF). +%% writer_fds — reverse map: WriterPid => WriteFd (fast lookup on +%% 'DOWN' messages). -record(state, { - handlers = #{} :: #{non_neg_integer() => {pid(), integer()}} + handlers = #{} :: #{non_neg_integer() => {pid(), integer()}}, + async_writers = #{} :: #{integer() => {pid(), atomics:atomics_ref()}}, + writer_fds = #{} :: #{pid() => integer()} }). +%% Bound on combined async in-flight + writer-mailbox depth. The +%% atomics counter covers BOTH executors that have spawned but not +%% yet enqueued AND messages already sitting in the writer's mailbox, +%% so the bound holds even when many parallel callbacks complete +%% before the writer drains. Submissions beyond this point are failed +%% inline with an "async callback queue full" error frame. +-define(ASYNC_WRITER_MAX_QUEUE, 10000). + %%% ============================================================================ %%% API %%% ============================================================================ @@ -66,6 +83,11 @@ start_link() -> %%% ============================================================================ init([]) -> + %% Trap exits so a dying handler / async writer is reaped via + %% handle_info({'EXIT', ...}) instead of taking the coordinator + %% down. (Also enables the 'DOWN' clause for monitored writers, + %% which we route through the same cleanup logic.) + process_flag(trap_exit, true), %% Register ourselves as the thread worker coordinator case py_nif:thread_worker_set_coordinator(self()) of ok -> @@ -105,24 +127,58 @@ handle_info({thread_callback, WorkerId, CallbackId, FuncName, Args}, end, {noreply, State}; -%% Handle async callback request from Python async_call() -%% Unlike thread_callback, this uses a global pipe for all async callbacks. -%% Each response includes the callback_id so Python can match it to the right Future. -handle_info({async_callback, CallbackId, FuncName, Args, WriteFd}, State) -> - %% Spawn a process to handle this callback asynchronously - %% This allows multiple async callbacks to be processed concurrently - spawn_link(fun() -> - handle_async_callback(WriteFd, CallbackId, FuncName, Args) - end), - {noreply, State}; - -%% Handle handler process exit -handle_info({'EXIT', Pid, _Reason}, #state{handlers = Handlers} = State) -> - %% Remove handler from map +%% Handle async callback request from Python async_call(). +%% +%% Unlike thread_callback, this uses one shared per-interpreter pipe +%% for all async callbacks. To prevent frame-on-the-wire interleaving +%% from concurrent writers (issue #63), every response is funnelled +%% through a single writer process per WriteFd: callback execution +%% stays parallel, only the pipe write is serialised. +handle_info({async_callback, CallbackId, FuncName, Args, WriteFd}, State0) -> + {Writer, Counter, State1} = ensure_async_writer(WriteFd, State0), + %% Reserve a slot in the in-flight counter. atomics:add_get is + %% atomic and gives us the post-increment value, so the check is + %% race-free across submissions on different schedulers. + case atomics:add_get(Counter, 1, 1) of + N when N > ?ASYNC_WRITER_MAX_QUEUE -> + %% Already at the bound: undo the reservation and fail + %% this callback with a queue-full error frame. The + %% writer still serialises the error frame on the pipe + %% (so the Future resolves with a clear RuntimeError + %% rather than hanging), and we use the same atomics ref + %% so the queued overflow message stays accounted for + %% until the writer drains it. + atomics:sub(Counter, 1, 1), + Writer ! {overflow, CallbackId, Counter}; + _ -> + spawn(fun() -> + Response = run_async_callback(FuncName, Args), + Writer ! {respond, CallbackId, Response, Counter} + end) + end, + {noreply, State1}; + +%% Linked-process exit: covers both sync handlers and async writers. +%% Both are spawn_link'd so a coordinator crash takes them with it +%% (no leaked writer holding the async pipe fd after the gen_server +%% restarts). When THEY die independently we trap the EXIT here and +%% remove the dead pid from whichever map it lives in. +handle_info({'EXIT', Pid, _Reason}, + #state{handlers = Handlers, + async_writers = Writers, + writer_fds = WriterFds} = State) -> NewHandlers = maps:filter( fun(_WorkerId, {HandlerPid, _Fd}) -> HandlerPid =/= Pid end, Handlers), - {noreply, State#state{handlers = NewHandlers}}; + case maps:take(Pid, WriterFds) of + {WriteFd, NewWriterFds} -> + NewWriters = maps:remove(WriteFd, Writers), + {noreply, State#state{handlers = NewHandlers, + async_writers = NewWriters, + writer_fds = NewWriterFds}}; + error -> + {noreply, State#state{handlers = NewHandlers}} + end; handle_info(_Info, State) -> {noreply, State}. @@ -138,9 +194,12 @@ terminate(_Reason, _State) -> %% Each handler is dedicated to one Python thread and has its own response pipe. handler_loop(WorkerId, WriteFd) -> receive - {thread_callback, _CallbackId, FuncName, Args} -> - %% Execute the callback and send response - handle_thread_callback(WriteFd, FuncName, Args), + {thread_callback, CallbackId, FuncName, Args} -> + %% Execute the callback and send response. If the write fails + %% (pipe closed by the C side after worker poisoning, or + %% write_timeout) we exit so the coordinator's trap_exit + %% clause removes us from its handler map. + handle_thread_callback(WriteFd, CallbackId, FuncName, Args), handler_loop(WorkerId, WriteFd); {pipe_closed} -> @@ -154,8 +213,8 @@ handler_loop(WorkerId, WriteFd) -> handler_loop(WorkerId, WriteFd) end. -%% Execute a callback and write response to the pipe -handle_thread_callback(WriteFd, FuncName, Args) -> +%% Execute a callback and write the id-prefixed response to the pipe. +handle_thread_callback(WriteFd, CallbackId, FuncName, Args) -> %% Convert Args from tuple to list if needed ArgsList = case Args of T when is_tuple(T) -> tuple_to_list(T); @@ -180,25 +239,26 @@ handle_thread_callback(WriteFd, FuncName, Args) -> <<1, ErrMsg/binary>> end, - %% Write response to pipe - py_nif:thread_worker_write(WriteFd, Response). + %% Write response to pipe; exit on failure so we are reaped. + case py_nif:thread_worker_write_with_id(WriteFd, CallbackId, Response) of + ok -> ok; + {error, Reason1} -> + exit({thread_worker_write_failed, Reason1}) + end. -%% Execute an async callback and write response to the async callback pipe. -%% Unlike handle_thread_callback, this includes the callback_id in the response -%% so Python can match it to the correct Future. -handle_async_callback(WriteFd, CallbackId, FuncName, Args) -> - %% Convert Args from tuple to list if needed +%% Execute the user's registered function and encode the result as a +%% wire-format response body (`<>`). +%% Used by async_writer_loop (and indirectly by handle_thread_callback +%% via the same encoding shape — the sync path keeps its own copy +%% close to the write call site). +run_async_callback(FuncName, Args) -> ArgsList = case Args of T when is_tuple(T) -> tuple_to_list(T); L when is_list(L) -> L; _ -> [Args] end, - - %% Execute the registered function - Response = case py_callback:execute(FuncName, ArgsList) of + case py_callback:execute(FuncName, ArgsList) of {ok, Result} -> - %% Encode result as Python-parseable string - %% Format: status_byte (0=ok) + python_repr ResultStr = term_to_python_repr(Result), <<0, ResultStr/binary>>; {error, {not_found, Name}} -> @@ -209,10 +269,76 @@ handle_async_callback(WriteFd, CallbackId, FuncName, Args) -> ErrMsg = iolist_to_binary( io_lib:format("~p: ~p", [Class, Reason])), <<1, ErrMsg/binary>> - end, + end. - %% Write response to async callback pipe (includes callback_id) - py_nif:async_callback_response(WriteFd, CallbackId, Response). +%% Look up or spawn the async writer for a given WriteFd. Writers are +%% spawn_link'd from the coordinator (which traps exits): a writer +%% death surfaces as `{'EXIT', WriterPid, _}' for cleanup, and a +%% coordinator crash propagates the EXIT signal back to the writer +%% (which does not trap) so it dies with the gen_server instead of +%% leaking the pipe fd. Supervision then restarts the gen_server with +%% an empty `async_writers' map and writers are respawned lazily on +%% the next callback. Each writer gets its own atomics counter that +%% bounds total in-flight async callbacks (executors + queued +%% responses) on that pipe. +ensure_async_writer(WriteFd, + #state{async_writers = Writers, + writer_fds = WriterFds} = State) -> + case maps:get(WriteFd, Writers, undefined) of + undefined -> + Counter = atomics:new(1, [{signed, false}]), + Writer = spawn_link(fun() -> async_writer_loop(WriteFd) end), + {Writer, Counter, State#state{ + async_writers = Writers#{WriteFd => {Writer, Counter}}, + writer_fds = WriterFds#{Writer => WriteFd}}}; + {Writer, Counter} -> + {Writer, Counter, State} + end. + +%% Per-fd writer process. Owns one async pipe write fd and is the only +%% process that calls py_nif:async_callback_response/3 for it; its +%% mailbox is the serialisation point for all responses on that fd. +%% +%% The {respond, ...} message carries the per-pipe atomics counter +%% from ensure_async_writer/2. The writer ALWAYS decrements after +%% handing the frame to the NIF (success or error) so the in-flight +%% bound is honoured even when the underlying write fails. {overflow, +%% ...} messages do not carry the counter — the gen_server already +%% un-reserved before sending — and the writer just emits the +%% error frame. +%% +%% Lifecycle: +%% - spawn_link'd from the coordinator: a coordinator crash +%% propagates an unhandled EXIT here (we do NOT trap_exit) and +%% the writer dies with it, releasing the fd before supervision +%% restarts the gen_server. +%% - On hard write errors the writer exits normally. The +%% coordinator's trap_exit clause catches the EXIT, drops the +%% map entry, and the next callback respawns the writer with a +%% fresh atomics counter so any leaked count from the previous +%% instance is discarded with the dead writer. +async_writer_loop(WriteFd) -> + receive + {respond, CallbackId, Response, Counter} -> + R = py_nif:async_callback_response(WriteFd, CallbackId, Response), + atomics:sub(Counter, 1, 1), + case R of + ok -> async_writer_loop(WriteFd); + {error, _} = Err -> + exit({async_callback_response_failed, Err}) + end; + {overflow, CallbackId} -> + Response = <<1, "async callback queue full">>, + case py_nif:async_callback_response(WriteFd, CallbackId, Response) of + ok -> async_writer_loop(WriteFd); + {error, _} = Err -> + exit({async_callback_response_failed, Err}) + end; + shutdown -> + ok; + _Other -> + async_writer_loop(WriteFd) + end. %%% ============================================================================ %%% Term to Python repr conversion diff --git a/test/py_thread_callback_SUITE.erl b/test/py_thread_callback_SUITE.erl index adce52b..a656bc3 100644 --- a/test/py_thread_callback_SUITE.erl +++ b/test/py_thread_callback_SUITE.erl @@ -23,7 +23,10 @@ threadpool_thread_reuse_test/1, simple_thread_basic_test/1, simple_thread_multiple_calls_test/1, - simple_thread_concurrent_test/1 + simple_thread_concurrent_test/1, + threadpool_high_concurrency_test/1, + async_callback_concurrent_test/1, + async_callback_large_payload_test/1 ]). all() -> @@ -36,7 +39,10 @@ all() -> threadpool_thread_reuse_test, simple_thread_basic_test, simple_thread_multiple_calls_test, - simple_thread_concurrent_test + simple_thread_concurrent_test, + threadpool_high_concurrency_test, + async_callback_concurrent_test, + async_callback_large_payload_test ]. init_per_suite(Config) -> @@ -63,6 +69,8 @@ end_per_testcase(_TestCase, _Config) -> catch py:unregister_function(square_in_erlang), catch py:unregister_function(maybe_fail), catch py:unregister_function(get_id), + catch py:unregister_function(async_double), + catch py:unregister_function(async_blob), ok. %%% ============================================================================ @@ -204,3 +212,104 @@ simple_thread_concurrent_test(_Config) -> {ok, Results} = py:eval(Code), [0, 2, 4, 6, 8] = Results, ok. + +%%% ============================================================================ +%%% Regression tests for issue #63 (sync + async pipe race / interleave) +%%% ============================================================================ + +%% @doc Stress the sync thread-callback path: +%% - 8 worker threads, +%% - 200 tasks per repetition, each task makes 5 sequential +%% erl.call('add_one', x) invocations, +%% - 10 repetitions per case. +%% +%% Without the Phase 2-5 fixes (looped reads, combined write, +%% callback_id correlation, worker poisoning), short-read or +%% length-then-data races deliver wrong values to the wrong caller — +%% the symptom captured in issue #63. With the fixes, every result +%% must equal x + 5. +threadpool_high_concurrency_test(_Config) -> + py:register_function(add_one, fun([X]) -> X + 1 end), + Code = << + "(lambda cf, erl: list(" + "cf.ThreadPoolExecutor(max_workers=8).__enter__().map(" + "lambda x: erl.call('add_one', erl.call('add_one', " + "erl.call('add_one', erl.call('add_one', erl.call('add_one', x))))), " + "range(200))))(__import__('concurrent.futures', " + "fromlist=['ThreadPoolExecutor']), __import__('erlang'))" + >>, + Expected = [X + 5 || X <- lists:seq(0, 199)], + lists:foreach( + fun(I) -> + {ok, Results} = py:eval(Code), + case Results of + Expected -> ok; + _ -> ct:fail({wrong_results_iter, I, Results}) + end + end, + lists:seq(1, 10)), + ok. + +%% @doc Stress the async thread-callback path (erlang.async_call from +%% Python). 50 concurrent invocations through a single +%% asyncio.gather; without the Phase 4a per-fd writer process or +%% Phase 4c resumable parser, frame-on-the-wire interleaving on the +%% shared async_callback_pipe corrupts results. +async_callback_concurrent_test(_Config) -> + py:register_function(async_double, fun([X]) -> X * 2 end), + %% Add the existing test/ directory to sys.path so the Python + %% helper module is importable. + TestDir = filename:join(code:lib_dir(erlang_python), "test"), + ok = py:exec(iolist_to_binary(io_lib:format( + "import sys; sys.path.insert(0, '~s')", [TestDir]))), + %% Define a Python coroutine that gathers 50 erlang.async_call + %% invocations and returns the result list. + ok = py:exec(<< + "import asyncio, erlang\n" + "async def _gather50():\n" + " return await asyncio.gather(*[\n" + " erlang.async_call('async_double', i)\n" + " for i in range(50)\n" + " ])\n" + "def _run_gather50():\n" + " return asyncio.run(_gather50())\n" + >>), + {ok, Results} = py:call('__main__', '_run_gather50', []), + Expected = [I * 2 || I <- lists:seq(0, 49)], + Expected = Results, + py:unregister_function(async_double), + ok. + +%% @doc Async callback with a payload larger than PIPE_BUF (4 KiB on +%% Linux, 512 on FreeBSD per POSIX minimum). Combined with concurrent +%% submissions, this exercises the looped non-blocking write under +%% the single-writer-process invariant: the kernel may chunk the +%% write but no other writer can interleave, so the resumable parser +%% on the Python side reassembles the frame intact. +async_callback_large_payload_test(_Config) -> + Size = 64 * 1024, + %% Printable ASCII only — the goal is wire-frame integrity across + %% the chunked write boundary, not arbitrary-byte encoding through + %% term_to_python_repr (which double-quotes binaries as Python + %% string literals). + Payload = list_to_binary([($a + (I rem 26)) || I <- lists:seq(1, Size)]), + py:register_function(async_blob, fun([_]) -> Payload end), + TestDir = filename:join(code:lib_dir(erlang_python), "test"), + ok = py:exec(iolist_to_binary(io_lib:format( + "import sys; sys.path.insert(0, '~s')", [TestDir]))), + ok = py:exec(<< + "import asyncio, erlang\n" + "async def _gather_blobs():\n" + " return await asyncio.gather(*[\n" + " erlang.async_call('async_blob', i)\n" + " for i in range(8)\n" + " ])\n" + "def _run_gather_blobs():\n" + " blobs = asyncio.run(_gather_blobs())\n" + " return [len(b) for b in blobs]\n" + >>), + {ok, Lengths} = py:call('__main__', '_run_gather_blobs', []), + Expected = lists:duplicate(8, Size), + Expected = Lengths, + py:unregister_function(async_blob), + ok.