vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php line 97

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Middleware;
  11. use Psr\Log\LoggerAwareTrait;
  12. use Psr\Log\NullLogger;
  13. use Symfony\Component\Messenger\Envelope;
  14. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  15. use Symfony\Component\Messenger\Exception\LogicException;
  16. use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
  17. use Symfony\Component\Messenger\Handler\Acknowledger;
  18. use Symfony\Component\Messenger\Handler\HandlerDescriptor;
  19. use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
  20. use Symfony\Component\Messenger\Stamp\AckStamp;
  21. use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
  22. use Symfony\Component\Messenger\Stamp\HandledStamp;
  23. use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
  24. /**
  25.  * @author Samuel Roze <samuel.roze@gmail.com>
  26.  */
  27. class HandleMessageMiddleware implements MiddlewareInterface
  28. {
  29.     use LoggerAwareTrait;
  30.     private $handlersLocator;
  31.     private $allowNoHandlers;
  32.     public function __construct(HandlersLocatorInterface $handlersLocatorbool $allowNoHandlers false)
  33.     {
  34.         $this->handlersLocator $handlersLocator;
  35.         $this->allowNoHandlers $allowNoHandlers;
  36.         $this->logger = new NullLogger();
  37.     }
  38.     /**
  39.      * {@inheritdoc}
  40.      *
  41.      * @throws NoHandlerForMessageException When no handler is found and $allowNoHandlers is false
  42.      */
  43.     public function handle(Envelope $envelopeStackInterface $stack): Envelope
  44.     {
  45.         $handler null;
  46.         $message $envelope->getMessage();
  47.         $context = [
  48.             'class' => \get_class($message),
  49.         ];
  50.         $exceptions = [];
  51.         $alreadyHandled false;
  52.         foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
  53.             if ($this->messageHasAlreadyBeenHandled($envelope$handlerDescriptor)) {
  54.                 $alreadyHandled true;
  55.                 continue;
  56.             }
  57.             try {
  58.                 $handler $handlerDescriptor->getHandler();
  59.                 $batchHandler $handlerDescriptor->getBatchHandler();
  60.                 /** @var AckStamp $ackStamp */
  61.                 if ($batchHandler && $ackStamp $envelope->last(AckStamp::class)) {
  62.                     $ack = new Acknowledger(get_debug_type($batchHandler), static function (?\Throwable $e null$result null) use ($envelope$ackStamp$handlerDescriptor) {
  63.                         if (null !== $e) {
  64.                             $e = new HandlerFailedException($envelope, [$e]);
  65.                         } else {
  66.                             $envelope $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor$result));
  67.                         }
  68.                         $ackStamp->ack($envelope$e);
  69.                     });
  70.                     $result $handler($message$ack);
  71.                     if (!\is_int($result) || $result) {
  72.                         throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".'\is_int($result) ? $result get_debug_type($result), get_debug_type($batchHandler)));
  73.                     }
  74.                     if (!$ack->isAcknowledged()) {
  75.                         $envelope $envelope->with(new NoAutoAckStamp($handlerDescriptor));
  76.                     } elseif ($ack->getError()) {
  77.                         throw $ack->getError();
  78.                     } else {
  79.                         $result $ack->getResult();
  80.                     }
  81.                 } else {
  82.                     $result $handler($message);
  83.                 }
  84.                 $handledStamp HandledStamp::fromDescriptor($handlerDescriptor$result);
  85.                 $envelope $envelope->with($handledStamp);
  86.                 $this->logger->info('Message {class} handled by {handler}'$context + ['handler' => $handledStamp->getHandlerName()]);
  87.             } catch (\Throwable $e) {
  88.                 $exceptions[] = $e;
  89.             }
  90.         }
  91.         /** @var FlushBatchHandlersStamp $flushStamp */
  92.         if ($flushStamp $envelope->last(FlushBatchHandlersStamp::class)) {
  93.             /** @var NoAutoAckStamp $stamp */
  94.             foreach ($envelope->all(NoAutoAckStamp::class) as $stamp) {
  95.                 try {
  96.                     $handler $stamp->getHandlerDescriptor()->getBatchHandler();
  97.                     $handler->flush($flushStamp->force());
  98.                 } catch (\Throwable $e) {
  99.                     $exceptions[] = $e;
  100.                 }
  101.             }
  102.         }
  103.         if (null === $handler && !$alreadyHandled) {
  104.             if (!$this->allowNoHandlers) {
  105.                 throw new NoHandlerForMessageException(sprintf('No handler for message "%s".'$context['class']));
  106.             }
  107.             $this->logger->info('No handler for message {class}'$context);
  108.         }
  109.         if (\count($exceptions)) {
  110.             throw new HandlerFailedException($envelope$exceptions);
  111.         }
  112.         return $stack->next()->handle($envelope$stack);
  113.     }
  114.     private function messageHasAlreadyBeenHandled(Envelope $envelopeHandlerDescriptor $handlerDescriptor): bool
  115.     {
  116.         /** @var HandledStamp $stamp */
  117.         foreach ($envelope->all(HandledStamp::class) as $stamp) {
  118.             if ($stamp->getHandlerName() === $handlerDescriptor->getName()) {
  119.                 return true;
  120.             }
  121.         }
  122.         return false;
  123.     }
  124. }