Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?php
namespace App\Command;
use Psr\Log\NullLogger;
use Swarrot\Broker\Message;
use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider;
use Swarrot\Consumer;
use Swarrot\Processor\ProcessorInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
class SwarrotConsumeSimpleCommand extends Command
{
public const QUEUE_NAME = 'messages';
protected static $defaultName = 'swarrot:consume:simple';
protected static $defaultDescription = 'Consume swarrot simple message';
protected function configure(): void
{
$this
->setDescription(self::$defaultDescription)
->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Maximum message to manage', 1000)
;
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
// Create connection
$connection = new \AMQPConnection();
$connection->connect();
$channel = new \AMQPChannel($connection);
// Get the queue to consume
$queue = new \AMQPQueue($channel);
$queue->setName(self::QUEUE_NAME);
$messageProvider = new PeclPackageMessageProvider($queue);
$processor = new class implements ProcessorInterface {
public function process(Message $message, array $options): bool
{
//echo sprintf("Consume message #%d\n", $message->getId());
return true;
}
};
$stack = (new \Swarrot\Processor\Stack\Builder())
->push('Swarrot\Processor\MaxMessages\MaxMessagesProcessor', new NullLogger())
->push('Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor')
->push('Swarrot\Processor\Ack\AckProcessor', $messageProvider)
;
$processor = $stack->resolve($processor);
$consumer = new Consumer($messageProvider, $processor);
$consumer->consume(['max_messages' => (int) $input->getOption('limit')]);
return Command::SUCCESS;
}
}