Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tools and benchmark datasets to read compressed json #174

Merged
merged 18 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/benchmark/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/pierrec/lz4"
)

const CompressionTypeZstd = "zstd"

type CompressionAlgorithm interface {
fmt.Stringer
Compress(data []byte) ([]byte, error)
Expand Down
92 changes: 88 additions & 4 deletions pkg/benchmark/dataset/real_logs_dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package dataset

import (
"bufio"
"errors"
"io"
"log"
"os"
"path/filepath"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"

"github.com/klauspost/compress/zstd"
"github.com/f5/otel-arrow-adapter/pkg/benchmark"
"github.com/f5/otel-arrow-adapter/pkg/benchmark/stats"
)

Expand All @@ -38,8 +43,71 @@ type logUnit struct {
scope plog.ScopeLogs
}

// NewRealLogsDataset creates a new RealLogsDataset from a binary file.
func NewRealLogsDataset(path string) *RealLogsDataset {
type logReader struct {
stringReader *bufio.Reader
unmarshaler *plog.JSONUnmarshaler
bytesRead int
}

func (lr *logReader) readAllLogs() (plog.Logs, error) {
logs := plog.NewLogs()

for {
if line, err := lr.stringReader.ReadString('\n'); err == nil {
ll, err := lr.unmarshaler.UnmarshalLogs([]byte(line))
if err != nil {
return logs, err
}
for i := 0; i < ll.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().AppendEmpty()
ll.ResourceLogs().At(i).CopyTo(rl)
}
lr.bytesRead += len(line)
} else { // failed to read line
if err != nil {
if errors.Is(err, io.EOF) {
return logs, nil
}
return logs, err
}
}
}
}

func logsFromJSON(path string, compression string) (plog.Logs, int) {
file, err := os.Open(filepath.Clean(path))
if err != nil {
log.Fatal("open file:", err)
}

lr := &logReader{
unmarshaler: &plog.JSONUnmarshaler{},
bytesRead: 0,
}

if compression == benchmark.CompressionTypeZstd {
cr, err := zstd.NewReader(file)
if err != nil {
log.Fatal("Failed to create compressed reader: ", err)
}
lr.stringReader = bufio.NewReader(cr)
} else { // no compression
lr.stringReader = bufio.NewReader(file)
}

logs, err := lr.readAllLogs()
if err != nil {
if lr.bytesRead == 0 {
log.Fatal("Read zero bytes from file: ", err)
}
log.Print("Found error when reading file: ", err)
log.Print("Bytes read: ", lr.bytesRead)
}

return logs, lr.bytesRead
}

func logsFromProto(path string) (plog.Logs, int) {
data, err := os.ReadFile(filepath.Clean(path))
if err != nil {
log.Fatal("read file:", err)
Expand All @@ -49,13 +117,29 @@ func NewRealLogsDataset(path string) *RealLogsDataset {
if err := otlp.UnmarshalProto(data); err != nil {
log.Fatal("unmarshal:", err)
}
logs := otlp.Logs()

return logs, len(data)
}


// NewRealLogsDataset creates a new RealLogsDataset from a binary file
// which is either formatted as otlp protobuf or compressed otlp json.
func NewRealLogsDataset(path string, compression string, format string) *RealLogsDataset {
var logs plog.Logs
var size int

if format == "json" {
logs, size = logsFromJSON(path, compression)
} else {
logs, size = logsFromProto(path)
}

ds := &RealLogsDataset{
logs: []logUnit{},
sizeInBytes: len(data),
sizeInBytes: size,
logsStats: stats.NewLogsStats(),
}
logs := otlp.Logs()
ds.logsStats.Analyze(logs)

for ri := 0; ri < logs.ResourceLogs().Len(); ri++ {
Expand Down
91 changes: 88 additions & 3 deletions pkg/benchmark/dataset/real_metrics_dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
package dataset

import (
"bufio"
"errors"
"io"
"log"
"os"
"path/filepath"

"github.com/klauspost/compress/zstd"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/f5/otel-arrow-adapter/pkg/benchmark/stats"
"github.com/f5/otel-arrow-adapter/pkg/benchmark"
)

// RealMetricsDataset represents a dataset of real metrics read from a Metrics serialized to a binary file.
Expand All @@ -38,8 +43,73 @@ type metrics struct {
scope pmetric.ScopeMetrics
}

// NewRealMetricsDataset creates a new RealMetricsDataset from a binary file.
func NewRealMetricsDataset(path string) *RealMetricsDataset {
type metricReader struct {
stringReader *bufio.Reader
unmarshaler *pmetric.JSONUnmarshaler
bytesRead int
}

func (mr *metricReader) readAllMetrics() (pmetric.Metrics, error) {
metrics := pmetric.NewMetrics()

for {
if line, err := mr.stringReader.ReadString('\n'); err == nil {
ml, err := mr.unmarshaler.UnmarshalMetrics([]byte(line))
if err != nil {
return metrics, err
}
for i := 0; i < ml.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().AppendEmpty()
ml.ResourceMetrics().At(i).CopyTo(rm)
}
mr.bytesRead += len(line)
} else { // failed to read line
if err != nil {
if errors.Is(err, io.EOF) {
return metrics, nil
}
return metrics, err
}
}
}
}

func metricsFromJSON(path string, compression string) (pmetric.Metrics, int) {
file, err := os.Open(filepath.Clean(path))
if err != nil {
log.Fatal("open file:", err)
}

mr := &metricReader{
unmarshaler: &pmetric.JSONUnmarshaler{},
bytesRead: 0,
}

if compression == benchmark.CompressionTypeZstd {
cr, err := zstd.NewReader(file)
if err != nil {
log.Fatal("Failed to create compressed reader: ", err)
}
mr.stringReader = bufio.NewReader(cr)
} else { // no compression
mr.stringReader = bufio.NewReader(file)
}

mdata, err := mr.readAllMetrics()

if err != nil {
if mr.bytesRead == 0 {
log.Fatal("Read zero bytes from file: ", err)
}
log.Print("Found error when reading file: ", err)
log.Print("Bytes read: ", mr.bytesRead)
}

return mdata, mr.bytesRead

}

func metricsFromProto(path string) (pmetric.Metrics, int) {
data, err := os.ReadFile(filepath.Clean(path))
if err != nil {
log.Fatal("read file:", err)
Expand All @@ -50,9 +120,24 @@ func NewRealMetricsDataset(path string) *RealMetricsDataset {
}
mdata := otlp.Metrics()

return mdata, len(data)
}


// NewRealMetricsDataset creates a new RealMetricsDataset from a binary file
// which is either formatted as otlp protobuf or compressed otlp json.
func NewRealMetricsDataset(path string, compression string, format string) *RealMetricsDataset {
var mdata pmetric.Metrics
var bytes int
if format == "json" {
mdata, bytes = metricsFromJSON(path, compression)
} else {
mdata, bytes = metricsFromProto(path)
}

ds := &RealMetricsDataset{
metrics: []metrics{},
sizeInBytes: len(data),
sizeInBytes: bytes,
metricsStats: stats.NewMetricsStats(),
}
ds.metricsStats.Analyze(mdata)
Expand Down
90 changes: 87 additions & 3 deletions pkg/benchmark/dataset/real_trace_dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@
package dataset

import (
"bufio"
"encoding/hex"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"strings"

"github.com/klauspost/compress/zstd"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"golang.org/x/exp/rand"

"github.com/f5/otel-arrow-adapter/pkg/benchmark"
carrow "github.com/f5/otel-arrow-adapter/pkg/otel/common/otlp"
)

Expand All @@ -47,22 +52,101 @@ type spanSorter struct {

var _ sort.Interface = spanSorter{}

func NewRealTraceDataset(path string, sortOrder []string) *RealTraceDataset {
type traceReader struct {
stringReader *bufio.Reader
unmarshaler *ptrace.JSONUnmarshaler
bytesRead int
}

func (tr *traceReader) readAllTraces() (ptrace.Traces, error) {
traces := ptrace.NewTraces()

for {
if line, err := tr.stringReader.ReadString('\n'); err == nil {
tl, err := tr.unmarshaler.UnmarshalTraces([]byte(line))
if err != nil {
return traces, err
}
for i := 0; i < tl.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().AppendEmpty()
tl.ResourceSpans().At(i).CopyTo(rs)
}
tr.bytesRead += len(line)
} else { // failed to read line
if err != nil {
if errors.Is(err, io.EOF) {
return traces, nil
}
return traces, err
}
}
}
}

func tracesFromJSON(path string, compression string) (ptrace.Traces, int) {
file, err := os.Open(filepath.Clean(path))
if err != nil {
log.Fatal("open file:", err)
}

tr := &traceReader{
unmarshaler: &ptrace.JSONUnmarshaler{},
bytesRead: 0,
}

if compression == benchmark.CompressionTypeZstd {
cr, err := zstd.NewReader(file)
if err != nil {
log.Fatal("Failed to create compressed reader: ", err)
}
tr.stringReader = bufio.NewReader(cr)
} else { // no compression
tr.stringReader = bufio.NewReader(file)
}

traces, err := tr.readAllTraces()
if err != nil {
if tr.bytesRead == 0 {
log.Fatal("Read zero bytes from file: ", err)
}
log.Print("Found error when reading file: ", err)
log.Print("Bytes read: ", tr.bytesRead)
}

return traces, tr.bytesRead
}

func tracesFromProto(path string, compression string) (ptrace.Traces, int) {
data, err := os.ReadFile(filepath.Clean(path))
if err != nil {
log.Fatal("read file:", err)
}

otlp := ptraceotlp.NewExportRequest()
if err := otlp.UnmarshalProto(data); err != nil {
log.Fatalf("in %q unmarshal: %v", path, err)
}

traces := otlp.Traces()
return traces, len(data)
}

// NewRealTraceDataset creates a new RealTraceDataset from a binary file
// which is either formatted as otlp protobuf or compressed otlp json.
func NewRealTraceDataset(path string, compression string, format string, sortOrder []string) *RealTraceDataset {
var traces ptrace.Traces
var size int
if format == "json" {
traces, size = tracesFromJSON(path, compression)
} else {
traces, size = tracesFromProto(path, compression)
}

ds := &RealTraceDataset{
s2r: map[ptrace.Span]pcommon.Resource{},
s2s: map[ptrace.Span]pcommon.InstrumentationScope{},
sizeInBytes: len(data),
sizeInBytes: size,
}
traces := otlp.Traces()

for i := 0; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
Expand Down
2 changes: 1 addition & 1 deletion pkg/benchmark/profileable/otlp/otlp_profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestOtlpLightstepTracesProfiler(t *testing.T) {
t.Skip("Lightstep specific test")
t.Parallel()

benchdata := dataset.NewRealTraceDataset("/Users/josh.macdonald/src/lightstep/forward_spans.bin.otlp.bin", []string{
benchdata := dataset.NewRealTraceDataset("../../../../tools/trace_benchmark/data/otlp_traces.json", benchmark.CompressionTypeZstd, []string{
"trace_id",
})

Expand Down
Loading