Skip to content

Commit

Permalink
update deps
Browse files Browse the repository at this point in the history
  • Loading branch information
iqbalaydrus committed Jan 19, 2024
1 parent ab973ef commit c40c3f1
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 597 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# build vFlow in the first stage
FROM golang:1.15.3 as builder
FROM golang:1.21.6 as builder
WORKDIR /go/src/

RUN mkdir -p github.com/EdgeCast/vflow
Expand Down
32 changes: 19 additions & 13 deletions consumers/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
package main

import (
"context"
"encoding/json"
"errors"
"flag"
"github.com/segmentio/kafka-go"
"io"
"log"
"sync"
"time"

cluster "github.com/bsm/sarama-cluster"
cluster "github.com/IBM/sarama"
)

type options struct {
Expand Down Expand Up @@ -70,21 +74,19 @@ func main() {

config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true

wg.Add(opts.Workers)

for i := 0; i < opts.Workers; i++ {
go func(ti int) {
var objmap ipfix

brokers := []string{opts.Broker}
topics := []string{opts.Topic}
consumer, err := cluster.NewConsumer(brokers, "mygroup", topics, config)
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{opts.Broker},
Topic: opts.Topic,
GroupID: "mygroup",
})

if err != nil {
panic(err)
}
defer consumer.Close()

pCount := 0
Expand All @@ -98,9 +100,15 @@ func main() {
log.Printf("partition GroupId#%d, rate=%d\n", ti, (count-pCount)/10)
}
pCount = count
case msg, more := <-consumer.Messages():
if more {
if err := json.Unmarshal(msg.Value, &objmap); err != nil {
default:
timeout, cancel := context.WithTimeout(context.Background(), time.Second)
msg, err := consumer.ReadMessage(timeout)
cancel()
if errors.Is(err, io.EOF) {
panic(err)
}
if err == nil {
if err = json.Unmarshal(msg.Value, &objmap); err != nil {
log.Println(err)
} else {
for _, data := range objmap.DataSets {
Expand All @@ -111,8 +119,6 @@ func main() {
}
}
}

consumer.MarkOffset(msg, "")
count++
}
}
Expand Down
35 changes: 19 additions & 16 deletions consumers/clickhouse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
package main

import (
"context"
"database/sql"
"encoding/json"
"errors"
"flag"
"io"
"log"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go"
cluster "github.com/bsm/sarama-cluster"
"github.com/segmentio/kafka-go"
)

type options struct {
Expand Down Expand Up @@ -78,10 +81,6 @@ func main() {
ch = make(chan ipfix, 10000)
)

config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true

for i := 0; i < 5; i++ {
go ingestClickHouse(ch)
}
Expand All @@ -90,13 +89,12 @@ func main() {

for i := 0; i < opts.Workers; i++ {
go func(ti int) {
brokers := []string{opts.Broker}
topics := []string{opts.Topic}
consumer, err := cluster.NewConsumer(brokers, "mygroup", topics, config)
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{opts.Broker},
Topic: opts.Topic,
GroupID: "mygroup",
})

if err != nil {
panic(err)
}
defer consumer.Close()

pCount := 0
Expand All @@ -110,15 +108,20 @@ func main() {
log.Printf("partition GroupId#%d, rate=%d\n", ti, (count-pCount)/10)
}
pCount = count
case msg, more := <-consumer.Messages():
objmap := ipfix{}
if more {
if err := json.Unmarshal(msg.Value, &objmap); err != nil {
default:
timeout, cancel := context.WithTimeout(context.Background(), time.Second)
msg, err := consumer.ReadMessage(timeout)
cancel()
if errors.Is(err, io.EOF) {
panic(err)
}
if err == nil {
objmap := ipfix{}
if err = json.Unmarshal(msg.Value, &objmap); err != nil {
log.Println(err)
} else {
ch <- objmap
}
consumer.MarkOffset(msg, "")
count++
}
}
Expand Down
48 changes: 38 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,18 +1,46 @@
module github.com/EdgeCast/vflow

go 1.15
go 1.21

require (
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/Shopify/sarama v1.33.0
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/nats-io/nats-server/v2 v2.8.2 // indirect
github.com/nats-io/nats.go v1.15.0
github.com/IBM/sarama v1.42.1
github.com/nats-io/nats.go v1.32.0
github.com/nsqio/go-nsq v1.1.0
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.19.0 // indirect
github.com/prometheus/client_golang v1.12.2
github.com/segmentio/kafka-go v0.4.31
golang.org/x/net v0.0.0-20220513224357-95641704303c
github.com/prometheus/client_golang v1.18.0
github.com/segmentio/kafka-go v0.4.47
golang.org/x/net v0.20.0
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
Loading

0 comments on commit c40c3f1

Please sign in to comment.