Apache Kafka
Apache Kafka is a distributed and scalable data processing platform used for real-time data streaming. It serves as a messaging and event streaming platform, facilitating the secure, reliable, and real-time transfer of data between applications and distributed systems.
Kafka operates on a publish-subscribe model, where data producers publish messages to a topic, and consumers subscribe to that topic to receive messages. Kafka can handle large volumes of real-time data and distribute them to multiple consumers in parallel.
Commonly, Kafka finds applications in enterprise solutions, especially those requiring high-speed, high-availability communication between distributed systems. Additionally, Kafka is a flexible platform that can be integrated with various programming languages and tools, boasting an active developer community contributing to its improvement and expansion.
Key features of Kafka include:
- Horizontal scalability
- Fault tolerance
- Data replication
- Storage capacity
- Integration with data processing tools like Apache Spark, Apache Flink, and Apache Storm
Implementing Kafka in NestJS
To implement Kafka in NestJS, follow these steps:
- Step one
- Step two
- Step three
- 1. Install the @nestjs/microservices and kafkajs packages using npm:
npm install @nestjs/microservices kafkajs
- 2. Create a Kafka module in NestJS using the NestJS generator:
nest generate module kafka
- 3. Import the KafkaModule in your main application:
import { KafkaModule } from './kafka/kafka.module';
@Module({
imports: [KafkaModule],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
- 4. Create a Kafka service in the KafkaModule:
import { Injectable } from '@nestjs/common';
import { Kafka } from 'kafkajs';
@Injectable()
export class KafkaService {
private kafka;
constructor() {
this.kafka = new Kafka({
clientId: 'your-client-id',
brokers: ['kafka-broker1', 'kafka-broker2'],
});
}
async sendMessage(topic: string, message: string) {
const producer = this.kafka.producer();
await producer.connect();
await producer.send({
topic,
messages: [{ value: message }],
});
await producer.disconnect();
}
async consumeMessages(topic: string) {
const consumer = this.kafka.consumer({ groupId: 'group-id' });
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// Handle the received message
console.log({
topic,
partition,
value: message.value.toString(),
});
},
});
}
}
- 5. Add a controller in the KafkaModule to handle HTTP requests:
import { Controller, Get, Post, Body, Param } from '@nestjs/common';
import { KafkaService } from './kafka.service';
@Controller('kafka')
export class KafkaController {
constructor(private readonly kafkaService: KafkaService) {}
@Post('send/:topic')
async sendKafkaMessage(@Param('topic') topic: string, @Body() message: { value: string }) {
await this.kafkaService.sendMessage(topic, message.value);
}
@Get('consume/:topic')
async consumeKafkaMessages(@Param('topic') topic: string) {
await this.kafkaService.consumeMessages(topic);
}
}
- 6. Add the controller to the KafkaModule:
@Module({
controllers: [KafkaController],
providers: [KafkaService],
})
export class KafkaModule {}
- 7. Start your application. These are the basic steps to implement Kafka in NestJS. Of course, there are many other configurations and options to consider, depending on your specific use case.