8000 Allow queries submitted by postgres_fdw to be canceled. · dinesh372/postgres@3aa16b1 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3aa16b1

Browse files
committed
Allow queries submitted by postgres_fdw to be canceled.
Back-patch of commits f039eaa and 1b812af, which arranged (in 9.6+) to make remote queries interruptible. It was known at the time that the same problem existed in the back-branches, but I did not back-patch for lack of a user complaint. Michael Paquier and Etsuro Fujita, adjusted for older branches by me. Per gripe from Suraj Kharage. This doesn't directly addresss Suraj's gripe, but since the patch that will do so builds up on top of this work, it seems best to back-patch this part first. Discussion: http://postgr.es/m/CAF1DzPU8Kx+fMXEbFoP289xtm3bz3t+ZfxhmKavr98Bh-C0TqQ@mail.gmail.com
1 parent babf8f1 commit 3aa16b1

File tree

3 files changed

+193
-40
lines changed

3 files changed

+193
-40
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include "access/xact.h"
1818
#include "mb/pg_wchar.h"
1919
#include "miscadmin.h"
20+
#include "storage/latch.h"
21+
#include "storage/proc.h"
2022
#include "utils/hsearch.h"
2123
#include "utils/memutils.h"
2224

@@ -449,6 +451,78 @@ GetPrepStmtNumber(PGconn *conn)
449451
return ++prep_stmt_number;
450452
}
451453

454+
/*
455+
* Submit a query and wait for the result.
456+
*
457+
* This function is interruptible by signals.
458+
*
459+
* Caller is responsible for the error handling on the result.
460+
*/
461+
PGresult *
462+
pgfdw_exec_query(PGconn *conn, const char *query)
463+
{
464+
/*
465+
* Submit a query. Since we don't use non-blocking mode, this also can
466+
* block. But its risk is relatively small, so we ignore that for now.
467+
*/
468+
if (!PQsendQuery(conn, query))
469+
pgfdw_report_error(ERROR, NULL, conn, false, query);
470+
471+
/* Wait for the result. */
472+
return pgfdw_get_result(conn, query);
473+
}
474+
475+
/*
476+
* Wait for the result from a prior asynchronous execution function call.
477+
*
478+
* This function offers quick responsiveness by checking for any interruptions.
479+
*
480+
* This function emulates the PQexec()'s behavior of returning the last result
481+
* when there are many.
482+
*
483+
* Caller is responsible for the error handling on the result.
484+
*/
485+
PGresult *
486+
pgfdw_get_result(PGconn *conn, const char *query)
487+
{
488+
PGresult *last_res = NULL;
489+
490+
for (;;)
491+
{
492+
PGresult *res;
493+
494+
while (PQisBusy(conn))
495+
{
496+
int wc;
497+
498+
/* Sleep until there's something to do */
499+
wc = WaitLatchOrSocket(&MyProc->procLatch,
500+
WL_LATCH_SET | WL_SOCKET_READABLE,
501+
PQsocket(conn),
502+
-1L);
503+
ResetLatch(&MyProc->procLatch);
504+
505+
CHECK_FOR_INTERRUPTS();
506+
507+
/* Data available in socket */
508+
if (wc & WL_SOCKET_READABLE)
509+
{
510+
if (!PQconsumeInput(conn))
511+
pgfdw_report_error(ERROR, NULL, conn, false, query);
512+
}
513+
}
514+
515+
res = PQgetResult(conn);
516+
if (res == NULL)
517+
break; /* query is complete */
518+
519+
PQclear(last_res);
520+
last_res = res;
521+
}
522+
523+
return last_res;
524+
}
525+
452526
/*
453527
* Report an error we got from the remote server.
454528
*
@@ -597,6 +671,30 @@ pgfdw_xact_callback(XactEvent event, void *arg)
597671
case XACT_EVENT_ABORT:
598672
/* Assume we might have lost track of prepared statements */
599673
entry->have_error = true;
674+
675+
/*
676+
* If a command has been submitted to the remote server by
677+
* using an asynchronous execution function, the command
678+
* might not have yet completed. Check to see if a command
679+
* is still being processed by the remote server, and if so,
680+
* request cancellation of the command.
681+
*/
682+
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
683+
{
684+
PGcancel *cancel;
685+
char errbuf[256];
686+
687+
if ((cancel = PQgetCancel(entry->conn)))
688+
{
689+
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
690+
ereport(WARNING,
691+
(errcode(ERRCODE_CONNECTION_FAILURE),
692+
errmsg("could not send cancel request: %s",
693+
errbuf)));
694+
PQfreeCancel(cancel);
695+
}
696+
}
697+
600698
/* If we're aborting, abort all remote transactions too */
601699
res = PQexec(entry->conn, "ABORT TRANSACTION");
602700
/* Note: can't throw ERROR, it would be infinite loop */
@@ -698,6 +796,30 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
698796
{
699797
/* Assume we might have lost track of prepared statements */
700798
entry->have_error = true;
799+
800+
/*
801+
* If a command has been submitted to the remote server by using an
802+
* asynchronous execution function, the command might not have yet
803+
* completed. Check to see if a command is still being processed by
804+
* the remote server, and if so, request cancellation of the
805+
* command.
806+
*/
807+
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
808+
{
809+
PGcancel *cancel;
810+
char errbuf[256];
811+
812+
if ((cancel = PQgetCancel(entry->conn)))
813+
{
814+
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
815+
ereport(WARNING,
816+
(errcode(ERRCODE_CONNECTION_FAILURE),
817+
errmsg("could not send cancel request: %s",
818+
errbuf)));
819+
PQfreeCancel(cancel);
820+
}
821+
}
822+
701823
/* Rollback all remote subtransactions during abort */
702824
snprintf(sql, sizeof(sql),
703825
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 69 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,7 +1066,7 @@ postgresReScanForeignScan(ForeignScanState *node)
10661066
* We don't use a PG_TRY block here, so be careful not to throw error
10671067
* without releasing the PGresult.
10681068
*/
1069-
res = PQexec(fsstate->conn, sql);
1069+
res = pgfdw_exec_query(fsstate->conn, sql);
10701070
if (PQresultStatus(res) != PGRES_COMMAND_OK)
10711071
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
10721072
PQclear(res);
@@ -1388,18 +1388,24 @@ postgresExecForeignInsert(EState *estate,
13881388
p_values = convert_prep_stmt_params(fmstate, NULL, slot);
13891389

13901390
/*
1391-
* Execute the prepared statement, and check for success.
1391+
* Execute the prepared statement.
1392+
*/
1393+
if (!PQsendQueryPrepared(fmstate->conn,
1394+
fmstate->p_name,
1395+
fmstate->p_nums,
1396+
p_values,
1397+
NULL,
1398+
NULL,
1399+
0))
1400+
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1401+
1402+
/*
1403+
* Get the result, and check for success.
13921404
*
13931405
* We don't use a PG_TRY block here, so be careful not to throw error
13941406
* without releasing the PGresult.
13951407
*/
1396-
res = PQexecPrepared(fmstate->conn,
1397-
fmstate->p_name,
1398-
fmstate->p_nums,
1399-
p_values,
1400-
NULL,
1401-
NULL,
1402-
0);
1408+
res = pgfdw_get_result(fmstate->conn, fmstate->query);
14031409
if (PQresultStatus(res) !=
14041410
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
14051411
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1458,18 +1464,24 @@ postgresExecForeignUpdate(EState *estate,
14581464
slot);
14591465

14601466
/*
1461-
* Execute the prepared statement, and check for success.
1467+
* Execute the prepared statement.
1468+
*/
1469+
if (!PQsendQueryPrepared(fmstate->conn,
1470+
fmstate->p_name,
1471+
fmstate->p_nums,
1472+
p_values,
1473+
NULL,
1474+
NULL,
1475+
0))
1476+
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1477+
1478+
/*
1479+
* Get the result, and check for success.
14621480
*
14631481
* We don't use a PG_TRY block here, so be careful not to throw error
14641482
* without releasing the PGresult.
14651483
*/
1466-
res = PQexecPrepared(fmstate->conn,
1467-
fmstate->p_name,
1468-
fmstate->p_nums,
1469-
p_values,
1470-
NULL,
1471-
NULL,
1472-
0);
1484+
res = pgfdw_get_result(fmstate->conn, fmstate->query);
14731485
if (PQresultStatus(res) !=
14741486
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
14751487
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1528,18 +1540,24 @@ postgresExecForeignDelete(EState *estate,
15281540
NULL);
15291541

15301542
/*
1531-
* Execute the prepared statement, and check for success.
1543+
* Execute the prepared statement.
1544+
*/
1545+
if (!PQsendQueryPrepared(fmstate->conn,
1546+
fmstate->p_name,
1547+
fmstate->p_nums,
1548+
p_values,
1549+
NULL,
1550+
NULL,
1551+
0))
1552+
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1553+
1554+
/*
1555+
* Get the result, and check for success.
15321556
*
15331557
* We don't use a PG_TRY block here, so be careful not to throw error
15341558
* without releasing the PGresult.
15351559
*/
1536-
res = PQexecPrepared(fmstate->conn,
1537-
fmstate->p_name,
1538-
fmstate->p_nums,
1539-
p_values,
1540-
NULL,
1541-
NULL,
1542-
0);
1560+
res = pgfdw_get_result(fmstate->conn, fmstate->query);
15431561
if (PQresultStatus(res) !=
15441562
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
15451563
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1589,7 +1607,7 @@ postgresEndForeignModify(EState *estate,
15891607
* We don't use a PG_TRY block here, so be careful not to throw error
15901608
* without releasing the PGresult.
15911609
*/
1592-
res = PQexec(fmstate->conn, sql);
1610+
res = pgfdw_exec_query(fmstate->conn, sql);
15931611
if (PQresultStatus(res) != PGRES_COMMAND_OK)
15941612
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
15951613
PQclear(res);
@@ -1847,7 +1865,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
18471865
/*
18481866
* Execute EXPLAIN remotely.
18491867
*/
1850-
res = PQexec(conn, sql);
1868+
res = pgfdw_exec_query(conn, sql);
18511869
if (PQresultStatus(res) != PGRES_TUPLES_OK)
18521870
pgfdw_report_error(ERROR, res, conn, false, sql);
18531871

@@ -1976,12 +1994,18 @@ create_cursor(ForeignScanState *node)
19761994
* parameter (see deparse.c), the "inference" is trivial and will produce
19771995
* the desired result. This allows us to avoid assuming that the remote
19781996
* server has the same OIDs we do for the parameters' types.
1997+
*/
1998+
if (!PQsendQueryParams(conn, buf.data, numParams,
1999+
NULL, values, NULL, NULL, 0))
2000+
pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2001+
2002+
/*
2003+
* Get the result, and check for success.
19792004
*
19802005
* We don't use a PG_TRY block here, so be careful not to throw error
19812006
* without releasing the PGresult.
19822007
*/
1983-
res = PQexecParams(conn, buf.data, numParams, NULL, values,
1984-
NULL, NULL, 0);
2008+
res = pgfdw_get_result(conn, buf.data);
19852009
if (PQresultStatus(res) != PGRES_COMMAND_OK)
19862010
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
19872011
PQclear(res);
@@ -2031,7 +2055,7 @@ fetch_more_data(ForeignScanState *node)
20312055
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
20322056
fetch_size, fsstate->cursor_number);
20332057

2034-
res = PQexec(conn, sql);
2058+
res = pgfdw_exec_query(conn, sql);
20352059
/* On error, report the original query, not the FETCH. */
20362060
if (PQresultStatus(res) != PGRES_TUPLES_OK)
20372061
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2138,7 +2162,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
21382162
* We don't use a PG_TRY block here, so be careful not to throw error
21392163
* without releasing the PGresult.
21402164
*/
2141-
res = PQexec(conn, sql);
2165+
res = pgfdw_exec_query(conn, sql);
21422166
if (PQresultStatus(res) != PGRES_COMMAND_OK)
21432167
pgfdw_report_error(ERROR, res, conn, true, sql);
21442168
PQclear(res);
@@ -2166,16 +2190,21 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
21662190
* with the remote server using different type OIDs than we do. All of
21672191
* the prepared statements we use in this module are simple enough that
21682192
* the remote server will make the right choices.
2193+
*/
2194+
if (!PQsendPrepare(fmstate->conn,
2195+
p_name,
2196+
fmstate->query,
2197+
0,
2198+
NULL))
2199+
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
2200+
2201+
/*
2202+
* Get the result, and check for success.
21692203
*
21702204
* We don't use a PG_TRY block here, so be careful not to throw error
21712205
* without releasing the PGresult.
21722206
*/
2173-
res = PQprepare(fmstate->conn,
2174-
p_name,
2175-
fmstate->query,
2176-
0,
2177-
NULL);
2178-
2207+
res = pgfdw_get_result(fmstate->conn, fmstate->query);
21792208
if (PQresultStatus(res) != PGRES_COMMAND_OK)
21802209
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
21812210
PQclear(res);
@@ -2325,7 +2354,7 @@ postgresAnalyzeForeignTable(Relation relation,
23252354
/* In what follows, do not risk leaking any PGresults. */
23262355
PG_TRY();
23272356
{
2328-
res = PQexec(conn, sql.data);
2357+
res = pgfdw_exec_query(conn, sql.data);
23292358
if (PQresultStatus(res) != PGRES_TUPLES_OK)
23302359
pgfdw_report_error(ERROR, res, conn, false, sql.data);
23312360

@@ -2419,7 +2448,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
24192448
/* In what follows, do not risk leaking any PGresults. */
24202449
PG_TRY();
24212450
{
2422-
res = PQexec(conn, sql.data);
2451+
res = pgfdw_exec_query(conn, sql.data);
24232452
if (PQresultStatus(res) != PGRES_COMMAND_OK)
24242453
pgfdw_report_error(ERROR, res, conn, false, sql.data);
24252454
PQclear(res);
@@ -2449,7 +2478,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
24492478
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
24502479
fetch_size, cursor_number);
24512480

2452-
res = PQexec(conn, fetch_sql);
2481+
res = pgfdw_exec_query(conn, fetch_sql);
24532482
/* On error, report the original query, not the FETCH. */
24542483
if (PQresultStatus(res) != PGRES_TUPLES_OK)
24552484
pgfdw_report_error(ERROR, res, conn, false, sql.data);

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
3030
extern void ReleaseConnection(PGconn *conn);
3131
extern unsigned int GetCursorNumber(PGconn *conn);
3232
extern unsigned int GetPrepStmtNumber(PGconn *conn);
33+
extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
34+
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
3335
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
3436
bool clear, const char *sql);
3537

0 commit comments

Comments
 (0)
0