Apache Kafka Producer and Consumer

Kafka Producer


from kafka import KafkaProducer
import json

# Initialize the Kafka producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send a message to the Kafka topic 'test_topic'
producer.send('test_topic', {'key': 'value'})

# Ensure all messages are sent before closing the producer
producer.flush()
producer.close()
    

Description

KafkaProducer: Initializes the producer with a list of Kafka brokers. Here, it connects to localhost:9092.

value_serializer: Serializes the message to JSON format before sending it to Kafka.

send: Sends the message to the specified topic (test_topic).

flush: Ensures all messages are sent.

close: Closes the producer after sending messages.

Kafka Consumer


from kafka import KafkaConsumer
import json

# Initialize the Kafka consumer
consumer = KafkaConsumer(
    'test_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Read and print messages from the Kafka topic
for message in consumer:
    print(f"Received message: {message.value}")

# Close the consumer when done
consumer.close()
    

Description

KafkaConsumer: Initializes the consumer to connect to localhost:9092 and reads messages from the test_topic.

auto_offset_reset: Sets the consumer to start reading from the earliest message.

enable_auto_commit: Automatically commits the offset of the messages.

value_deserializer: Deserializes the JSON messages received from Kafka.

for message in consumer: Continuously reads messages from the topic and prints them.