Skip to content

Commit

Permalink
Sorted key/value store (badger) backed storage plugin (#760)
Browse files Browse the repository at this point in the history
* Implementation of sorted key/value store backed storage plugin for Jaeger. Implemented against badger for now.

Implement Services() and Operations(service) using memory cached map which is filled from the K/V store on the startup. Also, move entry creation outside the Update() transaction to reduce blocking in the badger processing as well as implement correct TTL writing and reading.

Fix ASC ordering of return set if only a single index seek was used

Implement range index scanning for duration index, use common initialize and tear down for tests and benchmarks, implement a small write benchmark and do Span fetching in a single transaction.

Duration only test

Create configuration options and modify tests to use them. Rebased from master.

Fixtures should use internal model and not domain model for comparison

Fix badger storage to pass all the integration tests and enable integration tests for badger

Add license headers

Update dependencies

Make lint happy

Addressing review comments, replacing magic numbers with a sizeOf constant, using logger to indicate initialization is completed and io.Closer is used. Added maintenance thread that cleans up value log as well as exposes the size of data directories. Added some comments for multiple internal functions as well as refactored FindTraces to use more functions for easier reading.

Default data directory (when ephmeral is not used) is now starting directory + data/keys and data/values.

Fix domain_trace_compare to check for differences between data and not pointers

Add support for protoBuf encoding/decoding and make it as a default instead of json encoding in the storage

Fix merge error

disk statistics are only built on a Linux platform, fixes darwin compilation issues

Signed-off-by: Michael Burman <miburman@redhat.com>

* Add stuff (and some tests also) to satisfy Codecov

Signed-off-by: Michael Burman <miburman@redhat.com>

* Add LastMaintenanceRun expvar for test purposes and remove error checks from stats_linux.go for codecov satisfaction

Signed-off-by: Michael Burman <miburman@redhat.com>

* Add more testing purposes timers

Signed-off-by: Michael Burman <miburman@redhat.com>

* Add dependency reader, address comments

Signed-off-by: Michael Burman <miburman@redhat.com>

* Replace expvar with metrics.Factory, rebase to new TraceReader API, remove unused code / comments, rename MaintenanceTimer, change visibility of the ticker, make Close() remove the temp files in most cases, update dependencies to Gopkg.toml

Signed-off-by: Michael Burman <miburman@redhat.com>

* Revert changes to the fixtures, outdated

Signed-off-by: Michael Burman <miburman@redhat.com>

* Satisfy gosimple by using Equal instead of Compare

Signed-off-by: Michael Burman <miburman@redhat.com>

* Make factory_test check for io.Closer implementation

Signed-off-by: Michael Burman <miburman@redhat.com>

* Make metrics vars private to fix the liner

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Fix compile error in linux-only test

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Create artificial test to hopefully cheat Codecov

Signed-off-by: Michael Burman <miburman@redhat.com>

* Add sign-off to empty_tests

Signed-off-by: Michael Burman <miburman@redhat.com>

* Rebased and changed to metricstest

Signed-off-by: Michael Burman <miburman@redhat.com>

* dep ensure --update

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Refactor tests int sub-packages

Signed-off-by: Yuri Shkuro <ys@uber.com>

* dep ensure --update

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Change cache interfaces and add new tests to reach higher coverage

Signed-off-by: Michael Burman <yak@iki.fi>

* Add more tests, including validation and encoding parsing tests

Signed-off-by: Michael Burman <yak@iki.fi>

* Fix test refactoring to get factory coverage back to 100%

Signed-off-by: Michael Burman <yak@iki.fi>

* Change dependencyreader to use spanstore

Signed-off-by: Michael Burman <yak@iki.fi>

* Remove redundant consts

Signed-off-by: Michael Burman <yak@iki.fi>

* dep update

Signed-off-by: Yuri Shkuro <ys@uber.com>

* make fmt

Signed-off-by: Yuri Shkuro <ys@uber.com>

* regen proto files

Signed-off-by: Yuri Shkuro <ys@uber.com>

* dep --update

Signed-off-by: Yuri Shkuro <ys@uber.com>

* make proto

Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
burmanm authored and yurishkuro committed Apr 3, 2019
1 parent f47d66f commit 1703bae
Show file tree
Hide file tree
Showing 30 changed files with 2,874 additions and 104 deletions.
225 changes: 149 additions & 76 deletions Gopkg.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ required = [
name = "github.com/gogo/protobuf"
revision = "ba06b47c162d49f2af050fb4c75bcbc86a159d5c"

[[constraint]]
name = "github.com/dgraph-io/badger"
version = "=1.5.3"

[prune]
go-tests = true
unused-packages = true
Expand Down
27 changes: 16 additions & 11 deletions model/prototest/model_test.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions model/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ import (
"sort"
)

type byTraceID []*TraceID

func (s byTraceID) Len() int { return len(s) }
func (s byTraceID) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTraceID) Less(i, j int) bool {
if s[i].High < s[j].High {
return true
} else if s[i].High > s[j].High {
return false
}
return s[i].Low < s[j].Low
}

// SortTraceIDs sorts a list of TraceIDs
func SortTraceIDs(traceIDs []*TraceID) {
sort.Sort(byTraceID(traceIDs))
}

type traceByTraceID []*Trace

func (s traceByTraceID) Len() int { return len(s) }
Expand Down
21 changes: 21 additions & 0 deletions model/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,24 @@ func TestSortListOfTraces(t *testing.T) {
SortTraces(list2)
assert.EqualValues(t, list1, list2)
}

func TestSortByTraceID(t *testing.T) {
traceID := &TraceID{
High: uint64(1),
Low: uint64(1),
}
traceID2 := &TraceID{
High: uint64(2),
Low: uint64(0),
}
traceID3 := &TraceID{
High: uint64(1),
Low: uint64(0),
}

traces := []*TraceID{traceID, traceID2, traceID3}
// Expect ascending order
tracesExpected := []*TraceID{traceID3, traceID, traceID2}
SortTraceIDs(traces)
assert.EqualValues(t, tracesExpected, traces)
}
99 changes: 99 additions & 0 deletions plugin/storage/badger/dependencystore/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dependencystore

import (
"context"
"time"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
type DependencyStore struct {
reader spanstore.Reader
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(store spanstore.Reader) *DependencyStore {
return &DependencyStore{
reader: store,
}
}

// GetDependencies returns all interservice dependencies, implements DependencyReader
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
deps := map[string]*model.DependencyLink{}

params := &spanstore.TraceQueryParameters{
StartTimeMin: endTs.Add(-1 * lookback),
StartTimeMax: endTs,
}

// We need to do a full table scan - if this becomes a bottleneck, we can write write an index that describes
// dependencyKeyPrefix + timestamp + parent + child key and do a key-only seek (which is fast - but requires additional writes)

// GetDependencies is not shipped with a context like the SpanReader / SpanWriter
traces, err := s.reader.FindTraces(context.Background(), params)
if err != nil {
return nil, err
}
for _, tr := range traces {
processTrace(deps, tr)
}

return depMapToSlice(deps), err
}

// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin
func depMapToSlice(deps map[string]*model.DependencyLink) []model.DependencyLink {
retMe := make([]model.DependencyLink, 0, len(deps))
for _, dep := range deps {
retMe = append(retMe, *dep)
}
return retMe
}

// processTrace is copy from the memory storage plugin
func processTrace(deps map[string]*model.DependencyLink, trace *model.Trace) {
for _, s := range trace.Spans {
parentSpan := seekToSpan(trace, s.ParentSpanID())
if parentSpan != nil {
if parentSpan.Process.ServiceName == s.Process.ServiceName {
continue
}
depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentSpan.Process.ServiceName,
Child: s.Process.ServiceName,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
}
}
}

func seekToSpan(trace *model.Trace, spanID model.SpanID) *model.Span {
for _, s := range trace.Spans {
if s.SpanID == spanID {
return s
}
}
return nil
}
28 changes: 28 additions & 0 deletions plugin/storage/badger/dependencystore/storage_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dependencystore

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/model"
)

func TestSeekToSpan(t *testing.T) {
span := seekToSpan(&model.Trace{}, model.SpanID(uint64(1)))
assert.Nil(t, span)
}
103 changes: 103 additions & 0 deletions plugin/storage/badger/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dependencystore_test

import (
"fmt"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// Opens a badger db and runs a a test on it.
func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader)) {
f := badger.NewFactory()
opts := badger.NewOptions("badger")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--badger.ephemeral=true",
"--badger.consistency=false",
})
f.InitFromViper(v)

err := f.Initialize(metrics.NullFactory, zap.NewNop())
assert.NoError(tb, err)

sw, err := f.CreateSpanWriter()
assert.NoError(tb, err)

dr, err := f.CreateDependencyReader()
assert.NoError(tb, err)

defer func() {
if closer, ok := sw.(io.Closer); ok {
err := closer.Close()
assert.NoError(tb, err)
} else {
tb.FailNow()
}

}()
test(tb, sw, dr)
}

func TestDependencyReader(t *testing.T) {
runFactoryTest(t, func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader) {
tid := time.Now()
links, err := dr.GetDependencies(tid, time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)

traces := 40
spans := 3
for i := 0; i < traces; i++ {
for j := 0; j < spans; j++ {
s := model.Span{
TraceID: model.TraceID{
Low: uint64(i),
High: 1,
},
SpanID: model.SpanID(j),
OperationName: fmt.Sprintf("operation-a"),
Process: &model.Process{
ServiceName: fmt.Sprintf("service-%d", j),
},
StartTime: tid.Add(time.Duration(i)),
Duration: time.Duration(i + j),
}
if j > 0 {
s.References = []model.SpanRef{model.NewChildOfRef(s.TraceID, model.SpanID(j-1))}
}
err := sw.WriteSpan(&s)
assert.NoError(t, err)
}
}
links, err = dr.GetDependencies(time.Now(), time.Hour)
assert.NoError(t, err)
assert.NotEmpty(t, links)
assert.Equal(t, spans-1, len(links)) // First span does not create a dependency
assert.Equal(t, uint64(traces), links[0].CallCount) // Each trace calls the same services
})
}
Loading

0 comments on commit 1703bae

Please sign in to comment.