Conversation
To reduce overall load of the query we are now peeking before pulling so we avoid running the more expensive query and get faster feedback
14df369 to
d66e39c
Compare
📝 WalkthroughWalkthroughThis pull request introduces a new PostgreSQL CDC feature that enables peeking into replication slots to detect empty changes without advancing the log sequence number. It includes a new migration creating the Changes
Sequence DiagramsequenceDiagram
participant Poller as Replication Poller
participant PG as PostgreSQL
participant Sub as Subscription Handler
loop Poll Cycle
Poller->>Poller: Start poll timer
Poller->>PG: Call realtime.list_changes()
alt Changes Available
PG-->>Poller: Return change rows
Poller->>Sub: Process change rows
Poller->>Poller: Reset backoff
else No Changes (Peek-Empty)
PG-->>Poller: Return peek_empty sentinel
Poller->>Poller: Detect peek_empty condition
Poller->>Poller: Reset backoff (no slot advance)
Poller->>Poller: Reschedule immediately
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@test/integration/replications_test.exs`:
- Around line 43-75: The test's strict timing assertion for peek short-circuit
(assert time < 50_000) is flaky; update the assertion in the "prepare, poll,
consume full cycle" test that measures :timer.tc around
Replications.list_changes to use a relaxed threshold (e.g., assert time <
200_000) or remove the wall-clock assert entirely and instead assert functional
behavior only (presence of sentinel and its peek_empty value via
Enum.at(sentinel, 8)), referencing the measured variable time and the call to
Replications.list_changes so you change the correct assertion.
| test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do | ||
| {time, result} = | ||
| :timer.tc(fn -> | ||
| Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) | ||
| end) | ||
|
|
||
| assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result | ||
| assert Enum.at(sentinel, 8) == ["peek_empty"] | ||
| assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms" | ||
|
|
||
| Process.sleep(@poll_interval) | ||
|
|
||
| Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", []) | ||
| Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", []) | ||
| Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", []) | ||
|
|
||
| Process.sleep(@poll_interval) | ||
|
|
||
| {:ok, %Postgrex.Result{num_rows: 3, rows: rows}} = | ||
| Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) | ||
|
|
||
| [row | _] = rows | ||
| assert Enum.at(row, 0) == "INSERT" | ||
| assert Enum.at(row, 1) == "public" | ||
| assert Enum.at(row, 2) == "test" | ||
|
|
||
| Process.sleep(@poll_interval) | ||
|
|
||
| {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = | ||
| Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) | ||
|
|
||
| assert Enum.at(sentinel, 8) == ["peek_empty"] | ||
| end |
There was a problem hiding this comment.
Avoid a hard 50ms timing assertion to prevent flaky CI.
The strict bound is environment-sensitive and can intermittently fail even when behavior is correct. Consider relaxing the threshold or asserting functional behavior only.
🧪 Suggested relaxation
- assert time < 50_000, "Expected peek short-circuit under 50ms
8000
, took #{div(time, 1000)}ms"
+ assert time < 500_000, "Expected peek short-circuit under 500ms, took #{div(time, 1000)}ms"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do | |
| {time, result} = | |
| :timer.tc(fn -> | |
| Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) | |
| end) | |
| assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result | |
| assert Enum.at(sentinel, 8) == ["peek_empty"] | |
| assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms" | |
| Process.sleep(@poll_interval) | |
| Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", []) | |
| Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", []) | |
| Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", []) | |
| Process.sleep(@poll_interval) | |
| {:ok, %Postgrex.Result{num_rows: 3, rows: rows}} = | |
| Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) | |
| [row | _] = rows | |
| assert Enum.at(row, 0) == "INSERT" | |
| assert Enum.at(row, 1) == "public" | |
| assert Enum.at(row, 2) == "test" | |
| Process.sleep(@poll_interval) | |
| {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = | |
| Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) | |
| assert Enum.at(sentinel, 8) == ["peek_empty"] | |
| end | |
| test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do | |
| {time, result} = | |
| :timer.tc(fn -> | |
| Replications.list_changes(conn, slot_name, `@publication`, 100, 1_048_576) | |
| end) | |
| assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result | |
| assert Enum.at(sentinel, 8) == ["peek_empty"] | |
| assert time < 500_000, "Expected peek short-circuit under 500ms, took #{div(time, 1000)}ms" | |
| Process.sleep(`@poll_interval`) | |
| Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", []) | |
| Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", []) | |
| Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", []) | |
| Process.sleep(`@poll_interval`) | |
| {:ok, %Postgrex.Result{num_rows: 3, rows: rows}} = | |
| Replications.list_changes(conn, slot_name, `@publication`, 100, 1_048_576) | |
| [row | _] = rows | |
| assert Enum.at(row, 0) == "INSERT" | |
| assert Enum.at(row, 1) == "public" | |
| assert Enum.at(row, 2) == "test" | |
| Process.sleep(`@poll_interval`) | |
| {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = | |
| Replications.list_changes(conn, slot_name, `@publication`, 100, 1_048_576) | |
| assert Enum.at(sentinel, 8) == ["peek_empty"] | |
| end |
🤖 Prompt for AI Agents
In `@test/integration/replications_test.exs` around lines 43 - 75, The test's
strict timing assertion for peek short-circuit (assert time < 50_000) is flaky;
update the assertion in the "prepare, poll, consume full cycle" test that
measures :timer.tc around Replications.list_changes to use a relaxed threshold
(e.g., assert time < 200_000) or remove the wall-clock assert entirely and
instead assert functional behavior only (presence of sentinel and its peek_empty
value via Enum.at(sentinel, 8)), referencing the measured variable time and the
call to Replications.list_changes so you change the correct assertion.
What kind of change does this PR introduce?
To reduce overall load of the query we are now peeking before pulling so we avoid running the more expensive query and get faster feedback
Summary by CodeRabbit
New Features
Tests