Skip to content

Commit

Permalink
Fix badger storage to pass all the integration tests and enable integ…
Browse files Browse the repository at this point in the history
…ration tests for badger
  • Loading branch information
Michael Burman committed May 3, 2018
1 parent 097907e commit 71aeaab
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 23 deletions.
16 changes: 16 additions & 0 deletions model/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@ import (
"sort"
)

type byTraceID []*TraceID

func (s byTraceID) Len() int { return len(s) }
func (s byTraceID) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTraceID) Less(i, j int) bool {
if s[i].High < s[j].High {
return true
}
return s[i].Low < s[j].Low
}

// SortTraceIDs sorts a list of TraceIDs
func SortTraceIDs(traceIDs []*TraceID) {
sort.Sort(byTraceID(traceIDs))
}

type traceByTraceID []*Trace

func (s traceByTraceID) Len() int { return len(s) }
Expand Down
76 changes: 76 additions & 0 deletions plugin/storage/badger/dependencystore/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package dependencystore

import (
"time"

"github.com/jaegertracing/jaeger/model"
)

const (
dependencyKeyPrefix byte = 0xC0 // Dependency PKs have first two bits set to 1
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
type DependencyStore struct {
// session cassandra.Session
// dependencyDataFrequency time.Duration
// dependenciesTableMetrics *casMetrics.Table
// logger *zap.Logger
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore() *DependencyStore {
// session cassandra.Session,
// dependencyDataFrequency time.Duration,
// metricsFactory metrics.Factory,
// logger *zap.Logger,
// ) *DependencyStore {
// return &DependencyStore{
// session: session,
// dependencyDataFrequency: dependencyDataFrequency,
// dependenciesTableMetrics: casMetrics.NewTable(metricsFactory, "Dependencies"),
// logger: logger,
// }
return nil
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
// deps := make([]Dependency, len(dependencies))
// for i, d := range dependencies {
// deps[i] = Dependency{
// Parent: d.Parent,
// Child: d.Child,
// CallCount: int64(d.CallCount),
// }
// }
// query := s.session.Query(depsInsertStmt, ts, ts, deps)
// return s.dependenciesTableMetrics.Exec(query, s.logger)
return nil
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
// query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs)
// iter := query.Consistency(cassandra.One).Iter()

// var mDependency []model.DependencyLink
// var dependencies []Dependency
// var ts time.Time
// for iter.Scan(&ts, &dependencies) {
// for _, dependency := range dependencies {
// mDependency = append(mDependency, model.DependencyLink{
// Parent: dependency.Parent,
// Child: dependency.Child,
// CallCount: uint64(dependency.CallCount),
// })
// }
// }

// if err := iter.Close(); err != nil {
// s.logger.Error("Failure to read Dependencies", zap.Time("endTs", endTs), zap.Duration("lookback", lookback), zap.Error(err))
// return nil, errors.Wrap(err, "Error reading dependencies from storage")
// }
// return mDependency, nil
return nil, nil
}
37 changes: 28 additions & 9 deletions plugin/storage/badger/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ func (r *TraceReader) getTraces(traceIDs []model.TraceID) ([]*model.Trace, error

err := r.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 1 // TraceIDs are not sorted, pointless to prefetch large amount of values
opts.PrefetchSize = 10 // TraceIDs are not sorted, pointless to prefetch large amount of values
it := txn.NewIterator(opts)
defer it.Close()

val := []byte{}
for _, prefix := range prefixes {
spans := make([]*model.Span, 0, 4) // reduce reallocation requirements by defining some initial length

d := 0
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
// Add value to the span store (decode from JSON / defined encoding first)
// These are in the correct order because of the sorted nature
Expand All @@ -93,13 +94,13 @@ func (r *TraceReader) getTraces(traceIDs []model.TraceID) ([]*model.Trace, error
return err
}
spans = append(spans, &sp)
d++
}
trace := &model.Trace{
Spans: spans,
}
traces = append(traces, trace)
}

return nil
})

Expand All @@ -121,7 +122,7 @@ func (r *TraceReader) GetTrace(traceID model.TraceID) (*model.Trace, error) {

func createPrimaryKeySeekPrefix(traceID model.TraceID) []byte {
buf := new(bytes.Buffer)
buf.WriteByte(primaryKeyPrefix)
buf.WriteByte(spanKeyPrefix)
binary.Write(buf, binary.BigEndian, traceID.High)
binary.Write(buf, binary.BigEndian, traceID.Low)
return buf.Bytes()
Expand Down Expand Up @@ -212,20 +213,29 @@ func (r *TraceReader) FindTraces(query *spanstore.TraceQueryParameters) ([]*mode
// This is not unique index result - same TraceID can be matched from multiple spans
indexResults, _ := r.scanRangeIndex(startKey, endKey, query.StartTimeMin, query.StartTimeMax)
hashFilter := make(map[model.TraceID]struct{}, len(indexResults))
filteredResults := make([][]byte, 0, len(indexResults)) // Max possible length
filteredResults := make([]*model.TraceID, 0, len(indexResults)) // Max possible length
appendableResults := make([][]byte, 0, len(indexResults)) // Max possible length
var value struct{}
for _, k := range indexResults {
key := k[len(k)-16:]
id := model.TraceID{
id := &model.TraceID{
High: binary.BigEndian.Uint64(key[:8]),
Low: binary.BigEndian.Uint64(key[8:]),
}
if _, exists := hashFilter[id]; !exists {
filteredResults = append(filteredResults, key)
hashFilter[id] = value
if _, exists := hashFilter[*id]; !exists {
filteredResults = append(filteredResults, id)
hashFilter[*id] = value
}
}
ids = append(ids, filteredResults)

model.SortTraceIDs(filteredResults)

// This is an ugly hack at this point - but has no impact on performance really
for _, tr := range filteredResults {
appendableResults = append(appendableResults, traceIDToComparableBytes(tr))
}

ids = append(ids, appendableResults)
}

// Get intersection of all the hits.. optimization problem without distribution knowledge: would it make more sense to scan the primary keys and decode them instead?
Expand Down Expand Up @@ -411,6 +421,15 @@ func scanRangeFunction(it *badger.Iterator, indexEndValue []byte) bool {
return false
}

func traceIDToComparableBytes(traceID *model.TraceID) []byte {
buf := new(bytes.Buffer)

binary.Write(buf, binary.BigEndian, traceID.High)
binary.Write(buf, binary.BigEndian, traceID.Low)

return buf.Bytes()
}

// Close Implements io.Closer
func (r *TraceReader) Close() error {
// Allows this to be signaled that we've been closed
Expand Down
39 changes: 25 additions & 14 deletions plugin/storage/badger/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
*/

const (
primaryKeyPrefix byte = 0x80 // All primary key values should have first bit set to 1
indexKeyPrefix byte = 0x00 // All secondary indexes should have first bit set to 0
serviceNameIndexKey byte = 0x01
operationNameIndexKey byte = 0x02
tagIndexKey byte = 0x03
durationIndexKey byte = 0x04
startTimeIndexKey byte = 0x05 // Reserved
spanKeyPrefix byte = 0x80 // All span keys should have first bit set to 1
dependencyKeyPrefix byte = 0xC0 // Dependency keys have first two bits set to 1 (documented here only)
secondaryBytePrefix byte = 0x10 // Reserved, prefix uses more than one byte
indexKeyRange byte = 0x0F // Secondary indexes use last 4 bits
primaryKeyPrefix byte = 0x00 // Primary keys have last 4 bits set to 0
serviceNameIndexKey byte = 0x81
operationNameIndexKey byte = 0x82
tagIndexKey byte = 0x83
durationIndexKey byte = 0x84
startTimeIndexKey byte = 0x85 // Reserved
jsonEncoding byte = 0x01 // Last 4 bits of the meta byte are for encoding type
)

Expand All @@ -48,7 +51,7 @@ func NewSpanWriter(db *badger.DB, c *CacheStore, ttl time.Duration, closer func(
func (w *SpanWriter) WriteSpan(span *model.Span) error {

// Avoid doing as much as possible inside the transaction boundary, create entries here
entriesToStore := make([]*badger.Entry, 0, len(span.Tags)+4)
entriesToStore := make([]*badger.Entry, 0, len(span.Tags)+4+len(span.Process.Tags)+len(span.Logs)*4)

trace, err := w.createTraceEntry(span)
if err != nil {
Expand All @@ -65,10 +68,18 @@ func (w *SpanWriter) WriteSpan(span *model.Span) error {
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(durationIndexKey, durationValue, span.StartTime, span.TraceID), nil))

for _, kv := range span.Tags {
// Ignore other types than String for now
if kv.VType == model.StringType {
// KEY: it<serviceName><tagsKey><traceId> VALUE: <tagsValue>
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(tagIndexKey, []byte(span.Process.ServiceName+kv.Key+kv.VStr), span.StartTime, span.TraceID), nil))
// Convert everything to string since queries are done that way also
// KEY: it<serviceName><tagsKey><traceId> VALUE: <tagsValue>
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(tagIndexKey, []byte(span.Process.ServiceName+kv.Key+kv.AsString()), span.StartTime, span.TraceID), nil))
}

for _, kv := range span.Process.Tags {
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(tagIndexKey, []byte(span.Process.ServiceName+kv.Key+kv.AsString()), span.StartTime, span.TraceID), nil))
}

for _, log := range span.Logs {
for _, kv := range log.Fields {
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(tagIndexKey, []byte(span.Process.ServiceName+kv.Key+kv.AsString()), span.StartTime, span.TraceID), nil))
}
}

Expand Down Expand Up @@ -98,7 +109,7 @@ func createIndexKey(indexPrefixKey byte, value []byte, startTime time.Time, trac
// KEY: indexKey<indexValue><startTime><traceId> (traceId is last 16 bytes of the key)
buf := new(bytes.Buffer)

buf.WriteByte(indexPrefixKey)
buf.WriteByte((indexPrefixKey & indexKeyRange) | spanKeyPrefix) // Enforce to prevent future accidental key overlapping
buf.Write(value)
binary.Write(buf, binary.BigEndian, model.TimeAsEpochMicroseconds(startTime))
binary.Write(buf, binary.BigEndian, traceID.High)
Expand Down Expand Up @@ -133,7 +144,7 @@ func createTraceKV(span *model.Span) ([]byte, []byte, error) {
// KEY: ti<trace-id><startTime><span-id> VALUE: All the details (json for now) METADATA: Encoding
buf := new(bytes.Buffer)

buf.WriteByte(primaryKeyPrefix)
buf.WriteByte(spanKeyPrefix)
binary.Write(buf, binary.BigEndian, span.TraceID.High)
binary.Write(buf, binary.BigEndian, span.TraceID.Low)
binary.Write(buf, binary.BigEndian, model.TimeAsEpochMicroseconds(span.StartTime))
Expand Down
79 changes: 79 additions & 0 deletions plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package integration

import (
"fmt"
"io"
"os"
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
assert "github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
)

type BadgerIntegrationStorage struct {
StorageIntegration
}

func (s *BadgerIntegrationStorage) initialize() error {
f := badger.NewFactory()
err := f.Initialize(metrics.NullFactory, zap.NewNop())
if err != nil {
return err
}

sw, err := f.CreateSpanWriter()
if err != nil {
return err
}
sr, err := f.CreateSpanReader()
if err != nil {
return err
}

s.SpanReader = sr
s.SpanWriter = sw

s.Refresh = s.refresh
s.CleanUp = s.cleanUp

logger, _ := testutils.NewLogger()
s.logger = logger

return nil
}

func (s *BadgerIntegrationStorage) clear() error {
if closer, ok := s.SpanWriter.(io.Closer); ok {
err := closer.Close()
if err != nil {
return err
}
return nil
}
return fmt.Errorf("BadgerIntegrationStorage did not implement io.Closer, unable to close and cleanup the storage correctly")
}

func (s *BadgerIntegrationStorage) cleanUp() error {
err := s.clear()
if err != nil {
return err
}
return s.initialize()
}

func (s *BadgerIntegrationStorage) refresh() error {
return nil
}

func TestBadgerStorage(t *testing.T) {
if os.Getenv("STORAGE") != "badger" {
t.Skip("Integration test against Badger skipped; set STORAGE env var to badger to run this")
}
s := &BadgerIntegrationStorage{}
assert.NoError(t, s.initialize())
s.IntegrationTestAll(t)
defer s.clear()
}

0 comments on commit 71aeaab

Please sign in to comment.