Cómo implementar comunicación síncrona entre microservicios con AMQP y RabbitMQ

En un sistema distribuido (microservicios), a veces necesitamos llamar a un método o procedimiento de un servidor remoto (otro microservicio). Para hacerlo de forma eficiente, se utiliza el patrón de comunicación RPC (Remote Procedure Call), en el que un cliente envía una solicitud a un servidor y espera una respuesta.

En este tutorial, vamos a mostrar cómo implementar RPC utilizando el Protocolo Avanzado de Cola de Mensajes (AMQP) con RabbitMQ como broker de mensajes. Para ello, os dejo el código en el siguiente repositorio:

GitHub - jcanadilla/amqp-rpc: Simple example for RPC using AMQP and RabbitMQ
Simple example for RPC using AMQP and RabbitMQ. Contribute to jcanadilla/amqp-rpc development by creating an account on GitHub.

Requisitos previos

Antes de comenzar, necesitarás tener instalado en tu máquina:

  • Node.js
  • RabbitMQ
  • Yarn / NPM

Configuración del entorno

  1. Clona el repositorio.
git clone https://github.com/jcanadilla/amqp-rpc.git

2. Instalar las dependencias:

# Usando NPM
npm install

# Usando Yarn
yarn install

3. Arranca o comprueba que ya está arrancado el RabbitMQ.

¿Cómo funciona?

En el repositorio encontraremos un server.ts y un client.ts. Comenzaremos por el server.ts.

Server

En esta archivo, se importa la biblioteca de amqplib y la función v4 de uuid para generar un identificador único para cada respuesta que se envía. Luego, se define la URL de RabbitMQ a la que se conectará el consumidor.

import amqp from "amqplib";
import { v4 as uuidv4 } from "uuid";

const RABBITMQ = "amqp://guest:guest@localhost:5672";

Aquí, se crea una conexión a la URL de RabbitMQ y se crea un canal para interactuar con el servidor.

const connection = await amqp.connect(RABBITMQ);
const channel = await connection.createChannel();

Esta línea crea una cola llamada example si no existe. La cola se utiliza para almacenar los mensajes entrantes que el consumidor procesará.

const q = 'example';

// ...

await channel.assertQueue(q);

Esta línea establece el consumidor de RabbitMQ en la cola example. La función de devolución de llamada que se pasa como segundo parámetro se ejecuta cada vez que se recibe un mensaje en la cola.

await channel.consume(q, function (msg) {...});

Esta línea envía una respuesta al remitente del mensaje. La respuesta se envía a la cola especificada en la propiedad replyTo del mensaje original. La respuesta se codifica como un buffer utilizando Buffer.from() y se especifica el ID de correlación del mensaje original en la opción correlationId. Esto se hace así para que RabbitMQ sepa a qué destinatario enviarle la información.

channel.sendToQueue(
  msg.properties.replyTo,
  Buffer.from(JSON.stringify(response)),
  {
    correlationId: msg.properties.correlationId,
  }
);

Finalmente, esta línea confirma la recepción del mensaje por el consumidor. Si se omite esta línea, RabbitMQ asumirá que el mensaje no se ha procesado correctamente y lo volverá a enviar.

Client

La primera parte del archivo incluye una serie de importaciones necesarias y la definición de algunas constantes, como la dirección del servidor RabbitMQ y la cola que se utilizará para enviar y recibir mensajes.

import amqp, { Channel, Connection } from "amqplib";
import { v4 as uuidv4 } from "uuid";

const RABBITMQ = "amqp://guest:guest@localhost:5672";

// pseudo-queue for direct reply-to
const REPLY_QUEUE = "amq.rabbitmq.reply-to";
const q = "example";

La siguiente sección define una función createClient que conecta con el servidor RabbitMQ y crea un nuevo canal. La función devuelve una promesa que se resuelve con el canal creado.

const createClient = (rabbitmqconn: string): Promise<Channel> =>
  // Connect to RabbitMQ
  amqp
    .connect(rabbitmqconn)
    // Create a new channel
    .then((conn: Connection) => conn.createChannel())
    // Create a channel
    .then((channel: Channel) => {
      channel.setMaxListeners(0);

      channel.consume(
        REPLY_QUEUE,
        (msg) => {
          channel.emit(
            msg?.properties.correlationId,
            msg?.content.toString("utf8")
          );
        },
        { noAck: true }
      );

      // Return the channel
      return channel;
    });

La función sendRPCMessage se utiliza para enviar un mensaje a una cola y esperar una respuesta en la cola de respuesta. La función recibe como parámetros el canal, el mensaje que se desea enviar y la cola a la que se debe enviar el mensaje.

const sendRPCMessage = (channel: Channel, message: string, rpcQueue: string) =>
  new Promise((resolve) => {
    // Create a unique correlationId for this call
    const correlationId = uuidv4();

    // Listen for a response in the reply queue
    channel.once(correlationId, resolve);

    // Send the message, including the correlationId and the reply queue
    channel.sendToQueue(rpcQueue, Buffer.from(message), {
      correlationId,
      replyTo: REPLY_QUEUE,
    });
  });

La función init es la función principal que se ejecutará cuando se ejecute el archivo client.ts. En la función init, se crea un nuevo canal, se define el mensaje que se enviará y se envía el mensaje utilizando la función sendRPCMessage. Luego, se espera una respuesta y se imprime en la consola.

const init = async () => {
  // 1. Connect to RabbitMQ
  const channel = await createClient(RABBITMQ);

  // 2. Create a message to send
  const message = { uuid: uuidv4() };

  // 3. Log the message to the console
  console.log(`[ ${new Date()} ] Message sent: ${JSON.stringify(message)}`);

  // 4. Send the message and wait for a response
  const respone = await sendRPCMessage(channel, JSON.stringify(message), q);

  // 5. Log the response to the console
  console.log(`[ ${new Date()} ] Message received: ${respone}`);

  // 6. Exit the process
  process.exit();
};

¡Pruébalo!

Para iniciar el server:

yarn start:server
$ ts-node server.ts
[ Thu Mar 02 2023 00:53:25 GMT+0100 (Central European Standard Time) ] Server started
[ Thu Mar 02 2023 00:53:25 GMT+0100 (Central European Standard Time) ] the queue example exists. To exit press CTRL+C

Podemos comprobar cómo nos indica el log que el server se ha iniciado y que la cola existe.

Si ahora iniciamos el cliente esto es lo que ocurrirá:

yarn start:client
$ ts-node client.ts
[ Thu Mar 02 2023 00:55:19 GMT+0100 (Central European Standard Time) ] Message sent: {"uuid":"a7fc2a90-0b45-4a05-8587-faf1e04777bc"}
[ Thu Mar 02 2023 00:55:19 GMT+0100 (Central European Standard Time) ] Message received: {"uuid":"eae417bf-62fc-4d49-a983-27b4f50a7f79"}
✨  Done in 0.82s.

Y en el server tendremos:

$ ts-node server.ts
[ Thu Mar 02 2023 00:53:25 GMT+0100 (Central European Standard Time) ] Server started
[ Thu Mar 02 2023 00:53:25 GMT+0100 (Central European Standard Time) ] the queue example exists. To exit press CTRL+C
[ Thu Mar 02 2023 00:55:19 GMT+0100 (Central European Standard Time) ] Message received: {"uuid":"a7fc2a90-0b45-4a05-8587-faf1e04777bc"}
[ Thu Mar 02 2023 00:55:19 GMT+0100 (Central European Standard Time) ] Message sent: {"uuid":"eae417bf-62fc-4d49-a983-27b4f50a7f79"}

Básicamente como podemos comprobar en los logs, el cliente manda un mensaje al server acabado en faf1e04777bc , el server lo recibe y contesta con otro acabado en 27b4f50a7f79.

Te animo a que incluyas cierto retraso entre la recepción y la respuesta para ver cómo espera el cliente por ella.

Conclusiones

Este es un ejemplo muy sencillo y obviamente hay que desarrollar más funcionalidades para poder sacar partido a los RPCs utilizando RabbitMQ, pero como punto de partida es muy sencillo de entender y de ampliar. Espero que te sirva de ayuda y que lo compartas.

Dime en los comentarios si quieres que integre esta solución con express.js para enviar los mensajes a un handler de una ruta.