8000 Merge branch 'PGPRO-4561-ds' · postgrespro/pg_query_state@8068a40 · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit 8068a40

Browse files
author
Daniel Shelepanov
committed
Merge branch 'PGPRO-4561-ds'
2 parents c40bbd0 + a4f4cd7 commit 8068a40

File tree

3 files changed

+72
-8
lines changed

3 files changed

+72
-8
lines changed

pg_query_state.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1141,7 +1141,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
11411141
MAX_RCV_TIMEOUT);
11421142
if (mq_receive_result != SHM_MQ_SUCCESS)
11431143
{
1144-
/* counterpart is died, not consider it */
1144+
/* counterpart is dead, not considering it */
11451145
goto mq_error;
11461146
}
11471147
if (msg->reqid != reqid)

pg_query_state.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#define QUEUE_SIZE (16 * 1024)
2121
#define MSG_MAX_SIZE 1024
22+
#define WRITING_DELAY (100 * 1000) /* 100ms */
23+
#define NUM_OF_ATTEMPTS 6
2224

2325
#define TIMINIG_OFF_WARNING 1
2426
#define BUFFERS_OFF_WARNING 2

signal_handler.c

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,16 @@ typedef struct
2727
char *plan;
2828
} stack_frame;
2929

30-
static void send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data);
30+
/*
31+
* An self-explanarory enum describing the send_msg_by_parts results
32+
*/
33+
typedef enum
34+
{
35+
MSG_BY_PARTS_SUCCEEDED,
36+
MSG_BY_PARTS_FAILED
37+
} msg_by_parts_result;
38+
39+
static msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data);
3140

3241
/*
3342
* Get List of stack_frames as a stack of function calls starting from outermost call.
@@ -151,22 +160,57 @@ serialize_stack(char *dest, List *qs_stack)
151160
}
152161
}
153162

154-
static void
163+
static msg_by_parts_result
164+
shm_mq_send_nonblocking(shm_mq_handle *mqh, Size nbytes, const void *data, Size attempts)
165+
{
166+
int i;
167+
shm_mq_result res;
168+
169+
for(i = 0; i < attempts; i++)
170+
{
171+
res = shm_mq_send(mqh, nbytes, data, true);
172+
173+
if(res == SHM_MQ_SUCCESS)
174+
break;
175+
else if (res == SHM_MQ_DETACHED)
176+
return MSG_BY_PARTS_FAILED;
177+
178+
/* SHM_MQ_WOULD_BLOCK - sleeping for some delay */
179+
pg_usleep(WRITING_DELAY);
180+
}
181+
182+
if(i == attempts)
183+
return MSG_BY_PARTS_FAILED;
184+
185+
return MSG_BY_PARTS_SUCCEEDED;
186+
}
187+
188+
/*
189+
* send_msg_by_parts sends data through the queue as a bunch of messages
190+
* of smaller size
191+
*/
192+
static msg_by_parts_result
155193
send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data)
156194
{
157195
int bytes_left;
158196
int bytes_send;
159197
int offset;
160198

161199
/* Send the expected message length */
162-
shm_mq_send(mqh, sizeof(Size), &nbytes, false);
200+
if(shm_mq_send_nonblocking(mqh, sizeof(Size), &nbytes, NUM_OF_ATTEMPTS) == MSG_BY_PARTS_FAILED)
201+
return MSG_BY_PARTS_FAILED;
163202

203+
/* Send the message itself */
164204
for (offset = 0; offset < nbytes; offset += bytes_send)
165205
{
166206
bytes_left = nbytes - offset;
167207
bytes_send = (bytes_left < MSG_MAX_SIZE) ? bytes_left : MSG_MAX_SIZE;
168-
shm_mq_send(mqh, bytes_send, &(((unsigned char*)data)[offset]), false);
208+
if(shm_mq_send_nonblocking(mqh, bytes_send, &(((unsigned char*)data)[offset]), NUM_OF_ATTEMPTS)
209+
== MSG_BY_PARTS_FAILED)
210+
return MSG_BY_PARTS_FAILED;
169211
}
212+
213+
return MSG_BY_PARTS_SUCCEEDED;
170214
}
171215

172216
/*
@@ -227,15 +271,17 @@ SendQueryState(void)
227271
{
228272
shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, STAT_DISABLED };
229273

230-
send_msg_by_parts(mqh, msg.length, &msg);
274+
if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED)
275+
goto connection_cleanup;
231276
}
232277

233278
/* check if backend doesn't execute any query */
234279
else if (list_length(QueryDescStack) == 0)
235280
{
236281
shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, QUERY_NOT_RUNNING };
237282

238-
send_msg_by_parts(mqh, msg.length, &msg);
283+
if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED)
284+
goto connection_cleanup;
239285
}
240286

241287
/* happy path */
@@ -258,9 +304,25 @@ SendQueryState(void)
258304

259305
msg->stack_depth = list_length(qs_stack);
260306
serialize_stack(msg->stack, qs_stack);
261-
send_msg_by_parts(mqh, msglen, msg);
307+
308+
if(send_msg_by_parts(mqh, msglen, msg) != MSG_BY_PARTS_SUCCEEDED)
309+
{
310+
elog(WARNING, "pg_query_state: peer seems to have detached");
311+
goto connection_cleanup;
312+
}
262313
}
263314
elog(DEBUG1, "Worker %d sends response for pg_query_state to %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid);
264315
DetachPeer();
265316
UnlockShmem(&tag);
317+
318+
return;
319+
320+
connection_cleanup:
321+
#if PG_VERSION_NUM < 100000
322+
shm_mq_detach(mq);
323+
#else
324+
shm_mq_detach(mqh);
325+
#endif
326+
DetachPeer();
327+
UnlockShmem(&tag);
266328
}

0 commit comments

Comments
 (0)
0