Skip to content
Open in Gitpod
On this page

Develop with Python Kafka client

This guide will show you how to use Python Kafka client to interact with HStream. Currenty, we support kafka-python and confluent-kafka.

Installation

sh
# If you want to use kafka-python
pip install kafka-python

# Or if you want to use confluent-kafka
pip install confluent-kafka

TIP

Prefer to use a virtual environment? Check out Python's built-in venv.

Create a Topic

python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"


def create_topic():
    admin = KafkaAdminClient(bootstrap_servers=addr)
    topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
    admin.create_topics([topic])
python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)

topic_name = "my_topic"
group_id = "confluent_kafka_group"

conf = {
    "bootstrap.servers": host + ":" + str(port),
    "client.id": "confluent_kafka_client",
}


def create_topic():
    admin = AdminClient(conf)
    new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)
    admin.create_topics([new_topic])

Produce Records

python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"


def produce():
    producer = KafkaProducer(
        bootstrap_servers=addr,
        acks="all",
        linger_ms=100,
    )
    futures = [
        producer.send(topic_name, b"hello, hstream " + str(i).encode())
        for i in range(5)
    ]
    for future in futures:
        response = future.get(timeout=10)
        print("Producer response:", response)
python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)

topic_name = "my_topic"
group_id = "confluent_kafka_group"

conf = {
    "bootstrap.servers": host + ":" + str(port),
    "client.id": "confluent_kafka_client",
}


def produce():
    def acked(err, msg):
        if err is not None:
            print(f"Failed to deliver message: {msg}: {err}")
        else:
            print(
                f"Message produced: offset={msg.offset()}, "
                f'key="{msg.key().decode()}", '
                f'value="{msg.value().decode()}"'
            )

    producer = Producer(conf)
    for i in range(5):
        producer.produce(
            topic_name,
            key=b"key " + str(i).encode(),
            value=b"hello, hstream " + str(i).encode(),
            on_delivery=acked,
        )
    producer.flush()

Consume Records

python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"


def consume():
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=addr,
        auto_offset_reset="earliest",
        enable_auto_commit=False,
        fetch_max_wait_ms=1000,
    )
    i = 0
    for msg in consumer:
        print("Consumer response", msg)
        i += 1
        if i >= 5:
            consumer.close()
python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)

topic_name = "my_topic"
group_id = "confluent_kafka_group"

conf = {
    "bootstrap.servers": host + ":" + str(port),
    "client.id": "confluent_kafka_client",
}


def consume():
    consumer = Consumer(
        {
            **conf,
            "group.id": group_id,
            "auto.offset.reset": "smallest",
            "enable.auto.commit": "false",
            "isolation.level": "read_uncommitted",
        }
    )
    consumer.subscribe([topic_name])
    i = 0
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # Initial message consumption may take up to
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting...")
            elif msg.error():
                print(f"ERROR: {msg.error()}")
            else:
                # Extract the (optional) key and value, and print.
                print(
                    f"Consumed topic {msg.topic()}: "
                    f'key="{msg.key().decode()}", '
                    f'value="{msg.value().decode()}"'
                )
                i += 1
            if i >= 5:
                break
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()