17
17
#include "access/htup_details.h"
18
18
#include "catalog/pg_user_mapping.h"
19
19
#include "access/global_snapshot.h"
20
+ #include "access/hash.h"
20
21
#include "access/xact.h"
21
22
#include "access/xtm.h"
22
23
#include "access/transam.h"
33
34
#include "utils/memutils.h"
34
35
#include "utils/syscache.h"
35
36
36
- /*
37
- * Connection cache hash table entry
38
- *
39
- * The lookup key in this hash table is the user mapping OID. We use just one
40
- * connection per user mapping ID, which ensures that all the scans use the
41
- * same snapshot during a query. Using the user mapping OID rather than
42
- * the foreign server OID + user OID avoids creating multiple connections when
43
- * the public user mapping applies to all user OIDs.
44
- *
45
- * The "conn" pointer can be NULL if we don't currently have a live connection.
46
- * When we do have a connection, xact_depth tracks the current depth of
47
- * transactions and subtransactions open on the remote side. We need to issue
48
- * commands at the same nesting depth on the remote as we're executing at
49
- * ourselves, so that rolling back a subtransaction will kill the right
50
- * queries and not the wrong ones.
51
- */
52
- typedef Oid ConnCacheKey ;
53
-
54
- struct ConnCacheEntry
55
- {
56
- ConnCacheKey key ; /* hash key (must be first) */
57
- PGconn * conn ; /* connection to foreign server, or NULL */
58
- WaitEventSet * wait_set ; /* for data from server ready notifications */
59
- /* Remaining fields are invalid when conn is NULL: */
60
- int xact_depth ; /* 0 = no xact open, 1 = main xact open, 2 =
61
- * one level of subxact open, etc */
62
- bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
63
- bool have_error ; /* have any subxacts aborted in this xact? */
64
- bool changing_xact_state ; /* xact state change in process */
65
- bool invalidated ; /* true if reconnect is pending */
66
- uint32 server_hashvalue ; /* hash value of foreign server OID */
67
- uint32 mapping_hashvalue ; /* hash value of user mapping OID */
68
- bool copy_from_started ; /* COPY FROM in progress on this conn */
69
- };
70
-
71
37
/*
72
38
* Connection cache (initialized on first use)
73
39
*/
@@ -117,6 +83,35 @@ static bool pgfdw_exec_cleanup_query(ConnCacheEntry *entry, const char *query,
117
83
bool ignore_errors );
118
84
static bool pgfdw_get_cleanup_result (ConnCacheEntry * entry , TimestampTz endtime ,
119
85
PGresult * * result );
86
+ static void cleanup_dm_prepared (ConnCacheEntry * entry );
87
+
88
+ /* Adapted from string_hash */
89
+ static uint32
90
+ char_ptr_hash_fn (const void * key , Size keysize )
91
+ {
92
+ char * const * keyptr = key ;
93
+ return DatumGetUInt32 (hash_any ((const unsigned char * ) (* keyptr ), strlen (* keyptr )));
94
+ }
95
+
96
+ static int
97
+ char_ptr_match_fn (const void * key1 , const void * key2 , Size keysize )
98
+ {
99
+ char * const * keyptr1 = key1 ;
100
+ char * const * keyptr2 = key2 ;
101
+ return strcmp (* keyptr1 , * keyptr2 );
102
+ }
103
+
104
+ /* Allocate always from top-level, where hashtable lives */
105
+ static void *
106
+ char_ptr_keycopy_fn (void * dest , const void * src , Size keysize )
107
+ {
108
+ char * * destptr = dest ;
109
+ char * const * srcptr = src ;
110
+
111
+ * destptr = MemoryContextStrdup (CacheMemoryContext , * srcptr );
112
+ return NULL ; /* not used */
113
+ }
114
+
120
115
/*
121
116
* Get a ConnCacheEntry which can be used to execute queries on the remote PostgreSQL
122
117
* server with the user's authorization. A new connection is established
@@ -208,6 +203,7 @@ GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
208
203
if (entry -> conn == NULL )
209
204
{
210
205
ForeignServer * server = GetForeignServer (user -> serverid );
206
+ HASHCTL ctl ;
211
207
212
208
/* Reset all transient state fields, to be sure all are clean */
213
209
entry -> xact_depth = 0 ;
@@ -226,6 +222,19 @@ GetConnectionCopyFrom(UserMapping *user, bool will_prep_stmt,
226
222
/* Now try to make the connection */
227
223
connect_pg_server (entry , server , user );
228
224
225
+ /* Create hash table of prepared statemetns for DirectModify */
226
+ ctl .keysize = sizeof (char * );
227
+ ctl .entrysize = sizeof (DirectModifyPrepStmtHashEnt );
228
+ ctl .hash = char_ptr_hash_fn ;
229
+ ctl .match = char_ptr_match_fn ;
230
+ ctl .keycopy = char_ptr_keycopy_fn ;
231
+ ctl .hcxt = CacheMemoryContext ;
232
+
233
+ entry -> dm_prepared = hash_create ("DirectModify prepared stmts" ,
234
+ 16 , & ctl ,
235
+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE |
236
+ HASH_KEYCOPY | HASH_CONTEXT );
237
+
229
238
elog (DEBUG3 , "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)" ,
230
239
entry -> conn , server -> servername , user -> umid , user -> userid );
231
240
}
@@ -364,6 +373,8 @@ disconnect_pg_server(ConnCacheEntry *entry)
364
373
{
365
374
if (entry -> conn != NULL )
366
375
{
376
+ cleanup_dm_prepared (entry );
377
+ hash_destroy (entry -> dm_prepared );
367
378
Assert (entry -> wait_set );
368
379
FreeWaitEventSet (entry -> wait_set );
369
380
entry -> wait_set = NULL ;
@@ -1021,6 +1032,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
1021
1032
entry -> have_error = false;
1022
1033
}
1023
1034
1035
+ /* We have deallocated all prepared statements */
1036
+ cleanup_dm_prepared (entry );
1037
+
1024
1038
/* Disarm changing_xact_state if it all worked. */
1025
1039
entry -> changing_xact_state = abort_cleanup_failure ;
1026
1040
break ;
@@ -1118,11 +1132,27 @@ deallocate_prepared_stmts(ConnCacheEntry *entry)
1118
1132
{
1119
1133
res = PQexec (entry -> conn , "DEALLOCATE ALL" );
1120
1134
PQclear (res );
1135
+ cleanup_dm_prepared (entry );
1121
1136
}
1122
1137
entry -> have_prep_stmt = false;
1123
1138
entry -> have_error = false;
1124
1139
}
1125
1140
1141
+ static void cleanup_dm_prepared (ConnCacheEntry * entry )
1142
+ {
1143
+ HASH_SEQ_STATUS scan ;
1144
+ DirectModifyPrepStmtHashEnt * prep_stmt_entry ;
1145
+
1146
+ hash_seq_init (& scan , entry -> dm_prepared );
1147
+ while ((prep_stmt_entry = (DirectModifyPrepStmtHashEnt * ) hash_seq_search (& scan )))
1148
+ {
1149
+ /* save the key to free it */
1150
+ char * sql = prep_stmt_entry -> sql ;
1151
+ hash_search (entry -> dm_prepared , & prep_stmt_entry -> sql , HASH_REMOVE , NULL );
1152
+ pfree (sql );
1153
+ }
1154
+ }
1155
+
1126
1156
/*
1127
1157
* pgfdw_subxact_callback --- cleanup at subtransaction end.
1128
1158
*/
0 commit comments