Develop with Python Kafka client
This guide will show you how to use Python Kafka client to interact with HStream. Currenty, we support kafka-python and confluent-kafka.
Installation
sh
# If you want to use kafka-python
pip install kafka-python
# Or if you want to use confluent-kafka
pip install confluent-kafka
TIP
Prefer to use a virtual environment? Check out Python's built-in venv.
Create a Topic
python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"
def create_topic():
admin = KafkaAdminClient(bootstrap_servers=addr)
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
admin.create_topics([topic])
python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
topic_name = "my_topic"
group_id = "confluent_kafka_group"
conf = {
"bootstrap.servers": host + ":" + str(port),
"client.id": "confluent_kafka_client",
}
def create_topic():
admin = AdminClient(conf)
new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)
admin.create_topics([new_topic])
Produce Records
python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"
def produce():
producer = KafkaProducer(
bootstrap_servers=addr,
acks="all",
linger_ms=100,
)
futures = [
producer.send(topic_name, b"hello, hstream " + str(i).encode())
for i in range(5)
]
for future in futures:
response = future.get(timeout=10)
print("Producer response:", response)
python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
topic_name = "my_topic"
group_id = "confluent_kafka_group"
conf = {
"bootstrap.servers": host + ":" + str(port),
"client.id": "confluent_kafka_client",
}
def produce():
def acked(err, msg):
if err is not None:
print(f"Failed to deliver message: {msg}: {err}")
else:
print(
f"Message produced: offset={msg.offset()}, "
f'key="{msg.key().decode()}", '
f'value="{msg.value().decode()}"'
)
producer = Producer(conf)
for i in range(5):
producer.produce(
topic_name,
key=b"key " + str(i).encode(),
value=b"hello, hstream " + str(i).encode(),
on_delivery=acked,
)
producer.flush()
Consume Records
python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"
def consume():
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=addr,
auto_offset_reset="earliest",
enable_auto_commit=False,
fetch_max_wait_ms=1000,
)
i = 0
for msg in consumer:
print("Consumer response", msg)
i += 1
if i >= 5:
consumer.close()
python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
topic_name = "my_topic"
group_id = "confluent_kafka_group"
conf = {
"bootstrap.servers": host + ":" + str(port),
"client.id": "confluent_kafka_client",
}
def consume():
consumer = Consumer(
{
**conf,
"group.id": group_id,
"auto.offset.reset": "smallest",
"enable.auto.commit": "false",
"isolation.level": "read_uncommitted",
}
)
consumer.subscribe([topic_name])
i = 0
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting...")
elif msg.error():
print(f"ERROR: {msg.error()}")
else:
# Extract the (optional) key and value, and print.
print(
f"Consumed topic {msg.topic()}: "
f'key="{msg.key().decode()}", '
f'value="{msg.value().decode()}"'
)
i += 1
if i >= 5:
break
except KeyboardInterrupt:
pass
finally:
consumer.close()