Skip to content
Open in Gitpod

Develop with Python Kafka client

This guide will show you how to use Python Kafka client to interact with HStream. Currenty, we support kafka-python and confluent-kafka.

Installation

sh
# If you want to use kafka-python
pip install kafka-python

# Or if you want to use confluent-kafka
pip install confluent-kafka

TIP

Prefer to use a virtual environment? Check out Python's built-in venv.

Create a Topic

python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"


def create_topic():
    admin = KafkaAdminClient(bootstrap_servers=addr)
    topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
    admin.create_topics([topic])
python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)

topic_name = "my_topic"
group_id = "confluent_kafka_group"

conf = {
    "bootstrap.servers": host + ":" + str(port),
    "client.id": "confluent_kafka_client",
}


def create_topic():
    admin = AdminClient(conf)
    new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)
    admin.create_topics([new_topic])

Produce Records

python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"


def produce():
    producer = KafkaProducer(
        bootstrap_servers=addr,
        acks="all",
        linger_ms=100,
    )
    futures = [
        producer.send(topic_name, b"hello, hstream " + str(i).encode())
        for i in range(5)
    ]
    for future in futures:
        response = future.get(timeout=10)
        print("Producer response:", response)
python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)

topic_name = "my_topic"
group_id = "confluent_kafka_group"

conf = {
    "bootstrap.servers": host + ":" + str(port),
    "client.id": "confluent_kafka_client",
}


def produce():
    def acked(err, msg):
        if err is not None:
            print(f"Failed to deliver message: {msg}: {err}")
        else:
            print(
                f"Message produced: offset={msg.offset()}, "
                f'key="{msg.key().decode()}", '
                f'value="{msg.value().decode()}"'
            )

    producer = Producer(conf)
    for i in range(5):
        producer.produce(
            topic_name,
            key=b"key " + str(i).encode(),
            value=b"hello, hstream " + str(i).encode(),
            on_delivery=acked,
        )
    producer.flush()

Consume Records

python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"


def consume():
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=addr,
        auto_offset_reset="earliest",
        enable_auto_commit=False,
        fetch_max_wait_ms=1000,
    )
    i = 0
    for msg in consumer:
        print("Consumer response", msg)
        i += 1
        if i >= 5:
            consumer.close()
python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)

topic_name = "my_topic"
group_id = "confluent_kafka_group"

conf = {
    "bootstrap.servers": host + ":" + str(port),
    "client.id": "confluent_kafka_client",
}


def consume():
    consumer = Consumer(
        {
            **conf,
            "group.id": group_id,
            "auto.offset.reset": "smallest",
            "enable.auto.commit": "false",
            "isolation.level": "read_uncommitted",
        }
    )
    consumer.subscribe([topic_name])
    i = 0
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # Initial message consumption may take up to
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting...")
            elif msg.error():
                print(f"ERROR: {msg.error()}")
            else:
                # Extract the (optional) key and value, and print.
                print(
                    f"Consumed topic {msg.topic()}: "
                    f'key="{msg.key().decode()}", '
                    f'value="{msg.value().decode()}"'
                )
                i += 1
            if i >= 5:
                break
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()