8000 Integrate FDW with XTM · postgrespro/postgres_cluster@6fa240f · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6fa240f

Browse files
committed
Integrate FDW with XTM
1 parent dee9a9a commit 6fa240f

File tree

4 files changed

+87
-37
lines changed

4 files changed

+87
-37
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ TransactionId
327327
DtmGetNewTransactionId(bool isSubXact)
328328
{
329329
TransactionId xid;
330+
331+
XTM_INFO("%d: GetNewTransactionId\n", getpid());
330332

331333
/*
332334
* Workers synchronize transaction state at the beginning of each parallel
@@ -580,9 +582,9 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
580582

581583
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
582584
{
583-
XTM_TRACE("XTM: DtmSetTransactionStatus %u = %u \n", xid, status);
585+
XTM_INFO("%d: DtmSetTransactionStatus %u = %u\n", getpid(), xid, status);
584586
if (!RecoveryInProgress()) {
585-
if (TransactionIdIsValid(DtmNextXid)) {
587+
if (!DtmIsGlobalTransaction && TransactionIdIsValid(DtmNextXid)) {
586588
/* Already should be IN_PROGRESS */
587589
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
588590
CurrentTransactionSnapshot = NULL;
@@ -638,6 +640,8 @@ static void DtmInitialize()
638640
dtm->minXid = InvalidTransactionId;
639641
dtm->activeSnapshot.xip = (TransactionId*)ShmemAlloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
640642
dtm->activeSnapshot.subxip = (TransactionId*)ShmemAlloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
643+
644+
RegisterXactCallback(DtmXactCallback, NULL);
641645
}
642646
LWLockRelease(AddinShmemInitLock);
643647

@@ -652,7 +656,6 @@ static void DtmInitialize()
652656
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
653657
);
654658

655-
RegisterXactCallback(DtmXactCallback, NULL);
656659

657660
TM = &DtmTM;
658661
}

contrib/postgres_fdw/connection.c

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include "utils/hsearch.h"
2121
#include "utils/memutils.h"
2222

23+
#undef DEBUG3
24+
#define DEBUG3 WARNING
2325

2426
/*
2527
* Connection cache hash table entry
@@ -68,6 +70,8 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
6870
static void check_conn_params(const char **keywords, const char **values);
6971
static void configure_remote_session(PGconn *conn);
7072
static void do_sql_command(PGconn *conn, const char *sql);
73+
static void do_sql_send_command(PGconn *conn, const char *sql);
74+
static void do_sql_wait_command(PGconn *conn, const char *sql);
7175
static void begin_remote_xact(ConnCacheEntry *entry);
7276
static void pgfdw_xact_callback(XactEvent event, void *arg);
7377
static void pgfdw_subxact_callback(SubXactEvent event,
@@ -358,6 +362,27 @@ do_sql_command(PGconn *conn, const char *sql)
358362
PQclear(res);
359363
}
360364

365+
static void
366+
do_sql_send_command(PGconn *conn, const char *sql)
367+
{
368+
if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK) {
369+
PGresult *res = PQgetResult(conn);
370+
pgfdw_report_error(ERROR, res, conn, true, sql);
371+
PQclear(res);
372+
}
373+
}
374+
375+
static void
376+
do_sql_wait_command(PGconn *conn, const char *sql)
377+
{
378+
PGresult *res;
379+
while ((res = PQgetResult(conn)) != NULL) {
380+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
381+
pgfdw_report_error(ERROR, res, conn, true, sql);
382+
PQclear(res);
383+
}
384+
}
385+
361386
/*
362387
* Start remote transaction or subtransaction, if needed.
363388
*
@@ -541,16 +566,35 @@ pgfdw_xact_callback(XactEvent event, void *arg)
541566
/* If it has an open remote transaction, try to close it */
542567
if (entry->xact_depth > 0)
543568
{
544-
elog(DEBUG3, "closing remote transaction on connection %p",
545-
entry->conn);
569+
elog(DEBUG3, "closing remote transaction on connection %p event %d",
570+
entry->conn, event);
546571

547572
switch (event)
548573
{
549574
case XACT_EVENT_PARALLEL_PRE_COMMIT:
550575
case XACT_EVENT_PRE_COMMIT:
551576
/* Commit all remote transactions during pre-commit */
552-
do_sql_command(entry->conn, "COMMIT TRANSACTION");
577+
do_sql_send_command(entry->conn, "COMMIT TRANSACTION");
578+
continue;
579+
case XACT_EVENT_PRE_PREPARE:
553580

581+
/*
582+
* We disallow remote transactions that modified anything,
583+
* since it's not very reasonable to hold them open until
584+
* the prepared transaction is committed. For the moment,
585+
* throw error unconditionally; later we might allow
586+
* read-only cases. Note that the error will cause us to
587+
* come right back here with event == XACT_EVENT_ABORT, so
588+
* we'll clean up the connection state at that point.
589+
*/
590+
ereport(ERROR,
591+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
592+
errmsg("cannot prepare a transaction that modified remote tables")));
593+
break;
594+
case XACT_EVENT_PARALLEL_COMMIT:
595+
case XACT_EVENT_COMMIT:
596+
case XACT_EVENT_PREPARE:
597+
do_sql_wait_command(entry->conn, "COMMIT TRANSACTION");
554598
/*
555599
* If there were any errors in subtransactions, and we
556600
* made prepared statements, do a DEALLOCATE ALL to make
@@ -574,27 +618,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
574618
entry->have_prep_stmt = false;
575619
entry->have_error = false;
576620
break;
577-
case XACT_EVENT_PRE_PREPARE:
578-
579-
/*
580-
* We disallow remote transactions that modified anything,
581-
* since it's not very reasonable to hold them open until
582-
* the prepared transaction is committed. For the moment,
583-
* throw error unconditionally; later we might allow
584-
* read-only cases. Note that the error will cause us to
585-
* come right back here with event == XACT_EVENT_ABORT, so
586-
* we'll clean up the connection state at that point.
587-
*/
588-
ereport(ERROR,
589-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
590-
errmsg("cannot prepare a transaction that modified remote tables")));
591-
break;
592-
case XACT_EVENT_PARALLEL_COMMIT:
593-
case XACT_EVENT_COMMIT:
594-
case XACT_EVENT_PREPARE:
595-
/* Pre-commit should have closed the open transaction */
596-
elog(ERROR, "missed cleaning up connection during pre-commit");
597-
break;
598621
case XACT_EVENT_PARALLEL_ABORT:
599622
case XACT_EVENT_ABORT:
600623
/* Assume we might have lost track of prepared statements */
@@ -631,21 +654,23 @@ pgfdw_xact_callback(XactEvent event, void *arg)
631654
if (PQstatus(entry->conn) != CONNECTION_OK ||
632655
PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
633656
{
634-
elog(DEBUG3, "discarding connection %p", entry->conn);
657+
elog(DEBUG3, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn));
635658
PQfinish(entry->conn);
636659
entry->conn = NULL;
637660
}
638661
}
639662

640-
/*
641-
* Regardless of the event type, we can now mark ourselves as out of the
642-
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
643-
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
644-
*/
645-
xact_got_connection = false;
646-
647-
/* Also reset cursor numbering for next transaction */
648-
cursor_number = 0;
663+
if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) {
664+
/*
665+
* Regardless of the event type, we can now mark ourselves as out of the
666+
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
667+
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
668+
*/
669+
xact_got_connection = false;
670+
671+
/* Also reset cursor numbering for next transaction */
672+
cursor_number = 0;
673+
}
649674
}
650675

651676
/*

contrib/postgres_fdw/postgres_fdw--1.0.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@ LANGUAGE C STRICT;
1616
CREATE FOREIGN DATA WRAPPER postgres_fdw
1717
HANDLER postgres_fdw_handler
1818
VALIDATOR postgres_fdw_validator;
19+
20+
CREATE FUNCTION postgres_fdw_exec(relid oid, sql cstring)
21+
RETURNS void
22+
AS 'MODULE_PATHNAME'
23+
LANGUAGE C STRICT;

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ typedef struct
230230
* SQL functions
231231
*/
232232
PG_FUNCTION_INFO_V1(postgres_fdw_handler);
233+
PG_FUNCTION_INFO_V1(postgres_fdw_exec);
233234

234235
/*
235236
* FDW callback routines
@@ -2994,3 +2995,19 @@ conversion_error_callback(void *arg)
29942995
NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
29952996
RelationGetRelationName(errpos->rel));
29962997
}
2998+
2999+
Datum
3000+
postgres_fdw_exec(PG_FUNCTION_ARGS)
3001+
{
3002+
Oid relid = PG_GETARG_OID(0);
3003+
char const* sql = PG_GETARG_CSTRING(1);
3004+
Oid userid = GetUserId();
3005+
ForeignTable *table = GetForeignTable(relid);
3006+
ForeignServer *server = GetForeignServer(table->serverid);
3007+
UserMapping *user = GetUserMapping(userid, server->serverid);
3008+
PGconn* conn = GetConnection(server, user, false);
3009+
PGresult* res = PQexec(conn, sql);
3010+
PQclear(res);
3011+
ReleaseConnection(conn);
3012+
PG_RETURN_VOID();
3013+
}

0 commit comments

Comments
 (0)
0