Using domain events with rabbitmq
In this post, we'll explain how to use a DomainEvents
class to orchestrate event publishing and consumption using RabbitMQ. This setup is ideal for a microservices architecture within a monorepo.
Setting Up Dependencies
First, install the necessary dependencies for our new monorepo package message-broker:
pnpm install amqplib pnpm install @types/amqplib --save-dev
RabbitMQ Connection Class
Create a RabbitMQ
class to manage connections and channels:
import amqp, { Channel, Connection, ConsumeMessage } from 'amqplib'; import { env } from "env"; class RabbitMQ { private connection: Connection | null = null; private channel: Channel | null = null; async connect(): Promise<void> { this.connection = await amqp.connect(env.MQ_URL); this.channel = await this.connection.createChannel(); } async createQueue(queue: string): Promise<void> { if (!this.channel) throw new Error("Channel is not initialized"); await this.channel.assertQueue(queue, { durable: true }); } async sendToQueue(queue: string, message: string): Promise<void> { if (!this.channel) throw new Error("Channel is not initialized"); this.channel.sendToQueue(queue, Buffer.from(message)); } async consumeQueue(queue: string, callback: (msg: ConsumeMessage | null) => void): Promise<void> { if (!this.channel) throw new Error("Channel is not initialized"); await this.channel.consume(queue, callback, { noAck: true }); } async close(): Promise<void> { if (this.channel) await this.channel.close(); if (this.connection) await this.connection.close(); } } export const rabbitMQ = new RabbitMQ();
DomainEvents Class
Implement the DomainEvents
class for event orchestration:
import { rabbitMQ } from './rabbitmq' type EventHandler = (data: unknown) => void class DomainEvents { private static handlers: Record<string, EventHandler[]> = {} static async publish<T>(event: string, data: T): Promise<void> { const message = JSON.stringify({ event, data }) await rabbitMQ.sendToQueue(event, message) } static subscribe(event: string, handler: EventHandler): void { if (!this.handlers[event]) { this.handlers[event] = [] } this.handlers[event].push(handler) } static async listen(event: string): Promise<void> { await rabbitMQ.createQueue(event) await rabbitMQ.consumeQueue(event, (msg) => { if (msg) { const messageContent = msg.content.toString() const { data } = JSON.parse(messageContent) this.handlers[event].forEach((handler) => handler(data)) } }) } } export { DomainEvents }
Using DomainEvents in Microservices
Publishing user.created on user-service
To publish an event, use the publish
method, like here on the register-user-controller.ts
, on user-service
await DomainEvents.publish<UserPublic>( 'user.created', UserPresenter.toHTTP(user), )
Subscribing to an Event
To subscribe to an event, use the subscribe
method, like here on the subscribe.ts
, on hotel-service
import { DomainEvents, rabbitMQ } from 'message-broker'; import { createUser } from './events/user-created'; import { updateUser } from './events/user-updated'; export const subscribe = async () => { await rabbitMQ.connect() await DomainEvents.listen('user.created'); await DomainEvents.listen('user.updated'); DomainEvents.subscribe('user.created', async (data) => { await createUser(data) }); DomainEvents.subscribe('user.updated', async (data) => { await updateUser(data) }); }
Starting the Application
Make sure to call subscribe()
at the server start:
app.listen({ port: env.HOTEL_SERVICE_PORT }).then(async () => { subscribe().catch(console.error) console.log('') console.log('🤘 MS Hotel Service running!') })
Here we can see all together in action:
By following these steps, you can effectively manage event-driven communication between microservices using RabbitMQ in a Node.js monorepo. This approach ensures a scalable and maintainable architecture for your application.
Contribute to the Project
If you found this post helpful or have suggestions for improvement, feel free to check out the project repository on GitHub. You are welcome to fork the repository and submit a pull request. If you have any questions or want to discuss a topic, please open an issue. We appreciate your contributions!