Skip to content
Snippets Groups Projects
SwarrotConsumeSimpleCommand.php 2.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • <?php
    
    namespace App\Command;
    
    use Psr\Log\NullLogger;
    use Swarrot\Broker\Message;
    use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider;
    use Swarrot\Consumer;
    use Swarrot\Processor\ProcessorInterface;
    use Symfony\Component\Console\Command\Command;
    use Symfony\Component\Console\Input\InputArgument;
    use Symfony\Component\Console\Input\InputInterface;
    use Symfony\Component\Console\Input\InputOption;
    use Symfony\Component\Console\Output\OutputInterface;
    use Symfony\Component\Console\Style\SymfonyStyle;
    
    class SwarrotConsumeSimpleCommand extends Command
    {
        public const QUEUE_NAME = 'messages';
        protected static $defaultName = 'swarrot:consume:simple';
        protected static $defaultDescription = 'Consume swarrot simple message';
    
        protected function configure(): void
        {
            $this
                ->setDescription(self::$defaultDescription)
                ->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Maximum message to manage', 1000)
            ;
        }
    
        protected function execute(InputInterface $input, OutputInterface $output): int
        {
    // Create connection
            $connection = new \AMQPConnection();
            $connection->connect();
            $channel = new \AMQPChannel($connection);
    // Get the queue to consume
            $queue = new \AMQPQueue($channel);
            $queue->setName(self::QUEUE_NAME);
    
            $messageProvider = new PeclPackageMessageProvider($queue);
    
            $processor = new class implements ProcessorInterface {
                public function process(Message $message, array $options): bool
                {
                    //echo sprintf("Consume message #%d\n", $message->getId());
    
                    return true;
                }
            };
    
            $stack = (new \Swarrot\Processor\Stack\Builder())
                ->push('Swarrot\Processor\MaxMessages\MaxMessagesProcessor', new NullLogger())
                ->push('Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor')
                ->push('Swarrot\Processor\Ack\AckProcessor', $messageProvider)
            ;
    
            $processor = $stack->resolve($processor);
    
    
            $consumer = new Consumer($messageProvider, $processor);
            $consumer->consume(['max_messages' => (int) $input->getOption('limit')]);
    
            return Command::SUCCESS;
        }
    }