8000 finally working xlog insert/read · postgrespro/postgres_cluster@d37e2cd · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d37e2cd

Browse files
committed
finally working xlog insert/read
1 parent 8d791a5 commit d37e2cd

File tree

3 files changed

+97
-142
lines changed

3 files changed

+97
-142
lines changed

src/backend/access/transam/twophase.c

Lines changed: 95 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,8 @@ static void RemoveGXact(GlobalTransaction gxact);
174174

175175
static char twophase_buf[10*1024];
176176
static int twophase_pos = 0;
177-
size_t bogus_write(int fd, char *buf, size_t nbytes);
178-
177+
size_t bogus_write(int fd, const void *buf, size_t nbytes);
179178
static char *XlogReadTwoPhaseData(XLogRecPtr lsn);
180-
// LWLock *xlogreclock;
181179

182180
/*
183181
* Initialization of shared memory
@@ -997,6 +995,8 @@ StartPrepare(GlobalTransaction gxact)
997995

998996
save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
999997

998+
// fprintf(stderr, "StartPrepare: %s=(%d,%d,%d,%d)\n", hdr.gid, hdr.nsubxacts, hdr.ncommitrels, hdr.nabortrels, hdr.ninvalmsgs);
999+
10001000
/*
10011001
* Add the additional info about subxacts, deletable files and cache
10021002
* invalidation messages.
@@ -1033,13 +1033,13 @@ StartPrepare(GlobalTransaction gxact)
10331033
void
10341034
EndPrepare(GlobalTransaction gxact)
10351035
{
1036-
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1037-
TransactionId xid = pgxact->xid;
1036+
// PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1037+
// TransactionId xid = pgxact->xid;
10381038
TwoPhaseFileHeader *hdr;
10391039
char path[MAXPGPATH];
10401040
StateFileChunk *record;
10411041
pg_crc32c statefile_crc;
1042-
pg_crc32c bogus_crc;
1042+
// pg_crc32c bogus_crc;
10431043
int fd;
10441044

10451045
/* Add the end sentinel to the list of 2PC records */
@@ -1144,26 +1144,13 @@ EndPrepare(GlobalTransaction gxact)
11441144
MyPgXact->delayChkpt = true;
11451145

11461146
XLogBeginInsert();
1147-
11481147
for (record = records.head; record != NULL; record = record->next)
11491148
XLogRegisterData(record->data, record->len);
1150-
1151-
// LWLockAcquire(xlogreclock, LW_EXCLUSIVE);
1152-
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1153-
gxact->prepare_xlogptr = GetXLogInsertRecPtr();
11541149
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1155-
LWLockRelease(TwoPhaseStateLock);
1156-
// LWLockRelease(xlogreclock);
1157-
1158-
11591150
XLogFlush(gxact->prepare_lsn);
1151+
gxact->prepare_xlogptr = ProcLastRecPtr;
11601152

1161-
1162-
// fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n",
1163-
// gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr));
1164-
// fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n",
1165-
// gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn));
1166-
1153+
// fprintf(stderr, "EndPrepare: %s={xlogptr:%X,lsn:%X, delta: %X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
11671154

11681155
/* If we crash now, we have prepared: WAL replay will fix things */
11691156

@@ -1250,101 +1237,100 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
12501237
static char *
12511238
ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
12521239
{
1253-
// char path[MAXPGPATH];
1254-
// char *buf;
1255-
// TwoPhaseFileHeader *hdr;
1256-
// int fd;
1257-
// struct stat stat;
1258-
// uint32 crc_offset;
1259-
// pg_crc32c calc_crc,
1260-
// file_crc;
1261-
1262-
// TwoPhaseFilePath(path, xid);
1240+
char path[MAXPGPATH];
1241+
char *buf;
1242+
TwoPhaseFileHeader *hdr;
1243+
int fd;
1244+
struct stat stat;
1245+
uint32 crc_offset;
1246+
pg_crc32c calc_crc,
1247+
file_crc;
12631248

1264-
// fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1249+
TwoPhaseFilePath(path, xid);
12651250

1266-
// if (fd < 0)
1267-
// {
1268-
// if (give_warnings)
1269-
// ereport(WARNING,
1270-
// (errcode_for_file_access(),
1271-
// errmsg("could not open two-phase state file \"%s\": %m",
1272-
// path)));
1273-
// return NULL;
1274-
// }
1251+
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1252+
if (fd < 0)
1253+
{
1254+
if (give_warnings)
1255+
ereport(WARNING,
1256+
(errcode_for_file_access(),
1257+
errmsg("could not open two-phase state file \"%s\": %m",
1258+
path)));
1259+
return NULL;
1260+
}
12751261

12761262
/*
12771263
* Check file length. We can determine a lower bound pretty easily. We
12781264
* set an upper bound to avoid palloc() failure on a corrupt file, though
12791265
* we can't guarantee that we won't get an out of memory error anyway,
12801266
* even on a valid file.
12811267
*/
1282-
// if (fstat(fd, &stat))
1283-
// {
1284-
// CloseTransientFile(fd);
1285-
// if (give_warnings)
1286-
// ereport(WARNING,
1287-
// (errcode_for_file_access(),
1288-
// errmsg("could not stat two-phase state file \"%s\": %m",
1289-
// path)));
1290-
// return NULL;
1291-
// }
1268+
if (fstat(fd, &stat))
1269+
{
1270+
CloseTransientFile(fd);
1271+
if (give_warnings)
1272+
ereport(WARNING,
1273+
(errcode_for_file_access(),
1274+
errmsg("could not stat two-phase state file \"%s\": %m",
1275+
path)));
1276+
return NULL;
1277+
}
12921278

1293-
// if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1294-
// MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1295-
// sizeof(pg_crc32c)) ||
1296-
// stat.st_size > MaxAllocSize)
1297-
// {
1298-
// CloseTransientFile(fd);
1299-
// return NULL;
1300-
// }
1279+
if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1280+
MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1281+
sizeof(pg_crc32c)) ||
1282+
stat.st_size > MaxAllocSize)
1283+
{
1284+
CloseTransientFile(fd);
1285+
return NULL;
1286+
}
13011287

1302-
// crc_offset = stat.st_size - sizeof(pg_crc32c);
1303-
// if (crc_offset != MAXALIGN(crc_offset))
1304-
// {
1305-
// CloseTransientFile(fd);
1306-
// return NULL;
1307-
// }
1288+
crc_offset = stat.st_size - sizeof(pg_crc32c);
1289+
if (crc_offset != MAXALIGN(crc_offset))
1290+
{
1291+
CloseTransientFile(fd);
1292+
return NULL;
1293+
}
13081294

1309-
// /*
1310-
// * OK, slurp in the file.
1311-
// */
1312-
// buf = (char *) palloc(stat.st_size);
1295+
/*
1296+
* OK, slurp in the file.
1297+
*/
1298+
buf = (char *) palloc(stat.st_size);
13131299

1314-
// if (read(fd, buf, stat.st_size) != stat.st_size)
1315-
// {
1316-
// CloseTransientFile(fd);
1317-
// if (give_warnings)
1318-
// ereport(WARNING,
1319-
// (errcode_for_file_access(),
1320-
// errmsg("could not read two-phase state file \"%s\": %m",
1321-
// path)));
1322-
// pfree(buf);
1323-
// return NULL;
1324-
// }
1300+
if (read(fd, buf, stat.st_size) != stat.st_size)
1301+
{
1302+
CloseTransientFile(fd);
1303+
if (give_warnings)
1304+
ereport(WARNING,
1305+
(errcode_for_file_access(),
1306+
errmsg("could not read two-phase state file \"%s\": %m",
1307+
path)));
1308+
pfree(buf);
1309+
return NULL;
1310+
}
13251311

1326-
// CloseTransientFile(fd);
1312+
CloseTransientFile(fd);
13271313

1328-
// hdr = (TwoPhaseFileHeader *) buf;
1329-
// if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1330-
// {
1331-
// pfree(buf);
1332-
// return NULL;
1333-
// }
1314+
hdr = (TwoPhaseFileHeader *) buf;
1315+
if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1316+
{
1317+
pfree(buf);
1318+
return NULL;
1319+
}
13341320

1335-
// INIT_CRC32C(calc_crc);
1336-
// COMP_CRC32C(calc_crc, buf, crc_offset);
1337-
// FIN_CRC32C(calc_crc);
1321+
INIT_CRC32C(calc_crc);
1322+
COMP_CRC32C(calc_crc, buf, crc_offset);
1323+
FIN_CRC32C(calc_crc);
13381324

1339-
// file_crc = *((pg_crc32c *) (buf + crc_offset));
1325+
file_crc = *((pg_crc32c *) (buf + crc_offset));
13401326

1341-
// if (!EQ_CRC32C(calc_crc, file_crc))
1342-
// {
1343-
// pfree(buf);
1344-
// return NULL;
1345-
// }
1327+
if (!EQ_CRC32C(calc_crc, file_crc))
1328+
{
1329+
pfree(buf);
1330+
return NULL;
1331+
}
13461332

1347-
return twophase_buf 9920 ;
1333+
return buf;
13481334
}
13491335

13501336
/*
@@ -1410,12 +1396,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14101396
* Read and validate the state file
14111397
*/
14121398
// buf = ReadTwoPhaseFile(xid, true);
1399+
// buf = twophase_buf;
14131400
buf = XlogReadTwoPhaseData(gxact->prepare_xlogptr);
1414-
if (buf == NULL)
1415-
ereport(ERROR,
1416-
(errcode(ERRCODE_DATA_CORRUPTED),
1417-
errmsg("two-phase state file for transaction %u is corrupt",
1418-
xid)));
14191401

14201402
/*
14211403
* Disassemble the header area
@@ -1435,6 +1417,15 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14351417
/* compute latestXid among all children */
14361418
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
14371419

1420+
1421+
// fprintf(stderr, "FinishPrepared: %s=(%d,%d,%d,%d)\n", gxact->gid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1422+
// fprintf(stderr, "FinishPrepared: %s={xlogptr:%X,lsn:%X,delta:%X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1423+
1424+
Assert(hdr->nsubxacts == 0);
1425+
Assert(hdr->ncommitrels == 0);
1426+
Assert(hdr->nabortrels == 0);
1427+
Assert(hdr->ninvalmsgs == 0);
1428+
14381429
/*
14391430
* The order of operations here is critical: make the XLOG entry for
14401431
* commit or abort, then mark the transaction committed or aborted in
@@ -2246,30 +2237,11 @@ RecordTransactionAbortPrepared(TransactionId xid,
22462237
SyncRepWaitForLSN(recptr);
22472238
}
22482239

2249-
2250-
2251-
2252-
2253-
2254-
2255-
2256-
22572240
/**********************************************************************************/
22582241

22592242

2260-
// static int xlogreadfd = -1;
2261-
// static XLogSegNo xlogreadsegno = -1;
2262-
// static char xlogfpath[MAXPGPATH];
2263-
2264-
// typedef struct XLogPageReadPrivate
2265-
// {
2266-
// const char *datadir;
2267-
// TimeLineID tli;
2268-
// } XLogPageReadPrivate;
2269-
2270-
2271-
size_t
2272-
bogus_write(int fd, char *buf, size_t nbytes)
2243+
size_t
2244+
bogus_write(int fd, const void *buf, size_t nbytes)
22732245
{
22742246
memcpy(twophase_buf + twophase_pos, buf, nbytes);
22752247
twophase_pos += nbytes;
@@ -2284,8 +2256,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
22842256
XLogReaderState *xlogreader;
22852257
char *errormsg;
22862258

2287-
fprintf(stderr, "XlogReadTwoPhaseData called\n");
2288-
22892259
xlogreader = XLogReaderAllocate(&logical_read_local_xlog_page, NULL);
22902260
if (xlogreader == NULL)
22912261
fprintf(stderr, "xlogreader == NULL\n");
@@ -2296,20 +2266,5 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
22962266
fprintf(stderr, "XLogReadRecord error\n");
22972267
}
22982268

2299-
// memcpy(twophase_buf + twophase_pos, buf, nbytes);
2300-
// twophase_pos += nbytes;
2301-
// return nbytes;
2302-
2303-
// XLogReaderFree(xlogreader);
2304-
// if (xlogreadfd != -1)
2305-
// {
2306-
// close(xlogreadfd);
2307-
// xlogreadfd = -1;
2308-
// }
2309-
23102269
return XLogRecGetData(xlogreader);
23112270
}
2312-
2313-
2314-
2315-

src/backend/access/transam/xlog.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,7 @@ static TimeLineID curFileTLI;
321321
* stored here. The parallel leader advances its own copy, when necessary,
322322
* in WaitForParallelWorkersToFinish.
323323
*/
324-
static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
325-
324+
XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
326325
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
327326
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
328327

src/include/access/xlog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ typedef enum
8686
RECOVERY_TARGET_IMMEDIATE
8787
} RecoveryTargetType;
8888

89+
extern XLogRecPtr ProcLastRecPtr;
8990
extern XLogRecPtr XactLastRecEnd;
9091
extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
9192

0 commit comments

Comments
 (0)
0