Skip to content
Open in Gitpod

Develop with Java Kafka client

This page shows how to use Apache Kafka Java Client to interact with HStream.

TIP

Replace the variables in the following code according to your setup.

Create a Topic

java
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;

class CreateTopic {
  public static void main(String[] args) throws Exception {
    String endpoint = "localhost:9092";
    String topicName = "my_topic";
    int partitions = 1;
    short replicationFactor = 1;

    var props = new Properties();
    props.put("bootstrap.servers", endpoint);

    try (var admin = AdminClient.create(props)) {
      admin
          .createTopics(
              Collections.singleton(new NewTopic(topicName, partitions, replicationFactor)))
          .all()
          .get();
    }
  }
}

Produce a Record

java
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

class Produce {
  public static void main(String[] args) throws Exception {
    String endpoint = "localhost:9092";
    String topicName = "my_topic";

    var props = new Properties();
    props.put("bootstrap.servers", endpoint);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    try (var producer = new KafkaProducer<String, String>(props)) {
      producer.send(new ProducerRecord<>(topicName, "Hello HStream!"));
      producer.flush();
    }
  }
}

Consume Records

java
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;

class Consume {
  public static void main(String[] args) throws Exception {
    String endpoint = "localhost:9092";
    String topicName = "my_topic";
    String groupName = "my_group";

    var props = new Properties();
    props.put("bootstrap.servers", endpoint);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "earliest");
    props.put("group.id", groupName);

    try (var consumer = new KafkaConsumer<String, String>(props)) {
      consumer.subscribe(Collections.singleton(topicName));
      var records = consumer.poll(Duration.ofSeconds(10));
      for (var record : records) {
        System.out.println(record);
      }
    }
  }
}