Develop with Go Kafka client
This page shows how to use Kafka-go Client to interact with HStream.
Create a Topic
go
package examples
import (
"context"
"errors"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func CreateTopics() {
host := "localhost:9092"
client := &kafka.Client{
Addr: kafka.TCP(host),
Timeout: 10 * time.Second,
}
request := &kafka.CreateTopicsRequest{
Topics: []kafka.TopicConfig{
{
Topic: "test-topic",
NumPartitions: 1,
ReplicationFactor: 1,
},
},
ValidateOnly: false,
}
resp, err := client.CreateTopics(context.Background(), request)
if err != nil {
log.Fatal(err)
}
for _, err = range resp.Errors {
if err != nil && !errors.Is(err, kafka.TopicAlreadyExists) {
log.Fatal(err)
}
}
}
Produce Records
go
package examples
import (
"context"
"fmt"
"log"
"sync"
"github.com/segmentio/kafka-go"
)
func Produce() {
host := "localhost:9092"
wg := &sync.WaitGroup{}
wg.Add(totalMesssages)
writer := &kafka.Writer{
Addr: kafka.TCP(host),
Topic: "test-topic",
Balancer: &kafka.RoundRobin{},
RequiredAcks: kafka.RequireAll,
Async: true,
Completion: func(messages []kafka.Message, err error) {
if err != nil {
wg.Done()
log.Printf("produce err: %s\n", err.Error())
return
}
for _, msg := range messages {
wg.Done()
log.Printf("write date to partition %d, offset %d\n", msg.Partition, msg.Offset)
}
},
}
defer func() {
if err := writer.Close(); err != nil {
log.Fatal("Failed to close writer:", err)
}
}()
for i := 0; i < totalMesssages; i++ {
msg := kafka.Message{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
}
if err := writer.WriteMessages(context.Background(), msg); err != nil {
log.Fatal("Failed to write messages:", err)
}
}
wg.Wait()
log.Println("Write messages done.")
}
Consume Records
go
package examples
import (
"context"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func Consume() {
host := "localhost:9092"
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{host},
Topic: "test-topic",
GroupID: "test-group1",
StartOffset: kafka.FirstOffset,
})
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
readCnt := 0
for readCnt < totalMesssages {
m, err := reader.ReadMessage(ctx)
if err != nil {
log.Fatal(err)
}
readCnt++
log.Printf("Message received: value = %s, timestamp = %v, topic = %s", string(m.Value), m.Time, m.Topic)
}
log.Printf("Read %d messages", readCnt)
if err := reader.Close(); err != nil {
log.Fatal("Failed to close reader:", err)
}
}