With the growing need to build scalable, performing and maintainable enterprise services, microservices and event-driven architectures are often brought up as solutions to the problem, but what should you keep in mind when building services with such architectures?
Introduction
This article explains the motivations for using Event-Driven Architecture, the core concepts, and its main challenges. It also explains how to implement Event-Driven Architecture with NestJS as the backend and RabbitMQ as the communication tool to handle events and messaging.
Microservices and their shortcomings
With the growing need for scalable, modular and maintainable systems, microservices architecture is becoming the de facto architecture for building complex enterprise systems. With a microservices architecture, the system is broken down into smaller fault-tolerant modules that can be scaled, deployed, and maintained separately; individual services can even use different technologies stacks that might be more suitable to the domain or the task that the service is responsible for. For instance, a system where most services are mostly written in Java could be using Python for the service responsible for the Machine-Learning model.
One of the big advantages of going for a microservices architecture is having loosely coupled services so that a failure in one service does not cause failures in other services as a single point of failure. But that advantage can be lost if a blocking communication between services is causing it to become a single point of failure. This problem is very common when two services are communicating over HTTP/TCP in the request/response style.
We may also face latency problems as our service might be waiting for the other service to complete a long process to produce the response, causing slow waiting times that could be felt by the end user.
Event-Driven Architecture
To solve this problem a non-blocking method of communication is needed, and that is where Event-Driven Architecture can solve those problems. With Event-Driven Architecture, instead of services communicating through a request/response method, Event Producers will post events to an Event Queue in the form of messages that will be consumed by Event Consumers. Different technologies will use different protocols to deliver their message and some will have support for streaming real-time data, but the basic architecture is as follows:
data:image/s3,"s3://crabby-images/d45b6/d45b6ca9ee07ac9cb45602e20a3e21058952fe00" alt="Event-driven architecture diagram" and that could cause events to be all out of order if the logic to prevent it is not implemented.
Error Handling
With more moving parts, different situations, different communication methods, etc. introduced by the Event-Driven Architecture, error handling becomes a lot more complex, but it is also a key part of fine-tuning your services.
Event-Driven Architecture patterns
Simple event processing
data:image/s3,"s3://crabby-images/a1e9c/a1e9c4abdf5140736f127f8b57f767f106be0579" alt="Simple event processing".
- When process retries are needed and/or the failures need to avoid data loss.
When you SHOULD NOT use Event-Driven Architecture
- The response from the service is integral for the process to continue. (Eg. Fraud detection)
- When the system is not complex enough for the benefits of an Event-Driven Architecture to outweigh the operational cost and complex implementation.
- The team is not used to Event-Driven Architecture and not enough time is available for the team to transition to the new architecture.
Message Brokers and related technologies
- RabbitMQ: Works as a queue, pub/sub message broker using the AMQP protocol. Popular due to its simplicity and reliability.
- Apache Kafka: Distributed event store and streaming processing platform. One of the more popular options due to its performance and implementation possibilities.
- AWS SQS/SNS, GCP Pub/sub: Cloud-based message queue services that are ideal for those who want less operational overhead while working in the cloud.
- AWS Kinesis: A managed data streaming platform for AWS. Similar to Kafka, but with less operational overhead.
Managed versions of RabbitMQ and Kafka can also be found in some cloud providers.
Architecture example
A simple example of an ordering app using Event-Driven architecture:
data:image/s3,"s3://crabby-images/24113/241134b69edc3751a6e6990bb7faf62aaa61e84c" alt="Ordering app architecture example"
export class OrdersController {
constructor(private readonly ordersService: OrdersService) {}
@Post()
async createOrder(@Body() createOrderRequest: CreateOrderRequest) {
return await this.ordersService.createOrder(createOrderRequest);
}
}
orders.service.ts
:
@Injectable()
export class OrdersService {
constructor(
@InjectModel(Order.name) private orderModel: Model<OrderDocument>,
@Inject(BILLING_SERVICE) private billingClient: ClientProxy
) {}
async createOrder(
createOrderRequest: CreateOrderRequest
): Promise<OrderInterface> {
try {
const order = new this.orderModel({
...createOrderRequest,
status: "PENDING",
});
await order.save();
const orderPayload: OrderInterface = {
orderId: order._id.toString(),
productName: order.productName,
price: order.price,
buyer: order.buyer,
status: order.status,
};
await lastValueFrom(
this.billingClient.emit("order_created", {
orderPayload,
})
);
return orderPayload;
} catch (error) {
throw error;
}
}
}
Our entry point is on our controller which will send the Order data to the service layer that will save the Order as PENDING and emit an order_created event to the BILLING queue with a payload, the queue is registered under the BILLING_SERVICE const variable. ( export const BILLING_SERVICE = 'BILLING';
)
Event Consumer
billing.controller.ts
@Controller()
export class BillingController {
constructor(
private readonly billingService: BillingService,
private readonly rmqService: RmqService,
@Inject(ORDERS_SERVICE) private ordersClient: ClientProxy
) {}
@EventPattern("order_created")
async handleOrderCreated(
@Payload("orderPayload") order: Order,
@Ctx() context: RmqContext
) {
//check for fraud
if (await this.billingService.isFraud(order)) {
order.status = "CANCELED_FRAUD";
} else {
this.billingService.bill(order);
order.status = "PROCESSED";
}
// emit and event to update the order status
await lastValueFrom(
this.ordersClient.emit("order_procesed", {
order,
})
);
// if no errors happened, acknowledge the order_created process is over and remove it from the queue
this.rmqService.ack(context);
}
}
The Billing service now will be watching for the Event Pattern order_created, it will check for fraud and then try to bill the order. If no error occurs, it will then emit an order_processed event with the new order status. And if no errors happen it will then Ack the order_created event message.
The Order service will then update the database with the new status and proceed to ack the order_processed event message to end the whole process.
Event Broker
rmq.module.ts
(Configuration needed for services using RabbitMQ as Producers)
@Module({
providers: [RmqService],
exports: [RmqService],
})
export class RmqModule {
static register({ serviceName, serviceEnvVar }: RmqModuleOptions): DynamicModule {
return {
module: RmqModule,
imports: [
/*
Creates a dynamic module based on the serviceName
This module creates/connects to a queue as an event producer
*/
ClientsModule.registerAsync([
{
name: serviceName,
useFactory: (configService: ConfigService) => ({
transport: Transport.RMQ,
options: {
urls: [configService.get<string>('RABBIT_MQ_URI')],
queue: configService.get<string>(serviceEnvVar),
},
}),
inject: [ConfigService],
},
]),
],
exports: [ClientsModule],
};
}
The register method will be used by services that need to use RabbitMQ as Producers. The service will need to provide a serviceName, the URI for RabbitMQ (eg. running container), and serviceEnvVar as the environment variable holding the queue name.
For instance, you could have an environment variable RABBIT_MQ_BILLING_QUEUE=BILLING
, so you would call the register method like RmqModule.register({ serviceName: 'BILLING', serviceEnvVar: 'RABBIT_MQ_BILLING_QUEUE'}),
.
rmq.service.ts
:
@Injectable()
export class RmqService {
constructor(private readonly configService: ConfigService) {}
// return the config options to subscribe to a queue as an event consumer
getOptions(queue: string, noAck = false): RmqOptions {
return {
transport: Transport.RMQ,
options: {
urls: [this.configService.get<string>("RABBIT_MQ_URI")],
queue: this.configService.get<string>(queue),
noAck,
persistent: true,
},
};
}
}
Returns the configuration needed for a service to consume from a RabbitMQ queue. (The queue variable is the env variable name).
Event Message Ack
rmq.service.ts
:
@Injectable()
export class RmqService {
constructor(private readonly configService: ConfigService) {}
ack(context: RmqContext): void {
const channel = context.getChannelRef();
const originalMessage = context.getMessage();
channel.ack(originalMessage);
}
}
The method needed to Ack event messages from a queue when they are no longer needed.
References
- The complete code for the above architecture can be found in the following Github repository
- NestJS + RabbitMQ integration
- IBM Event-Driven Architecture article
Article Photo by Krusche & Company