<?php namespace App\Command; use Psr\Log\NullLogger; use Swarrot\Broker\Message; use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider; use Swarrot\Consumer; use Swarrot\Processor\ProcessorInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; class SwarrotConsumeSimpleCommand extends Command { public const QUEUE_NAME = 'messages'; protected static $defaultName = 'swarrot:consume:simple'; protected static $defaultDescription = 'Consume swarrot simple message'; protected function configure(): void { $this ->setDescription(self::$defaultDescription) ->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Maximum message to manage', 1000) ; } protected function execute(InputInterface $input, OutputInterface $output): int { // Create connection $connection = new \AMQPConnection(); $connection->connect(); $channel = new \AMQPChannel($connection); // Get the queue to consume $queue = new \AMQPQueue($channel); $queue->setName(self::QUEUE_NAME); $messageProvider = new PeclPackageMessageProvider($queue); $processor = new class implements ProcessorInterface { public function process(Message $message, array $options): bool { //echo sprintf("Consume message #%d\n", $message->getId()); return true; } }; $stack = (new \Swarrot\Processor\Stack\Builder()) ->push('Swarrot\Processor\MaxMessages\MaxMessagesProcessor', new NullLogger()) ->push('Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor') ->push('Swarrot\Processor\Ack\AckProcessor', $messageProvider) ; $processor = $stack->resolve($processor); $consumer = new Consumer($messageProvider, $processor); $consumer->consume(['max_messages' => (int) $input->getOption('limit')]); return Command::SUCCESS; } }