from kafka import KafkaProducer
import json
# Initialize the Kafka producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send a message to the Kafka topic 'test_topic'
producer.send('test_topic', {'key': 'value'})
# Ensure all messages are sent before closing the producer
producer.flush()
producer.close()
KafkaProducer: Initializes the producer with a list of Kafka brokers. Here, it connects to
localhost:9092
.
value_serializer: Serializes the message to JSON format before sending it to Kafka.
send: Sends the message to the specified topic (test_topic
).
flush: Ensures all messages are sent.
close: Closes the producer after sending messages.
from kafka import KafkaConsumer
import json
# Initialize the Kafka consumer
consumer = KafkaConsumer(
'test_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# Read and print messages from the Kafka topic
for message in consumer:
print(f"Received message: {message.value}")
# Close the consumer when done
consumer.close()
KafkaConsumer: Initializes the consumer to connect to localhost:9092
and reads messages from
the test_topic
.
auto_offset_reset: Sets the consumer to start reading from the earliest message.
enable_auto_commit: Automatically commits the offset of the messages.
value_deserializer: Deserializes the JSON messages received from Kafka.
for message in consumer: Continuously reads messages from the topic and prints them.