Handle asynchronous tasks in Symfony with Messenger

Aicha Fatrah
5 min readOct 26, 2022

Queued message handling with Messenger component

Photo by davide ragusa on Unsplash

A little bit of context

In our Document Management system (DMS), one of the components we have is a Document Processing Engine (DPE) which allows us to extract key information from files using OCR technology. As shown in the graph below, we send files to DPE via API and get a JSON back that we insert into the database and then display the results. The communication in this case is synchronous, the user has to wait for the API to respond, and depending on the number of files, the size of files, and the number of information that has to be extracted, the processing time can take up some time, which is not the most optimal user experience. The user should be able to navigate through the DMS application while waiting for his files to be processed. We should also be able to handle multiple user requests without impacting the performance of our DMS.

Synchronous communication

This is when we decided to use the Messenger component for our Symfony application. the purpose is to be able to send and receive multiple messages between the DMS and DPE asynchronously through queues, the queues enable us to keep track of the mass document processing, and it also allows us to manage failures and retries.

Asynchronous communication

Messenger Architecture

Messenger provides a message bus with the ability to send messages and then handle them immediately (synchronous) in your application or send them through transports (e.g. queues) to be handled later (asynchronous).

Messenger Architecture Async

In a synchronous architecture, the Messenger component will handle messages immediately without leaving the application. The publisher or producer (can be controller, service, or command …) dispatch a message to the message bus. Once the message is received, the bus sends the message to the handler that will execute whatever the instruction, like sending an email, processing a document, booking a ticket …

But our main interest in using Messenger is to be able to handle document processing requests asynchronously in the background. above is a diagram of Messenger architecture:

  1. The publisher dispatches a message to the message bus.
  2. The message bus then adds the message into the transport, a queuing system that can be stored in Doctrine, RabbitMQ, or any message broker. Which allows keeping a record of a list of messages to be executed.
  3. In parallel, the worker will listen to the transport for any new messages in the queue.
  4. If the worker finds a message it will diffuse it into the bus to be sent to the handler.
  5. Finally, the message reaches the handler that will execute it.

Installation and configuration of Messenger

You can add messenger to your Symfony (I’m on Symfony 5.0) application with composer:

$ composer require symfony/messenger

Messenger centers around two different classes that you need to create:

  • A message class that holds data
  • A handler(s) class will be called when that message is dispatched, the handler class will read the message class and perform some task.

The only requirement for a message class is that it can be serialized. In this class, the message class is called DPE and it contains some $content .

// src/Message/DPE.php
namespace App\Message;

class DPE
{
private $content;

public function __construct(string $content)
{
$this->content = $content;
}

public function getContent(): string
{
return $this->content;
}
}

Now that we have a message class called DPE , we can now create the handler. A handle is a class that implements the MessageHandlerInterface and has __invoke() a method that’s type-hinted with the message class.

// src/MessageHandler/DPEHandler.php
namespace App\MessageHandler;

use App\Message\DPE;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class DPEHandler implements MessageHandlerInterface
{
public function __invoke(DPE $message)
{
// ... do some work
}
}

Thanks to autoconfiguration and the DPE type hint, Symfony knows that this handler should be called when a DPE the message is dispatched. Most of the time, this is all you need to do.
Now to dispatch the message, we need to inject the message_bus service in a controller, a service, or a command.

// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\DPE;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
public function index(MessageBusInterface $bus)
{
$message = 'some message !'
// will cause the DPEHandler to be called
$bus->dispatch(new DPE($message));
// ...
}
}

As mentioned before, messages can be handled as soon as they are dispatched, so to handle them asynchronously, we need to configure transport. A transport is capable of sending messages to a queueing system and then receiving them via a worker. Messenger support multiple means of transport, in this case, we can use doctrine by changing the config in the .env configuration file:

MESSENGER_TRANSPORT_DSN=doctrine://default

Next, in config/packages/messenger.yaml , we need to define a transport called asyn , and then routing our Message DPE via our asyn transport.

# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
'App\Message\DPE': async

Doctrine will create a table messenger_messages to record messages in your database. Now, last but not least we can start consuming our messages by running the worker via the command:

$ php bin/console messenger:consume async

# use -vv to see details about what's happening
$ php bin/console messenger:consume async -vv

The command messenger:consume will run forever, but if you’re deploying to production you can use a process manager like Supervisor or Systemd to keep worker(s) running.

Enjoy 🖖.

References:

--

--