Develop with Python Kafka client 
This guide will show you how to use Python Kafka client to interact with HStream. Currenty, we support kafka-python and confluent-kafka.
Installation 
sh
# If you want to use kafka-python
pip install kafka-python
# Or if you want to use confluent-kafka
pip install confluent-kafkaTIP
Prefer to use a virtual environment? Check out Python's built-in venv.
Create a Topic 
python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"
def create_topic():
    admin = KafkaAdminClient(bootstrap_servers=addr)
    topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
    admin.create_topics([topic])python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
topic_name = "my_topic"
group_id = "confluent_kafka_group"
conf = {
    "bootstrap.servers": host + ":" + str(port),
    "client.id": "confluent_kafka_client",
}
def create_topic():
    admin = AdminClient(conf)
    new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)
    admin.create_topics([new_topic])Produce Records 
python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"
def produce():
    producer = KafkaProducer(
        bootstrap_servers=addr,
        acks="all",
        linger_ms=100,
    )
    futures = [
        producer.send(topic_name, b"hello, hstream " + str(i).encode())
        for i in range(5)
    ]
    for future in futures:
        response = future.get(timeout=10)
        print("Producer response:", response)python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
topic_name = "my_topic"
group_id = "confluent_kafka_group"
conf = {
    "bootstrap.servers": host + ":" + str(port),
    "client.id": "confluent_kafka_client",
}
def produce():
    def acked(err, msg):
        if err is not None:
            print(f"Failed to deliver message: {msg}: {err}")
        else:
            print(
                f"Message produced: offset={msg.offset()}, "
                f'key="{msg.key().decode()}", '
                f'value="{msg.value().decode()}"'
            )
    producer = Producer(conf)
    for i in range(5):
        producer.produce(
            topic_name,
            key=b"key " + str(i).encode(),
            value=b"hello, hstream " + str(i).encode(),
            on_delivery=acked,
        )
    producer.flush()Consume Records 
python
import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"
def consume():
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=addr,
        auto_offset_reset="earliest",
        enable_auto_commit=False,
        fetch_max_wait_ms=1000,
    )
    i = 0
    for msg in consumer:
        print("Consumer response", msg)
        i += 1
        if i >= 5:
            consumer.close()python
import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
topic_name = "my_topic"
group_id = "confluent_kafka_group"
conf = {
    "bootstrap.servers": host + ":" + str(port),
    "client.id": "confluent_kafka_client",
}
def consume():
    consumer = Consumer(
        {
            **conf,
            "group.id": group_id,
            "auto.offset.reset": "smallest",
            "enable.auto.commit": "false",
            "isolation.level": "read_uncommitted",
        }
    )
    consumer.subscribe([topic_name])
    i = 0
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # Initial message consumption may take up to
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting...")
            elif msg.error():
                print(f"ERROR: {msg.error()}")
            else:
                # Extract the (optional) key and value, and print.
                print(
                    f"Consumed topic {msg.topic()}: "
                    f'key="{msg.key().decode()}", '
                    f'value="{msg.value().decode()}"'
                )
                i += 1
            if i >= 5:
                break
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()