@@ -27,7 +27,16 @@ typedef struct
27
27
char * plan ;
28
28
} stack_frame ;
29
29
30
- static void send_msg_by_parts (shm_mq_handle * mqh , Size nbytes , const void * data );
30
+ /*
31
+ * An self-explanarory enum describing the send_msg_by_parts results
32
+ */
33
+ typedef enum
34
+ {
35
+ MSG_BY_PARTS_SUCCEEDED ,
36
+ MSG_BY_PARTS_FAILED
37
+ } msg_by_parts_result ;
38
+
39
+ static msg_by_parts_result send_msg_by_parts (shm_mq_handle * mqh , Size nbytes , const void * data );
31
40
32
41
/*
33
42
* Get List of stack_frames as a stack of function calls starting from outermost call.
@@ -151,22 +160,57 @@ serialize_stack(char *dest, List *qs_stack)
151
160
}
152
161
}
153
162
154
- static void
163
+ static msg_by_parts_result
164
+ shm_mq_send_nonblocking (shm_mq_handle * mqh , Size nbytes , const void * data , Size attempts )
165
+ {
166
+ int i ;
167
+ shm_mq_result res ;
168
+
169
+ for (i = 0 ; i < attempts ; i ++ )
170
+ {
171
+ res = shm_mq_send (mqh , nbytes , data , true);
172
+
173
+ if (res == SHM_MQ_SUCCESS )
174
+ break ;
175
+ else if (res == SHM_MQ_DETACHED )
176
+ return MSG_BY_PARTS_FAILED ;
177
+
178
+ /* SHM_MQ_WOULD_BLOCK - sleeping for some delay */
179
+ pg_usleep (WRITING_DELAY );
180
+ }
181
+
182
+ if (i == attempts )
183
+ return MSG_BY_PARTS_FAILED ;
184
+
185
+ return MSG_BY_PARTS_SUCCEEDED ;
186
+ }
187
+
188
+ /*
189
+ * send_msg_by_parts sends data through the queue as a bunch of messages
190
+ * of smaller size
191
+ */
192
+ static msg_by_parts_result
155
193
send_msg_by_parts (shm_mq_handle * mqh , Size nbytes , const void * data )
156
194
{
157
195
int bytes_left ;
158
196
int bytes_send ;
159
197
int offset ;
160
198
161
199
/* Send the expected message length */
162
- shm_mq_send (mqh , sizeof (Size ), & nbytes , false);
200
+ if (shm_mq_send_nonblocking (mqh , sizeof (Size ), & nbytes , NUM_OF_ATTEMPTS ) == MSG_BY_PARTS_FAILED )
201
+ return MSG_BY_PARTS_FAILED ;
163
202
203
+ /* Send the message itself */
164
204
for (offset = 0 ; offset < nbytes ; offset += bytes_send )
165
205
{
166
206
bytes_left = nbytes - offset ;
167
207
bytes_send = (bytes_left < MSG_MAX_SIZE ) ? bytes_left : MSG_MAX_SIZE ;
168
- shm_mq_send (mqh , bytes_send , & (((unsigned char * )data )[offset ]), false);
208
+ if (shm_mq_send_nonblocking (mqh , bytes_send , & (((unsigned char * )data )[offset ]), NUM_OF_ATTEMPTS )
209
+ == MSG_BY_PARTS_FAILED )
210
+ return MSG_BY_PARTS_FAILED ;
169
211
}
212
+
213
+ return MSG_BY_PARTS_SUCCEEDED ;
170
214
}
171
215
172
216
/*
@@ -227,15 +271,17 @@ SendQueryState(void)
227
271
{
228
272
shm_mq_msg msg = { reqid , BASE_SIZEOF_SHM_MQ_MSG , MyProc , STAT_DISABLED };
229
273
230
- send_msg_by_parts (mqh , msg .length , & msg );
274
+ if (send_msg_by_parts (mqh , msg .length , & msg ) != MSG_BY_PARTS_SUCCEEDED )
275
+ goto connection_cleanup ;
231
276
}
232
277
233
278
/* check if backend doesn't execute any query */
234
279
else if (list_length (QueryDescStack ) == 0 )
235
280
{
236
281
shm_mq_msg msg = { reqid , BASE_SIZEOF_SHM_MQ_MSG , MyProc , QUERY_NOT_RUNNING };
237
282
238
- send_msg_by_parts (mqh , msg .length , & msg );
283
+ if (send_msg_by_parts (mqh , msg .length , & msg ) != MSG_BY_PARTS_SUCCEEDED )
284
+ goto connection_cleanup ;
239
285
}
240
286
241
287
/* happy path */
@@ -258,9 +304,25 @@ SendQueryState(void)
258
304
259
305
msg -> stack_depth = list_length (qs_stack );
260
306
serialize_stack (msg -> stack , qs_stack );
261
- send_msg_by_parts (mqh , msglen , msg );
307
+
308
+ if (send_msg_by_parts (mqh , msglen , msg ) != MSG_BY_PARTS_SUCCEEDED )
309
+ {
310
+ elog (WARNING , "pg_query_state: peer seems to have detached" );
311
+ goto connection_cleanup ;
312
+ }
262
313
}
263
314
elog (DEBUG1 , "Worker %d sends response for pg_query_state to %d" , shm_mq_get_sender (mq )-> pid , shm_mq_get_receiver (mq )-> pid );
264
315
DetachPeer ();
265
316
UnlockShmem (& tag );
317
+
318
+ return ;
319
+
320
+ connection_cleanup :
321
+ #if PG_VERSION_NUM < 100000
322
+ shm_mq_detach (mq );
323
+ #else
324
+ shm_mq_detach (mqh );
325
+ #endif
326
+ DetachPeer ();
327
+ UnlockShmem (& tag );
266
328
}
0 commit comments