diff --git a/pkg/benchmark/compression.go b/pkg/benchmark/compression.go index 749d4e58..9c3bed92 100644 --- a/pkg/benchmark/compression.go +++ b/pkg/benchmark/compression.go @@ -22,6 +22,8 @@ import ( "github.com/pierrec/lz4" ) +const CompressionTypeZstd = "zstd" + type CompressionAlgorithm interface { fmt.Stringer Compress(data []byte) ([]byte, error) diff --git a/pkg/benchmark/dataset/real_logs_dataset.go b/pkg/benchmark/dataset/real_logs_dataset.go index 4665402b..ee43cd70 100644 --- a/pkg/benchmark/dataset/real_logs_dataset.go +++ b/pkg/benchmark/dataset/real_logs_dataset.go @@ -15,6 +15,9 @@ package dataset import ( + "bufio" + "errors" + "io" "log" "os" "path/filepath" @@ -22,6 +25,8 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "github.com/klauspost/compress/zstd" + "github.com/f5/otel-arrow-adapter/pkg/benchmark" "github.com/f5/otel-arrow-adapter/pkg/benchmark/stats" ) @@ -38,8 +43,71 @@ type logUnit struct { scope plog.ScopeLogs } -// NewRealLogsDataset creates a new RealLogsDataset from a binary file. -func NewRealLogsDataset(path string) *RealLogsDataset { +type logReader struct { + stringReader *bufio.Reader + unmarshaler *plog.JSONUnmarshaler + bytesRead int +} + +func (lr *logReader) readAllLogs() (plog.Logs, error) { + logs := plog.NewLogs() + + for { + if line, err := lr.stringReader.ReadString('\n'); err == nil { + ll, err := lr.unmarshaler.UnmarshalLogs([]byte(line)) + if err != nil { + return logs, err + } + for i := 0; i < ll.ResourceLogs().Len(); i++ { + rl := logs.ResourceLogs().AppendEmpty() + ll.ResourceLogs().At(i).CopyTo(rl) + } + lr.bytesRead += len(line) + } else { // failed to read line + if err != nil { + if errors.Is(err, io.EOF) { + return logs, nil + } + return logs, err + } + } + } +} + +func logsFromJSON(path string, compression string) (plog.Logs, int) { + file, err := os.Open(filepath.Clean(path)) + if err != nil { + log.Fatal("open file:", err) + } + + lr := &logReader{ + unmarshaler: &plog.JSONUnmarshaler{}, + bytesRead: 0, + } + + if compression == benchmark.CompressionTypeZstd { + cr, err := zstd.NewReader(file) + if err != nil { + log.Fatal("Failed to create compressed reader: ", err) + } + lr.stringReader = bufio.NewReader(cr) + } else { // no compression + lr.stringReader = bufio.NewReader(file) + } + + logs, err := lr.readAllLogs() + if err != nil { + if lr.bytesRead == 0 { + log.Fatal("Read zero bytes from file: ", err) + } + log.Print("Found error when reading file: ", err) + log.Print("Bytes read: ", lr.bytesRead) + } + + return logs, lr.bytesRead +} + +func logsFromProto(path string) (plog.Logs, int) { data, err := os.ReadFile(filepath.Clean(path)) if err != nil { log.Fatal("read file:", err) @@ -49,13 +117,29 @@ func NewRealLogsDataset(path string) *RealLogsDataset { if err := otlp.UnmarshalProto(data); err != nil { log.Fatal("unmarshal:", err) } + logs := otlp.Logs() + + return logs, len(data) +} + + +// NewRealLogsDataset creates a new RealLogsDataset from a binary file +// which is either formatted as otlp protobuf or compressed otlp json. +func NewRealLogsDataset(path string, compression string, format string) *RealLogsDataset { + var logs plog.Logs + var size int + + if format == "json" { + logs, size = logsFromJSON(path, compression) + } else { + logs, size = logsFromProto(path) + } ds := &RealLogsDataset{ logs: []logUnit{}, - sizeInBytes: len(data), + sizeInBytes: size, logsStats: stats.NewLogsStats(), } - logs := otlp.Logs() ds.logsStats.Analyze(logs) for ri := 0; ri < logs.ResourceLogs().Len(); ri++ { diff --git a/pkg/benchmark/dataset/real_metrics_dataset.go b/pkg/benchmark/dataset/real_metrics_dataset.go index 590b5902..17c452f7 100644 --- a/pkg/benchmark/dataset/real_metrics_dataset.go +++ b/pkg/benchmark/dataset/real_metrics_dataset.go @@ -15,14 +15,19 @@ package dataset import ( + "bufio" + "errors" + "io" "log" "os" "path/filepath" + "github.com/klauspost/compress/zstd" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "github.com/f5/otel-arrow-adapter/pkg/benchmark/stats" + "github.com/f5/otel-arrow-adapter/pkg/benchmark" ) // RealMetricsDataset represents a dataset of real metrics read from a Metrics serialized to a binary file. @@ -38,8 +43,73 @@ type metrics struct { scope pmetric.ScopeMetrics } -// NewRealMetricsDataset creates a new RealMetricsDataset from a binary file. -func NewRealMetricsDataset(path string) *RealMetricsDataset { +type metricReader struct { + stringReader *bufio.Reader + unmarshaler *pmetric.JSONUnmarshaler + bytesRead int +} + +func (mr *metricReader) readAllMetrics() (pmetric.Metrics, error) { + metrics := pmetric.NewMetrics() + + for { + if line, err := mr.stringReader.ReadString('\n'); err == nil { + ml, err := mr.unmarshaler.UnmarshalMetrics([]byte(line)) + if err != nil { + return metrics, err + } + for i := 0; i < ml.ResourceMetrics().Len(); i++ { + rm := metrics.ResourceMetrics().AppendEmpty() + ml.ResourceMetrics().At(i).CopyTo(rm) + } + mr.bytesRead += len(line) + } else { // failed to read line + if err != nil { + if errors.Is(err, io.EOF) { + return metrics, nil + } + return metrics, err + } + } + } +} + +func metricsFromJSON(path string, compression string) (pmetric.Metrics, int) { + file, err := os.Open(filepath.Clean(path)) + if err != nil { + log.Fatal("open file:", err) + } + + mr := &metricReader{ + unmarshaler: &pmetric.JSONUnmarshaler{}, + bytesRead: 0, + } + + if compression == benchmark.CompressionTypeZstd { + cr, err := zstd.NewReader(file) + if err != nil { + log.Fatal("Failed to create compressed reader: ", err) + } + mr.stringReader = bufio.NewReader(cr) + } else { // no compression + mr.stringReader = bufio.NewReader(file) + } + + mdata, err := mr.readAllMetrics() + + if err != nil { + if mr.bytesRead == 0 { + log.Fatal("Read zero bytes from file: ", err) + } + log.Print("Found error when reading file: ", err) + log.Print("Bytes read: ", mr.bytesRead) + } + + return mdata, mr.bytesRead + +} + +func metricsFromProto(path string) (pmetric.Metrics, int) { data, err := os.ReadFile(filepath.Clean(path)) if err != nil { log.Fatal("read file:", err) @@ -50,9 +120,24 @@ func NewRealMetricsDataset(path string) *RealMetricsDataset { } mdata := otlp.Metrics() + return mdata, len(data) +} + + +// NewRealMetricsDataset creates a new RealMetricsDataset from a binary file +// which is either formatted as otlp protobuf or compressed otlp json. +func NewRealMetricsDataset(path string, compression string, format string) *RealMetricsDataset { + var mdata pmetric.Metrics + var bytes int + if format == "json" { + mdata, bytes = metricsFromJSON(path, compression) + } else { + mdata, bytes = metricsFromProto(path) + } + ds := &RealMetricsDataset{ metrics: []metrics{}, - sizeInBytes: len(data), + sizeInBytes: bytes, metricsStats: stats.NewMetricsStats(), } ds.metricsStats.Analyze(mdata) diff --git a/pkg/benchmark/dataset/real_trace_dataset.go b/pkg/benchmark/dataset/real_trace_dataset.go index 472c63dd..902c40ab 100644 --- a/pkg/benchmark/dataset/real_trace_dataset.go +++ b/pkg/benchmark/dataset/real_trace_dataset.go @@ -15,19 +15,24 @@ package dataset import ( + "bufio" "encoding/hex" + "errors" "fmt" + "io" "log" "os" "path/filepath" "sort" "strings" + "github.com/klauspost/compress/zstd" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "golang.org/x/exp/rand" + "github.com/f5/otel-arrow-adapter/pkg/benchmark" carrow "github.com/f5/otel-arrow-adapter/pkg/otel/common/otlp" ) @@ -47,22 +52,101 @@ type spanSorter struct { var _ sort.Interface = spanSorter{} -func NewRealTraceDataset(path string, sortOrder []string) *RealTraceDataset { +type traceReader struct { + stringReader *bufio.Reader + unmarshaler *ptrace.JSONUnmarshaler + bytesRead int +} + +func (tr *traceReader) readAllTraces() (ptrace.Traces, error) { + traces := ptrace.NewTraces() + + for { + if line, err := tr.stringReader.ReadString('\n'); err == nil { + tl, err := tr.unmarshaler.UnmarshalTraces([]byte(line)) + if err != nil { + return traces, err + } + for i := 0; i < tl.ResourceSpans().Len(); i++ { + rs := traces.ResourceSpans().AppendEmpty() + tl.ResourceSpans().At(i).CopyTo(rs) + } + tr.bytesRead += len(line) + } else { // failed to read line + if err != nil { + if errors.Is(err, io.EOF) { + return traces, nil + } + return traces, err + } + } + } +} + +func tracesFromJSON(path string, compression string) (ptrace.Traces, int) { + file, err := os.Open(filepath.Clean(path)) + if err != nil { + log.Fatal("open file:", err) + } + + tr := &traceReader{ + unmarshaler: &ptrace.JSONUnmarshaler{}, + bytesRead: 0, + } + + if compression == benchmark.CompressionTypeZstd { + cr, err := zstd.NewReader(file) + if err != nil { + log.Fatal("Failed to create compressed reader: ", err) + } + tr.stringReader = bufio.NewReader(cr) + } else { // no compression + tr.stringReader = bufio.NewReader(file) + } + + traces, err := tr.readAllTraces() + if err != nil { + if tr.bytesRead == 0 { + log.Fatal("Read zero bytes from file: ", err) + } + log.Print("Found error when reading file: ", err) + log.Print("Bytes read: ", tr.bytesRead) + } + + return traces, tr.bytesRead +} + +func tracesFromProto(path string, compression string) (ptrace.Traces, int) { data, err := os.ReadFile(filepath.Clean(path)) if err != nil { log.Fatal("read file:", err) } + otlp := ptraceotlp.NewExportRequest() if err := otlp.UnmarshalProto(data); err != nil { log.Fatalf("in %q unmarshal: %v", path, err) } + traces := otlp.Traces() + return traces, len(data) +} + +// NewRealTraceDataset creates a new RealTraceDataset from a binary file +// which is either formatted as otlp protobuf or compressed otlp json. +func NewRealTraceDataset(path string, compression string, format string, sortOrder []string) *RealTraceDataset { + var traces ptrace.Traces + var size int + if format == "json" { + traces, size = tracesFromJSON(path, compression) + } else { + traces, size = tracesFromProto(path, compression) + } + ds := &RealTraceDataset{ s2r: map[ptrace.Span]pcommon.Resource{}, s2s: map[ptrace.Span]pcommon.InstrumentationScope{}, - sizeInBytes: len(data), + sizeInBytes: size, } - traces := otlp.Traces() for i := 0; i < traces.ResourceSpans().Len(); i++ { rs := traces.ResourceSpans().At(i) diff --git a/pkg/benchmark/profileable/otlp/otlp_profiler_test.go b/pkg/benchmark/profileable/otlp/otlp_profiler_test.go index ccc1dc34..de58ba7e 100644 --- a/pkg/benchmark/profileable/otlp/otlp_profiler_test.go +++ b/pkg/benchmark/profileable/otlp/otlp_profiler_test.go @@ -62,7 +62,7 @@ func TestOtlpLightstepTracesProfiler(t *testing.T) { t.Skip("Lightstep specific test") t.Parallel() - benchdata := dataset.NewRealTraceDataset("/Users/josh.macdonald/src/lightstep/forward_spans.bin.otlp.bin", []string{ + benchdata := dataset.NewRealTraceDataset("../../../../tools/trace_benchmark/data/otlp_traces.pb", benchmark.CompressionTypeZstd, "proto", []string{ "trace_id", }) diff --git a/pkg/otel/traces/validation_test.go b/pkg/otel/traces/validation_test.go index 468f50d8..2dbe5712 100644 --- a/pkg/otel/traces/validation_test.go +++ b/pkg/otel/traces/validation_test.go @@ -196,7 +196,7 @@ func TestConversionFromRealData(t *testing.T) { t.Skip("Testing based on production data that is not stored in the") // Load a real OTLP traces request. - ds := dataset.NewRealTraceDataset("../../../data/nth_first_otlp_traces.pb", []string{"trace_id"}) + ds := dataset.NewRealTraceDataset("../../../data/nth_first_otlp_traces.pb", "", "proto", []string{"trace_id"}) batchSizes := []int{1, 10, 100, 1000, 5000, 10000} for _, batchSize := range batchSizes { diff --git a/tools/logs_benchmark/main.go b/tools/logs_benchmark/main.go index 633507a6..2e07da31 100644 --- a/tools/logs_benchmark/main.go +++ b/tools/logs_benchmark/main.go @@ -51,6 +51,8 @@ func main() { // The -stats flag displays a series of statistics about the schema and the // dataset. This flag is disabled by default. statsFlag := flag.Bool("stats", false, "stats mode") + // supports "proto" and "json" formats + formatFlag := flag.String("format", "proto", "file format") // Parse the flag flag.Parse() @@ -65,7 +67,7 @@ func main() { inputFiles := flag.Args() if len(inputFiles) == 0 { println("\nNo input file specified, using default file ./data/otlp_logs.pb") - println("CSV and OTLP protobuf files are supported as input files (ext .csv or .pb)") + println("CSV, OTLP protobuf, and OTLP json files are supported as input files (ext .csv or .pb or .json)") inputFiles = append(inputFiles, "./data/otlp_logs.pb") } @@ -88,17 +90,22 @@ func main() { profiler := benchmark.NewProfiler([]int{128, 1024, 2048, 4096}, "output/logs_benchmark.log", 2) //profiler := benchmark.NewProfiler([]int{10}, "output/logs_benchmark.log", 2) + // in case formatFlag was not passed + if strings.HasSuffix(inputFile, ".json") { + *formatFlag = "json" + } else if strings.HasSuffix(inputFile, ".pb") { + *formatFlag = "proto" + } + // Build dataset from CSV file or from OTLP protobuf file if strings.HasSuffix(inputFile, ".csv") { ds = CsvToLogsDataset(inputFile) - } else if strings.HasSuffix(inputFile, ".pb") { - // ToDo Remove - rds := dataset.NewRealLogsDataset(inputFiles[i]) + } else { + rds := dataset.NewRealLogsDataset(inputFiles[i], benchmark.CompressionTypeZstd, *formatFlag) //rds.Resize(10) ds = rds - } else { - log.Fatal("Unsupported input file format (only .csv and .pb are supported)") } + profiler.Printf("Dataset '%s' (%s) loaded\n", inputFiles[i], humanize.Bytes(uint64(ds.SizeInBytes()))) otlpLogs := otlp.NewLogsProfileable(ds, compressionAlgo) diff --git a/tools/logs_gen/main.go b/tools/logs_gen/main.go index c339af8a..814bda7a 100644 --- a/tools/logs_gen/main.go +++ b/tools/logs_gen/main.go @@ -20,12 +20,14 @@ package main import ( "crypto/rand" "flag" + "io" "log" "math" "math/big" "os" "path" + "github.com/klauspost/compress/zstd" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "github.com/f5/otel-arrow-adapter/pkg/datagen" @@ -33,12 +35,55 @@ import ( var help = flag.Bool("help", false, "Show help") var outputFile = "./data/otlp_logs.pb" -var batchSize = 100000 +var batchSize = 20 +var format = "proto" + +func writeJSON(file *os.File, batchsize int, generator *datagen.LogsGenerator) { + fw, err := zstd.NewWriter(file) + if err != nil { + log.Fatal("error creating compressed writer", err) + } + defer fw.Close() + + for i := 0; i < batchSize; i++ { + request := plogotlp.NewExportRequestFromLogs(generator.Generate(1, 100)) + + // Marshal the request to bytes. + msg, err := request.MarshalJSON() + if err != nil { + log.Fatal("marshaling error: ", err) + } + if _, err := fw.Write(msg); err != nil { + log.Fatal("writing error: ", err) + } + if _, err := io.WriteString(fw, "\n"); err != nil { + log.Fatal("writing newline error: ", err) + } + } + + fw.Flush() +} + +func writeProto(file *os.File, batchsize int, generator *datagen.LogsGenerator) { + request := plogotlp.NewExportRequestFromLogs(generator.Generate(batchSize, 100)) + // Marshal the request to bytes. + msg, err := request.MarshalProto() + if err != nil { + log.Fatal("marshaling error: ", err) + } + // Write protobuf to file + err = os.WriteFile(outputFile, msg, 0600) + + if err != nil { + log.Fatal("write error: ", err) + } +} func main() { // Define the flags. flag.StringVar(&outputFile, "output", outputFile, "Output file") flag.IntVar(&batchSize, "batchsize", batchSize, "Batch size") + flag.StringVar(&format, "format", format, "file format") // Parse the flag flag.Parse() @@ -57,24 +102,22 @@ func main() { entropy := datagen.NewTestEntropy(v.Int64()) generator := datagen.NewLogsGenerator(entropy, entropy.NewStandardResourceAttributes(), entropy.NewStandardInstrumentationScopes()) - request := plogotlp.NewExportRequestFromLogs(generator.Generate(batchSize, 100)) - - // Marshal the request to bytes. - msg, err := request.MarshalProto() - if err != nil { - log.Fatal("marshaling error: ", err) - } - // Write protobuf to file if _, err := os.Stat(outputFile); os.IsNotExist(err) { err = os.MkdirAll(path.Dir(outputFile), 0700) if err != nil { log.Fatal("error creating directory: ", err) } } - - err = os.WriteFile(outputFile, msg, 0600) + f, err := os.OpenFile(outputFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { - log.Fatal("write error: ", err) + log.Fatal("failed to open file: ", err) } + + if format == "json" { + writeJSON(f, batchSize, generator) + } else { // proto + writeProto(f, batchSize, generator) + } + } diff --git a/tools/metrics_benchmark/main.go b/tools/metrics_benchmark/main.go index 29ca9b62..9b3f3388 100644 --- a/tools/metrics_benchmark/main.go +++ b/tools/metrics_benchmark/main.go @@ -18,6 +18,7 @@ import ( "flag" "fmt" "os" + "path/filepath" "github.com/dustin/go-humanize" @@ -38,6 +39,8 @@ func main() { // The -stats flag displays a series of statistics about the schema and the // dataset. This flag is disabled by default. stats := flag.Bool("stats", false, "stats mode") + // The -format flag supports "json" or "proto" file formats + format := flag.String("format", "proto", "format of file to read") // Parse the flag flag.Parse() @@ -51,7 +54,7 @@ func main() { // Define default input file inputFiles := flag.Args() if len(inputFiles) == 0 { - inputFiles = append(inputFiles, "./data/otlp_metrics.pb") + inputFiles = append(inputFiles, filepath.Join("data", "otlp_metrics.pb")) } conf := &benchmark.Config{ @@ -70,7 +73,7 @@ func main() { profiler := benchmark.NewProfiler([]int{128, 1024, 2048, 4096}, "output/metrics_benchmark.log", warmUpIter) compressionAlgo := benchmark.Zstd() maxIter := uint64(3) - ds := dataset.NewRealMetricsDataset(inputFiles[i]) + ds := dataset.NewRealMetricsDataset(inputFiles[i], benchmark.CompressionTypeZstd, *format) profiler.Printf("Dataset '%s' (%s) loaded\n", inputFiles[i], humanize.Bytes(uint64(ds.SizeInBytes()))) otlpMetrics := otlp.NewMetricsProfileable(ds, compressionAlgo) //otlpDictMetrics := otlpdict.NewMetricsProfileable(ds, compressionAlgo) diff --git a/tools/metrics_gen/main.go b/tools/metrics_gen/main.go index 848baedb..c79f3eb5 100644 --- a/tools/metrics_gen/main.go +++ b/tools/metrics_gen/main.go @@ -17,25 +17,73 @@ package main import ( "crypto/rand" "flag" + "io" "log" "math" "math/big" "os" "path" - "github.com/f5/otel-arrow-adapter/pkg/datagen" - + "github.com/klauspost/compress/zstd" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + + "github.com/f5/otel-arrow-adapter/pkg/datagen" ) var help = flag.Bool("help", false, "Show help") var outputFile = "./data/otlp_metrics.pb" -var batchSize = 10000 +var batchSize = 20 +var format = "proto" + +// when format == "json" this function will write zstd compressed +// json to the desired output file. +func writeJSON(file *os.File, batchSize int, generator *datagen.MetricsGenerator) { + fw, err := zstd.NewWriter(file) + if err != nil { + log.Fatal("error creating compressed writer", err) + } + defer fw.Close() + + for i := 0; i < batchSize; i++ { + request := pmetricotlp.NewExportRequestFromMetrics(generator.GenerateAllKindOfMetrics(1, 100)) + + // Marshal the request to bytes. + msg, err := request.MarshalJSON() + if err != nil { + log.Fatal("marshaling error: ", err) + } + if _, err := fw.Write(msg); err != nil { + log.Fatal("writing error: ", err) + } + if _, err := io.WriteString(fw, "\n"); err != nil { + log.Fatal("writing newline error: ", err) + } + } + + fw.Flush() +} + +func writeProto(file *os.File, batchSize int, generator *datagen.MetricsGenerator) { + request := pmetricotlp.NewExportRequestFromMetrics(generator.GenerateAllKindOfMetrics(batchSize, 100)) + // Marshal the request to bytes. + msg, err := request.MarshalProto() + if err != nil { + log.Fatal("marshaling error: ", err) + } + + // Write protobuf to file + err = os.WriteFile(outputFile, msg, 0600) + + if err != nil { + log.Fatal("write error: ", err) + } +} func main() { // Define the flags. flag.StringVar(&outputFile, "output", outputFile, "Output file") flag.IntVar(&batchSize, "batchsize", batchSize, "Batch size") + flag.StringVar(&format, "format", format, "file format") // Parse the flag flag.Parse() @@ -54,23 +102,22 @@ func main() { entropy := datagen.NewTestEntropy(v.Int64()) generator := datagen.NewMetricsGenerator(entropy, entropy.NewStandardResourceAttributes(), entropy.NewStandardInstrumentationScopes()) - request := pmetricotlp.NewExportRequestFromMetrics(generator.GenerateAllKindOfMetrics(batchSize, 100)) - - // Marshal the request to bytes. - msg, err := request.MarshalProto() - if err != nil { - log.Fatal("marshaling error: ", err) - } - // Write protobuf to file if _, err := os.Stat(outputFile); os.IsNotExist(err) { err = os.MkdirAll(path.Dir(outputFile), 0700) if err != nil { log.Fatal("error creating directory: ", err) } } - err = os.WriteFile(outputFile, msg, 0600) + f, err := os.OpenFile(outputFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { - log.Fatal("write error: ", err) + log.Fatal("failed to open file: ", err) } + + if format == "json" { + writeJSON(f, batchSize, generator) + } else { // proto + writeProto(f, batchSize, generator) + } + } diff --git a/tools/trace_benchmark/main.go b/tools/trace_benchmark/main.go index 14e45edb..211c2630 100644 --- a/tools/trace_benchmark/main.go +++ b/tools/trace_benchmark/main.go @@ -37,6 +37,8 @@ func main() { // The -stats flag displays a series of statistics about the schema and the // dataset. This flag is disabled by default. stats := flag.Bool("stats", false, "stats mode") + // supports "proto" and "json" formats + format := flag.String("format", "proto", "file format") // Parse the flag flag.Parse() @@ -69,7 +71,7 @@ func main() { //profiler := benchmark.NewProfiler([]int{1000}, "output/trace_benchmark.log", 2) compressionAlgo := benchmark.Zstd() maxIter := uint64(1) - ds := dataset.NewRealTraceDataset(inputFiles[i], []string{"trace_id"}) + ds := dataset.NewRealTraceDataset(inputFiles[i], benchmark.CompressionTypeZstd, *format, []string{"trace_id"}) //ds.Resize(5000) profiler.Printf("Dataset '%s' (%s) loaded\n", inputFiles[i], humanize.Bytes(uint64(ds.SizeInBytes()))) otlpTraces := otlp.NewTraceProfileable(ds, compressionAlgo) diff --git a/tools/trace_gen/main.go b/tools/trace_gen/main.go index 7d7ae99b..3851a82b 100644 --- a/tools/trace_gen/main.go +++ b/tools/trace_gen/main.go @@ -17,26 +17,72 @@ package main import ( "crypto/rand" "flag" + "io" "log" "math" "math/big" "os" "path" - "github.com/f5/otel-arrow-adapter/pkg/datagen" - + "github.com/klauspost/compress/zstd" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + + "github.com/f5/otel-arrow-adapter/pkg/datagen" ) var help = flag.Bool("help", false, "Show help") -var outputFile = "./data/otlp_traces.pb" -var batchSize = 50000 +var outputFile = "./data/otlp_traces.json" +var batchSize = 20 +var format = "proto" + +func writeJSON(file *os.File, batchSize int, generator *datagen.TraceGenerator) { + fw, err := zstd.NewWriter(file) + if err != nil { + log.Fatal("error creating compressed writer", err) + } + defer fw.Close() + + for i := 0; i < batchSize; i++ { + request := ptraceotlp.NewExportRequestFromTraces(generator.Generate(1, 100)) + + // Marshal the request to bytes. + msg, err := request.MarshalJSON() + if err != nil { + log.Fatal("marshaling error: ", err) + } + if _, err := fw.Write(msg); err != nil { + log.Fatal("writing error: ", err) + } + if _, err := io.WriteString(fw, "\n"); err != nil { + log.Fatal("writing newline error: ", err) + } + } + + fw.Flush() +} + +func writeProto(file *os.File, batchSize int, generator *datagen.TraceGenerator) { + request := ptraceotlp.NewExportRequestFromTraces(generator.Generate(batchSize, 100)) + + // Marshal the request to bytes. + msg, err := request.MarshalProto() + if err != nil { + log.Fatal("marshaling error: ", err) + } + + // Write protobuf to file + err = os.WriteFile(outputFile, msg, 0600) + if err != nil { + log.Fatal("write error: ", err) + } +} // This tool generates a trace dataset in the OpenTelemetry Protocol format from a fake traces generator. func main() { // Define the flags. flag.StringVar(&outputFile, "output", outputFile, "Output file") flag.IntVar(&batchSize, "batchsize", batchSize, "Batch size") + flag.StringVar(&format, "format", format, "file format") // Parse the flag flag.Parse() @@ -54,23 +100,22 @@ func main() { } entropy := datagen.NewTestEntropy(v.Int64()) generator := datagen.NewTracesGenerator(entropy, entropy.NewStandardResourceAttributes(), entropy.NewStandardInstrumentationScopes()) - request := ptraceotlp.NewExportRequestFromTraces(generator.Generate(batchSize, 100)) - - // Marshal the request to bytes. - msg, err := request.MarshalProto() - if err != nil { - log.Fatal("marshaling error: ", err) - } - // Write protobuf to file if _, err := os.Stat(outputFile); os.IsNotExist(err) { err = os.MkdirAll(path.Dir(outputFile), 0700) if err != nil { log.Fatal("error creating directory: ", err) } } - err = os.WriteFile(outputFile, msg, 0600) + f, err := os.OpenFile(outputFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { - log.Fatal("write error: ", err) + log.Fatal("failed to open file: ", err) } + + if format == "json" { + writeJSON(f, batchSize, generator) + } else { + writeProto(f, batchSize, generator) + } + } diff --git a/tools/trace_head/main.go b/tools/trace_head/main.go index 6c0c2b70..62884006 100644 --- a/tools/trace_head/main.go +++ b/tools/trace_head/main.go @@ -22,21 +22,24 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "github.com/f5/otel-arrow-adapter/pkg/benchmark" "github.com/f5/otel-arrow-adapter/pkg/benchmark/dataset" ) var help = flag.Bool("help", false, "Show help") -var inputFile = "./data/otlp_traces.pb" +var inputFile = "data/otlp_traces.pb" var outputFile = "./data/nth_first_otlp_traces.pb" -var spanCount = 10000 +var spanCount = 10 +var format = "proto" -// This tool extracts the first n spans from a protobuf file of traces (i.e. kind of `head` command for spans). +// This tool extracts the first n spans from a compressed json file of traces (i.e. kind of `head` command for spans). func main() { // Define the flags. - flag.StringVar(&inputFile, "input", outputFile, "Input file") + flag.StringVar(&inputFile, "input", inputFile, "Input file") flag.StringVar(&outputFile, "output", outputFile, "Output file") flag.IntVar(&spanCount, "span_count", spanCount, "Number of spans") + flag.StringVar(&format, "format", format, "file format") // Parse the flag flag.Parse() @@ -48,14 +51,14 @@ func main() { } // Extract the first n spans - ds := dataset.NewRealTraceDataset(inputFile, []string{"trace_id"}) + ds := dataset.NewRealTraceDataset(inputFile, benchmark.CompressionTypeZstd, format, []string{"trace_id"}) + if ds.SizeInBytes() == 0 { + log.Fatal("failed to read any bytes from input") + } traces := ds.Traces(0, spanCount) request := ptraceotlp.NewExportRequestFromTraces(traces[0]) - pb, err := request.MarshalProto() - if err != nil { - log.Fatal("marshaling error: ", err) - } + // Write protobuf to file if _, err := os.Stat(outputFile); os.IsNotExist(err) { @@ -65,7 +68,21 @@ func main() { } } - err = os.WriteFile(outputFile, pb, 0600) + var buf []byte + var err error + if format == "json" { + buf, err = request.MarshalJSON() + if err != nil { + log.Fatal("marshaling error: ", err) + } + } else { + buf, err = request.MarshalProto() + if err != nil { + log.Fatal("marshaling error: ", err) + } + } + + err = os.WriteFile(outputFile, buf, 0600) if err != nil { log.Fatal("write error: ", err) }