8000 feat: implement peek on walrus query by filipecabaco · Pull Request #1707 · supabase/realtime · GitHub
[go: up one dir, main page]

Skip to content

feat: implement peek on walrus query#1707

Open
filipecabaco wants to merge 2 commits intomainfrom
feat/peek-before-change
Open

feat: implement peek on walrus query#1707
filipecabaco wants to merge 2 commits intomainfrom
feat/peek-before-change

Conversation

@filipecabaco
Copy link
Member
@filipecabaco filipecabaco commented Feb 9, 2026

What kind of change does this PR introduce?

To reduce overall load of the query we are now peeking before pulling so we avoid running the more expensive query and get faster feedback

Summary by CodeRabbit

  • New Features

    • Added PostgreSQL logical replication polling capability with optimized detection of empty changes
    • Enhanced replication polling behavior with immediate rescheduling and backoff reset for faster response times
    • Implemented Row-Level Security (RLS) support for replication change tracking
  • Tests

    • Added integration tests validating logical replication functionality, RLS filtering, and sentinel response behavior

@filipecabaco filipecabaco requested a review from a team February 9, 2026 23:58
To reduce overall load of the query we are now peeking before pulling so we avoid running the more expensive query and get faster feedback
@filipecabaco filipecabaco force-pushed the feat/peek-before-change branch from 14df369 to d66e39c Compare February 13, 2026 18:05
@coderabbitai
Copy link
coderabbitai bot commented Feb 13, 2026
📝 Walkthrough

Walkthrough

This pull request introduces a new PostgreSQL CDC feature that enables peeking into replication slots to detect empty changes without advancing the log sequence number. It includes a new migration creating the list_changes function, enhanced poller logic to handle peek-empty sentinel results, and integration tests validating the replication lifecycle with RLS considerations.

Changes

Cohort / File(s) Summary
Migration Infrastructure
lib/realtime/tenants/migrations.ex, lib/realtime/tenants/repo/migrations/20260210000000_create_peek_and_list_changes_function.ex
Introduces migration registration and a new Postgres function realtime.list_changes that retrieves logical replication changes, applies RLS filters, and returns a sentinel row when no changes exist (peek\_empty condition).
Replication Poller Enhancement
lib/extensions/postgres_cdc_rls/replication_poller.ex
Adds special-case handling for peek\_empty sentinel results, resetting backoff and rescheduling polls immediately without advancing the replication slot position.
Integration Tests
test/integration/replications_test.exs
New comprehensive test suite covering replication lifecycle (prepare, list\_changes, polling), change validation, RLS filtering behavior, and sentinel detection on empty polls.

Sequence Diagram

sequenceDiagram
    participant Poller as Replication Poller
    participant PG as PostgreSQL
    participant Sub as Subscription Handler
    
    loop Poll Cycle
        Poller->>Poller: Start poll timer
        Poller->>PG: Call realtime.list_changes()
        
        alt Changes Available
            PG-->>Poller: Return change rows
            Poller->>Sub: Process change rows
            Poller->>Poller: Reset backoff
        else No Changes (Peek-Empty)
            PG-->>Poller: Return peek_empty sentinel
            Poller->>Poller: Detect peek_empty condition
            Poller->>Poller: Reset backoff (no slot advance)
            Poller->>Poller: Reschedule immediately
        end
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: implement peek on walrus query' directly describes the main change: introducing a peek optimization step before the walrus query pull to reduce load and improve feedback speed, which aligns with the PR objectives and file changes.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/peek-before-change

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@test/integration/replications_test.exs`:
- Around line 43-75: The test's strict timing assertion for peek short-circuit
(assert time < 50_000) is flaky; update the assertion in the "prepare, poll,
consume full cycle" test that measures :timer.tc around
Replications.list_changes to use a relaxed threshold (e.g., assert time <
200_000) or remove the wall-clock assert entirely and instead assert functional
behavior only (presence of sentinel and its peek_empty value via
Enum.at(sentinel, 8)), referencing the measured variable time and the call to
Replications.list_changes so you change the correct assertion.

Comment on lines +43 to +75
test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do
{time, result} =
:timer.tc(fn ->
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
end)

assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result
assert Enum.at(sentinel, 8) == ["peek_empty"]
assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms"

Process.sleep(@poll_interval)

Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", [])

Process.sleep(@poll_interval)

{:ok, %Postgrex.Result{num_rows: 3, rows: rows}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

[row | _] = rows
assert Enum.at(row, 0) == "INSERT"
assert Enum.at(row, 1) == "public"
assert Enum.at(row, 2) == "test"

Process.sleep(@poll_interval)

{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

assert Enum.at(sentinel, 8) == ["peek_empty"]
end
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid a hard 50ms timing assertion to prevent flaky CI.

The strict bound is environment-sensitive and can intermittently fail even when behavior is correct. Consider relaxing the threshold or asserting functional behavior only.

🧪 Suggested relaxation
-      assert time < 50_000, "Expected peek short-circuit under 50ms
8000
, took #{div(time, 1000)}ms"
+      assert time < 500_000, "Expected peek short-circuit under 500ms, took #{div(time, 1000)}ms"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do
{time, result} =
:timer.tc(fn ->
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
end)
assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result
assert Enum.at(sentinel, 8) == ["peek_empty"]
assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms"
Process.sleep(@poll_interval)
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", [])
Process.sleep(@poll_interval)
{:ok, %Postgrex.Result{num_rows: 3, rows: rows}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
[row | _] = rows
assert Enum.at(row, 0) == "INSERT"
assert Enum.at(row, 1) == "public"
assert Enum.at(row, 2) == "test"
Process.sleep(@poll_interval)
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
assert Enum.at(sentinel, 8) == ["peek_empty"]
end
test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do
{time, result} =
:timer.tc(fn ->
Replications.list_changes(conn, slot_name, `@publication`, 100, 1_048_576)
end)
assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result
assert Enum.at(sentinel, 8) == ["peek_empty"]
assert time < 500_000, "Expected peek short-circuit under 500ms, took #{div(time, 1000)}ms"
Process.sleep(`@poll_interval`)
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", [])
Process.sleep(`@poll_interval`)
{:ok, %Postgrex.Result{num_rows: 3, rows: rows}} =
Replications.list_changes(conn, slot_name, `@publication`, 100, 1_048_576)
[row | _] = rows
assert Enum.at(row, 0) == "INSERT"
assert Enum.at(row, 1) == "public"
assert Enum.at(row, 2) == "test"
Process.sleep(`@poll_interval`)
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, `@publication`, 100, 1_048_576)
assert Enum.at(sentinel, 8) == ["peek_empty"]
end
🤖 Prompt for AI Agents
In `@test/integration/replications_test.exs` around lines 43 - 75, The test's
strict timing assertion for peek short-circuit (assert time < 50_000) is flaky;
update the assertion in the "prepare, poll, consume full cycle" test that
measures :timer.tc around Replications.list_changes to use a relaxed threshold
(e.g., assert time < 200_000) or remove the wall-clock assert entirely and
instead assert functional behavior only (presence of sentinel and its peek_empty
value via Enum.at(sentinel, 8)), referencing the measured variable time and the
call to Replications.list_changes so you change the correct assertion.

@coveralls
Copy link

Coverage Status

coverage: 90.29% (+0.2%) from 90.085%
when pulling d66e39c on feat/peek-before-change
into 2ff8182 on main.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

0