-
-
Notifications
You must be signed in to change notification settings - Fork 87
/
ConsumeMessagesCommand.php
325 lines (255 loc) · 13.8 KB
/
ConsumeMessagesCommand.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\SignalableCommandInterface;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Exception\InvalidOptionException;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
use Symfony\Component\Messenger\Worker;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
{
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
private ?Worker $worker = null;
public function __construct(
private RoutableMessageBus $routableBus,
private ContainerInterface $receiverLocator,
private EventDispatcherInterface $eventDispatcher,
private ?LoggerInterface $logger = null,
private array $receiverNames = [],
private ?ResetServicesListener $resetServicesListener = null,
private array $busIds = [],
private ?ContainerInterface $rateLimiterLocator = null,
private ?array $signals = null,
) {
parent::__construct();
}
protected function configure(): void
{
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
$this
->setDefinition([
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'The number of failed messages the worker can consume'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
])
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
<info>php %command.full_name% <receiver-name></info>
To receive from multiple transports, pass each name:
<info>php %command.full_name% receiver1 receiver2</info>
Use the --limit option to limit the number of messages received:
<info>php %command.full_name% <receiver-name> --limit=10</info>
Use the --failure-limit option to stop the worker when the given number of failed messages is reached:
<info>php %command.full_name% <receiver-name> --failure-limit=2</info>
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached.
If a message is being handled, the worker will stop after the processing is finished:
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
Use the --bus option to specify the message bus to dispatch received messages
to instead of trying to determine it automatically. This is required if the
messages didn't originate from Messenger:
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
Use the --queues option to limit a receiver to only certain queues (only supported by some receivers):
<info>php %command.full_name% <receiver-name> --queues=fasttrack</info>
Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):
<info>php %command.full_name% <receiver-name> --no-reset</info>
Use the --all option to consume from all receivers:
<info>php %command.full_name% --all</info>
EOF
)
;
}
protected function initialize(InputInterface $input, OutputInterface $output): void
{
if ($input->hasParameterOption('--keepalive')) {
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
}
}
protected function interact(InputInterface $input, OutputInterface $output): void
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
if ($input->getOption('all')) {
return;
}
if ($this->receiverNames && !$input->getArgument('receivers')) {
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
if (\count($this->receiverNames) > 1) {
$io->writeln(\sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
}
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
$question->setMultiselect(true);
$input->setArgument('receivers', $io->askQuestion($question));
}
if (!$input->getArgument('receivers')) {
throw new RuntimeException('Please pass at least one receiver.');
}
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$receivers = [];
$rateLimiters = [];
$receiverNames = $input->getOption('all') ? $this->receiverNames : $input->getArgument('receivers');
foreach ($receiverNames as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = \sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
$message .= \sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
}
throw new RuntimeException($message);
}
$receiver = $this->receiverLocator->get($receiverName);
if ($receiver instanceof SyncTransport) {
$idx = array_search($receiverName, $receiverNames);
unset($receiverNames[$idx]);
continue;
}
$receivers[$receiverName] = $receiver;
if ($this->rateLimiterLocator?->has($receiverName)) {
$rateLimiters[$receiverName] = $this->rateLimiterLocator->get($receiverName);
}
}
if (null !== $this->resetServicesListener && !$input->getOption('no-reset')) {
$this->eventDispatcher->addSubscriber($this->resetServicesListener);
}
$stopsWhen = [];
if (null !== $limit = $input->getOption('limit')) {
if (!is_numeric($limit) || 0 >= $limit) {
throw new InvalidOptionException(\sprintf('Option "limit" must be a positive integer, "%s" passed.', $limit));
}
$stopsWhen[] = "processed {$limit} messages";
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
}
if ($failureLimit = $input->getOption('failure-limit')) {
$stopsWhen[] = "reached {$failureLimit} failed messages";
$this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger));
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
}
if (null !== $timeLimit = $input->getOption('time-limit')) {
if (!is_numeric($timeLimit) || 0 >= $timeLimit) {
throw new InvalidOptionException(\sprintf('Option "time-limit" must be a positive integer, "%s" passed.', $timeLimit));
}
$stopsWhen[] = "been running for {$timeLimit}s";
$this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
}
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(\sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 1 ? 's' : '', implode(', ', $receiverNames)));
if ($stopsWhen) {
$last = array_pop($stopsWhen);
$stopsWhen = ($stopsWhen ? implode(', ', $stopsWhen).' or ' : '').$last;
$io->comment("The worker will automatically exit once it has {$stopsWhen}.");
}
$io->comment('Quit the worker with CONTROL-C.');
if (OutputInterface::VERBOSITY_VERBOSE > $output->getVerbosity()) {
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
$this->worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters);
$options = [
'sleep' => $input->getOption('sleep') * 1000000,
];
if ($queues = $input->getOption('queues')) {
$options['queues'] = $queues;
}
try {
$this->worker->run($options);
} finally {
$this->worker = null;
}
return 0;
}
public function complete(CompletionInput $input, CompletionSuggestions $suggestions): void
{
if ($input->mustSuggestArgumentValuesFor('receivers')) {
$suggestions->suggestValues(array_diff($this->receiverNames, array_diff($input->getArgument('receivers'), [$input->getCompletionValue()])));
return;
}
if ($input->mustSuggestOptionValuesFor('bus')) {
$suggestions->suggestValues($this->busIds);
}
}
public function getSubscribedSignals(): array
{
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []);
}
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
{
if (!$this->worker) {
return false;
}
if (\SIGALRM === $signal) {
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
$this->worker->keepalive($this->getApplication()->getAlarmInterval());
return false;
}
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
$this->worker->stop();
return false;
}
private function convertToBytes(string $memoryLimit): int
{
$memoryLimit = strtolower($memoryLimit);
$max = ltrim($memoryLimit, '+');
if (str_starts_with($max, '0x')) {
$max = \intval($max, 16);
} elseif (str_starts_with($max, '0')) {
$max = \intval($max, 8);
} else {
$max = (int) $max;
}
switch (substr(rtrim($memoryLimit, 'b'), -1)) {
case 't': $max *= 1024;
// no break
case 'g': $max *= 1024;
// no break
case 'm': $max *= 1024;
// no break
case 'k': $max *= 1024;
}
return $max;
}
}