Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tools] Add benchmarking support to read_data_files #2986

Merged
merged 3 commits into from
Dec 7, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 74 additions & 17 deletions src/cmd/tools/read_data_files/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,35 @@ import (
"go.uber.org/zap"
)

const snapshotType = "snapshot"
const flushType = "flush"
const (
snapshotType = "snapshot"
flushType = "flush"
)

type benchmarkMode uint8

const (
// benchmarkNone prints the data read to the standard output and does not measure performance.
benchmarkNone benchmarkMode = iota

// benchmarkSeries benchmarks time series read performance (skipping datapoint decoding).
benchmarkSeries

// benchmarkDatapoints benchmarks series read, including datapoint decoding.
benchmarkDatapoints
)

func main() {
var (
optPathPrefix = getopt.StringLong("path-prefix", 'p', "", "Path prefix [e.g. /var/lib/m3db]")
optNamespace = getopt.StringLong("namespace", 'n', "", "Namespace [e.g. metrics]")
optNamespace = getopt.StringLong("namespace", 'n', "default", "Namespace [e.g. metrics]")
optShard = getopt.Uint32Long("shard", 's', 0, "Shard [expected format uint32]")
optBlockstart = getopt.Int64Long("block-start", 'b', 0, "Block Start Time [in nsec]")
volume = getopt.Int64Long("volume", 'v', 0, "Volume number")
fileSetTypeArg = getopt.StringLong("fileset-type", 't', flushType, fmt.Sprintf("%s|%s", flushType, snapshotType))
idFilter = getopt.StringLong("id-filter", 'f', "", "ID Contains Filter (optional)")
benchmark = getopt.StringLong(
"benchmark", 'B', "", "benchmark mode (optional), [series/datapoints]")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can you set benchmarkNone?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benchmarkNone is the default mode (when no benchmark argument is provided).

)
getopt.Parse()

Expand Down Expand Up @@ -82,12 +99,30 @@ func main() {
log.Fatalf("unknown fileset type: %s", *fileSetTypeArg)
}

var benchMode benchmarkMode
switch *benchmark {
case "":
case "series":
benchMode = benchmarkSeries
case "datapoints":
benchMode = benchmarkDatapoints
default:
log.Fatalf("unknown benchmark type: %s", *benchmark)
}

bytesPool := tools.NewCheckedBytesPool()
bytesPool.Init()

encodingOpts := encoding.NewOptions().SetBytesPool(bytesPool)

fsOpts := fs.NewOptions().SetFilePathPrefix(*optPathPrefix)

var (
seriesCount = 0
datapointCount = 0
start = time.Now()
)

reader, err := fs.NewReader(bytesPool, fsOpts)
if err != nil {
log.Fatalf("could not create new reader: %v", err)
Expand Down Expand Up @@ -121,23 +156,45 @@ func main() {
continue
}

data.IncRef()
iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts)
for iter.Next() {
dp, _, annotation := iter.Current()
// Use fmt package so it goes to stdout instead of stderr
fmt.Printf("{id: %s, dp: %+v", id.String(), dp)
if len(annotation) > 0 {
fmt.Printf(", annotation: %s", base64.StdEncoding.EncodeToString(annotation))
if benchMode != benchmarkSeries {
data.IncRef()

iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts)
for iter.Next() {
dp, _, annotation := iter.Current()
if benchMode == benchmarkNone {
// Use fmt package so it goes to stdout instead of stderr
fmt.Printf("{id: %s, dp: %+v", id.String(), dp)
if len(annotation) > 0 {
fmt.Printf(", annotation: %s", base64.StdEncoding.EncodeToString(annotation))
}
fmt.Println("}")
}
datapointCount++
}
fmt.Println("}")
}
if err := iter.Err(); err != nil {
log.Fatalf("unable to iterate original data: %v", err)
if err := iter.Err(); err != nil {
log.Fatalf("unable to iterate original data: %v", err)
}
iter.Close()

data.DecRef()
}
iter.Close()

data.DecRef()
data.Finalize()
seriesCount++
}

if benchMode != benchmarkNone {
runTime := time.Since(start)
fmt.Printf("Running time: %s\n", runTime)
fmt.Printf("\n%d series read\n", seriesCount)
if runTime > 0 {
fmt.Printf("(%.2f series/second)\n", float64(seriesCount)/runTime.Seconds())
}

if benchMode == benchmarkDatapoints {
fmt.Printf("\n%d datapoints decoded\n", datapointCount)
fmt.Printf("(%.2f datapoints/second)\n", float64(datapointCount)/runTime.Seconds())
}
}
}