@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
7979 Oid relid ;
8080} LogicalRepWorkerId ;
8181
82- static List * on_commit_stop_workers = NIL ;
82+ typedef struct StopWorkersData
83+ {
84+ int nestDepth ; /* Sub-transaction nest level */
85+ List * workers ; /* List of LogicalRepWorkerId */
86+ struct StopWorkersData * parent ; /* This need not be an immediate
87+ * subtransaction parent */
88+ } StopWorkersData ;
89+
90+ /*
91+ * Stack of StopWorkersData elements. Each stack element contains the workers
92+ * to be stopped for that subtransaction.
93+ */
94+ static StopWorkersData * on_commit_stop_workers = NULL ;
8395
8496static void ApplyLauncherWakeup (void );
8597static void logicalrep_launcher_onexit (int code , Datum arg );
@@ -559,17 +571,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
559571void
560572logicalrep_worker_stop_at_commit (Oid subid , Oid relid )
561573{
574+ int nestDepth = GetCurrentTransactionNestLevel ();
562575 LogicalRepWorkerId * wid ;
563576 MemoryContext oldctx ;
564577
565578 /* Make sure we store the info in context that survives until commit. */
566579 oldctx = MemoryContextSwitchTo (TopTransactionContext );
567580
581+ /* Check that previous transactions were properly cleaned up. */
582+ Assert (on_commit_stop_workers == NULL ||
583+ nestDepth >= on_commit_stop_workers -> nestDepth );
584+
585+ /*
586+ * Push a new stack element if we don't already have one for the current
587+ * nestDepth.
588+ */
589+ if (on_commit_stop_workers == NULL ||
590+ nestDepth > on_commit_stop_workers -> nestDepth )
591+ {
592+ StopWorkersData * newdata = palloc (sizeof (StopWorkersData ));
593+
594+ newdata -> nestDepth = nestDepth ;
595+ newdata -> workers = NIL ;
596+ newdata -> parent = on_commit_stop_workers ;
597+ on_commit_stop_workers = newdata ;
598+ }
599+
600+ /*
601+ * Finally add a new worker into the worker list of the current
602+ * subtransaction.
603+ */
568604 wid = palloc (sizeof (LogicalRepWorkerId ));
569605 wid -> subid = subid ;
570606 wid -> relid = relid ;
571-
572- on_commit_stop_workers = lappend (on_commit_stop_workers , wid );
607+ on_commit_stop_workers -> workers =
608+ lappend (on_commit_stop_workers -> workers , wid );
573609
574610 MemoryContextSwitchTo (oldctx );
575611}
@@ -823,7 +859,7 @@ ApplyLauncherShmemInit(void)
823859bool
824860XactManipulatesLogicalReplicationWorkers (void )
825861{
826- return (on_commit_stop_workers != NIL );
862+ return (on_commit_stop_workers != NULL );
827863}
828864
829865/*
@@ -832,15 +868,25 @@ XactManipulatesLogicalReplicationWorkers(void)
832868void
833869AtEOXact_ApplyLauncher (bool isCommit )
834870{
871+
872+ Assert (on_commit_stop_workers == NULL ||
873+ (on_commit_stop_workers -> nestDepth == 1 &&
874+ on_commit_stop_workers -> parent == NULL ));
875+
835876 if (isCommit )
836877 {
837878 ListCell * lc ;
838879
839- foreach ( lc , on_commit_stop_workers )
880+ if ( on_commit_stop_workers != NULL )
840881 {
841- LogicalRepWorkerId * wid = lfirst (lc );
882+ List * workers = on_commit_stop_workers -> workers ;
883+
884+ foreach (lc , workers )
885+ {
886+ LogicalRepWorkerId * wid = lfirst (lc );
842887
843- logicalrep_worker_stop (wid -> subid , wid -> relid );
888+ logicalrep_worker_stop (wid -> subid , wid -> relid );
889+ }
844890 }
845891
846892 if (on_commit_launcher_wakeup )
@@ -851,10 +897,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
851897 * No need to pfree on_commit_stop_workers. It was allocated in
852898 * transaction memory context, which is going to be cleaned soon.
853899 */
854- on_commit_stop_workers = NIL ;
900+ on_commit_stop_workers = NULL ;
855901 on_commit_launcher_wakeup = false;
856902}
857903
904+ /*
905+ * On commit, merge the current on_commit_stop_workers list into the
906+ * immediate parent, if present.
6D38
907+ * On rollback, discard the current on_commit_stop_workers list.
908+ * Pop out the stack.
909+ */
910+ void
911+ AtEOSubXact_ApplyLauncher (bool isCommit , int nestDepth )
912+ {
913+ StopWorkersData * parent ;
914+
915+ /* Exit immediately if there's no work to do at this level. */
916+ if (on_commit_stop_workers == NULL ||
917+ on_commit_stop_workers -> nestDepth < nestDepth )
918+ return ;
919+
920+ Assert (on_commit_stop_workers -> nestDepth == nestDepth );
921+
922+ parent = on_commit_stop_workers -> parent ;
923+
924+ if (isCommit )
925+ {
926+ /*
927+ * If the upper stack element is not an immediate parent
928+ * subtransaction, just decrement the notional nesting depth without
929+ * doing any real work. Else, we need to merge the current workers
930+ * list into the parent.
931+ */
932+ if (!parent || parent -> nestDepth < nestDepth - 1 )
933+ {
934+ on_commit_stop_workers -> nestDepth -- ;
935+ return ;
936+ }
937+
938+ parent -> workers =
939+ list_concat (parent -> workers , on_commit_stop_workers -> workers );
940+ }
941+ else
942+ {
943+ /*
944+ * Abandon everything that was done at this nesting level. Explicitly
945+ * free memory to avoid a transaction-lifespan leak.
946+ */
947+ list_free_deep (on_commit_stop_workers -> workers );
948+ }
949+
950+ /*
951+ * We have taken care of the current subtransaction workers list for both
952+ * abort or commit. So we are ready to pop the stack.
953+ */
954+ pfree (on_commit_stop_workers );
955+ on_commit_stop_workers = parent ;
956+ }
957+
858958/*
859959 * Request wakeup of the launcher on commit of the transaction.
860960 *
0 commit comments