Skip to content

Commit

Permalink
Update tools and benchmark datasets to read compressed json (#174)
Browse files Browse the repository at this point in the history
* not working

* add data

* not working

* cleanup metrics

* why was this so painful

* add data files

* add logs

* remove data files and update data generators

* update file name

* metrics support both proto and compressed json

* logs support both proto and json

* traces support both proto and json

* trace head supports both formats

* comments + review feedback

* fix test

---------

Co-authored-by: Laurent Quérel <laurent.querel@gmail.com>
  • Loading branch information
moh-osman3 and lquerel authored Jun 12, 2023
1 parent 534d56b commit 223ff57
Show file tree
Hide file tree
Showing 13 changed files with 489 additions and 70 deletions.
2 changes: 2 additions & 0 deletions pkg/benchmark/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/pierrec/lz4"
)

const CompressionTypeZstd = "zstd"

type CompressionAlgorithm interface {
fmt.Stringer
Compress(data []byte) ([]byte, error)
Expand Down
92 changes: 88 additions & 4 deletions pkg/benchmark/dataset/real_logs_dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package dataset

import (
"bufio"
"errors"
"io"
"log"
"os"
"path/filepath"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"

"github.com/klauspost/compress/zstd"
"github.com/f5/otel-arrow-adapter/pkg/benchmark"
"github.com/f5/otel-arrow-adapter/pkg/benchmark/stats"
)

Expand All @@ -38,8 +43,71 @@ type logUnit struct {
scope plog.ScopeLogs
}

// NewRealLogsDataset creates a new RealLogsDataset from a binary file.
func NewRealLogsDataset(path string) *RealLogsDataset {
type logReader struct {
stringReader *bufio.Reader
unmarshaler *plog.JSONUnmarshaler
bytesRead int
}

func (lr *logReader) readAllLogs() (plog.Logs, error) {
logs := plog.NewLogs()

for {
if line, err := lr.stringReader.ReadString('\n'); err == nil {
ll, err := lr.unmarshaler.UnmarshalLogs([]byte(line))
if err != nil {
return logs, err
}
for i := 0; i < ll.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().AppendEmpty()
ll.ResourceLogs().At(i).CopyTo(rl)
}
lr.bytesRead += len(line)
} else { // failed to read line
if err != nil {
if errors.Is(err, io.EOF) {
return logs, nil
}
return logs, err
}
}
}
}

func logsFromJSON(path string, compression string) (plog.Logs, int) {
file, err := os.Open(filepath.Clean(path))
if err != nil {
log.Fatal("open file:", err)
}

lr := &logReader{
unmarshaler: &plog.JSONUnmarshaler{},
bytesRead: 0,
}

if compression == benchmark.CompressionTypeZstd {
cr, err := zstd.NewReader(file)
if err != nil {
log.Fatal("Failed to create compressed reader: ", err)
}
lr.stringReader = bufio.NewReader(cr)
} else { // no compression
lr.stringReader = bufio.NewReader(file)
}

logs, err := lr.readAllLogs()
if err != nil {
if lr.bytesRead == 0 {
log.Fatal("Read zero bytes from file: ", err)
}
log.Print("Found error when reading file: ", err)
log.Print("Bytes read: ", lr.bytesRead)
}

return logs, lr.bytesRead
}

func logsFromProto(path string) (plog.Logs, int) {
data, err := os.ReadFile(filepath.Clean(path))
if err != nil {
log.Fatal("read file:", err)
Expand All @@ -49,13 +117,29 @@ func NewRealLogsDataset(path string) *RealLogsDataset {
if err := otlp.UnmarshalProto(data); err != nil {
log.Fatal("unmarshal:", err)
}
logs := otlp.Logs()

return logs, len(data)
}


// NewRealLogsDataset creates a new RealLogsDataset from a binary file
// which is either formatted as otlp protobuf or compressed otlp json.
func NewRealLogsDataset(path string, compression string, format string) *RealLogsDataset {
var logs plog.Logs
var size int

if format == "json" {
logs, size = logsFromJSON(path, compression)
} else {
logs, size = logsFromProto(path)
}

ds := &RealLogsDataset{
logs: []logUnit{},
sizeInBytes: len(data),
sizeInBytes: size,
logsStats: stats.NewLogsStats(),
}
logs := otlp.Logs()
ds.logsStats.Analyze(logs)

for ri := 0; ri < logs.ResourceLogs().Len(); ri++ {
Expand Down
91 changes: 88 additions & 3 deletions pkg/benchmark/dataset/real_metrics_dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
package dataset

import (
"bufio"
"errors"
"io"
"log"
"os"
"path/filepath"

"github.com/klauspost/compress/zstd"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/f5/otel-arrow-adapter/pkg/benchmark/stats"
"github.com/f5/otel-arrow-adapter/pkg/benchmark"
)

// RealMetricsDataset represents a dataset of real metrics read from a Metrics serialized to a binary file.
Expand All @@ -38,8 +43,73 @@ type metrics struct {
scope pmetric.ScopeMetrics
}

// NewRealMetricsDataset creates a new RealMetricsDataset from a binary file.
func NewRealMetricsDataset(path string) *RealMetricsDataset {
type metricReader struct {
stringReader *bufio.Reader
unmarshaler *pmetric.JSONUnmarshaler
bytesRead int
}

func (mr *metricReader) readAllMetrics() (pmetric.Metrics, error) {
metrics := pmetric.NewMetrics()

for {
if line, err := mr.stringReader.ReadString('\n'); err == nil {
ml, err := mr.unmarshaler.UnmarshalMetrics([]byte(line))
if err != nil {
return metrics, err
}
for i := 0; i < ml.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().AppendEmpty()
ml.ResourceMetrics().At(i).CopyTo(rm)
}
mr.bytesRead += len(line)
} else { // failed to read line
if err != nil {
if errors.Is(err, io.EOF) {
return metrics, nil
}
return metrics, err
}
}
}
}

func metricsFromJSON(path string, compression string) (pmetric.Metrics, int) {
file, err := os.Open(filepath.Clean(path))
if err != nil {
log.Fatal("open file:", err)
}

mr := &metricReader{
unmarshaler: &pmetric.JSONUnmarshaler{},
bytesRead: 0,
}

if compression == benchmark.CompressionTypeZstd {
cr, err := zstd.NewReader(file)
if err != nil {
log.Fatal("Failed to create compressed reader: ", err)
}
mr.stringReader = bufio.NewReader(cr)
} else { // no compression
mr.stringReader = bufio.NewReader(file)
}

mdata, err := mr.readAllMetrics()

if err != nil {
if mr.bytesRead == 0 {
log.Fatal("Read zero bytes from file: ", err)
}
log.Print("Found error when reading file: ", err)
log.Print("Bytes read: ", mr.bytesRead)
}

return mdata, mr.bytesRead

}

func metricsFromProto(path string) (pmetric.Metrics, int) {
data, err := os.ReadFile(filepath.Clean(path))
if err != nil {
log.Fatal("read file:", err)
Expand All @@ -50,9 +120,24 @@ func NewRealMetricsDataset(path string) *RealMetricsDataset {
}
mdata := otlp.Metrics()

return mdata, len(data)
}


// NewRealMetricsDataset creates a new RealMetricsDataset from a binary file
// which is either formatted as otlp protobuf or compressed otlp json.
func NewRealMetricsDataset(path string, compression string, format string) *RealMetricsDataset {
var mdata pmetric.Metrics
var bytes int
if format == "json" {
mdata, bytes = metricsFromJSON(path, compression)
} else {
mdata, bytes = metricsFromProto(path)
}

ds := &RealMetricsDataset{
metrics: []metrics{},
sizeInBytes: len(data),
sizeInBytes: bytes,
metricsStats: stats.NewMetricsStats(),
}
ds.metricsStats.Analyze(mdata)
Expand Down
90 changes: 87 additions & 3 deletions pkg/benchmark/dataset/real_trace_dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@
package dataset

import (
"bufio"
"encoding/hex"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"strings"

"github.com/klauspost/compress/zstd"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"golang.org/x/exp/rand"

"github.com/f5/otel-arrow-adapter/pkg/benchmark"
carrow "github.com/f5/otel-arrow-adapter/pkg/otel/common/otlp"
)

Expand All @@ -47,22 +52,101 @@ type spanSorter struct {

var _ sort.Interface = spanSorter{}

func NewRealTraceDataset(path string, sortOrder []string) *RealTraceDataset {
type traceReader struct {
stringReader *bufio.Reader
unmarshaler *ptrace.JSONUnmarshaler
bytesRead int
}

func (tr *traceReader) readAllTraces() (ptrace.Traces, error) {
traces := ptrace.NewTraces()

for {
if line, err := tr.stringReader.ReadString('\n'); err == nil {
tl, err := tr.unmarshaler.UnmarshalTraces([]byte(line))
if err != nil {
return traces, err
}
for i := 0; i < tl.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().AppendEmpty()
tl.ResourceSpans().At(i).CopyTo(rs)
}
tr.bytesRead += len(line)
} else { // failed to read line
if err != nil {
if errors.Is(err, io.EOF) {
return traces, nil
}
return traces, err
}
}
}
}

func tracesFromJSON(path string, compression string) (ptrace.Traces, int) {
file, err := os.Open(filepath.Clean(path))
if err != nil {
log.Fatal("open file:", err)
}

tr := &traceReader{
unmarshaler: &ptrace.JSONUnmarshaler{},
bytesRead: 0,
}

if compression == benchmark.CompressionTypeZstd {
cr, err := zstd.NewReader(file)
if err != nil {
log.Fatal("Failed to create compressed reader: ", err)
}
tr.stringReader = bufio.NewReader(cr)
} else { // no compression
tr.stringReader = bufio.NewReader(file)
}

traces, err := tr.readAllTraces()
if err != nil {
if tr.bytesRead == 0 {
log.Fatal("Read zero bytes from file: ", err)
}
log.Print("Found error when reading file: ", err)
log.Print("Bytes read: ", tr.bytesRead)
}

return traces, tr.bytesRead
}

func tracesFromProto(path string, compression string) (ptrace.Traces, int) {
data, err := os.ReadFile(filepath.Clean(path))
if err != nil {
log.Fatal("read file:", err)
}

otlp := ptraceotlp.NewExportRequest()
if err := otlp.UnmarshalProto(data); err != nil {
log.Fatalf("in %q unmarshal: %v", path, err)
}

traces := otlp.Traces()
return traces, len(data)
}

// NewRealTraceDataset creates a new RealTraceDataset from a binary file
// which is either formatted as otlp protobuf or compressed otlp json.
func NewRealTraceDataset(path string, compression string, format string, sortOrder []string) *RealTraceDataset {
var traces ptrace.Traces
var size int
if format == "json" {
traces, size = tracesFromJSON(path, compression)
} else {
traces, size = tracesFromProto(path, compression)
}

ds := &RealTraceDataset{
s2r: map[ptrace.Span]pcommon.Resource{},
s2s: map[ptrace.Span]pcommon.InstrumentationScope{},
sizeInBytes: len(data),
sizeInBytes: size,
}
traces := otlp.Traces()

for i := 0; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
Expand Down
2 changes: 1 addition & 1 deletion pkg/benchmark/profileable/otlp/otlp_profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestOtlpLightstepTracesProfiler(t *testing.T) {
t.Skip("Lightstep specific test")
t.Parallel()

benchdata := dataset.NewRealTraceDataset("/Users/josh.macdonald/src/lightstep/forward_spans.bin.otlp.bin", []string{
benchdata := dataset.NewRealTraceDataset("../../../../tools/trace_benchmark/data/otlp_traces.pb", benchmark.CompressionTypeZstd, "proto", []string{
"trace_id",
})

Expand Down
Loading

0 comments on commit 223ff57

Please sign in to comment.