8000 Logical decoding of TRUNCATE · postgres/postgres@5dfd1e5 · GitHub
[go: up one dir, main page]

Skip to content

Commit 5dfd1e5

Browse files
committed
Logical decoding of TRUNCATE
Add a new WAL record type for TRUNCATE, which is only used when wal_level >= logical. (For physical replication, TRUNCATE is already replicated via SMGR records.) Add new callback for logical decoding output plugins to receive TRUNCATE actions. Author: Simon Riggs <simon@2ndquadrant.com> Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it> Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com> Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
1 parent b508a56 commit 5dfd1e5

File tree

15 files changed

+414
-13
lines changed
  • commands
  • replication/logical
  • include
  • 15 files changed

    +414
    -13
    lines changed

    contrib/test_decoding/Makefile

    Lines changed: 1 addition & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -39,7 +39,7 @@ submake-test_decoding:
    3939

    4040
    REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
    4141
    decoding_into_rel binary prepared replorigin time messages \
    42-
    spill slot
    42+
    spill slot truncate
    4343

    4444
    regresscheck: | submake-regress submake-test_decoding temp-install
    4545
    $(pg_regress_check) \
    Lines changed: 25 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,25 @@
    1+
    SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    2+
    ?column?
    3+
    ----------
    4+
    init
    5+
    (1 row)
    6+
    7+
    CREATE TABLE tab1 (id serial unique, data int);
    8+
    CREATE TABLE tab2 (a int primary key, b int);
    9+
    TRUNCATE tab1;
    10+
    TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
    11+
    TRUNCATE tab1, tab2;
    12+
    SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
    13+
    data
    14+
    ------------------------------------------------------
    15+
    BEGIN
    16+
    table public.tab1: TRUNCATE: (no-flags)
    17+
    COMMIT
    18+
    BEGIN
    19+
    table public.tab1: TRUNCATE: restart_seqs cascade
    20+
    COMMIT
    21+
    BEGIN
    22+
    table public.tab1, public.tab2: TRUNCATE: (no-flags)
    23+
    COMMIT
    24+
    (9 rows)
    25+
    Lines changed: 10 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,10 @@
    1+
    SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    2+
    3+
    CREATE TABLE tab1 (id serial unique, data int);
    4+
    CREATE TABLE tab2 (a int primary key, b int);
    5+
    6+
    TRUNCATE tab1;
    7+
    TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
    8+
    TRUNCATE tab1, tab2;
    9+
    10+
    SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');

    contrib/test_decoding/test_decoding.c

    Lines changed: 58 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -52,6 +52,10 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
    5252
    static void pg_decode_change(LogicalDecodingContext *ctx,
    5353
    ReorderBufferTXN *txn, Relation rel,
    5454
    ReorderBufferChange *change);
    55+
    static void pg_decode_truncate(LogicalDecodingContext *ctx,
    56+
    ReorderBufferTXN *txn,
    57+
    int nrelations, Relation relations[],
    58+
    ReorderBufferChange *change);
    5559
    static bool pg_decode_filter(LogicalDecodingContext *ctx,
    5660
    RepOriginId origin_id);
    5761
    static void pg_decode_message(LogicalDecodingContext *ctx,
    @@ -74,6 +78,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
    7478
    cb->startup_cb = pg_decode_startup;
    7579
    cb->begin_cb = pg_decode_begin_txn;
    7680
    cb->change_cb = pg_decode_change;
    81+
    cb->truncate_cb = pg_decode_truncate;
    7782
    cb->commit_cb = pg_decode_commit_txn;
    7883
    cb->filter_by_origin_cb = pg_decode_filter;
    7984
    cb->shutdown_cb = pg_decode_shutdown;
    @@ -480,6 +485,59 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    480485
    OutputPluginWrite(ctx, true);
    481486
    }
    482487

    488+
    static void
    489+
    pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    490+
    int nrelations, Relation relations[], ReorderBufferChange *change)
    491+
    {
    492+
    TestDecodingData *data;
    493+
    MemoryContext old;
    494+
    int i;
    495+
    496+
    data = ctx->output_plugin_private;
    497+
    498+
    /* output BEGIN if we haven't yet */
    499+
    if (data->skip_empty_xacts && !data->xact_wrote_changes)
    500+
    {
    501+
    pg_output_begin(ctx, data, txn, false);
    502+
    }
    503+
    data->xact_wrote_changes = true;
    504+
    505+
    /* Avoid leaking memory by using and resetting our own context */
    506+
    old = MemoryContextSwitchTo(data->context);
    507+
    508+
    OutputPluginPrepareWrite(ctx, true);
    509+
    510+
    appendStringInfoString(ctx->out, "table ");
    511+
    512+
    for (i = 0; i < nrelations; i++)
    513+
    {
    514+
    if (i > 0)
    515+
    appendStringInfoString(ctx->out, ", ");
    516+
    517+
    appendStringInfoString(ctx->out,
    518+
    quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
    519+
    NameStr(relations[i]->rd_rel->relname)));
    520+
    }
    521+
    522+
    appendStringInfoString(ctx->out, ": TRUNCATE:");
    523+
    524+
    if (change->data.truncate.restart_seqs
    525+
    || change->data.truncate.cascade)
    526+
    {
    527+
    if (change->data.truncate.restart_seqs)
    528+
    appendStringInfo(ctx->out, " restart_seqs");
    529+
    if (change->data.truncate.cascade)
    530+
    appendStringInfo(ctx->out, " cascade");
    531+
    }
    532+
    else
    533+
    appendStringInfoString(ctx->out, " (no-flags)");
    534+
    535+
    MemoryContextSwitchTo(old);
    536+
    MemoryContextReset(data->context);
    537+
    538+
    OutputPluginWrite(ctx, true);
    539+
    }
    540+
    483541
    static void
    484542
    pg_decode_message(LogicalDecodingContext *ctx,
    485543
    ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,

    doc/src/sgml/logicaldecoding.sgml

    Lines changed: 26 additions & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -383,6 +383,7 @@ typedef struct OutputPluginCallbacks
    383383
    LogicalDecodeStartupCB startup_cb;
    384384
    LogicalDecodeBeginCB begin_cb;
    385385
    LogicalDecodeChangeCB change_cb;
    386+
    LogicalDecodeTruncateCB truncate_cb;
    386387
    LogicalDecodeCommitCB commit_cb;
    387388
    LogicalDecodeMessageCB message_cb;
    388389
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    @@ -394,8 +395,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
    394395
    The <function>begin_cb</function>, <function>change_cb</function>
    395396
    and <function>commit_cb</function> callbacks are required,
    396397
    while <function>startup_cb</function>,
    397-
    <function>filter_by_origin_cb</function>
    398+
    <function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
    398399
    and <function>shutdown_cb</function> are optional.
    400+
    If <function>truncate_cb</function> is not set but a
    401+
    <command>TRUNCATE</command> is to be decoded, the action will be ignored.
    399402
    </para>
    400403
    </sect2>
    401404

    @@ -590,6 +593,28 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
    590593
    </note>
    591594
    </sect3>
    592595

    596+
    <sect3 id="logicaldecoding-output-plugin-truncate">
    597+
    <title>Truncate Callback</title>
    598+
    599+
    <para>
    600+
    The <function>truncate_cb</function> callback is called for a
    601+
    <command>TRUNCATE</command> command.
    602+
    <programlisting>
    603+
    typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
    604+
    ReorderBufferTXN *txn,
    605+
    int nrelations,
    606+
    Relation relations[],
    607+
    ReorderBufferChange *change);
    608+
    </programlisting>
    609+
    The parameters are analogous to the <function>change_cb</function>
    610+
    callback. However, because <command>TRUNCATE</command> actions on
    611+
    tables connected by foreign keys need to be executed together, this
    612+
    callback receives an array of relations instead of just a single one.
    613+
    See the description of the <xref linkend="sql-truncate"/> statement for
    614+
    details.
    615+
    </para>
    616+
    </sect3>
    617+
    593618
    <sect3 id="logicaldecoding-output-plugin-filter-origin">
    594619
    <title>Origin Filter Callback</title>
    595620

    src/backend/access/heap/heapam.c

    Lines changed: 7 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -9260,6 +9260,13 @@ heap_redo(XLogReaderState *record)
    92609260
    case XLOG_HEAP F438 _UPDATE:
    92619261
    heap_xlog_update(record, false);
    92629262
    break;
    9263+
    case XLOG_HEAP_TRUNCATE:
    9264+
    /*
    9265+
    * TRUNCATE is a no-op because the actions are already logged as
    9266+
    * SMGR WAL records. TRUNCATE WAL record only exists for logical
    9267+
    * decoding.
    9268+
    */
    9269+
    break;
    92639270
    case XLOG_HEAP_HOT_UPDATE:
    92649271
    heap_xlog_update(record, true);
    92659272
    break;

    src/backend/access/rmgrdesc/heapdesc.c

    Lines changed: 16 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -75,6 +75,19 @@ heap_desc(StringInfo buf, XLogReaderState *record)
    7575
    xlrec->new_offnum,
    7676
    xlrec->new_xmax);
    7777
    }
    78+
    else if (info == XLOG_HEAP_TRUNCATE)
    79+
    {
    80+
    xl_heap_truncate *xlrec = (xl_heap_truncate *) rec;
    81+
    int i;
    82+
    83+
    if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    84+
    appendStringInfo(buf, "cascade ");
    85+
    if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    86+
    appendStringInfo(buf, "restart_seqs ");
    87+
    appendStringInfo(buf, "nrelids %u relids", xlrec->nrelids);
    88+
    for (i = 0; i < xlrec->nrelids; i++)
    89+
    appendStringInfo(buf, " %u", xlrec->relids[i]);
    90+
    }
    7891
    else if (info == XLOG_HEAP_CONFIRM)
    7992
    {
    8093
    xl_heap_confirm *xlrec = (xl_heap_confirm *) rec;
    @@ -186,6 +199,9 @@ heap_identify(uint8 info)
    186199
    case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE:
    187200
    id = "HOT_UPDATE+INIT";
    188201
    break;
    202+
    case XLOG_HEAP_TRUNCATE:
    203+
    id = "TRUNCATE";
    204+
    break;
    189205
    case XLOG_HEAP_CONFIRM:
    190206
    id = "HEAP_CONFIRM";
    191207
    break;

    src/backend/commands/tablecmds.c

    Lines changed: 95 additions & 9 deletions
    Original file line numberDiff line numberDiff line change
    @@ -16,6 +16,7 @@
    1616

    1717
    #include "access/genam.h"
    1818
    #include "access/heapam.h"
    19+
    #include "access/heapam_xlog.h"
    1920
    #include "access/multixact.h"
    2021
    #include "access/reloptions.h"
    2122
    #include "access/relscan.h"
    @@ -1322,11 +1323,7 @@ ExecuteTruncate(TruncateStmt *stmt)
    13221323
    {
    13231324
    List *rels = NIL;
    13241325
    List *relids = NIL;
    1325-
    List *seq_relids = NIL;
    1326-
    EState *estate;
    1327-
    ResultRelInfo *resultRelInfos;
    1328-
    ResultRelInfo *resultRelInfo;
    1329-
    SubTransactionId mySubid;
    1326+
    List *relids_logged = NIL;
    13301327
    ListCell *cell;
    13311328

    13321329
    /*
    @@ -1350,6 +1347,9 @@ ExecuteTruncate(TruncateStmt *stmt)
    13501347
    truncate_check_rel(rel);
    13511348
    rels = lappend(rels, rel);
    13521349
    relids = lappend_oid(relids, myrelid);
    1350+
    /* Log this relation only if needed for logical decoding */
    1351+
    if (RelationIsLogicallyLogged(rel))
    1352+
    relids_logged = lappend_oid(relids_logged, myrelid);
    13531353

    13541354
    if (recurse)
    13551355
    {
    @@ -1370,6 +1370,9 @@ ExecuteTruncate(TruncateStmt *stmt)
    13701370
    truncate_check_rel(rel);
    13711371
    rels = lappend(rels, rel);
    13721372
    relids = lappend_oid(relids, childrelid);
    1373+
    /* Log this relation only if needed for logical decoding */
    1374+
    if (RelationIsLogicallyLogged(rel))
    1375+
    relids_logged = lappend_oid(relids_logged, childrelid);
    13731376
    }
    13741377
    }
    13751378
    else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    @@ -1379,15 +1382,56 @@ ExecuteTruncate(TruncateStmt *stmt)
    13791382
    errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly.")));
    13801383
    }
    13811384

    1385+
    ExecuteTruncateGuts(rels, relids, relids_logged,
    1386+
    stmt->behavior, stmt->restart_seqs);
    1387+
    1388+
    /* And close the rels */
    1389+
    foreach(cell, rels)
    1390+
    {
    1391+
    Relation rel = (Relation) lfirst(cell);
    1392+
    1393+
    heap_close(rel, NoLock);
    1394+
    }
    1395+
    }
    1396+
    1397+
    /*
    1398+
    * ExecuteTruncateGuts
    1399+
    *
    1400+
    * Internal implementation of TRUNCATE. This is called by the actual TRUNCATE
    1401+
    * command (see above) as well as replication subscribers that execute a
    1402+
    * replicated TRUNCATE action.
    1403+
    *
    1404+
    * explicit_rels is the list of Relations to truncate that the command
    1405+
    * specified. relids is the list of Oids corresponding to explicit_rels.
    1406+
    * relids_logged is the list of Oids (a subset of relids) that require
    1407+
    * WAL-logging. This is all a bit redundant, but the existing callers have
    1408+
    * this information handy in this form.
    1409+
    */
    1410+
    void
    1411+
    ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
    1412+
    DropBehavior behavior, bool restart_seqs)
    1413+
    {
    1414+
    List *rels;
    1415+
    List *seq_relids = NIL;
    1416+
    EState *estate;
    1417+
    ResultRelInfo *resultRelInfos;
    1418+
    ResultRelInfo *resultRelInfo;
    1419+
    SubTransactionId mySubid;
    1420+
    ListCell *cell;
    1421+
    Oid *logrelids;
    1422+
    13821423
    /*
    1424+
    * Open, exclusive-lock, and check all the explicitly-specified relations
    1425+
    *
    13831426
    * In CASCADE mode, suck in all referencing relations as well. This
    13841427
    * requires multiple iterations to find indirectly-dependent relations. At
    13851428
    * each phase, we need to exclusive-lock new rels before looking for their
    13861429
    * dependencies, else we might miss something. Also, we check each rel as
    13871430
    * soon as we open it, to avoid a faux pas such as holding lock for a long
    13881431
    * time on a rel we have no permissions for.
    13891432
    */
    1390-
    if (stmt->behavior == DROP_CASCADE)
    1433+
    rels = list_copy(explicit_rels);
    1434+
    if (behavior == DROP_CASCADE)
    13911435
    {
    13921436
    for (;;)
    13931437
    {
    @@ -1409,6 +1453,9 @@ ExecuteTruncate(TruncateStmt *stmt)
    14091453
    truncate_check_rel(rel);
    14101454
    rels = lappend(rels, rel);
    14111455
    relids = lappend_oid(relids, relid);
    1456+
    /* Log this relation only if needed for logical decoding */
    1457+
    if (RelationIsLogicallyLogged(rel))
    1458+
    relids_logged = lappend_oid(relids_logged, relid);
    14121459
    }
    14131460
    }
    14141461
    }
    @@ -1421,7 +1468,7 @@ ExecuteTruncate(TruncateStmt *stmt)
    14211468
    #ifdef USE_ASSERT_CHECKING
    14221469
    heap_truncate_check_FKs(rels, false);
    14231470
    #else
    1424-
    if (stmt->behavior == DROP_RESTRICT)
    1471+
    if (behavior == DROP_RESTRICT)
    14251472
    heap_truncate_check_FKs(rels, false);
    14261473
    #endif
    14271474

    @@ -1431,7 +1478,7 @@ ExecuteTruncate(TruncateStmt *stmt)
    14311478
    * We want to do this early since it's pointless to do all the truncation
    14321479
    * work only to fail on sequence permissions.
    14331480
    */
    1434-
    if (stmt->restart_seqs)
    1481+
    if (restart_seqs)
    14351482
    {
    14361483
    foreach(cell, rels)
    14371484
    {
    @@ -1586,6 +1633,41 @@ ExecuteTruncate(TruncateStmt *stmt)
    15861633
    ResetSequence(seq_relid);
    15871634
    }
    15881635

    1636+
    /*
    1637+
    * Write a WAL record to allow this set of actions to be logically decoded.
    1638+
    *
    1639+
    * Assemble an array of relids so we can write a single WAL record for the
    1640+
    * whole action.
    1641+
    */
    1642+
    if (list_length(relids_logged) > 0)
    1643+
    {
    1644+
    xl_heap_truncate xlrec;
    1645+
    int i = 0;
    1646+
    1647+
    /* should only get here if wal_level >= logical */
    1648+
    Assert(XLogLogicalInfoActive());
    1649+
    1650+
    logrelids = palloc(list_length(relids_logged) * sizeof(Oid));
    1651+
    foreach (cell, relids_logged)
    1652+
    logrelids[i++] = lfirst_oid(cell);
    1653+
    1654+
    xlrec.dbId = MyDatabaseId;
    1655+
    xlrec.nrelids = list_length(relids_logged);
    1656+
    xlrec.flags = 0;
    1657+
    if (behavior == DROP_CASCADE)
    1658+
    xlrec.flags |= XLH_TRUNCATE_CASCADE;
    1659+
    if (restart_seqs)
    1660+
    xlrec.flags |= XLH_TRUNCATE_RESTART_SEQS;
    1661+
    1662+
    XLogBeginInsert();
    1663+
    XLogRegisterData((char *) &xlrec, SizeOfHeapTruncate);
    1664+
    XLogRegisterData((char *) logrelids, list_length(relids_logged) * sizeof(Oid));
    1665+
    1666+
    XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
    1667+
    1668+
    (void) XLogInsert(RM_HEAP_ID, XLOG_HEAP_TRUNCATE);
    1669+
    }
    1670+
    15891671
    /*
    15901672
    * Process all AFTER STATEMENT TRUNCATE triggers.
    15911673
    */
    @@ -1603,7 +1685,11 @@ ExecuteTruncate(TruncateStmt *stmt)
    16031685
    /* We can clean up the EState now */
    16041686
    FreeExecutorState(estate);
    16051687

    1606-
    /* And close the rels (can't do this while EState still holds refs) */
    1688+
    /*
    1689+
    * Close any rels opened by CASCADE (can't do this while EState still
    1690+
    * holds refs)
    1691+
    */
    1692+
    rels = list_difference_ptr(rels, explicit_rels);
    16071693
    foreach(cell, rels)
    16081694
    {
    16091695
    Relation rel = (Relation) lfirst(cell);

    0 commit comments

    Comments
     (0)
    0