8000 Use prepared statements for DirectModify. · postgrespro/postgres_cluster@ae0b9d0 · GitHub
[go: up one dir, main page]

Skip to content

Commit ae0b9d0

Browse files
committed
Use prepared statements for DirectModify.
We keep per-connection hashtable sql->prep statement name for checking whether we have prepared it already or not. This is quick-and-dirty solution, because it will lead to hashtable bloat if custom plans are used -- in this case we prepare queries with substituted param values.
1 parent 285ec6d commit ae0b9d0

File tree

3 files changed

+158
-50
lines changed

3 files changed

+158
-50
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 65 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "access/htup_details.h"
1818
#include "catalog/pg_user_mapping.h"
1919
#include "access/global_snapshot.h"
20+
#include "access/hash.h"
2021
#include "access/xact.h"
2122
#include "access/xtm.h"
2223
#include "access/transam.h"
@@ -33,41 +34,6 @@
3334
#include "utils/memutils.h"
3435
#include "utils/syscache.h"
3536

36-
/*
37-
* Connection cache hash table entry
38-
*
39-
* The lookup key in this hash table is the user mapping OID. We use just one
40-
* connection per user mapping ID, which ensures that all the scans use the
41-
* same snapshot during a query. Using the user mapping OID rather than
42-
* the foreign server OID + user OID avoids creating multiple connections when
43-
* the public user mapping applies to all user OIDs.
44-
*
45-
* The "conn" pointer can be NULL if we don't currently have a live connection.
46-
* When we do have a connection, xact_depth tracks the current depth of
47-
* transactions and subtransactions open on the remote side. We need to issue
48-
* commands at the same nesting depth on the remote as we're executing at
49-
* ourselves, so that rolling back a subtransaction will kill the right
50-
* queries and not the wrong ones.
51-
*/
52-
typedef Oid ConnCacheKey;
53-
54-
struct ConnCacheEntry
55-
{
56-
ConnCacheKey key; /* hash key (must be first) */
57-
PGconn *conn; /* connection to foreign server, or NULL */
58-
WaitEventSet *wait_set; /* for data from server ready notifications */
59-
/* Remaining fields are invalid when conn is NULL: */
60-
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
61-
* one level of subxact open, etc */
62-
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
63-
bool have_error; /* have any subxacts aborted in this xact? */
64-
bool changing_xact_state; /* xact state change in process */
65-
bool invalidated; /* true if reconnect is pending */
66-
uint32 server_hashvalue; /* hash value of foreign server OID */
67-
uint32 mapping_hashvalue; /* hash value of user mapping OID */
68-
bool copy_from_started; /* COPY FROM in progress on this conn */
69-
};
70-
7137
/*
7238
* Connection cache (initialized on first use)
7339
*/
@@ -117,6 +83,35 @@ static bool pgfdw_exec_cleanup_query(ConnCacheEntry *entry, const char *query,
11783
bool ignore_errors);
11884
static bool pgfdw_get_cleanup_result(ConnCacheEntry *entry, TimestampTz endtime,
11985
PGresult **result);
86+
static void cleanup_dm_prepared(ConnCacheEntry *entry);
87+
88+
/* Adapted from string_hash */
89+
static uint32
90+
char_ptr_hash_fn(const void *key, Size keysize)
91+
{
92+
char * const *keyptr = key;
93+
return DatumGetUInt32(hash_any((const unsigned char *) (*keyptr), strlen(*keyptr)));
94+
}
95+
96+
static int
97+
char_ptr_match_fn(const void *key1, const void *key2, Size keysize)
98+
{
99+
char * const *keyptr1 = key1;
100+
char * const *keyptr2 = key2;
101+
return strcmp(*keyptr1, *keyptr2);
102+
}
103+
104+
/* Allocate always from top-level, where hashtable lives */
105+
static void *
106+
char_ptr_keycopy_fn(void *dest, const void *src, Size keysize)
107+
{
108+
char **destptr = dest;
109+
char * const *srcptr = src;
110+
111+
*destptr = MemoryContextStrdup(CacheMemoryContext, *srcptr);
112+
return NULL; /* not used */
113+
}
114+
120115
/*
121116
* Get a ConnCacheEntry which can be used to execute queries on the remote PostgreSQL
122117
* server with the user's authorization. A new connection is established
@@ -208,6 +203,7 @@ GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
208203
if (entry->conn == NULL)
209204
{
210205
ForeignServer *server = GetForeignServer(user->serverid);
206+
HASHCTL ctl;
211207

212208
/* Reset all transient state fields, to be sure all are clean */
213209
entry->xact_depth = 0;
@@ -226,6 +222,19 @@ GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
226222
/* Now try to make the connection */
227223
connect_pg_server(entry, server, user);
228224

225+
/* Create hash table of prepared statemetns for DirectModify */
226+
ctl.keysize = sizeof(char *);
227+
ctl.entrysize = sizeof(DirectModifyPrepStmtHashEnt);
228+
ctl.hash = char_ptr_hash_fn;
229+
ctl.match = char_ptr_match_fn;
230+
ctl.keycopy = char_ptr_keycopy_fn;
231+
ctl.hcxt = CacheMemoryContext;
232+
233+
entry->dm_prepared = hash_create("DirectModify prepared stmts",
234+
16, &ctl,
235+
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE |
236+
HASH_KEYCOPY | HASH_CONTEXT);
237+
229238
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
230239
entry->conn, server->servername, user->umid, user->userid);
231240
}
@@ -364,6 +373,8 @@ disconnect_pg_server(ConnCacheEntry *entry)
364373
{
365374
if (entry->conn != NULL)
366375
{
376+
cleanup_dm_prepared(entry);
377+
hash_destroy(entry->dm_prepared);
367378
Assert(entry->wait_set);
368379
FreeWaitEventSet(entry->wait_set);
369380
entry->wait_set = NULL;
@@ -1021,6 +1032,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
10211032
entry->have_error = false;
10221033
}
10231034

1035+
/* We have deallocated all prepared statements */
1036+
cleanup_dm_prepared(entry);
1037+
10241038
/* Disarm changing_xact_state if it all worked. */
10251039
entry->changing_xact_state = abort_cleanup_failure;
10261040
break;
@@ -1118,11 +1132,27 @@ deallocate_prepared_stmts(ConnCacheEntry *entry)
11181132
{
11191133
res = PQexec(entry->conn, "DEALLOCATE ALL");
11201134
PQclear(res);
1135+
cleanup_dm_prepared(entry);
11211136
}
11221137
entry->have_prep_stmt = false;
11231138
entry->have_error = false;
11241139
}
11251140

1141+
static void cleanup_dm_prepared(ConnCacheEntry *entry)
1142+
{
1143+
HASH_SEQ_STATUS scan;
1144+
DirectModifyPrepStmtHashEnt *prep_stmt_entry;
1145+
1146+
hash_seq_init(&scan, entry->dm_prepared);
1147+
while ((prep_stmt_entry = (DirectModifyPrepStmtHashEnt *) hash_seq_search(&scan)))
1148+
{
1149+
/* save the key to free it */
1150+
char *sql = prep_stmt_entry->sql;
1151+
hash_search(entry->dm_prepared, &prep_stmt_entry->sql, HASH_REMOVE, NULL);
1152+
pfree(sql);
1153+
}
1154+
}
1155+
11261156
/*
11271157
* pgfdw_subxact_callback --- cleanup at subtransaction end.
11281158
*/

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2337,7 +2337,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
23372337
* Get connection to the foreign server. Connection manager will
23382338
* establish new connection if necessary.
23392339
*/
2340-
dmstate->conn_entry = GetConnection(user, false);
2340+
dmstate->conn_entry = GetConnection(user, true);
23412341

23422342
/* Initialize state variable */
23432343
dmstate->num_tuples = -1; /* -1 means not set yet */
@@ -3323,6 +3323,8 @@ execute_dml_stmt(ForeignScanState *node)
33233323
ExprContext *econtext = node->ss.ps.ps_ExprContext;
33243324
int numParams = dmstate->numParams;
33253325
const char **values = dmstate->param_values;
3326+
DirectModifyPrepStmtHashEnt *prep_stmt_entry;
3327+
bool found;
33263328

33273329
/*
33283330
* Construct array of query parameter values in text format.
@@ -3334,22 +3336,59 @@ execute_dml_stmt(ForeignScanState *node)
33343336
dmstate->param_exprs,
33353337
values);
33363338

3339+
/*
3340+
* Prepare the statement, if we have never seen it before.
3341+
*/
3342+
prep_stmt_entry = hash_search(entry->dm_prepared, &dmstate->query,
3343+
HASH_FIND, &found);
3344+
if (!found)
3345+
{
3346+
char p_name[NAMEDATALEN];
3347+
long num = hash_get_num_entries(entry->dm_prepared) + 1;
3348+
PGresult *res;
3349+
3350+
snprintf(p_name, NAMEDATALEN, "postgres_fdw:%d:%ld", MyProcPid, num);
3351+
3352+
if (!PQsendPrepare(conn,
3353+
p_name,
3354+
dmstate->query,
3355+
0,
3356+
NULL))
3357+
pgfdw_report_error(ERROR, NULL, conn, false, dmstate->query);
3358+
3359+
/*
3360+
* Get the result, and check for success.
3361+
*
3362+
* We don't use a PG_TRY block here, so be careful not to throw error
3363+
* without releasing the PGresult.
3364+
*/
3365+
res = pgfdw_get_result(entry, dmstate->query);
3366+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
3367+
pgfdw_report_error(ERROR, res, conn, true, dmstate->query);
3368+
PQclear(res);
3369+
3370+
/* Now, when it is successfully prepared, add it to the hashtable */
3371+
prep_stmt_entry = hash_search(entry->dm_prepared, &dmstate->query,
3372+
HASH_ENTER, &found);
3373+
strlcpy(prep_stmt_entry->prep_name, p_name, NAMEDATALEN);
3374+
}
3375+
33373376
/*
33383377
* Notice that we pass NULL for paramTypes, thus forcing the remote server
33393378
* to infer types for all parameters. Since we explicitly cast every
33403379
* parameter (see deparse.c), the "inference" is trivial and will produce
33413380
* the desired result. This allows us to avoid assuming that the remote
33423381
* server has the same OIDs we do for the parameters' types.
33433382
*/
3344-
if (!PQsendQueryParams(conn, dmstate->query, numParams,
3345-
NULL, values, NULL, NULL, 0))
3383+
3384+
if (!PQsendQueryPrepared(conn,
3385+
prep_stmt_entry->prep_name,
3386+
numParams,
3387+
values,
3388+
NULL,
3389+
NULL,
3390+
0))
33463391
pgfdw_report_error(ERROR, NULL, conn, false, dmstate->query);
3347-
// }
3348-
// else
3349-
// {
3350< F438 /code>-
// if (!PQsendQuery(dmstate->conn, dmstate->query))
3351-
// pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3352-
// }
33533392

33543393
/*
33553394
* Get the result, and check for success.

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,11 @@
1717
#include "commands/copy.h"
1818
#include "lib/stringinfo.h"
1919
#include "nodes/relation.h"
20+
#include "storage/latch.h"
2021
#include "utils/relcache.h"
2122

2223
#include "libpq-fe.h"
2324

24-
/*
25-
* Encapsulates connection to foreign server. Contents should be unknown
26-
* outside connection.c
27-
*/
28-
typedef struct ConnCacheEntry ConnCacheEntry;
29-
3025
/*
3126
* FDW-specific planner information kept in RelOptInfo.fdw_private for a
3227
* postgres_fdw foreign table. For a baserel, this struct is created by
@@ -117,6 +112,50 @@ typedef struct PgFdwRelationInfo
117112
int relation_index;
118113
} PgFdwRelationInfo;
119114

115+
/*
116+
* Connection cache hash table entry
117+
*
118+
* The lookup key in this hash table is the user mapping OID. We use just one
119+
* connection per user mapping ID, which ensures that all the scans use the
120+
* same snapshot during a query. Using the user mapping OID rather than
121+
* the foreign server OID + user OID avoids creating multiple connections when
122+
* the public user mapping applies to all user OIDs.
123+
*
124+
* The "conn" pointer can be NULL if we don't currently have a live connection.
125+
* When we do have a connection, xact_depth tracks the current depth of
126+
* transactions and subtransactions open on the remote side. We need to issue
127+
* commands at the same nesting depth on the remote as we're executing at
128+
* ourselves, so that rolling back a subtransaction will kill the right
129+
* queries and not the wrong ones.
130+
*/
131+
typedef Oid ConnCacheKey;
132+
133+
typedef struct ConnCacheEntry ConnCacheEntry;
134+
struct ConnCacheEntry
135+
{
136+
ConnCacheKey key; /* hash key (must be first) */
137+
PGconn *conn; /* connection to foreign server, or NULL */
138+
WaitEventSet *wait_set; /* for data from server ready notifications */
139+
/* Remaining fields are invalid when conn is NULL: */
140+
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
141+
* one level of subxact open, etc */
142+
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
143+
bool have_error; /* have any subxacts aborted in this xact? */
144+
bool changing_xact_state; /* xact state change in process */
145+
bool invalidated; /* true if reconnect is pending */
146+
uint32 server_hashvalue; /* hash value of foreign server OID */
147+
uint32 mapping_hashvalue; /* hash value of user mapping OID */
148+
bool copy_from_started; /* COPY FROM in progress on this conn */
149+
HTAB *dm_prepared; /* prepared statements for DirectModify */
150+
};
151+
152+
/* sql -> prepared statement hashtable */
153+
typedef struct DirectModifyPrepStmtHashEnt
154+
{
155+
char *sql; /* SQL of the statement, the key; arbitrary size */
156+
char prep_name[NAMEDATALEN]; /* name of prepared statement */
157+
} DirectModifyPrepStmtHashEnt;
158+
120159
/* in postgres_fdw.c */
121160
extern int set_transmission_modes(void);
122161
extern void reset_transmission_modes(int nestlevel);

0 commit comments

Comments
 (0)
0