19
19
use Symfony \Component \Console \Input \InputInterface ;
20
20
use Symfony \Component \Console \Input \InputOption ;
21
21
use Symfony \Component \Console \Output \OutputInterface ;
22
+ use Symfony \Component \Console \Question \ChoiceQuestion ;
22
23
use Symfony \Component \Console \Style \SymfonyStyle ;
23
24
use Symfony \Component \Messenger \RoutableMessageBus ;
24
25
use Symfony \Component \Messenger \Worker ;
@@ -70,7 +71,7 @@ protected function configure(): void
70
71
71
72
$ this
72
73
->setDefinition ([
73
- new InputArgument ('receiver ' , $ defaultReceiverName ? InputArgument::OPTIONAL : InputArgument:: REQUIRED , 'Name of the receiver ' , $ defaultReceiverName ),
74
+ new InputArgument ('receivers ' , InputArgument::IS_ARRAY , 'Names of the receivers/transports to consume in order of priority ' , $ defaultReceiverName ? [ $ defaultReceiverName ] : [] ),
74
75
new InputOption ('limit ' , 'l ' , InputOption::VALUE_REQUIRED , 'Limit the number of received messages ' ),
75
76
new InputOption ('memory-limit ' , 'm ' , InputOption::VALUE_REQUIRED , 'The memory limit the worker can consume ' ),
76
77
new InputOption ('time-limit ' , 't ' , InputOption::VALUE_REQUIRED , 'The time limit in seconds the worker can run ' ),
@@ -83,6 +84,10 @@ protected function configure(): void
83
84
84
85
<info>php %command.full_name% <receiver-name></info>
85
86
87
+ To receive from multiple transports, pass each name:
88
+
89
+ <info>php %command.full_name% receiver1 receiver2</info>
90
+
86
91
Use the --limit option to limit the number of messages received:
87
92
88
93
<info>php %command.full_name% <receiver-name> --limit=10</info>
@@ -112,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
112
117
{
113
118
$ io = new SymfonyStyle ($ input , $ output instanceof ConsoleOutputInterface ? $ output ->getErrorOutput () : $ output );
114
119
115
- if ($ this ->receiverNames && !$ this ->receiverLocator ->has ($ receiverName = $ input ->getArgument ('receiver ' ))) {
116
- if (null === $ receiverName ) {
117
- $ io ->block ('Missing receiver argument. ' , null , 'error ' , ' ' , true );
118
- $ input ->setArgument ('receiver ' , $ io ->choice ('Select one of the available receivers ' , $ this ->receiverNames ));
119
- } elseif ($ alternatives = $ this ->findAlternatives ($ receiverName , $ this ->receiverNames )) {
120
- $ io ->block (sprintf ('Receiver "%s" is not defined. ' , $ receiverName ), null , 'error ' , ' ' , true );
121
- if ($ io ->confirm (sprintf ('Do you want to receive from "%s" instead? ' , $ alternatives [0 ]), false )) {
122
- $ input ->setArgument ('receiver ' , $ alternatives [0 ]);
123
- }
120
+ if ($ this ->receiverNames && 0 === \count ($ input ->getArgument ('receivers ' ))) {
121
+ $ io ->block ('Which transports/receivers do you want to consume? ' , null , 'fg=white;bg=blue ' , ' ' , true );
122
+
123
+ $ io ->writeln ('Choose which receivers you want to consume messages from in order of priority. ' );
124
+ if (\count ($ this ->receiverNames ) > 1 ) {
125
+ $ io ->writeln (sprintf ('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment> ' , implode (', ' , $ this ->receiverNames )));
124
126
}
127
+
128
+ $ question = new ChoiceQuestion ('Select receivers to consume: ' , $ this ->receiverNames , 0 );
129
+ $ question ->setMultiselect (true );
130
+
131
+ $ input ->setArgument ('receivers ' , $ io ->askQuestion ($ question ));
132
+ }
133
+
134
+ if (0 === \count ($ input ->getArgument ('receivers ' ))) {
135
+ throw new RuntimeException ('Please pass at least one receiver. ' );
125
136
}
126
137
}
127
138
@@ -136,24 +147,33 @@ protected function execute(InputInterface $input, OutputInterface $output): void
136
147
$ output ->writeln (sprintf ('<comment>%s</comment> ' , $ message ));
137
148
}
138
149
139
- if (!$ this ->receiverLocator ->has ($ receiverName = $ input ->getArgument ('receiver ' ))) {
140
- throw new RuntimeException (sprintf ('Receiver "%s" does not exist. ' , $ receiverName ));
141
- }
150
+ $ receivers = [];
151
+ $ retryStrategies = [];
152
+ foreach ($ receiverNames = $ input ->getArgument ('receivers ' ) as $ receiverName ) {
153
+ if (!$ this ->receiverLocator ->has ($ receiverName )) {
154
+ $ message = sprintf ('The receiver "%s" does not exist. ' , $ receiverName );
155
+ if ($ this ->receiverNames ) {
156
+ $ message .= sprintf (' Valid receivers are: %s. ' , implode (', ' , $ this ->receiverNames ));
157
+ }
142
158
143
- if (null !== $ this ->retryStrategyLocator && !$ this ->retryStrategyLocator ->has ($ receiverName )) {
144
- throw new RuntimeException (sprintf ('Receiver "%s" does not have a configured retry strategy. ' , $ receiverName ));
145
- }
159
+ throw new RuntimeException ($ message );
160
+ }
146
161
147
- $ receiver = $ this ->receiverLocator ->get ($ receiverName );
148
- $ retryStrategy = null !== $ this ->retryStrategyLocator ? $ this ->retryStrategyLocator ->get ($ receiverName ) : null ;
162
+ if (null !== $ this ->retryStrategyLocator && !$ this ->retryStrategyLocator ->has ($ receiverName )) {
163
+ throw new RuntimeException (sprintf ('Receiver "%s" does not have a configured retry strategy. ' , $ receiverName ));
164
+ }
165
+
166
+ $ receivers [$ receiverName ] = $ this ->receiverLocator ->get ($ receiverName );
167
+ $ retryStrategies [$ receiverName ] = null !== $ this ->retryStrategyLocator ? $ this ->retryStrategyLocator ->get ($ receiverName ) : null ;
168
+ }
149
169
150
170
if (null !== $ input ->getOption ('bus ' )) {
151
171
$ bus = $ this ->busLocator ->get ($ input ->getOption ('bus ' ));
152
172
} else {
153
173
$ bus = new RoutableMessageBus ($ this ->busLocator );
154
174
}
155
175
156
- $ worker = new Worker ($ receiver , $ bus , $ receiverName , $ retryStrategy , $ this ->eventDispatcher , $ this ->logger );
176
+ $ worker = new Worker ($ receivers , $ bus , $ retryStrategies , $ this ->eventDispatcher , $ this ->logger );
157
177
$ stopsWhen = [];
158
178
if ($ limit = $ input ->getOption ('limit ' )) {
159
179
$ stopsWhen [] = "processed {$ limit } messages " ;
@@ -171,7 +191,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
171
191
}
172
192
173
193
$ io = new SymfonyStyle ($ input , $ output instanceof ConsoleOutputInterface ? $ output ->getErrorOutput () : $ output );
174
- $ io ->success (sprintf ('Consuming messages from transport "%s". ' , $ receiverName ));
194
+ $ io ->success (sprintf ('Consuming messages from transport%s "%s". ' , \count ( $ receivers ) > 0 ? ' s ' : '' , implode ( ' , ' , $ receiverNames ) ));
175
195
176
196
if ($ stopsWhen ) {
177
197
$ last = array_pop ($ stopsWhen );
@@ -186,7 +206,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
186
206
}
187
207
188
208
$ worker ->run ([
189
- 'sleep ' => $ input ->getOption ('sleep ' ) * 1000000
209
+ 'sleep ' => $ input ->getOption ('sleep ' ) * 1000000 ,
190
210
]);
191
211
}
192
212
0 commit comments