From 5fc0b514d4d85699e1fee8198a5fc2222e904256 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Sat, 5 Dec 2020 20:40:40 +0200 Subject: [PATCH 1/2] [tools] Add benchmarking support to read_data_files --- src/cmd/tools/read_data_files/main/main.go | 90 ++++++++++++++++++---- 1 file changed, 73 insertions(+), 17 deletions(-) diff --git a/src/cmd/tools/read_data_files/main/main.go b/src/cmd/tools/read_data_files/main/main.go index d48e9ec561..b8b1c75a7b 100644 --- a/src/cmd/tools/read_data_files/main/main.go +++ b/src/cmd/tools/read_data_files/main/main.go @@ -41,18 +41,34 @@ import ( "go.uber.org/zap" ) -const snapshotType = "snapshot" -const flushType = "flush" +const ( + snapshotType = "snapshot" + flushType = "flush" +) + +type benchmarkMode uint8 + +const ( + // benchmarkNone prints the data read to the standard output and does not measure performance. + benchmarkNone benchmarkMode = iota + + // benchmarkSeries benchmarks time series read performance (skipping datapoint decoding). + benchmarkSeries + + // benchmarkDatapoints benchmarks series read, including datapoint decoding. + benchmarkDatapoints +) func main() { var ( optPathPrefix = getopt.StringLong("path-prefix", 'p', "", "Path prefix [e.g. /var/lib/m3db]") - optNamespace = getopt.StringLong("namespace", 'n', "", "Namespace [e.g. metrics]") + optNamespace = getopt.StringLong("namespace", 'n', "default", "Namespace [e.g. metrics]") optShard = getopt.Uint32Long("shard", 's', 0, "Shard [expected format uint32]") optBlockstart = getopt.Int64Long("block-start", 'b', 0, "Block Start Time [in nsec]") volume = getopt.Int64Long("volume", 'v', 0, "Volume number") fileSetTypeArg = getopt.StringLong("fileset-type", 't', flushType, fmt.Sprintf("%s|%s", flushType, snapshotType)) idFilter = getopt.StringLong("id-filter", 'f', "", "ID Contains Filter (optional)") + benchmark = getopt.StringLong("benchmark", 'B', "", "benchmark mode (optional), [series/datapoints]") ) getopt.Parse() @@ -82,12 +98,30 @@ func main() { log.Fatalf("unknown fileset type: %s", *fileSetTypeArg) } + var benchmarkMode benchmarkMode + switch *benchmark { + case "": + case "series": + benchmarkMode = benchmarkSeries + case "datapoints": + benchmarkMode = benchmarkDatapoints + default: + log.Fatalf("unknown benchmark type: %s", *benchmark) + } + bytesPool := tools.NewCheckedBytesPool() bytesPool.Init() encodingOpts := encoding.NewOptions().SetBytesPool(bytesPool) fsOpts := fs.NewOptions().SetFilePathPrefix(*optPathPrefix) + + var ( + seriesCount = 0 + datapointCount = 0 + start = time.Now() + ) + reader, err := fs.NewReader(bytesPool, fsOpts) if err != nil { log.Fatalf("could not create new reader: %v", err) @@ -121,23 +155,45 @@ func main() { continue } - data.IncRef() - iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts) - for iter.Next() { - dp, _, annotation := iter.Current() - // Use fmt package so it goes to stdout instead of stderr - fmt.Printf("{id: %s, dp: %+v", id.String(), dp) - if len(annotation) > 0 { - fmt.Printf(", annotation: %s", base64.StdEncoding.EncodeToString(annotation)) + if benchmarkMode != benchmarkSeries { + data.IncRef() + + iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts) + for iter.Next() { + dp, _, annotation := iter.Current() + if benchmarkMode == benchmarkNone { + // Use fmt package so it goes to stdout instead of stderr + fmt.Printf("{id: %s, dp: %+v", id.String(), dp) + if len(annotation) > 0 { + fmt.Printf(", annotation: %s", base64.StdEncoding.EncodeToString(annotation)) + } + fmt.Println("}") + } + datapointCount++ } - fmt.Println("}") - } - if err := iter.Err(); err != nil { - log.Fatalf("unable to iterate original data: %v", err) + if err := iter.Err(); err != nil { + log.Fatalf("unable to iterate original data: %v", err) + } + iter.Close() + + data.DecRef() } - iter.Close() - data.DecRef() data.Finalize() + seriesCount++ + } + + if benchmarkMode != benchmarkNone { + runTime := time.Now().Sub(start) + fmt.Printf("Running time: %s\n", runTime) + fmt.Printf("\n%d series read\n", seriesCount) + if runTime > 0 { + fmt.Printf("(%.2f series/second)\n", float64(seriesCount)/runTime.Seconds()) + } + + if benchmarkMode == benchmarkDatapoints { + fmt.Printf("\n%d datapoints decoded\n", datapointCount) + fmt.Printf("(%.2f datapoints/second)\n", float64(datapointCount)/runTime.Seconds()) + } } } From e9393cad4a5416f1244d1cacb1fb4e46c08e747f Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Sat, 5 Dec 2020 22:09:49 +0200 Subject: [PATCH 2/2] Lint --- src/cmd/tools/read_data_files/main/main.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/cmd/tools/read_data_files/main/main.go b/src/cmd/tools/read_data_files/main/main.go index b8b1c75a7b..550d4ff563 100644 --- a/src/cmd/tools/read_data_files/main/main.go +++ b/src/cmd/tools/read_data_files/main/main.go @@ -68,7 +68,8 @@ func main() { volume = getopt.Int64Long("volume", 'v', 0, "Volume number") fileSetTypeArg = getopt.StringLong("fileset-type", 't', flushType, fmt.Sprintf("%s|%s", flushType, snapshotType)) idFilter = getopt.StringLong("id-filter", 'f', "", "ID Contains Filter (optional)") - benchmark = getopt.StringLong("benchmark", 'B', "", "benchmark mode (optional), [series/datapoints]") + benchmark = getopt.StringLong( + "benchmark", 'B', "", "benchmark mode (optional), [series/datapoints]") ) getopt.Parse() @@ -98,13 +99,13 @@ func main() { log.Fatalf("unknown fileset type: %s", *fileSetTypeArg) } - var benchmarkMode benchmarkMode + var benchMode benchmarkMode switch *benchmark { case "": case "series": - benchmarkMode = benchmarkSeries + benchMode = benchmarkSeries case "datapoints": - benchmarkMode = benchmarkDatapoints + benchMode = benchmarkDatapoints default: log.Fatalf("unknown benchmark type: %s", *benchmark) } @@ -155,13 +156,13 @@ func main() { continue } - if benchmarkMode != benchmarkSeries { + if benchMode != benchmarkSeries { data.IncRef() iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts) for iter.Next() { dp, _, annotation := iter.Current() - if benchmarkMode == benchmarkNone { + if benchMode == benchmarkNone { // Use fmt package so it goes to stdout instead of stderr fmt.Printf("{id: %s, dp: %+v", id.String(), dp) if len(annotation) > 0 { @@ -183,15 +184,15 @@ func main() { seriesCount++ } - if benchmarkMode != benchmarkNone { - runTime := time.Now().Sub(start) + if benchMode != benchmarkNone { + runTime := time.Since(start) fmt.Printf("Running time: %s\n", runTime) fmt.Printf("\n%d series read\n", seriesCount) if runTime > 0 { fmt.Printf("(%.2f series/second)\n", float64(seriesCount)/runTime.Seconds()) } - if benchmarkMode == benchmarkDatapoints { + if benchMode == benchmarkDatapoints { fmt.Printf("\n%d datapoints decoded\n", datapointCount) fmt.Printf("(%.2f datapoints/second)\n", float64(datapointCount)/runTime.Seconds()) }