apply: handle upstream connection loss without origin-advance leak#466
apply: handle upstream connection loss without origin-advance leak#466mason-sharp wants to merge 1 commit intomainfrom
Conversation
When the upstream TCP connection dies (firewall reload, walsender RST,
keepalive timeout) the apply worker had two interacting problems:
1. fd = PQsocket(applyconn) was captured once before stream_replay: and
reused by every WaitLatchOrSocket() call. After libpq closed the
socket the cached fd was stale (possibly reused by the OS), producing
epoll_ctl(EINVAL) on Linux when WaitLatchOrSocket re-armed it. The
follow-on exception was caught with use_try_block=true and
PG_RE_THROW'd, producing a noisy two-error cascade per disconnect.
2. PG_CATCH treated every error the same way -- AbortOutOfAnyTransaction
then goto stream_replay. For a connection-class error this is
meaningless: the upstream is gone, replay cannot succeed. Worse,
replorigin_session_origin_lsn was still set (by handle_begin) to the
in-flight remote transaction's final_lsn, and RecordTransactionAbort
advances the replication origin to that stale LSN unless the session
origin variable is cleared first. spock_apply_main's on-exit
flush_progress_if_needed(true) commit has the same gate. Either path
can silently advance origin past an aborted remote transaction so it
gets skipped on reconnect -- the loss mode PG core defends against in
start_apply (worker.c) with replorigin_reset.
Three changes:
* Tag the disconnect/timeout/EOF elog(ERROR) sites with
ERRCODE_CONNECTION_FAILURE via ereport so the discriminator below can
classify them deterministically.
* Refresh fd = PQsocket(applyconn) at the top of every wait-loop
iteration, after the previous iteration's PQconsumeInput has had a
chance to update libpq state. If the connection has gone bad, raise
a tagged ERROR rather than passing a closed fd to WaitLatchOrSocket.
Mirrors the libpqrcv_receive pattern in PG core.
* In apply_work's PG_CATCH, detect connection-class errors (sqlerrcode
CONNECTION_FAILURE / CONNECTION_EXCEPTION / CONNECTION_DOES_NOT_EXIST
/ ADMIN_SHUTDOWN, or PQstatus == CONNECTION_BAD as a libpq fallback)
and PG_RE_THROW. apply_work is the only PG_TRY in this call stack,
so the rethrow propagates to the bgworker error handler. Origin is
zeroed by spock_apply_worker_shmem_exit (before_shmem_exit) before
ShutdownPostgres calls AbortOutOfAnyTransaction, so neither the abort
nor the bypassed on-exit flush_progress can advance origin to the
stale LSN. Apply-side errors (constraint violations etc.) do NOT
take this branch -- they continue through the existing exception_log
replay path.
Documents the before_shmem_exit LIFO ordering dependency in
spock_apply_worker_shmem_exit so the invariant survives future edits.
Adds two TAP tests (not in the schedule, run manually):
- 101_clean_exit_on_conn_loss.pl asserts the new "exiting via
rethrow" LOG line is emitted. Fails without the fix.
- 102_no_origin_leak_on_disconnect.pl exercises the apply_delay /
SIGKILL path and asserts no assertion failure, no SIGABRT, and
no row loss across the disconnect.
(cherry picked from commit b75cb4f)
📝 WalkthroughWalkthroughThis PR refines apply-side error handling to safely detect and exit on connection failures. Socket liveness is re-validated per loop iteration, COPY errors use tagged error codes, and connection-class exceptions trigger immediate rethrow to avoid stale state replay. Two TAP tests validate the rethrow path engages on connection loss and that origin state is safely reset before reconnect. ChangesApply-side connection error handling and origin state
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Duplication | 0 |
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/spock_apply.c (1)
3050-3062:⚠️ Potential issue | 🟠 Major | ⚡ Quick winTag the idle-timeout path as a connection failure.
This branch still throws a plain
elog(ERROR). If the sender is hung but the TCP session stays established,PQstatus(applyconn)can remainCONNECTION_OK, so the outerPG_CATCHwill miss the new connection-class branch and can still enter apply-side replay/exception handling.Suggested fix
- elog(ERROR, "SPOCK %s: no data received for %d seconds, " - "reconnecting (spock.apply_idle_timeout)", - MySubscription->name, spock_apply_idle_timeout); + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("SPOCK %s: no data received for %d seconds, " + "reconnecting (spock.apply_idle_timeout)", + MySubscription->name, + spock_apply_idle_timeout)));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/spock_apply.c` around lines 3050 - 3062, The idle-timeout branch currently raises a plain elog(ERROR) which isn't classified as a connection failure; change the error to a connection-class error so outer PG_CATCH will treat it as a connection failure. In the block that checks (rc & WL_TIMEOUT && spock_apply_idle_timeout > 0) (using last_receive_timestamp, GetCurrentTimestamp, MySpockWorker->worker_status and SPOCK_WORKER_STATUS_STOPPED), replace the elog(ERROR, ...) call with an ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg(...))) (preserving the existing message and MySubscription->name and spock_apply_idle_timeout) so the error is tagged as a connection failure (similar to errors from PQstatus(applyconn) checks) and will be handled by the connection-reconnect branch in PG_CATCH.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/tap/t/101_clean_exit_on_conn_loss.pl`:
- Around line 81-88: The test builds $batch and calls psql_or_bail(1, $batch)
which currently sends all 100 INSERTs in one call and thus executes them in a
single transaction; change the test so each INSERT is submitted as its own
commit—either by invoking psql_or_bail(1, ...) for each INSERT in the loop
(calling psql_or_bail inside the for loop) or by modifying $batch to wrap each
INSERT in its own explicit BEGIN; ... COMMIT; block—so that test_clean_exit
exercises 100 separate transactions as intended.
---
Outside diff comments:
In `@src/spock_apply.c`:
- Around line 3050-3062: The idle-timeout branch currently raises a plain
elog(ERROR) which isn't classified as a connection failure; change the error to
a connection-class error so outer PG_CATCH will treat it as a connection
failure. In the block that checks (rc & WL_TIMEOUT && spock_apply_idle_timeout >
0) (using last_receive_timestamp, GetCurrentTimestamp,
MySpockWorker->worker_status and SPOCK_WORKER_STATUS_STOPPED), replace the
elog(ERROR, ...) call with an ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE), errmsg(...))) (preserving the existing
message and MySubscription->name and spock_apply_idle_timeout) so the error is
tagged as a connection failure (similar to errors from PQstatus(applyconn)
checks) and will be handled by the connection-reconnect branch in PG_CATCH.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 65c13976-df05-4dda-b8c8-f4569c1793f2
📒 Files selected for processing (3)
src/spock_apply.ctests/tap/t/101_clean_exit_on_conn_loss.pltests/tap/t/102_no_origin_leak_on_disconnect.pl
| # Drive a series of small commits so the apply worker has plenty of in-flight | ||
| # state when we kill the walsender. Each statement in psql is its own | ||
| # implicit transaction, which is what we want here. | ||
| my $batch = ''; | ||
| for my $i (1 .. 100) { | ||
| $batch .= "INSERT INTO test_clean_exit (val) VALUES ('pre_$i');\n"; | ||
| } | ||
| psql_or_bail(1, $batch); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Inspect SpockTest::psql_or_bail implementation =="
fd -i '^SpockTest\.pm$' . -x sh -c '
echo "--- {} ---"
rg -n -C4 "sub psql_or_bail|\\-c|system_maybe|open3|IPC::Run" "{}"
'
echo
echo "== Show the batched call sites relying on commit boundaries =="
rg -n -C2 'psql_or_bail\(1,\s*\$batch|psql_or_bail\(1,\s*\$batch2' tests/tap/t/101_clean_exit_on_conn_loss.pl tests/tap/t/102_no_origin_leak_on_disconnect.plRepository: pgEdge/spock
Length of output: 6418
This batch executes all 100 inserts within a single transaction, not 100 separate commits.
When psql_or_bail(1, $batch) is called, it passes the concatenated batch string to psql -c, which executes all semicolon-separated statements within one implicit transaction. The test's comment claims "each statement in psql is its own implicit transaction," but that only applies when statements are submitted separately. As written, all 100 inserts are committed together in a single transaction, defeating the test's intent to exercise the apply worker's in-flight state across many transaction boundaries.
To fix, either call psql_or_bail once per insert, or wrap each insert in an explicit BEGIN; ... COMMIT; block within the batch string.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/tap/t/101_clean_exit_on_conn_loss.pl` around lines 81 - 88, The test
builds $batch and calls psql_or_bail(1, $batch) which currently sends all 100
INSERTs in one call and thus executes them in a single transaction; change the
test so each INSERT is submitted as its own commit—either by invoking
psql_or_bail(1, ...) for each INSERT in the loop (calling psql_or_bail inside
the for loop) or by modifying $batch to wrap each INSERT in its own explicit
BEGIN; ... COMMIT; block—so that test_clean_exit exercises 100 separate
transactions as intended.
When the upstream TCP connection dies (firewall reload, walsender RST, keepalive timeout) the apply worker had two interacting problems:
fd = PQsocket(applyconn) was captured once before stream_replay: and reused by every WaitLatchOrSocket() call. After libpq closed the socket the cached fd was stale (possibly reused by the OS), producing epoll_ctl(EINVAL) on Linux when WaitLatchOrSocket re-armed it. The follow-on exception was caught with use_try_block=true and PG_RE_THROW'd, producing a noisy two-error cascade per disconnect.
PG_CATCH treated every error the same way -- AbortOutOfAnyTransaction then goto stream_replay. For a connection-class error this is meaningless: the upstream is gone, replay cannot succeed. Worse, replorigin_session_origin_lsn was still set (by handle_begin) to the in-flight remote transaction's final_lsn, and RecordTransactionAbort advances the replication origin to that stale LSN unless the session origin variable is cleared first. spock_apply_main's on-exit flush_progress_if_needed(true) commit has the same gate. Either path can silently advance origin past an aborted remote transaction so it gets skipped on reconnect -- the loss mode PG core defends against in start_apply (worker.c) with replorigin_reset.
Three changes:
Tag the disconnect/timeout/EOF elog(ERROR) sites with ERRCODE_CONNECTION_FAILURE via ereport so the discriminator below can classify them deterministically.
Refresh fd = PQsocket(applyconn) at the top of every wait-loop iteration, after the previous iteration's PQconsumeInput has had a chance to update libpq state. If the connection has gone bad, raise a tagged ERROR rather than passing a closed fd to WaitLatchOrSocket. Mirrors the libpqrcv_receive pattern in PG core.
In apply_work's PG_CATCH, detect connection-class errors (sqlerrcode CONNECTION_FAILURE / CONNECTION_EXCEPTION / CONNECTION_DOES_NOT_EXIST / ADMIN_SHUTDOWN, or PQstatus == CONNECTION_BAD as a libpq fallback) and PG_RE_THROW. apply_work is the only PG_TRY in this call stack, so the rethrow propagates to the bgworker error handler. Origin is zeroed by spock_apply_worker_shmem_exit (before_shmem_exit) before ShutdownPostgres calls AbortOutOfAnyTransaction, so neither the abort nor the bypassed on-exit flush_progress can advance origin to the stale LSN. Apply-side errors (constraint violations etc.) do NOT take this branch -- they continue through the existing exception_log replay path.
Documents the before_shmem_exit LIFO ordering dependency in spock_apply_worker_shmem_exit so the invariant survives future edits.
Adds two TAP tests (not in the schedule, run manually):
(cherry picked from commit b75cb4f)