8000 Immediately WAL-log subtransaction and top-level XID association. · postgrespro/postgres@0bead9a · GitHub
[go: up one dir, main page]

Skip to content

Commit 0bead9a

Browse files
author
Amit Kapila
committed
Immediately WAL-log subtransaction and top-level XID association.
The logical decoding infrastructure needs to know which top-level transaction the subxact belongs to, in order to decode all the changes. Until now that might be delayed until commit, due to the caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring incremental decoding. So we also write the assignment info into WAL immediately, as part of the next WAL record (to minimize overhead) only when wal_level=logical. We can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that is required for avoiding overflow in the hot standby snapshot. Bump XLOG_PAGE_MAGIC, since this introduces XLR_BLOCK_ID_TOPLEVEL_XID. Author: Tomas Vondra, Dilip Kumar, Amit Kapila Reviewed-by: Amit Kapila Tested-by: Neha Sharma and Mahendra Singh Thalor Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent d05b172 commit 0bead9a

File tree

9 files changed

+108
-24
lines changed
  • src
    • backend
      • access/transam
  • replication/logical
  • include/access
  • 9 files changed

    +108
    -24
    lines changed

    src/backend/access/transam/xact.c

    Lines changed: 50 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -191,6 +191,7 @@ typedef struct TransactionStateData
    191191
    bool didLogXid; /* has xid been included in WAL record? */
    192192
    int parallelModeLevel; /* Enter/ExitParallelMode counter */
    193193
    bool chain; /* start a new block after this one */
    194+
    bool assigned; /* assigned to top-level XID */
    194195
    struct TransactionStateData *parent; /* back link to parent */
    195196
    } TransactionStateData;
    196197

    @@ -223,6 +224,7 @@ typedef struct SerializedTransactionState
    223224
    static TransactionStateData TopTransactionStateData = {
    224225
    .state = TRANS_DEFAULT,
    225226
    .blockState = TBLOCK_DEFAULT,
    227+
    .assigned = false,
    226228
    };
    227229

    228230
    /*
    @@ -5120,6 +5122,7 @@ PushTransaction(void)
    51205122
    GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
    51215123
    s->prevXactReadOnly = XactReadOnly;
    51225124
    s->parallelModeLevel = 0;
    5125+
    s->assigned = false;
    51235126

    51245127
    CurrentTransactionState = s;
    51255128

    @@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record)
    60226025
    else
    60236026
    elog(PANIC, "xact_redo: unknown op code %u", info);
    60246027
    }
    6028+
    6029+
    /*
    6030+
    * IsSubTransactionAssignmentPending
    6031+
    *
    6032+
    * This is used to decide whether we need to WAL log the top-level XID for
    6033+
    * operation in a subtransaction. We require that for logical decoding, see
    6034+
    * LogicalDecodingProcessRecord.
    6035+
    *
    6036+
    * This returns true if wal_level >= logical and we are inside a valid
    6037+
    * subtransaction, for which the assignment was not yet written to any WAL
    6038+
    * record.
    6039+
    */
    6040+
    bool
    6041+
    IsSubTransactionAssignmentPending(void)
    6042+
    {
    6043+
    /* wal_level has to be logical */
    6044+
    if (!XLogLogicalInfoActive())
    6045+
    return false;
    6046+
    6047+
    /* we need to be in a transaction state */
    6048+
    if (!IsTransactionState())
    6049+
    return false;
    6050+
    6051+
    /* it has to be a subtransaction */
    6052+
    if (!IsSubTransaction())
    6053+
    return false;
    6054+
    6055+
    /* the subtransaction has to have a XID assigned */
    6056+
    if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
    6057+
    return false;
    6058+
    6059+
    /* and it should not be already 'assigned' */
    6060+
    return !CurrentTransactionState->assigned;
    6061+
    }
    6062+
    6063+
    /*
    6064+
    * MarkSubTransactionAssigned
    6065+
    *
    6066+
    * Mark the subtransaction assignment as completed.
    6067+
    */
    6068+
    void
    6069+
    MarkSubTransactionAssigned(void)
    6070+
    {
    6071+
    Assert(IsSubTransactionAssignmentPending());
    6072+
    6073+
    CurrentTransactionState->assigned = true;
    6074+
    }

    src/backend/access/transam/xloginsert.c

    Lines changed: 21 additions & 2 deletions
    Original file line numberDiff line numberDiff line change
    @@ -89,11 +89,13 @@ static XLogRecData hdr_rdt;
    8989
    static char *hdr_scratch = NULL;
    9090

    9191
    #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
    92+
    #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
    9293

    9394
    #define HEADER_SCRATCH_SIZE \
    9495
    (SizeOfXLogRecord + \
    9596
    MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
    96-
    SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
    97+
    SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
    98+
    SizeOfXLogTransactionId)
    9799

    98100
    /*
    99101
    * An array of XLogRecData structs, to hold registered data.
    @@ -195,6 +197,10 @@ XLogResetInsertion(void)
    195197
    {
    196198
    int i;
    197199

    200+
    /* reset the subxact assignment flag (if needed) */
    201+
    if (curinsert_flags & XLOG_INCLUDE_XID)
    202+
    MarkSubTransactionAssigned();
    203+
    198204
    for (i = 0; i < max_registered_block_id; i++)
    199205
    registered_buffers[i].in_use = false;
    200206

    @@ -398,7 +404,7 @@ void
    398404
    XLogSetRecordFlags(uint8 flags)
    399405
    {
    400406
    Assert(begininsert_called);
    401-
    curinsert_flags = flags;
    407+
    curinsert_flags |= flags;
    402408
    }
    403409

    404410
    /*
    @@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
    748754
    scratch += sizeof(replorigin_session_origin);
    749755
    }
    750756

    757+
    /* followed by toplevel XID, if not already included in previous record */
    758+
    if (IsSubTransactionAssignmentPending())
    759+
    {
    760+
    TransactionId xid = GetTopTransactionIdIfAny();
    761+
    762+
    /* update the flag (later used by XLogResetInsertion) */
    763+
    XLogSetRecordFlags(XLOG_INCLUDE_XID);
    764+
    765+
    *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
    766+
    memcpy(scratch, &xid, sizeof(TransactionId));
    767+
    scratch += sizeof(TransactionId);
    768+
    }
    769+
    751770
    /* followed by main data, if any */
    752771
    if (mainrdata_len > 0)
    753772
    {

    src/backend/access/transam/xlogreader.c

    Lines changed: 5 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
    11971197

    11981198
    state->decoded_record = record;
    11991199
    state->record_origin = InvalidRepOriginId;
    1200+
    state->toplevel_xid = InvalidTransactionId;
    12001201

    12011202
    ptr = (char *) record;
    12021203
    ptr += SizeOfXLogRecord;
    @@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
    12351236
    {
    12361237
    COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
    12371238
    }
    1239+
    else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
    1240+
    {
    1241+
    COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
    1242+
    }
    12381243
    else if (block_id <= XLR_MAX_BLOCK_ID)
    12391244
    {
    12401245
    /* XLogRecordBlockHeader */

    src/backend/replication/logical/decode.c

    Lines changed: 23 additions & 21 deletions
    Original file line numberDiff line numberDiff line change
    @@ -94,11 +94,27 @@ void
    9494
    LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
    9595
    {
    9696
    XLogRecordBuffer buf;
    97+
    TransactionId txid;
    9798

    9899
    buf.origptr = ctx->reader->ReadRecPtr;
    99100
    buf.endptr = ctx->reader->EndRecPtr;
    100101
    buf.record = record;
    101102

    103+
    txid = XLogRecGetTopXid(record);
    104+
    105+
    /*
    106+
    * If the top-level xid is valid, we need to assign the subxact to the
    107+
    * top-level xact. We need to do this for all records, hence we do it
    108+
    * before the switch.
    109+
    */
    110+
    if (TransactionIdIsValid(txid))
    111+
    {
    112+
    ReorderBufferAssignChild(ctx->reorder,
    113+
    txid,
    114+
    record->decoded_record->xl_xid,
    115+
    buf.origptr);
    116+
    }
    117+
    102118
    /* cast so we get a warning when new rmgrs are added */
    103119
    switch ((RmgrId) XLogRecGetRmid(record))
    104120
    {
    @@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    216232
    /*
    217233
    * If the snapshot isn't yet fully built, we cannot decode anything, so
    218234
    * bail out.
    219-
    *
    220-
    * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
    221-
    * when the snapshot is being built: it is possible to get later records
    222-
    * that require subxids to be properly assigned.
    223235
    */
    224-
    if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
    225-
    info != XLOG_XACT_ASSIGNMENT)
    236+
    if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
    226237
    return;
    227238

    228239
    switch (info)
    @@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    264275
    break;
    265276
    }
    266277
    case XLOG_XACT_ASSIGNMENT:
    267-
    {
    268-
    xl_xact_assignment *xlrec;
    269-
    int i;
    270-
    TransactionId *sub_xid;
    271278

    272-
    xlrec = (xl_xact_assignment *) XLogRecGetData(r);
    273-
    274-
    sub_xid = &xlrec->xsub[0];
    275-
    276-
    for (i = 0; i < xlrec->nsubxacts; i++)
    277-
    {
    278-
    ReorderBufferAssignChild(reorder, xlrec->xtop,
    279-
    *(sub_xid++), buf->origptr);
    280-
    }
    281-
    break;
    282-
    }
    279+
    /*
    280+
    * We assign subxact to the toplevel xact while processing each
    281+
    * record if required. So, we don't need to do anything here.
    282+
    * See LogicalDecodingProcessRecord.
    283+
    */
    284+
    break;
    283285
    case XLOG_XACT_PREPARE:
    284286

    285287
    /*

    src/include/access/xact.h

    Lines changed: 3 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
    428428
    extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
    429429
    extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
    430430

    431+
    extern bool IsSubTransactionAssignmentPending(void);
    432+
    extern void MarkSubTransactionAssigned(void);
    433+
    431434
    extern int xactGetCommittedChildren(TransactionId **ptr);
    432435

    433436
    extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,

    src/include/access/xlog.h

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -237,6 +237,7 @@ extern bool XLOG_DEBUG;
    237237
    */
    238238
    #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */
    239239
    #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */
    240+
    #define XLOG_INCLUDE_XID 0x04 /* include XID of top-level xact */
    240241

    241242

    242243
    /* Checkpoint statistics */

    src/include/access/xlog_internal.h

    Lines changed: 1 addition & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -31,7 +31,7 @@
    3131
    /*
    3232
    * Each page of XLOG file has a header like this:
    3333
    */
    34-
    #define XLOG_PAGE_MAGIC 0xD106 /* can be used as WAL version indicator */
    34+
    #define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */
    3535

    3636
    typedef struct XLogPageHeaderData
    3737
    {

    src/include/access/xlogreader.h

    Lines changed: 3 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -191,6 +191,8 @@ struct XLogReaderState
    191191

    192192
    RepOriginId record_origin;
    193193

    194+
    TransactionId toplevel_xid; /* XID of top-level transaction */
    195+
    194196
    /* information about blocks referenced by the record. */
    195197
    DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
    196198

    @@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
    304306
    #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
    305307
    #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
    306308
    #define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
    309+
    #define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
    307310
    #define XLogRecGetData(decoder) ((decoder)->main_data)
    308311
    #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
    309312
    #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)

    src/include/access/xlogrecord.h

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
    223223
    #define XLR_BLOCK_ID_DATA_SHORT 255
    224224
    #define XLR_BLOCK_ID_DATA_LONG 254
    225225
    #define XLR_BLOCK_ID_ORIGIN 253
    226+
    #define XLR_BLOCK_ID_TOPLEVEL_XID 252
    226227

    227228
    #endif /* XLOGRECORD_H */

    0 commit comments

    Comments
     (0)
    0