Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sorted key/value store (badger) backed storage plugin #760

Merged
merged 32 commits into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d095edb
Implementation of sorted key/value store backed storage plugin for Ja…
burmanm Apr 16, 2018
82d163a
Add stuff (and some tests also) to satisfy Codecov
Jul 11, 2018
181e044
Add LastMaintenanceRun expvar for test purposes and remove error chec…
Jul 13, 2018
3aff0cf
Add more testing purposes timers
Jul 13, 2018
7b9eecd
Add dependency reader, address comments
Nov 16, 2018
2e11181
Replace expvar with metrics.Factory, rebase to new TraceReader API, r…
Jan 2, 2019
2b43039
Revert changes to the fixtures, outdated
Jan 2, 2019
d8471de
Satisfy gosimple by using Equal instead of Compare
Jan 3, 2019
d463076
Make factory_test check for io.Closer implementation
Jan 3, 2019
a8ecc16
Make metrics vars private to fix the liner
Jan 4, 2019
c7a86e6
Fix compile error in linux-only test
Jan 4, 2019
663c5a6
Create artificial test to hopefully cheat Codecov
Jan 10, 2019
fe8e652
Add sign-off to empty_tests
Jan 11, 2019
2955789
Rebased and changed to metricstest
Jan 28, 2019
b1dcada
Merge branch 'master' into local_storage
yurishkuro Feb 2, 2019
c1a76c3
Merge branch 'master' into local_storage
Feb 10, 2019
47fdc93
dep ensure --update
Feb 10, 2019
3d2fdde
Refactor tests int sub-packages
Feb 10, 2019
c35d10e
Merge branch 'master' into local_storage
Feb 11, 2019
1af4a5c
dep ensure --update
Feb 11, 2019
24ec9c2
Change cache interfaces and add new tests to reach higher coverage
burmanm Feb 12, 2019
8d3408d
Add more tests, including validation and encoding parsing tests
burmanm Feb 12, 2019
7b81610
Fix test refactoring to get factory coverage back to 100%
burmanm Feb 12, 2019
8606253
Change dependencyreader to use spanstore
burmanm Feb 13, 2019
e575e4c
Remove redundant consts
burmanm Feb 13, 2019
bfb1b7d
Merge branch 'master' into local_storage
Apr 2, 2019
10705ba
dep update
Apr 2, 2019
8bf30ad
make fmt
Apr 2, 2019
10b4b57
regen proto files
Apr 2, 2019
e053151
Merge branch 'master' into local_storage
Apr 3, 2019
25d27cb
dep --update
Apr 3, 2019
89e8522
make proto
Apr 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 149 additions & 76 deletions Gopkg.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ required = [
name = "github.com/gogo/protobuf"
revision = "ba06b47c162d49f2af050fb4c75bcbc86a159d5c"

[[constraint]]
name = "github.com/dgraph-io/badger"
version = "=1.5.3"

[prune]
go-tests = true
unused-packages = true
Expand Down
27 changes: 16 additions & 11 deletions model/prototest/model_test.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions model/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ import (
"sort"
)

type byTraceID []*TraceID

func (s byTraceID) Len() int { return len(s) }
func (s byTraceID) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTraceID) Less(i, j int) bool {
if s[i].High < s[j].High {
return true
} else if s[i].High > s[j].High {
return false
}
return s[i].Low < s[j].Low
}

// SortTraceIDs sorts a list of TraceIDs
func SortTraceIDs(traceIDs []*TraceID) {
sort.Sort(byTraceID(traceIDs))
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

type traceByTraceID []*Trace

func (s traceByTraceID) Len() int { return len(s) }
Expand Down
21 changes: 21 additions & 0 deletions model/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,24 @@ func TestSortListOfTraces(t *testing.T) {
SortTraces(list2)
assert.EqualValues(t, list1, list2)
}

func TestSortByTraceID(t *testing.T) {
traceID := &TraceID{
High: uint64(1),
Low: uint64(1),
}
traceID2 := &TraceID{
High: uint64(2),
Low: uint64(0),
}
traceID3 := &TraceID{
High: uint64(1),
Low: uint64(0),
}

traces := []*TraceID{traceID, traceID2, traceID3}
// Expect ascending order
tracesExpected := []*TraceID{traceID3, traceID, traceID2}
SortTraceIDs(traces)
assert.EqualValues(t, tracesExpected, traces)
}
99 changes: 99 additions & 0 deletions plugin/storage/badger/dependencystore/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dependencystore

import (
"context"
"time"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
type DependencyStore struct {
reader spanstore.Reader
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(store spanstore.Reader) *DependencyStore {
return &DependencyStore{
reader: store,
}
}

// GetDependencies returns all interservice dependencies, implements DependencyReader
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
deps := map[string]*model.DependencyLink{}

params := &spanstore.TraceQueryParameters{
StartTimeMin: endTs.Add(-1 * lookback),
StartTimeMax: endTs,
}

// We need to do a full table scan - if this becomes a bottleneck, we can write write an index that describes
// dependencyKeyPrefix + timestamp + parent + child key and do a key-only seek (which is fast - but requires additional writes)

// GetDependencies is not shipped with a context like the SpanReader / SpanWriter
traces, err := s.reader.FindTraces(context.Background(), params)
if err != nil {
return nil, err
}
for _, tr := range traces {
processTrace(deps, tr)
}

return depMapToSlice(deps), err
}

// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin
func depMapToSlice(deps map[string]*model.DependencyLink) []model.DependencyLink {
retMe := make([]model.DependencyLink, 0, len(deps))
for _, dep := range deps {
retMe = append(retMe, *dep)
}
return retMe
}

// processTrace is copy from the memory storage plugin
func processTrace(deps map[string]*model.DependencyLink, trace *model.Trace) {
for _, s := range trace.Spans {
parentSpan := seekToSpan(trace, s.ParentSpanID())
if parentSpan != nil {
if parentSpan.Process.ServiceName == s.Process.ServiceName {
continue
}
depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentSpan.Process.ServiceName,
Child: s.Process.ServiceName,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
}
}
}

func seekToSpan(trace *model.Trace, spanID model.SpanID) *model.Span {
for _, s := range trace.Spans {
if s.SpanID == spanID {
return s
}
}
return nil
}
28 changes: 28 additions & 0 deletions plugin/storage/badger/dependencystore/storage_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dependencystore

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/model"
)

func TestSeekToSpan(t *testing.T) {
span := seekToSpan(&model.Trace{}, model.SpanID(uint64(1)))
assert.Nil(t, span)
}
103 changes: 103 additions & 0 deletions plugin/storage/badger/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dependencystore_test

import (
"fmt"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// Opens a badger db and runs a a test on it.
func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader)) {
f := badger.NewFactory()
opts := badger.NewOptions("badger")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--badger.ephemeral=true",
"--badger.consistency=false",
})
f.InitFromViper(v)

err := f.Initialize(metrics.NullFactory, zap.NewNop())
assert.NoError(tb, err)

sw, err := f.CreateSpanWriter()
assert.NoError(tb, err)

dr, err := f.CreateDependencyReader()
assert.NoError(tb, err)

defer func() {
if closer, ok := sw.(io.Closer); ok {
err := closer.Close()
assert.NoError(tb, err)
} else {
tb.FailNow()
}

}()
test(tb, sw, dr)
}

func TestDependencyReader(t *testing.T) {
runFactoryTest(t, func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader) {
tid := time.Now()
links, err := dr.GetDependencies(tid, time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)

traces := 40
spans := 3
for i := 0; i < traces; i++ {
for j := 0; j < spans; j++ {
s := model.Span{
TraceID: model.TraceID{
Low: uint64(i),
High: 1,
},
SpanID: model.SpanID(j),
OperationName: fmt.Sprintf("operation-a"),
Process: &model.Process{
ServiceName: fmt.Sprintf("service-%d", j),
},
StartTime: tid.Add(time.Duration(i)),
Duration: time.Duration(i + j),
}
if j > 0 {
s.References = []model.SpanRef{model.NewChildOfRef(s.TraceID, model.SpanID(j-1))}
}
err := sw.WriteSpan(&s)
assert.NoError(t, err)
}
}
links, err = dr.GetDependencies(time.Now(), time.Hour)
assert.NoError(t, err)
assert.NotEmpty(t, links)
assert.Equal(t, spans-1, len(links)) // First span does not create a dependency
assert.Equal(t, uint64(traces), links[0].CallCount) // Each trace calls the same services
})
}
Loading