Cómo implementar comunicación síncrona entre microservicios con AMQP y RabbitMQ
Aprende a implementar RPC en sistemas distribuidos con RabbitMQ y AMQP. Ejemplo sencillo de cliente-servidor y comunicación de mensajes.
En un sistema distribuido (microservicios), a veces necesitamos llamar a un método o procedimiento de un servidor remoto (otro microservicio). Para hacerlo de forma eficiente, se utiliza el patrón de comunicación RPC (Remote Procedure Call), en el que un cliente envía una solicitud a un servidor y espera una respuesta.
En este tutorial, vamos a mostrar cómo implementar RPC utilizando el Protocolo Avanzado de Cola de Mensajes (AMQP) con RabbitMQ como broker de mensajes. Para ello, os dejo el código en el siguiente repositorio:
Requisitos previos
Antes de comenzar, necesitarás tener instalado en tu máquina:
- Node.js
- RabbitMQ
- Yarn / NPM
Configuración del entorno
- Clona el repositorio.
git clone https://github.com/jcanadilla/amqp-rpc.git
2. Instalar las dependencias:
# Usando NPM
npm install
# Usando Yarn
yarn install
3. Arranca o comprueba que ya está arrancado el RabbitMQ.
¿Cómo funciona?
En el repositorio encontraremos un server.ts y un client.ts. Comenzaremos por el server.ts.
Server
En esta archivo, se importa la biblioteca de amqplib
y la función v4
de uuid
para generar un identificador único para cada respuesta que se envía. Luego, se define la URL de RabbitMQ a la que se conectará el consumidor.
import amqp from "amqplib";
import { v4 as uuidv4 } from "uuid";
const RABBITMQ = "amqp://guest:guest@localhost:5672";
Aquí, se crea una conexión a la URL de RabbitMQ y se crea un canal para interactuar con el servidor.
const connection = await amqp.connect(RABBITMQ);
const channel = await connection.createChannel();
Esta línea crea una cola llamada example
si no existe. La cola se utiliza para almacenar los mensajes entrantes que el consumidor procesará.
const q = 'example';
// ...
await channel.assertQueue(q);
Esta línea establece el consumidor de RabbitMQ en la cola example
. La función de devolución de llamada que se pasa como segundo parámetro se ejecuta cada vez que se recibe un mensaje en la cola.
await channel.consume(q, function (msg) {...});
Esta línea envía una respuesta al remitente del mensaje. La respuesta se envía a la cola especificada en la propiedad replyTo
del mensaje original. La respuesta se codifica como un buffer utilizando Buffer.from()
y se especifica el ID de correlación del mensaje original en la opción correlationId
. Esto se hace así para que RabbitMQ sepa a qué destinatario enviarle la información.
channel.sendToQueue(
msg.properties.replyTo,
Buffer.from(JSON.stringify(response)),
{
correlationId: msg.properties.correlationId,
}
);
Finalmente, esta línea confirma la recepción del mensaje por el consumidor. Si se omite esta línea, RabbitMQ asumirá que el mensaje no se ha procesado correctamente y lo volverá a enviar.
Client
La primera parte del archivo incluye una serie de importaciones necesarias y la definición de algunas constantes, como la dirección del servidor RabbitMQ y la cola que se utilizará para enviar y recibir mensajes.
import amqp, { Channel, Connection } from "amqplib";
import { v4 as uuidv4 } from "uuid";
const RABBITMQ = "amqp://guest:guest@localhost:5672";
// pseudo-queue for direct reply-to
const REPLY_QUEUE = "amq.rabbitmq.reply-to";
const q = "example";
La siguiente sección define una función createClient
que conecta con el servidor RabbitMQ y crea un nuevo canal. La función devuelve una promesa que se resuelve con el canal creado.
const createClient = (rabbitmqconn: string): Promise<Channel> =>
// Connect to RabbitMQ
amqp
.connect(rabbitmqconn)
// Create a new channel
.then((conn: Connection) => conn.createChannel())
// Create a channel
.then((channel: Channel) => {
channel.setMaxListeners(0);
channel.consume(
REPLY_QUEUE,
(msg) => {
channel.emit(
msg?.properties.correlationId,
msg?.content.toString("utf8")
);
},
{ noAck: true }
);
// Return the channel
return channel;
});
La función sendRPCMessage
se utiliza para enviar un mensaje a una cola y esperar una respuesta en la cola de respuesta. La función recibe como parámetros el canal, el mensaje que se desea enviar y la cola a la que se debe enviar el mensaje.
const sendRPCMessage = (channel: Channel, message: string, rpcQueue: string) =>
new Promise((resolve) => {
// Create a unique correlationId for this call
const correlationId = uuidv4();
// Listen for a response in the reply queue
channel.once(correlationId, resolve);
// Send the message, including the correlationId and the reply queue
channel.sendToQueue(rpcQueue, Buffer.from(message), {
correlationId,
replyTo: REPLY_QUEUE,
});
});
La función init
es la función principal que se ejecutará cuando se ejecute el archivo client.ts
. En la función init
, se crea un nuevo canal, se define el mensaje que se enviará y se envía el mensaje utilizando la función sendRPCMessage
. Luego, se espera una respuesta y se imprime en la consola.
const init = async () => {
// 1. Connect to RabbitMQ
const channel = await createClient(RABBITMQ);
// 2. Create a message to send
const message = { uuid: uuidv4() };
// 3. Log the message to the console
console.log(`[ ${new Date()} ] Message sent: ${JSON.stringify(message)}`);
// 4. Send the message and wait for a response
const respone = await sendRPCMessage(channel, JSON.stringify(message), q);
// 5. Log the response to the console
console.log(`[ ${new Date()} ] Message received: ${respone}`);
// 6. Exit the process
process.exit();
};
¡Pruébalo!
Para iniciar el server:
yarn start:server
$ ts-node server.ts
[ Thu Mar 02 2023 00:53:25 GMT+0100 (Central European Standard Time) ] Server started
[ Thu Mar 02 2023 00:53:25 GMT+0100 (Central European Standard Time) ] the queue example exists. To exit press CTRL+C
Podemos comprobar cómo nos indica el log
que el server se ha iniciado y que la cola existe.
Si ahora iniciamos el cliente esto es lo que ocurrirá:
yarn start:client
$ ts-node client.ts
[ Thu Mar 02 2023 00:55:19 GMT+0100 (Central European Standard Time) ] Message sent: {"uuid":"a7fc2a90-0b45-4a05-8587-faf1e04777bc"}
[ Thu Mar 02 2023 00:55:19 GMT+0100 (Central European Standard Time) ] Message received: {"uuid":"eae417bf-62fc-4d49-a983-27b4f50a7f79"}
✨ Done in 0.82s.
Y en el server tendremos:
$ ts-node server.ts
[ Thu Mar 02 2023 00:53:25 GMT+0100 (Central European Standard Time) ] Server started
[ Thu Mar 02 2023 00:53:25 GMT+0100 (Central European Standard Time) ] the queue example exists. To exit press CTRL+C
[ Thu Mar 02 2023 00:55:19 GMT+0100 (Central European Standard Time) ] Message received: {"uuid":"a7fc2a90-0b45-4a05-8587-faf1e04777bc"}
[ Thu Mar 02 2023 00:55:19 GMT+0100 (Central European Standard Time) ] Message sent: {"uuid":"eae417bf-62fc-4d49-a983-27b4f50a7f79"}
Básicamente como podemos comprobar en los logs, el cliente manda un mensaje al server acabado en faf1e04777bc
, el server lo recibe y contesta con otro acabado en 27b4f50a7f79
.
Te animo a que incluyas cierto retraso entre la recepción y la respuesta para ver cómo espera el cliente por ella.
Conclusiones
Este es un ejemplo muy sencillo y obviamente hay que desarrollar más funcionalidades para poder sacar partido a los RPCs utilizando RabbitMQ, pero como punto de partida es muy sencillo de entender y de ampliar. Espero que te sirva de ayuda y que lo compartas.
Dime en los comentarios si quieres que integre esta solución con express.js para enviar los mensajes a un handler
de una ruta.
Comments ()