Skip to content

Commit

Permalink
change to kafka-go
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jan 10, 2024
1 parent 682b4e9 commit f354524
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 213 deletions.
2 changes: 1 addition & 1 deletion docs/develop-with-kafka-api/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Currenty, the clients below are tested by HStream.
| -------- | ----------------------------------------------------------- |
| Java | [Apache Kafka Java Client](https://github.com/apache/kafka) |
| Python | [kafka-python](https://github.com/dpkp/kafka-python) |
| Go | [sarama](https://github.com/IBM/sarama) |
| Go | [kafka-go](https://github.com/segmentio/kafka-go) |
| C/C++ | [librdkafka](https://github.com/confluentinc/librdkafka) |

::: tip
Expand Down
8 changes: 1 addition & 7 deletions docs/develop-with-kafka-api/go.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
# Develop with Go Kafka client

This page shows how to use [Sarama Go Client](https://github.com/IBM/sarama) to interact with HStream.
This page shows how to use [Kafka-go Client](https://github.com/segmentio/kafka-go) to interact with HStream.


::: warning

Currently only support kafka version between v0.11.0 to v1.0.2

:::

## Create a Topic


Expand Down
3 changes: 0 additions & 3 deletions kafka-examples/go/examples/common.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package examples

import "github.com/IBM/sarama"

var (
KafkaVersion = sarama.V1_0_2_0
totalMesssages = 10
)
106 changes: 23 additions & 83 deletions kafka-examples/go/examples/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,95 +3,35 @@ package examples
import (
"context"
"log"
"sync"
"sync/atomic"
"time"

"github.com/IBM/sarama"
"github.com/segmentio/kafka-go"
)

func Consume() {
brokers := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Version = KafkaVersion
config.Consumer.Offsets.Initial = sarama.OffsetOldest

consumer := Consumer{
ready: make(chan struct{}),
doneChan: make(chan struct{}),
totalMessages: int32(totalMesssages),
}

client, err := sarama.NewConsumerGroup(brokers, "test-group", config)
if err != nil {
log.Fatal(err)
}

ctx, cancel := context.WithCancel(context.Background())

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := client.Consume(ctx, []string{"test-topic"}, &consumer); err != nil {
log.Fatalf("Error from consumer: %v\n", err)
}

if ctx.Err() != nil {
return
}
consumer.ready = make(chan struct{})
host := "localhost:9092"

reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{host},
Topic: "test-topic",
GroupID: "test-group",
})

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

readCnt := 0
for readCnt < totalMesssages {
m, err := reader.ReadMessage(ctx)
if err != nil {
log.Fatal(err)
}
}()

<-consumer.ready
<-consumer.doneChan
cancel()

wg.Wait()
if err = client.Close(); err != nil {
log.Fatal(err)
readCnt++
log.Printf("Message received: value = %s, timestamp = %v, topic = %s", string(m.Value), m.Time, m.Topic)
}
}

type Consumer struct {
ready chan struct{}
doneChan chan struct{}
totalMessages int32
received atomic.Int32
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(c.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
log.Printf("Read %d messages", readCnt)

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for c.received.Load() <= c.totalMessages {
select {
case message, ok := <-claim.Messages():
if !ok {
return nil
}
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
c.received.Add(1)
if c.received.Load() == c.totalMessages {
close(c.doneChan)
log.Printf("received %d messages\n", c.received.Load())
return nil
}
case <-session.Context().Done():
return nil
}
if err := reader.Close(); err != nil {
log.Fatal("Failed to close reader:", err)
}
return nil
}
39 changes: 25 additions & 14 deletions kafka-examples/go/examples/create_topics.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,40 @@
package examples

import (
"context"
"errors"
"log"
"time"

"github.com/IBM/sarama"
"github.com/segmentio/kafka-go"
)

func CreateTopics() {
brokers := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Version = KafkaVersion
validateOnly := false

admin, err := sarama.NewClusterAdmin(brokers, config)
if err != nil {
log.Fatal(err)
host := "localhost:9092"
client := &kafka.Client{
Addr: kafka.TCP(host),
Timeout: 10 * time.Second,
}
defer admin.Close()

topicDetail := &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
request := &kafka.CreateTopicsRequest{
Topics: []kafka.TopicConfig{
{
Topic: "test-topic",
NumPartitions: 1,
ReplicationFactor: 1,
},
},
ValidateOnly: false,
}

if err = admin.CreateTopic("test-topic", topicDetail, validateOnly); err != nil {
resp, err := client.CreateTopics(context.Background(), request)
if err != nil {
log.Fatal(err)
}

for _, err = range resp.Errors {
if err != nil && !errors.Is(err, kafka.TopicAlreadyExists) {
log.Fatal(err)
}
}
}
76 changes: 33 additions & 43 deletions kafka-examples/go/examples/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,52 @@ import (
"context"
"fmt"
"log"
"time"
"sync"

"github.com/IBM/sarama"
"github.com/segmentio/kafka-go"
)

func Produce() {
brokers := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Version = KafkaVersion
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true

producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
log.Fatal(err)
}

totalReceived := 0
doneChan := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

go func() {
for {
select {
case <-ctx.Done():
close(doneChan)
return
case err := <-producer.Errors():
host := "localhost:9092"

wg := &sync.WaitGroup{}
wg.Add(totalMesssages)
writer := &kafka.Writer{
Addr: kafka.TCP(host),
Topic: "test-topic",
Balancer: &kafka.RoundRobin{},
RequiredAcks: kafka.RequireAll,
Async: true,
Completion: func(messages []kafka.Message, err error) {
if err != nil {
wg.Done()
log.Printf("produce err: %s\n", err.Error())
case msg := <-producer.Successes():
log.Printf("write date to partition %d, offset %d\n", msg.Partition, msg.Offset)
}
totalReceived += 1
if totalReceived >= totalMesssages {
close(doneChan)
return
}

for _, msg := range messages {
wg.Done()
log.Printf("write date to partition %d, offset %d\n", msg.Partition, msg.Offset)
}
},
}

defer func() {
if err := writer.Close(); err != nil {
log.Fatal("Failed to close writer:", err)
}
}()

for i := 0; i < totalMesssages; i++ {
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Key: sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
Value: sarama.StringEncoder(fmt.Sprintf("value-%d", i)),
msg := kafka.Message{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
}
select {
case <-ctx.Done():
case producer.Input() <- msg:
if err := writer.WriteMessages(context.Background(), msg); err != nil {
log.Fatal("Failed to write messages:", err)
}
}

<-doneChan
if err = producer.Close(); err != nil {
log.Fatal(err)
}
wg.Wait()
log.Println("Write messages done.")
}
19 changes: 2 additions & 17 deletions kafka-examples/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,10 @@ module github.com/hstreamdb/hstream-kafka-go-examples

go 1.21

require github.com/IBM/sarama v1.42.1
require github.com/segmentio/kafka-go v0.4.47

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
)
Loading

0 comments on commit f354524

Please sign in to comment.