19
19
use Symfony \Component \Console \Input \InputInterface ;
20
20
use Symfony \Component \Console \Input \InputOption ;
21
21
use Symfony \Component \Console \Output \OutputInterface ;
22
+ use Symfony \Component \Console \Question \ChoiceQuestion ;
22
23
use Symfony \Component \Console \Style \SymfonyStyle ;
23
24
use Symfony \Component \Messenger \RoutableMessageBus ;
24
- use Symfony \Component \Messenger \Transport \Receiver \StopWhenMemoryUsageIsExceededReceiver ;
25
- use Symfony \Component \Messenger \Transport \Receiver \StopWhenMessageCountIsExceededReceiver ;
26
- use Symfony \Component \Messenger \Transport \Receiver \StopWhenTimeLimitIsReachedReceiver ;
27
25
use Symfony \Component \Messenger \Worker ;
26
+ use Symfony \Component \Messenger \Worker \StopWhenMemoryUsageIsExceededWorker ;
27
+ use Symfony \Component \Messenger \Worker \StopWhenMessageCountIsExceededWorker ;
28
+ use Symfony \Component \Messenger \Worker \StopWhenTimeLimitIsReachedWorker ;
28
29
use Symfony \Contracts \EventDispatcher \EventDispatcherInterface ;
29
30
30
31
/**
@@ -70,10 +71,11 @@ protected function configure(): void
70
71
71
72
$ this
72
73
->setDefinition ([
73
- new InputArgument ('receiver ' , $ defaultReceiverName ? InputArgument::OPTIONAL : InputArgument:: REQUIRED , 'Name of the receiver ' , $ defaultReceiverName ),
74
+ new InputArgument ('receivers ' , InputArgument::IS_ARRAY , 'Names of the receivers/transports to consume in order of priority ' , $ defaultReceiverName ? [ $ defaultReceiverName ] : [] ),
74
75
new InputOption ('limit ' , 'l ' , InputOption::VALUE_REQUIRED , 'Limit the number of received messages ' ),
75
76
new InputOption ('memory-limit ' , 'm ' , InputOption::VALUE_REQUIRED , 'The memory limit the worker can consume ' ),
76
77
new InputOption ('time-limit ' , 't ' , InputOption::VALUE_REQUIRED , 'The time limit in seconds the worker can run ' ),
78
+ new InputOption ('sleep ' , null , InputOption::VALUE_REQUIRED , 'Seconds to sleep before asking for new messages after no messages were found ' , 1 ),
77
79
new InputOption ('bus ' , 'b ' , InputOption::VALUE_REQUIRED , 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically. ' ),
78
80
])
79
81
->setDescription ('Consumes messages ' )
@@ -82,6 +84,10 @@ protected function configure(): void
82
84
83
85
<info>php %command.full_name% <receiver-name></info>
84
86
87
+ To receive from multiple transports, pass each name:
88
+
89
+ <info>php %command.full_name% receiver1 receiver2</info>
90
+
85
91
Use the --limit option to limit the number of messages received:
86
92
87
93
<info>php %command.full_name% <receiver-name> --limit=10</info>
@@ -111,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
111
117
{
112
118
$ io = new SymfonyStyle ($ input , $ output instanceof ConsoleOutputInterface ? $ output ->getErrorOutput () : $ output );
113
119
114
- if ($ this ->receiverNames && !$ this ->receiverLocator ->has ($ receiverName = $ input ->getArgument ('receiver ' ))) {
115
- if (null === $ receiverName ) {
116
- $ io ->block ('Missing receiver argument. ' , null , 'error ' , ' ' , true );
117
- $ input ->setArgument ('receiver ' , $ io ->choice ('Select one of the available receivers ' , $ this ->receiverNames ));
118
- } elseif ($ alternatives = $ this ->findAlternatives ($ receiverName , $ this ->receiverNames )) {
119
- $ io ->block (sprintf ('Receiver "%s" is not defined. ' , $ receiverName ), null , 'error ' , ' ' , true );
120
- if ($ io ->confirm (sprintf ('Do you want to receive from "%s" instead? ' , $ alternatives [0 ]), false )) {
121
- $ input ->setArgument ('receiver ' , $ alternatives [0 ]);
122
- }
120
+ if ($ this ->receiverNames && 0 === \count ($ input ->getArgument ('receivers ' ))) {
121
+ $ io ->block ('Which transports/receivers do you want to consume? ' , null , 'fg=white;bg=blue ' , ' ' , true );
122
+
123
+ $ io ->writeln ('Choose which receivers you want to consume messages from in order of priority. ' );
124
+ if (\count ($ this ->receiverNames ) > 1 ) {
125
+ $ io ->writeln (sprintf ('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment> ' , implode (', ' , $ this ->receiverNames )));
123
126
}
127
+
128
+ $ question = new ChoiceQuestion ('Select receivers to consume: ' , $ this ->receiverNames , 0 );
129
+ $ question ->setMultiselect (true );
130
+
131
+ $ input ->setArgument ('receivers ' , $ io ->askQuestion ($ question ));
132
+ }
133
+
134
+ if (0 === \count ($ input ->getArgument ('receivers ' ))) {
135
+ throw new RuntimeException ('Please pass at least one receiver. ' );
124
136
}
125
137
}
126
138
@@ -135,41 +147,51 @@ protected function execute(InputInterface $input, OutputInterface $output): void
135
147
$ output ->writeln (sprintf ('<comment>%s</comment> ' , $ message ));
136
148
}
137
149
138
- if (!$ this ->receiverLocator ->has ($ receiverName = $ input ->getArgument ('receiver ' ))) {
139
- throw new RuntimeException (sprintf ('Receiver "%s" does not exist. ' , $ receiverName ));
140
- }
150
+ $ receivers = [];
151
+ $ retryStrategies = [];
152
+ foreach ($ receiverNames = $ input ->getArgument ('receivers ' ) as $ receiverName ) {
153
+ if (!$ this ->receiverLocator ->has ($ receiverName )) {
154
+ $ message = sprintf ('The receiver "%s" does not exist. ' , $ receiverName );
155
+ if ($ this ->receiverNames ) {
156
+ $ message .= sprintf (' Valid receivers are: %s. ' , implode (', ' , $ this ->receiverNames ));
157
+ }
141
158
142
- if (null !== $ this ->retryStrategyLocator && !$ this ->retryStrategyLocator ->has ($ receiverName )) {
143
- throw new RuntimeException (sprintf ('Receiver "%s" does not have a configured retry strategy. ' , $ receiverName ));
144
- }
159
+ throw new RuntimeException ($ message );
160
+ }
145
161
146
- $ receiver = $ this ->receiverLocator ->get ($ receiverName );
147
- $ retryStrategy = null !== $ this ->retryStrategyLocator ? $ this ->retryStrategyLocator ->get ($ receiverName ) : null ;
162
+ if (null !== $ this ->retryStrategyLocator && !$ this ->retryStrategyLocator ->has ($ receiverName )) {
163
+ throw new RuntimeException (sprintf ('Receiver "%s" does not have a configured retry strategy. ' , $ receiverName ));
164
+ }
165
+
166
+ $ receivers [$ receiverName ] = $ this ->receiverLocator ->get ($ receiverName );
167
+ $ retryStrategies [$ receiverName ] = null !== $ this ->retryStrategyLocator ? $ this ->retryStrategyLocator ->get ($ receiverName ) : null ;
168
+ }
148
169
149
170
if (null !== $ input ->getOption ('bus ' )) {
150
171
$ bus = $ this ->busLocator ->get ($ input ->getOption ('bus ' ));
151
172
} else {
152
173
$ bus = new RoutableMessageBus ($ this ->busLocator );
153
174
}
154
175
176
+ $ worker = new Worker ($ receivers , $ bus , $ retryStrategies , $ this ->eventDispatcher , $ this ->logger );
155
177
$ stopsWhen = [];
156
178
if ($ limit = $ input ->getOption ('limit ' )) {
157
179
$ stopsWhen [] = "processed {$ limit } messages " ;
158
- $ receiver = new StopWhenMessageCountIsExceededReceiver ( $ receiver , $ limit , $ this ->logger );
180
+ $ worker = new StopWhenMessageCountIsExceededWorker ( $ worker , $ limit , $ this ->logger );
159
181
}
160
182
161
183
if ($ memoryLimit = $ input ->getOption ('memory-limit ' )) {
162
184
$ stopsWhen [] = "exceeded {$ memoryLimit } of memory " ;
163
- $ receiver = new StopWhenMemoryUsageIsExceededReceiver ( $ receiver , $ this ->convertToBytes ($ memoryLimit ), $ this ->logger );
185
+ $ worker = new StopWhenMemoryUsageIsExceededWorker ( $ worker , $ this ->convertToBytes ($ memoryLimit ), $ this ->logger );
164
186
}
165
187
166
188
if ($ timeLimit = $ input ->getOption ('time-limit ' )) {
167
189
$ stopsWhen [] = "been running for {$ timeLimit }s " ;
168
- $ receiver = new StopWhenTimeLimitIsReachedReceiver ( $ receiver , $ timeLimit , $ this ->logger );
190
+ $ worker = new StopWhenTimeLimitIsReachedWorker ( $ worker , $ timeLimit , $ this ->logger );
169
191
}
170
192
171
193
$ io = new SymfonyStyle ($ input , $ output instanceof ConsoleOutputInterface ? $ output ->getErrorOutput () : $ output );
172
-<
F438
/span> $ io ->success (sprintf ('Consuming messages from transport "%s". ' , $ receiverName ));
194
+ $ io ->success (sprintf ('Consuming messages from transport%s "%s". ' , \count ( $ receivers ) > 0 ? ' s ' : '' , implode ( ' , ' , $ receiverNames ) ));
173
195
174
196
if ($ stopsWhen ) {
175
197
$ last = array_pop ($ stopsWhen );
@@ -183,8 +205,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void
183
205
$ io ->comment ('Re-run the command with a -vv option to see logs about consumed messages. ' );
184
206
}
185
207
186
- $ worker = new Worker ($ receiver , $ bus , $ receiverName , $ retryStrategy , $ this ->eventDispatcher , $ this ->logger );
187
- $ worker ->run ();
208
+ $ worker ->run ([
209
+ 'sleep ' => $ input ->getOption ('sleep ' ) * 1000000 ,
210
+ ]);
188
211
}
189
212
190
213
private function convertToBytes (string $ memoryLimit ): int
0 commit comments