Develop with Java Kafka client
This page shows how to use Apache Kafka Java Client to interact with HStream.
TIP
Replace the variables in the following code according to your setup.
Create a Topic
java
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
class CreateTopic {
public static void main(String[] args) throws Exception {
String endpoint = "localhost:9092";
String topicName = "my_topic";
int partitions = 1;
short replicationFactor = 1;
var props = new Properties();
props.put("bootstrap.servers", endpoint);
try (var admin = AdminClient.create(props)) {
admin
.createTopics(
Collections.singleton(new NewTopic(topicName, partitions, replicationFactor)))
.all()
.get();
}
}
}
Produce a Record
java
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
class Produce {
public static void main(String[] args) throws Exception {
String endpoint = "localhost:9092";
String topicName = "my_topic";
var props = new Properties();
props.put("bootstrap.servers", endpoint);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (var producer = new KafkaProducer<String, String>(props)) {
producer.send(new ProducerRecord<>(topicName, "Hello HStream!"));
producer.flush();
}
}
}
Consume Records
java
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
class Consume {
public static void main(String[] args) throws Exception {
String endpoint = "localhost:9092";
String topicName = "my_topic";
String groupName = "my_group";
var props = new Properties();
props.put("bootstrap.servers", endpoint);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("group.id", groupName);
try (var consumer = new KafkaConsumer<String, String>(props)) {
consumer.subscribe(Collections.singleton(topicName));
var records = consumer.poll(Duration.ofSeconds(10));
for (var record : records) {
System.out.println(record);
}
}
}
}