8000 Rework WAL-reading supporting structs · postgrespro/postgres@709d003 · GitHub
[go: up one dir, main page]

Skip to content

Commit 709d003

Browse files
committed
Rework WAL-reading supporting structs
The state-tracking of WAL reading in various places was pretty messy, mostly because the ancient physical-replication WAL reading code wasn't using the XLogReader abstraction. This led to some untidy code. Make it prettier by creating two additional supporting structs, WALSegmentContext and WALOpenSegment which keep track of WAL-reading state. This makes code cleaner, as well as supports more future cleanup. Author: Antonin Houska Reviewed-by: Álvaro Herrera and (older versions) Robert Haas Discussion: https://postgr.es/m/14984.1554998742@spoje.net
1 parent a9ae99d commit 709d003

File tree

12 files changed

+189
-179
lines changed
  • pg_waldump
  • include
  • 12 files changed

    +189
    -179
    lines changed

    src/backend/access/transam/twophase.c

    Lines changed: 2 additions & 3 deletions
    Original file line numberDiff line numberDiff line change
    @@ -1377,7 +1377,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
    13771377
    *
    13781378
    * Note clearly that this function can access WAL during normal operation,
    13791379
    * similarly to the way WALSender or Logical Decoding would do.
    1380-
    *
    13811380
    */
    13821381
    static void
    13831382
    XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
    @@ -1386,8 +1385,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
    13861385
    XLogReaderState *xlogreader;
    13871386
    char *errormsg;
    13881387

    1389-
    xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
    1390-
    NULL);
    1388+
    xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
    1389+
    &read_local_xlog_page, NULL);
    13911390
    if (!xlogreader)
    13921391
    ereport(ERROR,
    13931392
    (errcode(ERRCODE_OUT_OF_MEMORY),

    src/backend/access/transam/xlog.c

    10000
    Lines changed: 9 additions & 8 deletions
    Original file line numberDiff line numberDiff line change
    @@ -885,8 +885,7 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
    885885
    int source, bool notfoundOk);
    886886
    static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
    887887
    static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
    888-
    int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
    889-
    TimeLineID *readTLI);
    888+
    int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
    890889
    static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
    891890
    bool fetching_ckpt, XLogRecPtr tliRecPtr);
    892891
    static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
    @@ -1195,7 +1194,8 @@ XLogInsertRecord(XLogRecData *rdata,
    11951194
    appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
    11961195

    11971196
    if (!debug_reader)
    1198-
    debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
    1197+
    debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
    1198+
    NULL, NULL);
    11991199

    12001200
    if (!debug_reader)
    12011201
    {
    @@ -4296,7 +4296,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
    42964296
    XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size);
    42974297
    offset = XLogSegmentOffset(xlogreader->latestPagePtr,
    42984298
    wal_segment_size);
    4299-
    XLogFileName(fname, xlogreader->readPageTLI, segno,
    4299+
    XLogFileName(fname, xlogreader->seg.ws_tli, segno,
    43004300
    wal_segment_size);
    43014301
    ereport(emode_for_corrupt_record(emode,
    43024302
    RecPtr ? RecPtr : EndRecPtr),
    @@ -6353,7 +6353,8 @@ StartupXLOG(void)
    63536353

    63546354
    /* Set up XLOG reader facility */
    63556355
    MemSet(&private, 0, sizeof(XLogPageReadPrivate));
    6356-
    xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private);
    6356+
    xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
    6357+
    &XLogPageRead, &private);
    63576358
    if (!xlogreader)
    63586359
    ereport(ERROR,
    63596360
    (errcode(ERRCODE_OUT_OF_MEMORY),
    @@ -7355,7 +7356,7 @@ StartupXLOG(void)
    73557356
    * and we were reading the old WAL from a segment belonging to a higher
    73567357
    * timeline.
    73577358
    */
    7358-
    EndOfLogTLI = xlogreader->readPageTLI;
    7359+
    EndOfLogTLI = xlogreader->seg.ws_tli;
    73597360

    73607361
    /*
    73617362
    * Complain if we did not roll forward far enough to render the backup
    @@ -11523,7 +11524,7 @@ CancelBackup(void)
    1152311524
    */
    1152411525
    static int
    1152511526
    XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
    11526-
    XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
    11527+
    XLogRecPtr targetRecPtr, char *readBuf)
    1152711528
    {
    1152811529
    XLogPageReadPrivate *private =
    1152911530
    (XLogPageReadPrivate *) xlogreader->private_data;
    @@ -11640,7 +11641,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
    1164011641
    Assert(targetPageOff == readOff);
    1164111642
    Assert(reqLen <= readLen);
    1164211643

    11643-
    *readTLI = curFileTLI;
    11644+
    xlogreader->seg.ws_tli = curFileTLI;
    1164411645

    1164511646
    /*
    1164611647
    * Check the page header immediately, so that we can retry immediately if

    src/backend/access/transam/xlogreader.c

    Lines changed: 46 additions & 27 deletions
    Original file line numberDiff line numberDiff line change
    @@ -68,8 +68,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
    6868
    * Returns NULL if the xlogreader couldn't be allocated.
    6969
    */
    7070
    XLogReaderState *
    71-
    XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
    72-
    void *private_data)
    71+
    XLogReaderAllocate(int wal_segment_size, const char *waldir,
    72+
    XLogPageReadCB pagereadfunc, void *private_data)
    7373
    {
    7474
    XLogReaderState *state;
    7575

    @@ -96,7 +96,10 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
    9696
    return NULL;
    9797
    }
    9898

    99-
    state->wal_segment_size = wal_segment_size;
    99+
    /* Initialize segment info. */
    100< F41A code class="diff-text syntax-highlighted-line addition">+
    WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
    101+
    waldir);
    102+
    100103
    state->read_page = pagereadfunc;
    101104
    /* system_identifier initialized to zeroes above */
    102105
    state->private_data = private_data;
    @@ -198,6 +201,23 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
    198201
    return true;
    199202
    }
    200203

    204+
    /*
    205+
    * Initialize the passed segment structs.
    206+
    */
    207+
    void
    208+
    WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
    209+
    int segsize, const char *waldir)
    210+
    {
    211+
    seg->ws_file = -1;
    212+
    seg->ws_segno = 0;
    213+
    seg->ws_off = 0;
    214+
    seg->ws_tli = 0;
    215+
    216+
    segcxt->ws_segsize = segsize;
    217+
    if (waldir)
    218+
    snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
    219+
    }
    220+
    201221
    /*
    202222
    * Attempt to read an XLOG record.
    203223
    *
    @@ -490,8 +510,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
    490510
    (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
    491511
    {
    492512
    /* Pretend it extends to end of segment */
    493-
    state->EndRecPtr += state->wal_segment_size - 1;
    494-
    state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
    513+
    state->EndRecPtr += state->segcxt.ws_segsize - 1;
    514+
    state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
    495515
    }
    496516

    497517
    if (DecodeXLogRecord(state, record, errormsg))
    @@ -533,12 +553,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
    533553

    534554
    Assert((pageptr % XLOG_BLCKSZ) == 0);
    535555

    536-
    XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
    537-
    targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
    556+
    XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
    557+
    targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
    538558

    539559
    /* check whether we have all the requested data already */
    540-
    if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
    541-
    reqLen <= state->readLen)
    560+
    if (targetSegNo == state->seg.ws_segno &&
    561+
    targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
    542562
    return state->readLen;
    543563

    544564
    /*
    @@ -553,13 +573,13 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
    553573
    * record is. This is so that we can check the additional identification
    554574
    * info that is present in the first page's "long" header.
    555575
    */
    556-
    if (targetSegNo != state->readSegNo && targetPageOff != 0)
    576+
    if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
    557577
    {
    558578
    XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
    559579

    560580
    readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
    561581
    state->currRecPtr,
    562-
    state->readBuf, &state->readPageTLI);
    582+
    state->readBuf);
    563583
    if (readLen < 0)
    564584
    goto err;
    565585

    @@ -577,7 +597,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
    577597
    */
    578598
    readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
    579599
    state->currRecPtr,
    580-
    state->readBuf, &state->readPageTLI);
    600+
    state->readBuf);
    581601
    if (readLen < 0)
    582602
    goto err;
    583603

    596616
    {
    597617
    readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
    598618
    state->currRecPtr,
    599-
    state->readBuf, &state->readPageTLI);
    619+
    state->readBuf);
    600620
    if (readLen < 0)
    601621
    goto err;
    602622
    }
    @@ -608,8 +628,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
    608628
    goto err;
    609629

    610630
    /* update read state information */
    611-
    state->readSegNo = targetSegNo;
    612-
    state->readOff = targetPageOff;
    631+
    state->seg.ws_segno = targetSegNo;
    632+
    state->seg.ws_off = targetPageOff;
    613633
    state->readLen = readLen;
    614634

    615635
    return readLen;
    @@ -625,8 +645,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
    625645
    static void
    626646
    XLogReaderInvalReadState(XLogReaderState *state)
    627647
    {
    628-
    state->readSegNo = 0;
    629-
    state->readOff = 0;
    648+
    state->seg.ws_segno = 0;
    649+
    state->seg.ws_off = 0;
    630650
    state->readLen = 0;
    631651
    }
    632652

    @@ -745,16 +765,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    745765

    746766
    Assert((recptr % XLOG_BLCKSZ) == 0);
    747767

    748-
    XLByteToSeg(recptr, segno, state->wal_segment_size);
    749-
    offset = XLogSegmentOffset(recptr, state->wal_segment_size);
    768+
    XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
    769+
    offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
    750770

    751-
    XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr);
    771+
    XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
    752772

    753773
    if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
    754774
    {
    755775
    char fname[MAXFNAMELEN];
    756776

    757-
    XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
    777+
    XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    758778

    759779
    report_invalid_record(state,
    760780
    "invalid magic number %04X in log segment %s, offset %u",
    @@ -768,7 +788,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    768788
    {
    769789
    char fname[MAXFNAMELEN];
    770790

    771-
    XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
    791+
    XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    772792

    773793
    report_invalid_record(state,
    774794
    "invalid info bits %04X in log segment %s, offset %u",
    @@ -791,7 +811,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    791811
    (unsigned long long) state->system_identifier);
    792812
    return false;
    793813
    }
    794-
    else if (longhdr->xlp_seg_size != state->wal_segment_size)
    814+
    else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
    795815
    {
    796816
    report_invalid_record(state,
    797817
    "WAL file is from different database system: incorrect segment size in page header");
    @@ -808,7 +828,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    808828
    {
    809829
    char fname[MAXFNAMELEN];
    810830

    811-
    XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
    831+
    XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    812832

    813833
    /* hmm, first page of file doesn't have a long header? */
    814834
    report_invalid_record(state,
    @@ -828,7 +848,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    828848
    {
    829849
    char fname[MAXFNAMELEN];
    830850

    831-
    XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
    851+
    XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    832852

    833853
    report_invalid_record(state,
    834854
    "unexpected pageaddr %X/%X in log segment %s, offset %u",
    @@ -853,7 +873,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    853873
    {
    854874
    char fname[MAXFNAMELEN];
    855875

    856-
    XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
    876+
    XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    857877

    858878
    report_invalid_record(state,
    859879
    "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
    @@ -997,7 +1017,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
    9971017

    9981018
    #endif /* FRONTEND */
    9991019

    1000-
    10011020
    /* ----------------------------------------
    10021021
    * Functions for decoding the data and block references in a record.
    10031022
    * ----------------------------------------

    src/backend/access/transam/xlogutils.c

    Lines changed: 14 additions & 16 deletions
    Original file line numberDiff line numberDiff line change
    @@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
    802802
    void
    803803
    XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
    804804
    {
    805-
    const XLogRecPtr lastReadPage = state->readSegNo *
    806-
    state->wal_segment_size + state->readOff;
    805+
    const XLogRecPtr lastReadPage = state->seg.ws_segno *
    806+
    state->segcxt.ws_segsize + state->seg.ws_off;
    807807

    808808
    Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
    809809
    Assert(wantLength <= XLOG_BLCKSZ);
    @@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
    847847
    if (state->currTLIValidUntil != InvalidXLogRecPtr &&
    848848
    state->currTLI != ThisTimeLineID &&
    849849
    state->currTLI != 0 &&
    850-
    ((wantPage + wantLength) / state->wal_segment_size) <
    851-
    (state->currTLIValidUntil / state->wal_segment_size))
    850+
    ((wantPage + wantLength) / state->segcxt.ws_segsize) <
    851+
    (state->currTLIValidUntil / state->segcxt.ws_segsize))
    852852
    return;
    853853

    854854
    /*
    @@ -869,12 +869,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
    869869
    * by a promotion or replay from a cascaded replica.
    870870
    */
    871871
    List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
    872+
    XLogRecPtr endOfSegment;
    872873

    873-
    XLogRecPtr endOfSegment = (((wantPage / state->wal_segment_size) + 1)
    874-
    * state->wal_segment_size) - 1;
    875-
    876-
    Assert(wantPage / state->wal_segment_size ==
    877-
    endOfSegment / state->wal_segment_size);
    874+
    endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
    875+
    state->segcxt.ws_segsize - 1;
    876+
    Assert(wantPage / state->segcxt.ws_segsize ==
    877+
    endOfSegment / state->segcxt.ws_segsize);
    878878

    879879
    /*
    880880
    * Find the timeline of the last LSN on the segment containing
    @@ -909,8 +909,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
    909909
    */
    910910
    int
    911911
    read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
    912-
    int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
    913-
    TimeLineID *pageTLI)
    912+
    int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
    914913
    {
    915914
    XLogRecPtr read_upto,
    916915
    loc;
    @@ -933,8 +932,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
    933932
    read_upto = GetFlushRecPtr();
    934933
    else
    935934
    read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
    936-
    937-
    *pageTLI = ThisTimeLineID;
    935+
    state->seg.ws_tli = ThisTimeLineID;
    938936

    939937
    /*
    940938
    * Check which timeline to get the record from.
    @@ -984,14 +982,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
    984982
    read_upto = state->currTLIValidUntil;
    985983

    986984
    /*
    987-
    * Setting pageTLI to our wanted record's TLI is slightly wrong;
    985+
    * Setting ws_tli to our wanted record's TLI is slightly wrong;
    988986
    * the page might begin on an older timeline if it contains a
    989987
    * timeline switch, since its xlog segment will have been copied
    990988
    * from the prior timeline. This is pretty harmless though, as
    991989
    * nothing cares so long as the timeline doesn't go backwards. We
    992990
    * should read the page header instead; FIXME someday.
    993991
    */
    994-
    *pageTLI = state->currTLI;
    992+
    state->seg.ws_tli = state->currTLI;
    995993

    996994
    /* No need to wait on a historical timeline */
    997995
    break;
    @@ -1022,7 +1020,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
    10221020
    * as 'count', read the whole page anyway. It's guaranteed to be
    10231021
    * zero-padded up to the page boundary if it's incomplete.
    10241022
    */
    1025-
    XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
    1023+
    XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
    10261024
    XLOG_BLCKSZ);
    10271025

    10281026
    /* number of valid bytes in the buffer */

    src/backend/replication/logical/logical.c

    Lines changed: 1 addition & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -173,7 +173,7 @@ StartupDecodingContext(List *output_plugin_options,
    173173

    174174
    ctx->slot = slot;
    175175

    176-
    ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
    176+
    ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
    177177
    if (!ctx->reader)
    178178
    ereport(ERROR,
    179179
    (errcode(ERRCODE_OUT_OF_MEMORY),

    src/backend/replication/logical/logicalfuncs.c

    Lines changed: 2 additions & 2 deletions
    Original file line numberDiff line numberDiff line change
    @@ -116,10 +116,10 @@ check_permissions(void)
    116116

    117117
    int
    118118
    logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
    119-
    int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
    119+
    int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
    120120
    {
    121121
    return read_local_xlog_page(state, targetPagePtr, reqLen,
    122-
    targetRecPtr, cur_page, pageTLI);
    122+
    targetRecPtr, cur_page);
    123123
    }
    124124

    125125
    /*

    0 commit comments

    Comments
     (0)
    0