14
14
*/
15
15
16
16
/* Some general headers for custom bgworker facility */
17
+ #include <unistd.h>
17
18
#include "postgres.h"
18
19
#include "fmgr.h"
19
20
#include "libpq-fe.h"
20
21
#include "pqexpbuffer.h"
21
22
#include "access/xact.h"
23
+ #include "access/transam.h"
22
24
#include "lib/stringinfo.h"
23
25
#include "pgstat.h"
24
26
#include "executor/spi.h"
@@ -50,7 +52,7 @@ static int receiver_idle_time = 1;
50
52
static bool receiver_sync_mode = true;
51
53
52
54
/* Worker name */
53
- static char * worker_name = "multimaster" ;
55
+ char worker_proc [ 16 ] ;
54
56
55
57
/* Lastly written positions */
56
58
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr ;
@@ -93,7 +95,7 @@ sendFeedback(PGconn *conn, int64 now)
93
95
ereport (LOG , (errmsg ("%s: confirming write up to %X/%X, "
94
96
"flush to %X/%X (slot custom_slot), "
95
97
"applied to %X/%X" ,
96
- worker_name ,
98
+ worker_proc ,
97
99
(uint32 ) (output_written_lsn >> 32 ),
98
100
(uint32 ) output_written_lsn ,
99
101
(uint32 ) (output_fsync_lsn >> 32 ),
@@ -119,7 +121,7 @@ sendFeedback(PGconn *conn, int64 now)
119
121
if (PQputCopyData (conn , replybuf , len ) <= 0 || PQflush (conn ))
120
122
{
121
123
ereport (LOG , (errmsg ("%s: could not send feedback packet: %s" ,
122
- worker_name , PQerrorMessage (conn ))));
124
+ worker_proc , PQerrorMessage (conn ))));
123
125
return false;
124
126
}
125
127
@@ -209,13 +211,15 @@ receiver_raw_main(Datum main_arg)
209
211
PQExpBuffer query ;
210
212
PGconn * conn ;
211
213
PGresult * res ;
214
+ TransactionId xid = InvalidTransactionId ;
212
215
bool insideTrans = false;
213
216
bool rollbackTransaction = false;
214
217
215
218
/* Register functions for SIGTERM/SIGHUP management */
216
219
pqsignal (SIGHUP , receiver_raw_sighup );
217
220
pqsignal (SIGTERM , receiver_raw_sigterm );
218
221
222
+ sprintf (worker_proc , "mm_recv_%d" , getpid ());
219
223
220
224
/* We're now ready to receive signals */
221
225
BackgroundWorkerUnblockSignals ();
@@ -229,13 +233,13 @@ receiver_raw_main(Datum main_arg)
229
233
{
230
234
PQfinish (conn );
231
235
ereport (ERROR , (errmsg ("%s: Could not establish connection to remote server" ,
232
- worker_name )));
236
+ worker_proc )));
233
237
proc_exit (1 );
234
238
}
235
239
236
240
query = createPQExpBuffer ();
237
241
238
- appendPQExpBuffer (query , "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"" , args -> receiver_slot , worker_name );
242
+ appendPQExpBuffer (query , "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"" , args -> receiver_slot , worker_proc );
239
243
res = PQexec (conn , query -> data );
240
244
if (PQresultStatus (res ) != PGRES_TUPLES_OK )
241
245
{
@@ -244,7 +248,7 @@ receiver_raw_main(Datum main_arg)
244
248
{
245
249
PQclear (res );
246
250
ereport (ERROR , (errmsg ("%s: Could not create logical slot" ,
247
- worker_name )));
251
+ worker_proc )));
248
252
proc_exit (1 );
249
253
}
250
254
}
@@ -259,7 +263,7 @@ receiver_raw_main(Datum main_arg)
259
263
{
260
264
PQclear (res );
261
265
ereport (LOG , (errmsg ("%s: Could not start logical replication" ,
262
- worker_name )));
266
+ worker_proc )));
263
267
proc_exit (1 );
264
268
}
265
269
PQclear (res );
@@ -284,13 +288,13 @@ receiver_raw_main(Datum main_arg)
284
288
/* Process config file */
285
289
ProcessConfigFile (PGC_SIGHUP );
286
290
got_sighup = false;
287
- ereport (LOG , (errmsg ("%s: processed SIGHUP" , worker_name )));
291
+ ereport (LOG , (errmsg ("%s: processed SIGHUP" , worker_proc )));
288
292
}
289
293
290
294
if (got_sigterm )
291
295
{
292
296
/* Simply exit */
293
- ereport (LOG , (errmsg ("%s: processed SIGTERM" , worker_name )));
297
+ ereport (LOG , (errmsg ("%s: processed SIGTERM" , worker_proc )));
294
298
proc_exit (0 );
295
299
}
296
300
@@ -342,15 +346,15 @@ receiver_raw_main(Datum main_arg)
342
346
walEnd = fe_recvint64 (& copybuf [pos ]);
343
347
ereport (LOG , (errmsg ("%s: keepalive message from server, "
344
348
"walEnd %X/%X, " ,
345
- worker_name ,
349
+ worker_proc ,
346
350
(uint32 ) (walEnd >> 32 ),
347
351
(uint32 ) walEnd )));
348
352
pos += 8 ; /* read walEnd */
349
353
pos += 8 ; /* skip sendTime */
350
354
if (rc < pos + 1 )
351
355
{
352
356
ereport (LOG , (errmsg ("%s: streaming header too small: %d" ,
353
- worker_name , rc )));
357
+ worker_proc , rc )));
354
358
proc_exit (1 );
355
359
}
356
360
replyRequested = copybuf [pos ];
@@ -378,7 +382,7 @@ receiver_raw_main(Datum main_arg)
378
382
else if (copybuf [0 ] != 'w' )
379
383
{
380
384
ereport (LOG , (errmsg ("%s: Incorrect streaming header" ,
381
- worker_name )));
385
+ worker_proc )));
382
386
proc_exit (1 );
383
387
}
384
388
@@ -392,14 +396,14 @@ receiver_raw_main(Datum main_arg)
392
396
if (rc < hdr_len + 1 )
393
397
{
394
398
ereport (LOG , (errmsg ("%s: Streaming header too small" ,
395
- worker_name )));
399
+ worker_proc )));
396
400
proc_exit (1 );
397
401
}
398
402
399
403
/* Log some useful information */
400
404
ereport (LOG , (errmsg ("%s: received from server, walStart %X/%X, "
401
405
"and walEnd %X/%X" ,
402
- worker_name ,
406
+ worker_proc ,
403
407
(uint32 ) (walStart >> 32 ),
404
408
(uint32 ) walStart ,
405
409
(uint32 ) (walEnd >> 32 ),
@@ -411,17 +415,13 @@ receiver_raw_main(Datum main_arg)
411
415
SetCurrentStatementStartTimestamp ();
412
416
413
417
if (strncmp (stmt , "BEGIN " , 6 ) == 0 ) {
414
- TransactionId xid ;
415
418
int rc = sscanf (stmt + 6 , "%u" , & xid );
416
419
Assert (rc == 1 );
417
420
Assert (!insideTrans );
418
421
SetCurrentStatementStartTimestamp ();
419
422
MMJoinTransaction (xid );
420
423
421
424
StartTransactionCommand ();
422
- BeginTransactionBlock ();
423
- CommitTransactionCommand ();
424
-
425
425
SPI_connect ();
426
426
PushActiveSnapshot (GetTransactionSnapshot ());
427
427
insideTrans = true;
@@ -431,19 +431,20 @@ receiver_raw_main(Datum main_arg)
431
431
insideTrans = false;
432
432
SPI_finish ();
433
433
PopActiveSnapshot ();
434
- StartTransactionCommand ();
435
434
if (rollbackTransaction ) {
436
- UserAbortTransactionBlock ();
437
- }
438
- PG_TRY ();
439
- {
440
- CommitTransactionCommand ();
441
- }
442
- PG_CATCH ();
443
- {
444
- elog (WARNING , "%s: Current transaction is aborted at receiver" , worker_name );
435
+ elog (WARNING , "%s: Rollback transaction %u" , worker_proc , xid );
436
+ AbortCurrentTransaction ();
437
+ } else {
438
+ PG_TRY ();
439
+ {
440
+ CommitTransactionCommand ();
441
+ }
442
+ PG_CATCH ();
443
+ {
444
+ elog (WARNING , "%s: Commit of transaction %u is failed" , worker_proc , xid );
445
+ }
446
+ PG_END_TRY ();
445
447
}
446
- PG_END_TRY ();
447
448
} else if (!rollbackTransaction ) {
448
449
Assert (insideTrans );
449
450
/* Execute query */
@@ -452,20 +453,20 @@ receiver_raw_main(Datum main_arg)
452
453
rc = SPI_execute (stmt , false, 0 );
453
454
if (rc == SPI_OK_INSERT )
454
455
ereport (LOG , (errmsg ("%s: INSERT received correctly: %s" ,
455
- worker_name , stmt )));
456
+ worker_proc , stmt )));
456
457
else if (rc == SPI_OK_UPDATE )
457
458
ereport (LOG , (errmsg ("%s: UPDATE received correctly: %s" ,
458
- worker_name , stmt )));
459
+ worker_proc , stmt )));
459
460
else if (rc == SPI_OK_DELETE )
460
461
ereport (LOG , (errmsg ("%s: DELETE received correctly: %s" ,
461
- worker_name , stmt )));
462
+ worker_proc , stmt )));
462
463
else
463
464
ereport (WARNING , (errmsg ("%s: Error when applying change: %s" ,
464
- worker_name , stmt )));
465
+ worker_proc , stmt )));
465
466
}
466
467
PG_CATCH ();
467
468
{
468
- elog (WARNING , "%s: %s failed at receiver " , worker_name , stmt );
469
+ elog (WARNING , "%s: %s failed in transaction %u " , worker_proc , stmt , xid );
469
470
rollbackTransaction = true;
470
471
}
471
472
PG_END_TRY ();
@@ -531,15 +532,15 @@ receiver_raw_main(Datum main_arg)
531
532
else if (r < 0 )
532
533
{
533
534
ereport (LOG , (errmsg ("%s: Incorrect status received... Leaving." ,
534
- worker_name )));
535
+ worker_proc )));
535
536
proc_exit (1 );
536
537
}
537
538
538
539
/* Else there is actually data on the socket */
539
540
if (PQconsumeInput (conn ) == 0 )
540
541
{
541
542
ereport (LOG , (errmsg ("%s: Data remaining on the socket... Leaving." ,
542
- worker_name )));
543
+ worker_proc )));
543
544
proc_exit (1 );
544
545
}
545
546
continue ;
@@ -549,15 +550,15 @@ receiver_raw_main(Datum main_arg)
549
550
if (rc == -1 )
550
551
{
551
552
ereport (LOG , (errmsg ("%s: COPY Stream has abruptly ended..." ,
552
- worker_name )));
553
+ worker_proc )));
553
554
break ;
554
555
}
555
556
556
557
/* Failure when reading copy stream, leave */
557
558
if (rc == -2 )
558
559
{
559
560
ereport (LOG , (errmsg ("%s: Failure while receiving changes..." ,
560
- worker_name )));
561
+ worker_proc )));
561
562
proc_exit (1 );
562
563
}
563
564
}
@@ -608,4 +609,9 @@ int MMStartReceivers(char* conns, int node_id)
608
609
worker .bgw_main_arg = (Datum )ctx ;
609
610
RegisterBackgroundWorker (& worker );
610
611
}
611
- con
612
+ conn_str = p + 1 ;
613
+ }
614
+
615
+ return i ;
616
+ }
617
+
0 commit comments