8000 Allow queries submitted by postgres_fdw to be canceled. · markusborg/postgres@cdf5a00 · GitHub
[go: up one dir, main page]

Skip to content

Commit cdf5a00

Browse files
committed
Allow queries submitted by postgres_fdw to be canceled.
Back-patch of commits f039eaa and 1b812af, which arranged (in 9.6+) to make remote queries interruptible. It was known at the time that the same problem existed in the back-branches, but I did not back-patch for lack of a user complaint. Michael Paquier and Etsuro Fujita, adjusted for older branches by me. Per gripe from Suraj Kharage. This doesn't directly addresss Suraj's gripe, but since the patch that will do so builds up on top of this work, it seems best to back-patch this part first. Discussion: http://postgr.es/m/CAF1DzPU8Kx+fMXEbFoP289xtm3bz3t+ZfxhmKavr98Bh-C0TqQ@mail.gmail.com
1 parent d617c76 commit cdf5a00

File tree

3 files changed

+194
-42
lines changed

3 files changed

+194
-42
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "access/xact.h"
1818
#include "mb/pg_wchar.h"
1919
#include "miscadmin.h"
20+
#include "storage/latch.h"
2021
#include "utils/hsearch.h"
2122
#include "utils/memutils.h"
2223

@@ -448,6 +449,78 @@ GetPrepStmtNumber(PGconn *conn)
448449
return ++prep_stmt_number;
449450
}
450451

452+
/*
453+
* Submit a query and wait for the result.
454+
*
455+
* This function is interruptible by signals.
456+
*
457+
* Caller is responsible for the error handling on the result.
458+
*/
459+
PGresult *
460+
pgfdw_exec_query(PGconn *conn, const char *query)
461+
{
462+
/*
463+
* Submit a query. Since we don't use non-blocking mode, this also can
464+
* block. But its risk is relatively small, so we ignore that for now.
465+
*/
466+
if (!PQsendQuery(conn, query))
467+
pgfdw_report_error(ERROR, NULL, conn, false, query);
468+
469+
/* Wait for the result. */
470+
return pgfdw_get_result(conn, query);
471+
}
472+
473+
/*
474+
* Wait for the result from a prior asynchronous execution function call.
475+
*
476+
* This function offers quick responsiveness by checking for any interruptions.
477+
*
478+
* This function emulates the PQexec()'s behavior of returning the last result
479+
* when there are many.
480+
*
481+
* Caller is responsible for the error handling on the result.
482+
*/
483+
PGresult *
484+
pgfdw_get_result(PGconn *conn, const char *query)
485+
{
486+
PGresult *last_res = NULL;
487+
488+
for (;;)
489+
{
490+
PGresult *res;
491+
492+
while (PQisBusy(conn))
493+
{
494+
int wc;
495+
496+
/* Sleep until there's something to do */
497+
wc = WaitLatchOrSocket(MyLatch,
498+
WL_LATCH_SET | WL_SOCKET_READABLE,
499+
PQsocket(conn),
500+
-1L);
501+
ResetLatch(MyLatch);
502+
503+
CHECK_FOR_INTERRUPTS();
504+
505+
/* Data available in socket */
506+
if (wc & WL_SOCKET_READABLE)
507+
{
508+
if (!PQconsumeInput(conn))
509+
pgfdw_report_error(ERROR, NULL, conn, false, query);
510+
}
511+
}
512+
513+
res = PQgetResult(conn);
514+
if (res == NULL)
515+
break; /* query is complete */
516+
517+
PQclear(last_res);
518+
last_res = res;
519+
}
520+
521+
return last_res;
522+
}
523+
451524
/*
452525
* Report an error we got from the remote server.
453526
*
@@ -599,6 +672,30 @@ pgfdw_xact_callback(XactEvent event, void *arg)
599672
case XACT_EVENT_ABORT:
600673
/* Assume we might have lost track of prepared statements */
601674
entry->have_error = true;
675+
676+
/*
677+
* If a command has been submitted to the remote server by
678+
* using an asynchronous execution function, the command
679+
* might not have yet completed. Check to see if a command
680+
* is still being processed by the remote server, and if so,
681+
* request cancellation of the command.
682+
*/
683+
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
684+
{
685+
PGcancel *cancel;
686+
char errbuf[256];
687+
688+
if ((cancel = PQgetCancel(entry->conn)))
689+
{
690+
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
691+
ereport(WARNING,
692+
(errcode(ERRCODE_CONNECTION_FAILURE),
693+
errmsg("could not send cancel request: %s",
694+
errbuf)));
695+
PQfreeCancel(cancel);
696+
}
697+
}
698+
602699
/* If we're aborting, abort all remote transactions too */
603700
res = PQexec(entry->conn, "ABORT TRANSACTION");
604701
/* Note: can't throw ERROR, it would be infinite loop */
@@ -700,6 +797,30 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
700797
{
701798
/* Assume we might have lost track of prepared statements */
702799
entry->have_error = true;
800+
801+
/*
802+
* If a command has been submitted to the remote server by using an
803+
* asynchronous execution function, the command might not have yet
804+
* completed. Check to see if a command is still being processed by
805+
* the remote server, and if so, request cancellation of the
806+
* command.
807+
*/
808+
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
809+
{
810+
PGcancel *cancel;
811+
char errbuf[256];
812+
813+
if ((cancel = PQgetCancel(entry->conn)))
814+
{
815+
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
816+
ereport(WARNING,
817+
(errcode(ERRCODE_CONNECTION_FAILURE),
818+
errmsg("could not send cancel request: %s",
819+
errbuf)));
820+
PQfreeCancel(cancel);
821+
}
822+
}
823+
703824
/* Rollback all remote subtransactions during abort */
704825
snprintf(sql, sizeof(sql),
705826
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 71 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,7 +1088,7 @@ postgresReScanForeignScan(ForeignScanState *node)
10881088
* We don't use a PG_TRY block here, so be careful not to throw error
10891089
* without releasing the PGresult.
10901090
*/
1091-
res = PQexec(fsstate->conn, sql);
1091+
res = pgfdw_exec_query(fsstate->conn, sql);
10921092
if (PQresultStatus(res) != PGRES_COMMAND_OK)
10931093
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
10941094
PQclear(res);
@@ -1425,18 +1425,24 @@ postgresExecForeignInsert(EState *estate,
14251425
p_values = convert_prep_stmt_params(fmstate, NULL, slot);
14261426

14271427
/*
1428-
* Execute the prepared statement, and check for success.
1428+
* Execute the prepared statement.
1429+
*/
1430+
if (!PQsendQueryPrepared(fmstate->conn,
1431+
fmstate->p_name,
1432+
fmstate->p_nums,
1433+
p_values,
1434+
NULL,
1435+
NULL,
1436+
0))
1437+
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1438+
1439+
/*
1440+
* Get the result, and check for success.
14291441
*
14301442
* We don't use a PG_TRY block here, so be careful not to throw error
14311443
* without releasing the PGresult.
14321444
*/
1433-
res = PQexecPrepared(fmstate->conn,
1434-
fmstate->p_name,
1435-
fmstate->p_nums,
1436-
p_values,
1437-
NULL,
1438-
NULL,
1439-
0);
1445+
res = pgfdw_get_result(fmstate->conn, fmstate->query);
14401446
if (PQresultStatus(res) !=
14411447
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
14421448
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1495,18 +1501,24 @@ postgresExecForeignUpdate(EState *estate,
14951501
slot);
14961502

14971503
/*
1498-
* Execute the prepared statement, and check for success.
1504+
* Execute the prepared statement.
1505+
*/
1506+
if (!PQsendQueryPrepared(fmstate->conn,
1507+
fmstate->p_name,
1508+
fmstate->p_nums,
1509+
p_values,
1510+
NULL,
1511+
NULL,
1512+
0))
1513+
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1514+
1515+
/*
1516+
* Get the result, and check for success.
14991517
*
15001518
* We don't use a PG_TRY block here, so be careful not to throw error
15011519
* without releasing the PGresult.
15021520
*/
1503-
res = PQexecPrepared(fmstate->conn,
1504-
fmstate->p_name,
1505-
fmstate->p_nums,
1506-
p_values,
1507-
NULL,
1508-
NULL,
1509-
0);
1521+
res = pgfdw_get_result(fmstate->co 1241 nn, fmstate->query);
15101522
if (PQresultStatus(res) !=
15111523
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
15121524
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1565,18 +1577,24 @@ postgresExecForeignDelete(EState *estate,
15651577
NULL);
15661578

15671579
/*
1568-
* Execute the prepared statement, and check for success.
1580+
* Execute the prepared statement.
1581+
*/
1582+
if (!PQsendQueryPrepared(fmstate->conn,
1583+
fmstate->p_name,
1584+
fmstate->p_nums,
1585+
p_values,
1586+
NULL,
1587+
NULL,
1588+
0))
1589+
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1590+
1591+
/*
1592+
* Get the result, and check for success.
15691593
*
15701594
* We don't use a PG_TRY block here, so be careful not to throw error
15711595
* without releasing the PGresult.
15721596
*/
1573-
res = PQexecPrepared(fmstate->conn,
1574-
fmstate->p_name,
1575-
fmstate->p_nums,
1576-
p_values,
1577-
NULL,
1578-
NULL,
1579-
0);
1597+
res = pgfdw_get_result(fmstate->conn, fmstate->query);
15801598
if (PQresultStatus(res) !=
15811599
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
15821600
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1626,7 +1644,7 @@ postgresEndForeignModify(EState *estate,
16261644
* We don't use a PG_TRY block here, so be careful not to throw error
16271645
* without releasing the PGresult.
16281646
*/
1629-
res = PQexec(fmstate->conn, sql);
1647+
res = pgfdw_exec_query(fmstate->conn, sql);
16301648
if (PQresultStatus(res) != PGRES_COMMAND_OK)
16311649
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
16321650
PQclear(res);
@@ -1884,7 +1902,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
18841902
/*
18851903
* Execute EXPLAIN remotely.
18861904
*/
1887-
res = PQexec(conn, sql);
1905+
res = pgfdw_exec_query(conn, sql);
18881906
if (PQresultStatus(res) != PGRES_TUPLES_OK)
18891907
pgfdw_report_error(ERROR, res, conn, false, sql);
18901908

@@ -2013,12 +2031,18 @@ create_cursor(ForeignScanState *node)
20132031
* parameter (see deparse.c), the "inference" is trivial and will produce
20142032
* the desired result. This allows us to avoid assuming that the remote
20152033
* server has the same OIDs we do for the parameters' types.
2034+
*/
2035+
if (!PQsendQueryParams(conn, buf.data, numParams,
2036+
NULL, values, NULL, NULL, 0))
2037+
pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2038+
2039+
/*
2040+
* Get the result, and check for success.
20162041
*
20172042
* We don't use a PG_TRY block here, so be careful not to throw error
20182043
* without releasing the PGresult.
20192044
*/
2020-
res = PQexecParams(conn, buf.data, numParams, NULL, values,
2021-
NULL, NULL, 0);
2045+
res = pgfdw_get_result(conn, buf.data);
20222046
if (PQresultStatus(res) != PGRES_COMMAND_OK)
20232047
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
20242048
PQclear(res);
@@ -2068,7 +2092,7 @@ fetch_more_data(ForeignScanState *node)
20682092
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
20692093
fetch_size, fsstate->cursor_number);
20702094

2071-
res = PQexec(conn, sql);
2095+
res = pgfdw_exec_query(conn, sql);
20722096
/* On error, report the original query, not the FETCH. */
20732097
if (PQresultStatus(res) != PGRES_TUPLES_OK)
20742098
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2175,7 +2199,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
21752199
* We don't use a PG_TRY block here, so be careful not to throw error
21762200
* without releasing the PGresult.
21772201
*/
2178-
res = PQexec(conn, sql);
2202+
res = pgfdw_exec_query(conn, sql);
21792203
if (PQresultStatus(res) != PGRES_COMMAND_OK)
21802204
pgfdw_report_error(ERROR, res, conn, true, sql);
21812205
PQclear(res);
@@ -2203,16 +2227,21 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
22032227
* with the remote server using different type OIDs than we do. All of
22042228
* the prepared statements we use in this module are simple enough that
22052229
* the remote server will make the right choices.
2230+
*/
2231+
if (!PQsendPrepare(fmstate->conn,
2232+
p_name,
2233+
fmstate->query,
2234+
0,
2235+
NULL))
2236+
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
2237+
2238+
/*
2239+
* Get the result, and check for success.
22062240
*
22072241
* We don't use a PG_TRY block here, so be careful not to throw error
22082242
* without releasing the PGresult.
22092243
*/
2210-
res = PQprepare(fmstate->conn,
2211-
p_name,
2212-
fmstate->query,
2213-
0,
2214-
NULL);
2215-
2244+
res = pgfdw_get_result(fmstate->conn, fmstate->query);
22162245
if (PQresultStatus(res) != PGRES_COMMAND_OK)
22172246
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
22182247
PQclear(res);
@@ -2361,7 +2390,7 @@ postgresAnalyzeForeignTable(Relation relation,
23612390
/* In what follows, do not risk leaking any PGresults. */
23622391
PG_TRY();
23632392
{
2364-
res = PQexec(conn, sql.data);
2393+
res = pgfdw_exec_query(conn, sql.data);
23652394
if (PQresultStatus(res) != PGRES_TUPLES_OK)
23662395
pgfdw_report_error(ERROR, res, conn, false, sql.data);
23672396

@@ -2455,7 +2484,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
24552484
/* In what follows, do not risk leaking any PGresults. */
24562485
PG_TRY();
24572486
{
2458-
res = PQexec(conn, sql.data);
2487+
res = pgfdw_exec_query(conn, sql.data);
24592488
if (PQresultStatus(res) != PGRES_COMMAND_OK)
24602489
pgfdw_report_error(ERROR, res, conn, false, sql.data);
24612490
PQclear(res);
@@ -2485,7 +2514,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
24852514
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
24862515
fetch_size, cursor_number);
24872516

2488-
res = PQexec(conn, fetch_sql);
2517+
res = pgfdw_exec_query(conn, fetch_sql);
24892518
/* On error, report the original query, not the FETCH. */
24902519
if (PQresultStatus(res) != PGRES_TUPLES_OK)
24912520
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2659,7 +2688,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
26592688
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
26602689
deparseStringLiteral(&buf, stmt->remote_schema);
26612690

2662-
res = PQexec(conn, buf.data);
2691+
res = pgfdw_exec_query(conn, buf.data);
26632692
if (PQresultStatus(res) != PGRES_TUPLES_OK)
26642693
pgfdw_report_error(ERROR, res, conn, false, buf.data);
26652694

@@ -2758,7 +2787,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
27582787
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
27592788

27602789
/* Fetch the data */
2761-
res = PQexec(conn, buf.data);
2790+
res = pgfdw_exec_query(conn, buf.data);
27622791
if (PQresultStatus(res) != PGRES_TUPLES_OK)
27632792
pgfdw_report_error(ERROR, res, conn, false, buf.data);
27642793

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
3030
extern void ReleaseConnection(PGconn *conn);
3131
extern unsigned int GetCursorNumber(PGconn *conn);
3232
extern unsigned int GetPrepStmtNumber(PGconn *conn);
33+
extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
34+
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
3335
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
3436
bool clear, const char *sql);
3537

0 commit comments

Comments
 (0)
0