Skip to content
Open in Gitpod

Develop with Go Kafka client

This page shows how to use Kafka-go Client to interact with HStream.

Create a Topic

go
package examples

import (
	"context"
	"errors"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func CreateTopics() {
	host := "localhost:9092"
	client := &kafka.Client{
		Addr:    kafka.TCP(host),
		Timeout: 10 * time.Second,
	}

	request := &kafka.CreateTopicsRequest{
		Topics: []kafka.TopicConfig{
			{
				Topic:             "test-topic",
				NumPartitions:     1,
				ReplicationFactor: 1,
			},
		},
		ValidateOnly: false,
	}

	resp, err := client.CreateTopics(context.Background(), request)
	if err != nil {
		log.Fatal(err)
	}

	for _, err = range resp.Errors {
		if err != nil && !errors.Is(err, kafka.TopicAlreadyExists) {
			log.Fatal(err)
		}
	}
}

Produce Records

go
package examples

import (
	"context"
	"fmt"
	"log"
	"sync"

	"github.com/segmentio/kafka-go"
)

func Produce() {
	host := "localhost:9092"

	wg := &sync.WaitGroup{}
	wg.Add(totalMesssages)
	writer := &kafka.Writer{
		Addr:         kafka.TCP(host),
		Topic:        "test-topic",
		Balancer:     &kafka.RoundRobin{},
		RequiredAcks: kafka.RequireAll,
		Async:        true,
		Completion: func(messages []kafka.Message, err error) {
			if err != nil {
				wg.Done()
				log.Printf("produce err: %s\n", err.Error())
				return
			}

			for _, msg := range messages {
				wg.Done()
				log.Printf("write date to partition %d, offset %d\n", msg.Partition, msg.Offset)
			}
		},
	}

	defer func() {
		if err := writer.Close(); err != nil {
			log.Fatal("Failed to close writer:", err)
		}
	}()

	for i := 0; i < totalMesssages; i++ {
		msg := kafka.Message{
			Key:   []byte(fmt.Sprintf("key-%d", i)),
			Value: []byte(fmt.Sprintf("value-%d", i)),
		}
		if err := writer.WriteMessages(context.Background(), msg); err != nil {
			log.Fatal("Failed to write messages:", err)
		}
	}

	wg.Wait()
	log.Println("Write messages done.")
}

Consume Records

go
package examples

import (
	"context"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func Consume() {
	host := "localhost:9092"

	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:     []string{host},
		Topic:       "test-topic",
		GroupID:     "test-group1",
		StartOffset: kafka.FirstOffset,
	})

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	readCnt := 0
	for readCnt < totalMesssages {
		m, err := reader.ReadMessage(ctx)
		if err != nil {
			log.Fatal(err)
		}
		readCnt++
		log.Printf("Message received: value = %s, timestamp = %v, topic = %s", string(m.Value), m.Time, m.Topic)
	}
	log.Printf("Read %d messages", readCnt)

	if err := reader.Close(); err != nil {
		log.Fatal("Failed to close reader:", err)
	}
}