Handle asynchronous tasks in Symfony with Messenger

Photo by davide ragusa on Unsplash

A little bit of context

In our Document Management system (DMS), one of the components we have is a Document Processing Engine (DPE) which allows us to extract key information from files using OCR technology. As shown in the graph below, we send files to DPE via API and get a JSON back that we insert into the database and then display the results. The communication in this case is synchronous, the user has to wait for the API to respond, and depending on the number of files, the size of files, and the number of information that has to be extracted, the processing time can take up some time, which is not the most optimal user experience. The user should be able to navigate through the DMS application while waiting for his files to be processed. We should also be able to handle multiple user requests without impacting the performance of our DMS.

Synchronous communication
Asynchronous communication

Messenger Architecture

Messenger provides a message bus with the ability to send messages and then handle them immediately (synchronous) in your application or send them through transports (e.g. queues) to be handled later (asynchronous).

Messenger Architecture Async
  1. The publisher dispatches a message to the message bus.
  2. The message bus then adds the message into the transport, a queuing system that can be stored in Doctrine, RabbitMQ, or any message broker. Which allows keeping a record of a list of messages to be executed.
  3. In parallel, the worker will listen to the transport for any new messages in the queue.
  4. If the worker finds a message it will diffuse it into the bus to be sent to the handler.
  5. Finally, the message reaches the handler that will execute it.

Installation and configuration of Messenger

You can add messenger to your Symfony (I’m on Symfony 5.0) application with composer:

$ composer require symfony/messenger
  • A message class that holds data
  • A handler(s) class will be called when that message is dispatched, the handler class will read the message class and perform some task.
// src/Message/DPE.php
namespace App\Message;

class DPE
{
private $content;

public function __construct(string $content)
{
$this->content = $content;
}

public function getContent(): string
{
return $this->content;
}
}
// src/MessageHandler/DPEHandler.php
namespace App\MessageHandler;

use App\Message\DPE;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class DPEHandler implements MessageHandlerInterface
{
public function __invoke(DPE $message)
{
// ... do some work
}
}
// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\DPE;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
public function index(MessageBusInterface $bus)
{
$message = 'some message !'
// will cause the DPEHandler to be called
$bus->dispatch(new DPE($message));
// ...
}
}
MESSENGER_TRANSPORT_DSN=doctrine://default
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
'App\Message\DPE': async
$ php bin/console messenger:consume async

# use -vv to see details about what's happening
$ php bin/console messenger:consume async -vv

References:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store