From 25b474a54759f271376fced5a8d2a56dc2d66bfd Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 19 Nov 2024 02:56:18 -0500 Subject: [PATCH] kafkatool: add 'dump print' command (#9942) Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + tools/kafkatool/dump.go | 116 ++++++++++++++++++++++++++++++---------- 2 files changed, 88 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 16a3e57ccd4..156b8fa5ca2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -126,6 +126,7 @@ * [ENHANCEMENT] `kafkatool`: add SASL plain authentication support. The following new CLI flags have been added: #9584 * `--kafka-sasl-username` * `--kafka-sasl-password` +* [ENHANCEMENT] `kafkatool`: add `dump print` command to print the content of write requests from a dump. #9942 * [ENHANCEMENT] Updated `KubePersistentVolumeFillingUp` runbook, including a sample command to debug the distroless image. #9802 ## 2.14.1 diff --git a/tools/kafkatool/dump.go b/tools/kafkatool/dump.go index 9a215978b3a..382a58eb38d 100644 --- a/tools/kafkatool/dump.go +++ b/tools/kafkatool/dump.go @@ -15,6 +15,8 @@ import ( "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/atomic" + + "github.com/grafana/mimir/pkg/mimirpb" ) type DumpCommand struct { @@ -43,6 +45,7 @@ func (c *DumpCommand) Register(app *kingpin.Application, getKafkaClient func() * cmd.Command("import", "Import records from a file into a Kafka topic").Action(c.doImport) cmd.Command("export", "Export records from a Kafka topic into a file").Action(c.doExport) + cmd.Command("print", "Print the write requests inside records dumped using this tool").Action(c.doPrint) } type key int @@ -52,16 +55,26 @@ const ( ) func (c *DumpCommand) doExport(*kingpin.ParseContext) error { + var ( + recordCount = &atomic.Int64{} + consumedOffset = &atomic.Int64{} + ) + client := c.getKafkaClient() client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ c.topic: {int32(c.partition): kgo.NewOffset().At(c.exportOffsetStart)}}, ) + + // Print the number of consumed records both when the export is in progress and at the end. go func() { for { time.Sleep(time.Second) c.printer.PrintLine(fmt.Sprintf("consumed records: %d, offset %d", recordCount.Load(), consumedOffset.Load())) } }() + defer func() { + c.printer.PrintLine(fmt.Sprintf("consumed records: %d, offset %d", recordCount.Load(), consumedOffset.Load())) + }() encoder := json.NewEncoder(c.inOutFile) @@ -96,14 +109,13 @@ func (c *DumpCommand) doExport(*kingpin.ParseContext) error { return nil } -var ( - recordCount = &atomic.Int64{} - consumedOffset = &atomic.Int64{} - recordsTooLarge = &atomic.Int64{} - corruptedJSONRecords = &atomic.Int64{} -) - func (c *DumpCommand) doImport(*kingpin.ParseContext) error { + var ( + recordCount = &atomic.Int64{} + recordsTooLarge = &atomic.Int64{} + corruptedJSONRecords = &atomic.Int64{} + ) + client := c.getKafkaClient() go func() { @@ -116,17 +128,79 @@ func (c *DumpCommand) doImport(*kingpin.ParseContext) error { } }() + produceErr := atomic.NewError(nil) + + parseErr := c.parseDumpFile( + func(_ int, record *kgo.Record) { + client.Produce(context.Background(), record, func(record *kgo.Record, err error) { + recordCount.Inc() + if errors.Is(err, kerr.MessageTooLarge) { + recordsTooLarge.Inc() + return + } + if err != nil { + produceErr.Store(fmt.Errorf("failed to produce record with offset %d: %v", record.Context.Value(originalOffsetKey), err)) + } + }) + }, + func(recordIdx int, err error) { + corruptedJSONRecords.Inc() + c.printer.PrintLine(fmt.Sprintf("corrupted JSON record %d: %v", recordIdx, err)) + }) + + c.printer.PrintLine("waiting for produce to finish") + err := client.Flush(context.Background()) + if err != nil { + return fmt.Errorf("failed to flush records: %w", err) + } + + if parseErr != nil { + return fmt.Errorf("failed to parse dump file: %w", parseErr) + } + if err = produceErr.Load(); err != nil { + return err + } + return nil +} + +func (c *DumpCommand) doPrint(*kingpin.ParseContext) error { + return c.parseDumpFile( + func(recordIdx int, record *kgo.Record) { + req := mimirpb.WriteRequest{} + err := req.Unmarshal(record.Value) + if err != nil { + c.printer.PrintLine(fmt.Sprintf("failed to unmarshal write request from record %d: %v", recordIdx, err)) + return + } + + // Print the time series in the write request. + c.printer.PrintLine(fmt.Sprintf("Record #%d (offset: %d)", recordIdx, record.Offset)) + for _, series := range req.Timeseries { + for _, sample := range series.Samples { + c.printer.PrintLine(fmt.Sprintf("%s %d %f", + mimirpb.FromLabelAdaptersToLabels(series.Labels).String(), + sample.TimestampMs, + sample.Value, + )) + } + } + c.printer.PrintLine("") + }, + func(recordIdx int, err error) { + c.printer.PrintLine(fmt.Sprintf("corrupted JSON record %d: %v", recordIdx, err)) + }) +} + +func (c *DumpCommand) parseDumpFile(onRecordParsed func(recordIdx int, record *kgo.Record), onRecordCorrupted func(recordIdx int, err error)) error { separator := bufio.NewScanner(c.inOutFile) separator.Buffer(make([]byte, 10_000_000), 10_000_000) // 10MB buffer because we can have large records - produceErr := atomic.NewError(nil) - for recordsIdx := 0; separator.Scan(); recordsIdx++ { + for recordIdx := 0; separator.Scan(); recordIdx++ { item := separator.Bytes() record := &kgo.Record{} err := json.Unmarshal(item, record) if err != nil { - corruptedJSONRecords.Inc() - c.printer.PrintLine(fmt.Sprintf("corrupted JSON record %d: %v", recordsIdx, err)) + onRecordCorrupted(recordIdx, err) continue } if record.Offset < int64(c.skipFirst) { @@ -136,28 +210,12 @@ func (c *DumpCommand) doImport(*kingpin.ParseContext) error { record.Partition = int32(c.partition) record.Context = context.WithValue(context.Background(), originalOffsetKey, record.Offset) - client.Produce(context.Background(), record, func(record *kgo.Record, err error) { - recordCount.Inc() - if errors.Is(err, kerr.MessageTooLarge) { - recordsTooLarge.Inc() - return - } - if err != nil { - produceErr.Store(fmt.Errorf("failed to produce record with offset %d: %v", record.Context.Value(originalOffsetKey), err)) - } - }) + onRecordParsed(recordIdx, record) } - c.printer.PrintLine("waiting for produce to finish") - err := client.Flush(context.Background()) - if err != nil { - return fmt.Errorf("failed to flush records: %w", err) - } - if err = produceErr.Load(); err != nil { - return err - } if separator.Err() != nil { return fmt.Errorf("separator scan failed: %w", separator.Err()) } + return nil }