Skip to content

apply: handle upstream connection loss without origin-advance leak#466

Draft
mason-sharp wants to merge 1 commit intomainfrom
fix/SPOC-546/connection-handling-main
Draft

apply: handle upstream connection loss without origin-advance leak#466
mason-sharp wants to merge 1 commit intomainfrom
fix/SPOC-546/connection-handling-main

Conversation

@mason-sharp
Copy link
Copy Markdown
Member

When the upstream TCP connection dies (firewall reload, walsender RST, keepalive timeout) the apply worker had two interacting problems:

  1. fd = PQsocket(applyconn) was captured once before stream_replay: and reused by every WaitLatchOrSocket() call. After libpq closed the socket the cached fd was stale (possibly reused by the OS), producing epoll_ctl(EINVAL) on Linux when WaitLatchOrSocket re-armed it. The follow-on exception was caught with use_try_block=true and PG_RE_THROW'd, producing a noisy two-error cascade per disconnect.

  2. PG_CATCH treated every error the same way -- AbortOutOfAnyTransaction then goto stream_replay. For a connection-class error this is meaningless: the upstream is gone, replay cannot succeed. Worse, replorigin_session_origin_lsn was still set (by handle_begin) to the in-flight remote transaction's final_lsn, and RecordTransactionAbort advances the replication origin to that stale LSN unless the session origin variable is cleared first. spock_apply_main's on-exit flush_progress_if_needed(true) commit has the same gate. Either path can silently advance origin past an aborted remote transaction so it gets skipped on reconnect -- the loss mode PG core defends against in start_apply (worker.c) with replorigin_reset.

Three changes:

  • Tag the disconnect/timeout/EOF elog(ERROR) sites with ERRCODE_CONNECTION_FAILURE via ereport so the discriminator below can classify them deterministically.

  • Refresh fd = PQsocket(applyconn) at the top of every wait-loop iteration, after the previous iteration's PQconsumeInput has had a chance to update libpq state. If the connection has gone bad, raise a tagged ERROR rather than passing a closed fd to WaitLatchOrSocket. Mirrors the libpqrcv_receive pattern in PG core.

  • In apply_work's PG_CATCH, detect connection-class errors (sqlerrcode CONNECTION_FAILURE / CONNECTION_EXCEPTION / CONNECTION_DOES_NOT_EXIST / ADMIN_SHUTDOWN, or PQstatus == CONNECTION_BAD as a libpq fallback) and PG_RE_THROW. apply_work is the only PG_TRY in this call stack, so the rethrow propagates to the bgworker error handler. Origin is zeroed by spock_apply_worker_shmem_exit (before_shmem_exit) before ShutdownPostgres calls AbortOutOfAnyTransaction, so neither the abort nor the bypassed on-exit flush_progress can advance origin to the stale LSN. Apply-side errors (constraint violations etc.) do NOT take this branch -- they continue through the existing exception_log replay path.

Documents the before_shmem_exit LIFO ordering dependency in spock_apply_worker_shmem_exit so the invariant survives future edits.

Adds two TAP tests (not in the schedule, run manually):

  • 101_clean_exit_on_conn_loss.pl asserts the new "exiting via rethrow" LOG line is emitted. Fails without the fix.
  • 102_no_origin_leak_on_disconnect.pl exercises the apply_delay / SIGKILL path and asserts no assertion failure, no SIGABRT, and no row loss across the disconnect.

(cherry picked from commit b75cb4f)

When the upstream TCP connection dies (firewall reload, walsender RST,
keepalive timeout) the apply worker had two interacting problems:

1. fd = PQsocket(applyconn) was captured once before stream_replay: and
   reused by every WaitLatchOrSocket() call.  After libpq closed the
   socket the cached fd was stale (possibly reused by the OS), producing
   epoll_ctl(EINVAL) on Linux when WaitLatchOrSocket re-armed it.  The
   follow-on exception was caught with use_try_block=true and
   PG_RE_THROW'd, producing a noisy two-error cascade per disconnect.

2. PG_CATCH treated every error the same way -- AbortOutOfAnyTransaction
   then goto stream_replay.  For a connection-class error this is
   meaningless: the upstream is gone, replay cannot succeed.  Worse,
   replorigin_session_origin_lsn was still set (by handle_begin) to the
   in-flight remote transaction's final_lsn, and RecordTransactionAbort
   advances the replication origin to that stale LSN unless the session
   origin variable is cleared first.  spock_apply_main's on-exit
   flush_progress_if_needed(true) commit has the same gate.  Either path
   can silently advance origin past an aborted remote transaction so it
   gets skipped on reconnect -- the loss mode PG core defends against in
   start_apply (worker.c) with replorigin_reset.

Three changes:

* Tag the disconnect/timeout/EOF elog(ERROR) sites with
  ERRCODE_CONNECTION_FAILURE via ereport so the discriminator below can
  classify them deterministically.

* Refresh fd = PQsocket(applyconn) at the top of every wait-loop
  iteration, after the previous iteration's PQconsumeInput has had a
  chance to update libpq state.  If the connection has gone bad, raise
  a tagged ERROR rather than passing a closed fd to WaitLatchOrSocket.
  Mirrors the libpqrcv_receive pattern in PG core.

* In apply_work's PG_CATCH, detect connection-class errors (sqlerrcode
  CONNECTION_FAILURE / CONNECTION_EXCEPTION / CONNECTION_DOES_NOT_EXIST
  / ADMIN_SHUTDOWN, or PQstatus == CONNECTION_BAD as a libpq fallback)
  and PG_RE_THROW.  apply_work is the only PG_TRY in this call stack,
  so the rethrow propagates to the bgworker error handler.  Origin is
  zeroed by spock_apply_worker_shmem_exit (before_shmem_exit) before
  ShutdownPostgres calls AbortOutOfAnyTransaction, so neither the abort
  nor the bypassed on-exit flush_progress can advance origin to the
  stale LSN.  Apply-side errors (constraint violations etc.) do NOT
  take this branch -- they continue through the existing exception_log
  replay path.

Documents the before_shmem_exit LIFO ordering dependency in
spock_apply_worker_shmem_exit so the invariant survives future edits.

Adds two TAP tests (not in the schedule, run manually):
  - 101_clean_exit_on_conn_loss.pl asserts the new "exiting via
    rethrow" LOG line is emitted.  Fails without the fix.
  - 102_no_origin_leak_on_disconnect.pl exercises the apply_delay /
    SIGKILL path and asserts no assertion failure, no SIGABRT, and
    no row loss across the disconnect.

(cherry picked from commit b75cb4f)
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 10, 2026

Review Change Stack

📝 Walkthrough

Walkthrough

This PR refines apply-side error handling to safely detect and exit on connection failures. Socket liveness is re-validated per loop iteration, COPY errors use tagged error codes, and connection-class exceptions trigger immediate rethrow to avoid stale state replay. Two TAP tests validate the rethrow path engages on connection loss and that origin state is safely reset before reconnect.

Changes

Apply-side connection error handling and origin state

Layer / File(s) Summary
Shutdown ordering and origin state
src/spock_apply.c
Documentation strengthened in spock_apply_worker_shmem_exit() to enforce replication session/origin reset before ShutdownPostgres() to prevent stale origin advancement.
Socket liveness validation
src/spock_apply.c
Loop now refreshes socket descriptor from PQsocket() at each iteration and raises tagged ERRCODE_CONNECTION_FAILURE if connection is bad or socket invalid.
COPY stream error handling
src/spock_apply.c
COPY read failures switch from elog(ERROR) to tagged ereport(ERROR) with ERRCODE_CONNECTION_FAILURE and PQerrorMessage() context.
Connection-class error detection and rethrow
src/spock_apply.c
Outer PG_CATCH detects connection-class errors via edata->sqlerrcode and PQstatus()==CONNECTION_BAD, logs, and rethrows to bypass replay path.
Connection loss exit validation
tests/tap/t/101_clean_exit_on_conn_loss.pl
TAP test confirms rethrow discriminator message appears on forced walsender disconnect, exception cascade is absent, and subscriber recovers cleanly with row alignment intact.
Origin state safety on delayed apply
tests/tap/t/102_no_origin_leak_on_disconnect.pl
TAP test verifies origin state is not leaked when apply worker disconnects mid-apply_delay; confirms rethrow log, no assertion failure, no SIGABRT, and inflight rows survive.

Poem

🐰 With sockets checked at every hop,
And connection losses caught before a flop,
The origin state stays clean and bright,
No stale replays in the night—
Safe reconnect sets things right!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: fixing connection loss handling in the apply worker to prevent origin-advance leaks.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, explaining the problem, the solution, and the tests added.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/SPOC-546/connection-handling-main

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@mason-sharp mason-sharp marked this pull request as draft May 10, 2026 18:16
@mason-sharp mason-sharp requested a review from rasifr May 10, 2026 18:16
@codacy-production
Copy link
Copy Markdown

Up to standards ✅

🟢 Issues 0 issues

Results:
0 new issues

View in Codacy

🟢 Metrics 0 duplication

Metric Results
Duplication 0

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/spock_apply.c (1)

3050-3062: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Tag the idle-timeout path as a connection failure.

This branch still throws a plain elog(ERROR). If the sender is hung but the TCP session stays established, PQstatus(applyconn) can remain CONNECTION_OK, so the outer PG_CATCH will miss the new connection-class branch and can still enter apply-side replay/exception handling.

Suggested fix
-					elog(ERROR, "SPOCK %s: no data received for %d seconds, "
-						 "reconnecting (spock.apply_idle_timeout)",
-						 MySubscription->name, spock_apply_idle_timeout);
+					ereport(ERROR,
+							(errcode(ERRCODE_CONNECTION_FAILURE),
+							 errmsg("SPOCK %s: no data received for %d seconds, "
+									"reconnecting (spock.apply_idle_timeout)",
+									MySubscription->name,
+									spock_apply_idle_timeout)));
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/spock_apply.c` around lines 3050 - 3062, The idle-timeout branch
currently raises a plain elog(ERROR) which isn't classified as a connection
failure; change the error to a connection-class error so outer PG_CATCH will
treat it as a connection failure. In the block that checks (rc & WL_TIMEOUT &&
spock_apply_idle_timeout > 0) (using last_receive_timestamp,
GetCurrentTimestamp, MySpockWorker->worker_status and
SPOCK_WORKER_STATUS_STOPPED), replace the elog(ERROR, ...) call with an
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg(...))) (preserving
the existing message and MySubscription->name and spock_apply_idle_timeout) so
the error is tagged as a connection failure (similar to errors from
PQstatus(applyconn) checks) and will be handled by the connection-reconnect
branch in PG_CATCH.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/tap/t/101_clean_exit_on_conn_loss.pl`:
- Around line 81-88: The test builds $batch and calls psql_or_bail(1, $batch)
which currently sends all 100 INSERTs in one call and thus executes them in a
single transaction; change the test so each INSERT is submitted as its own
commit—either by invoking psql_or_bail(1, ...) for each INSERT in the loop
(calling psql_or_bail inside the for loop) or by modifying $batch to wrap each
INSERT in its own explicit BEGIN; ... COMMIT; block—so that test_clean_exit
exercises 100 separate transactions as intended.

---

Outside diff comments:
In `@src/spock_apply.c`:
- Around line 3050-3062: The idle-timeout branch currently raises a plain
elog(ERROR) which isn't classified as a connection failure; change the error to
a connection-class error so outer PG_CATCH will treat it as a connection
failure. In the block that checks (rc & WL_TIMEOUT && spock_apply_idle_timeout >
0) (using last_receive_timestamp, GetCurrentTimestamp,
MySpockWorker->worker_status and SPOCK_WORKER_STATUS_STOPPED), replace the
elog(ERROR, ...) call with an ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE), errmsg(...))) (preserving the existing
message and MySubscription->name and spock_apply_idle_timeout) so the error is
tagged as a connection failure (similar to errors from PQstatus(applyconn)
checks) and will be handled by the connection-reconnect branch in PG_CATCH.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 65c13976-df05-4dda-b8c8-f4569c1793f2

📥 Commits

Reviewing files that changed from the base of the PR and between 8ba18fe and eda3f0d.

📒 Files selected for processing (3)
  • src/spock_apply.c
  • tests/tap/t/101_clean_exit_on_conn_loss.pl
  • tests/tap/t/102_no_origin_leak_on_disconnect.pl

Comment on lines +81 to +88
# Drive a series of small commits so the apply worker has plenty of in-flight
# state when we kill the walsender. Each statement in psql is its own
# implicit transaction, which is what we want here.
my $batch = '';
for my $i (1 .. 100) {
$batch .= "INSERT INTO test_clean_exit (val) VALUES ('pre_$i');\n";
}
psql_or_bail(1, $batch);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Inspect SpockTest::psql_or_bail implementation =="
fd -i '^SpockTest\.pm$' . -x sh -c '
  echo "--- {} ---"
  rg -n -C4 "sub psql_or_bail|\\-c|system_maybe|open3|IPC::Run" "{}"
'

echo
echo "== Show the batched call sites relying on commit boundaries =="
rg -n -C2 'psql_or_bail\(1,\s*\$batch|psql_or_bail\(1,\s*\$batch2' tests/tap/t/101_clean_exit_on_conn_loss.pl tests/tap/t/102_no_origin_leak_on_disconnect.pl

Repository: pgEdge/spock

Length of output: 6418


This batch executes all 100 inserts within a single transaction, not 100 separate commits.

When psql_or_bail(1, $batch) is called, it passes the concatenated batch string to psql -c, which executes all semicolon-separated statements within one implicit transaction. The test's comment claims "each statement in psql is its own implicit transaction," but that only applies when statements are submitted separately. As written, all 100 inserts are committed together in a single transaction, defeating the test's intent to exercise the apply worker's in-flight state across many transaction boundaries.

To fix, either call psql_or_bail once per insert, or wrap each insert in an explicit BEGIN; ... COMMIT; block within the batch string.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/tap/t/101_clean_exit_on_conn_loss.pl` around lines 81 - 88, The test
builds $batch and calls psql_or_bail(1, $batch) which currently sends all 100
INSERTs in one call and thus executes them in a single transaction; change the
test so each INSERT is submitted as its own commit—either by invoking
psql_or_bail(1, ...) for each INSERT in the loop (calling psql_or_bail inside
the for loop) or by modifying $batch to wrap each INSERT in its own explicit
BEGIN; ... COMMIT; block—so that test_clean_exit exercises 100 separate
transactions as intended.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant