Skip to content

Commit

Permalink
kafkatool: add 'dump print' command (#9942)
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Nov 19, 2024
1 parent 00458cd commit 25b474a
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
* [ENHANCEMENT] `kafkatool`: add SASL plain authentication support. The following new CLI flags have been added: #9584
* `--kafka-sasl-username`
* `--kafka-sasl-password`
* [ENHANCEMENT] `kafkatool`: add `dump print` command to print the content of write requests from a dump. #9942
* [ENHANCEMENT] Updated `KubePersistentVolumeFillingUp` runbook, including a sample command to debug the distroless image. #9802

## 2.14.1
Expand Down
116 changes: 87 additions & 29 deletions tools/kafkatool/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/mimirpb"
)

type DumpCommand struct {
Expand Down Expand Up @@ -43,6 +45,7 @@ func (c *DumpCommand) Register(app *kingpin.Application, getKafkaClient func() *

cmd.Command("import", "Import records from a file into a Kafka topic").Action(c.doImport)
cmd.Command("export", "Export records from a Kafka topic into a file").Action(c.doExport)
cmd.Command("print", "Print the write requests inside records dumped using this tool").Action(c.doPrint)
}

type key int
Expand All @@ -52,16 +55,26 @@ const (
)

func (c *DumpCommand) doExport(*kingpin.ParseContext) error {
var (
recordCount = &atomic.Int64{}
consumedOffset = &atomic.Int64{}
)

client := c.getKafkaClient()
client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
c.topic: {int32(c.partition): kgo.NewOffset().At(c.exportOffsetStart)}},
)

// Print the number of consumed records both when the export is in progress and at the end.
go func() {
for {
time.Sleep(time.Second)
c.printer.PrintLine(fmt.Sprintf("consumed records: %d, offset %d", recordCount.Load(), consumedOffset.Load()))
}
}()
defer func() {
c.printer.PrintLine(fmt.Sprintf("consumed records: %d, offset %d", recordCount.Load(), consumedOffset.Load()))
}()

encoder := json.NewEncoder(c.inOutFile)

Expand Down Expand Up @@ -96,14 +109,13 @@ func (c *DumpCommand) doExport(*kingpin.ParseContext) error {
return nil
}

var (
recordCount = &atomic.Int64{}
consumedOffset = &atomic.Int64{}
recordsTooLarge = &atomic.Int64{}
corruptedJSONRecords = &atomic.Int64{}
)

func (c *DumpCommand) doImport(*kingpin.ParseContext) error {
var (
recordCount = &atomic.Int64{}
recordsTooLarge = &atomic.Int64{}
corruptedJSONRecords = &atomic.Int64{}
)

client := c.getKafkaClient()

go func() {
Expand All @@ -116,17 +128,79 @@ func (c *DumpCommand) doImport(*kingpin.ParseContext) error {
}
}()

produceErr := atomic.NewError(nil)

parseErr := c.parseDumpFile(
func(_ int, record *kgo.Record) {
client.Produce(context.Background(), record, func(record *kgo.Record, err error) {
recordCount.Inc()
if errors.Is(err, kerr.MessageTooLarge) {
recordsTooLarge.Inc()
return
}
if err != nil {
produceErr.Store(fmt.Errorf("failed to produce record with offset %d: %v", record.Context.Value(originalOffsetKey), err))
}
})
},
func(recordIdx int, err error) {
corruptedJSONRecords.Inc()
c.printer.PrintLine(fmt.Sprintf("corrupted JSON record %d: %v", recordIdx, err))
})

c.printer.PrintLine("waiting for produce to finish")
err := client.Flush(context.Background())
if err != nil {
return fmt.Errorf("failed to flush records: %w", err)
}

if parseErr != nil {
return fmt.Errorf("failed to parse dump file: %w", parseErr)
}
if err = produceErr.Load(); err != nil {
return err
}
return nil
}

func (c *DumpCommand) doPrint(*kingpin.ParseContext) error {
return c.parseDumpFile(
func(recordIdx int, record *kgo.Record) {
req := mimirpb.WriteRequest{}
err := req.Unmarshal(record.Value)
if err != nil {
c.printer.PrintLine(fmt.Sprintf("failed to unmarshal write request from record %d: %v", recordIdx, err))
return
}

// Print the time series in the write request.
c.printer.PrintLine(fmt.Sprintf("Record #%d (offset: %d)", recordIdx, record.Offset))
for _, series := range req.Timeseries {
for _, sample := range series.Samples {
c.printer.PrintLine(fmt.Sprintf("%s %d %f",
mimirpb.FromLabelAdaptersToLabels(series.Labels).String(),
sample.TimestampMs,
sample.Value,
))
}
}
c.printer.PrintLine("")
},
func(recordIdx int, err error) {
c.printer.PrintLine(fmt.Sprintf("corrupted JSON record %d: %v", recordIdx, err))
})
}

func (c *DumpCommand) parseDumpFile(onRecordParsed func(recordIdx int, record *kgo.Record), onRecordCorrupted func(recordIdx int, err error)) error {
separator := bufio.NewScanner(c.inOutFile)
separator.Buffer(make([]byte, 10_000_000), 10_000_000) // 10MB buffer because we can have large records

produceErr := atomic.NewError(nil)
for recordsIdx := 0; separator.Scan(); recordsIdx++ {
for recordIdx := 0; separator.Scan(); recordIdx++ {
item := separator.Bytes()
record := &kgo.Record{}
err := json.Unmarshal(item, record)
if err != nil {
corruptedJSONRecords.Inc()
c.printer.PrintLine(fmt.Sprintf("corrupted JSON record %d: %v", recordsIdx, err))
onRecordCorrupted(recordIdx, err)
continue
}
if record.Offset < int64(c.skipFirst) {
Expand All @@ -136,28 +210,12 @@ func (c *DumpCommand) doImport(*kingpin.ParseContext) error {
record.Partition = int32(c.partition)
record.Context = context.WithValue(context.Background(), originalOffsetKey, record.Offset)

client.Produce(context.Background(), record, func(record *kgo.Record, err error) {
recordCount.Inc()
if errors.Is(err, kerr.MessageTooLarge) {
recordsTooLarge.Inc()
return
}
if err != nil {
produceErr.Store(fmt.Errorf("failed to produce record with offset %d: %v", record.Context.Value(originalOffsetKey), err))
}
})
onRecordParsed(recordIdx, record)
}

c.printer.PrintLine("waiting for produce to finish")
err := client.Flush(context.Background())
if err != nil {
return fmt.Errorf("failed to flush records: %w", err)
}
if err = produceErr.Load(); err != nil {
return err
}
if separator.Err() != nil {
return fmt.Errorf("separator scan failed: %w", separator.Err())
}

return nil
}

0 comments on commit 25b474a

Please sign in to comment.