8000 Hack for 2PC for shardman loader. · postgrespro/postgres_cluster@56d099f · GitHub
[go: up one dir, main page]

Skip to content

Commit 56d099f

Browse files
committed
Hack for 2PC for shardman loader.
1 parent 305e4bd commit 56d099f

File tree

1 file changed

+111
-16
lines changed

1 file changed

+111
-16
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 111 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
#include "access/global_snapshot.h"
1818
#include "access/htup_details.h"
19-
#include "catalog/pg_user_mapping.h"
20-
#include "access/xact.h"
2119
#include "access/transam.h"
20+
#include "access/twophase.h"
21+
#include "access/xact.h"
2222
#include "access/xlog.h" /* GetSystemIdentifier() */
23+
#include "catalog/pg_user_mapping.h"
2324
#include "libpq-int.h"
2425
#include "mb/pg_wchar.h"
2526
#include "miscadmin.h"
@@ -81,7 +82,7 @@ static HTAB *ConnectionHash = NULL;
8182
*/
8283
typedef struct FdwTransactionState
8384
{
84-
char *gid;
85+
char gid[GIDSIZE];
8586
int nparticipants;
8687
GlobalCSN global_csn;
8788
bool two_phase_commit;
@@ -839,7 +840,84 @@ pgfdw_xact_callback(XactEvent event, void *arg)
839840
if (!xact_got_connection)
840841
return;
841842

842-
/* Handle possible two-phase commit */
843+
/*
844+
* Hack for shardman loader: it allows to do 2PC on user-issued
845+
* prepare. In this case we won't be able to commit xacts because we we
846+
* don't record participants info anywhere; this must be done by loader or
847+
* human behind it.
848+
*/
849+
if (event == XACT_EVENT_PRE_PREPARE &&
850+
UseGlobalSnapshots &&
851+
strncmp("pgfdw:", GetPrepareGid(), strlen("pgfdw:")) == 0 &&
852+
strstr(GetPrepareGid(), "shmnloader") != 0)
853+
{
854+
/*
855+
* Remember gid. We will PREPARE on other nodes and finish global
856+
* snaps on XACT_EVENT_POST_PREPARE.
857+
*/
858+
strncpy(fdwTransState->gid, GetPrepareGid(), GIDSIZE);
859+
/*
860+
* xact_depth and fdwTransState will be cleaned up on
861+
* XACT_EVENT_POST_PREPARE.
862+
*/
863+
elog(WARNING, "pre prepare gid %s", fdwTransState->gid);
864+
return;
865+
}
866+
if (event == XACT_EVENT_PREPARE && fdwTransState->gid[0] != '\0')
867+
return; /* prevent cleanup */
868+
if (event == XACT_EVENT_POST_PREPARE)
869+
{
870+
GlobalCSN max_csn = InProgressGlobalCSN;
871+
GlobalCSN my_csn = InProgressGlobalCSN;
872+
bool res;
873+
char *sql;
874+
elog(WARNING, "fdw post prepare");
875+
876+
if (fdwTransState->gid[0] == '\0')
877+
{
878+
/*
879+
* Nothing to do here; since this cb is not present in vanilla,
880+
* exit to avoid harming state machine
881+
*/
882+
return;
883+
}
884+
sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid);
885+
res = BroadcastCmd(sql);
886+
if (!res)
887+
goto error;
888+
889+
/* Broadcast pg_global_snapshot_prepare() */
890+
my_csn = GlobalSnapshotPrepareTwophase(fdwTransState->gid);
891+
892+
sql = psprintf("SELECT pg_global_snapshot_prepare('%s')",
893+
fdwTransState->gid);
894+
res = BroadcastStmt(sql, PGRES_TUPLES_OK, MaxCsnCB, &max_csn);
895+
if (!res)
896+
goto error;
897+
898+
/* select maximal global csn */
899+
if (my_csn > max_csn)
900+
max_csn = my_csn;
901+
902+
/* Broadcast pg_global_snapshot_assign() */
903+
GlobalSnapshotAssignCsnTwoPhase(fdwTransState->gid, max_csn);
904+
sql = psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")",
905+
fdwTransState->gid, max_csn);
906+
res = BroadcastFunc(sql);
907+
908+
error:
909+
elog(WARNING, "post prepare gid %s, res %d", fdwTransState->gid, res);
910+
if (!res)
911+
{
912+
sql = psprintf("ABORT PREPARED '%s'", fdwTransState->gid);
913+
BroadcastCmd(sql);
914+
elog(ERROR, "failed to PREPARE transaction on remote node, ABORT PREPARED this xact");
915+
}
916+
}
917+
918+
/*
919+
* Handle possible two-phase commit.
920+
*/
843921
if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT)
844922
{
845923
bool include_local_tx = false;
@@ -862,29 +940,31 @@ pgfdw_xact_callback(XactEvent event, void *arg)
862940
bool res;
863941
char *sql;
864942

865-
fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d",
866-
(long long) GetCurrentTimestamp(),
867-
(long long) GetSystemIdentifier(),
868-
MyProcPid,
869-
GetCurrentTransactionIdIfAny(),
870-
++two_phase_xact_count,
871-
fdwTransState->nparticipants);
943+
snprintf(fdwTransState->gid,
944+
GIDSIZE,
945+
"pgfdw:%lld:%llu:%d:%u:%d:%d",
946+
(long long) GetCurrentTimestamp(),
947+
(long long) GetSystemIdentifier(),
948+
MyProcPid,
949+
GetCurrentTransactionIdIfAny(),
950+
++two_phase_xact_count,
951+
fdwTransState->nparticipants);
872952

873953
/* Broadcast PREPARE */
874954
sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid);
875955
res = BroadcastCmd(sql);
876956
if (!res)
877-
goto error;
957+
goto error_user2pc;
878958

879959
/* Broadcast pg_global_snapshot_prepare() */
880960
if (include_local_tx)
881961
my_csn = GlobalSnapshotPrepareCurrent();
882962

883963
sql = psprintf("SELECT pg_global_snapshot_prepare('%s')",
884-
fdwTransState->gid);
964+
fdwTransState->gid);
885965
res = BroadcastStmt(sql, PGRES_TUPLES_OK, MaxCsnCB, &max_csn);
886966
if (!res)
887-
goto error;
967+
goto error_user2pc;
888968

889969
/* select maximal global csn */
890970
if (include_local_tx && my_csn > max_csn)
@@ -894,10 +974,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
894974
if (include_local_tx)
895975
GlobalSnapshotAssignCsnCurrent(max_csn);
896976
sql = psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")",
897-
fdwTransState->gid, max_csn);
977+
fdwTransState->gid, max_csn);
898978
res = BroadcastFunc(sql);
899979

900-
error:
980+
error_user2pc:
901981
if (!res)
902982
{
903983
sql = psprintf("ABORT PREPARED '%s'", fdwTransState->gid);
@@ -959,6 +1039,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
9591039
break;
9601040
case XACT_EVENT_PRE_PREPARE:
9611041

1042+
if (fdwTransState->gid[0] != '\0')
1043+
/* See comments above */
1044+
break;
1045+
9621046
/*
9631047
* We disallow remote transactions that modified anything,
9641048
* since it's not very reasonable to hold them open until
@@ -980,6 +1064,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
9801064
elog(ERROR, "missed cleaning up connection during pre-commit");
9811065
break;
9821066
case XACT_EVENT_PREPARE:
1067+
if (fdwTransState->gid[0] != '\0')
1068+
break;
1069+
9831070
/* Pre-commit should have closed the open transaction */
9841071
elog(ERROR, "missed cleaning up connection during pre-commit");
9851072
break;
@@ -1046,6 +1133,14 @@ pgfdw_xact_callback(XactEvent event, void *arg)
10461133
/* Disarm changing_xact_state if it all worked. */
10471134
entry->changing_xact_state = abort_cleanup_failure;
10481135
break;
1136+
case XACT_EVENT_POST_PREPARE:
1137+
/*
1138+
* New event can break our state machine, so let's list
1139+
* them here explicitely and force compiler warning in
1140+
* case of unhandled event.
1141+
*/
1142+
break;
1143+
10491144
}
10501145
}
10511146

0 commit comments

Comments
 (0)
0