Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start moving components of adaptive sampling to OSS #973

Merged
merged 7 commits into from
Nov 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pkg/io/starter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io

// Starter is the interface that wraps the basic Start() method.
type Starter interface {
Start() error
}
20 changes: 20 additions & 0 deletions pkg/testutils/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package testutils

import (
"encoding/json"
"fmt"
"strings"
"sync"

"go.uber.org/zap"
Expand Down Expand Up @@ -88,3 +90,21 @@ func (b *Buffer) Write(p []byte) (int, error) {
defer b.Unlock()
return b.Buffer.Write(p)
}

// LogMatcher is a helper func that returns true if the subStr appears more than 'occurrences' times in the logs.
var LogMatcher = func(occurrences int, subStr string, logs []string) (bool, string) {
errMsg := fmt.Sprintf("subStr '%s' does not occur %d time(s) in %v", subStr, occurrences, logs)
if len(logs) < occurrences {
return false, errMsg
}
var count int
for _, log := range logs {
if strings.Contains(log, subStr) {
count++
}
}
if count >= occurrences {
return true, ""
}
return false, errMsg
}
25 changes: 25 additions & 0 deletions pkg/testutils/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package testutils

import (
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -66,3 +67,27 @@ func TestRaceCondition(t *testing.T) {
close(start)
finish.Wait()
}

func TestLogMatcher(t *testing.T) {
tests := []struct {
occurences int
subStr string
logs []string
expected bool
errMsg string
}{
{occurences: 1, expected: false, errMsg: "subStr '' does not occur 1 time(s) in []"},
{occurences: 1, subStr: "hi", logs: []string{"hi"}, expected: true},
{occurences: 3, subStr: "hi", logs: []string{"hi", "hi"}, expected: false, errMsg: "subStr 'hi' does not occur 3 time(s) in [hi hi]"},
{occurences: 3, subStr: "hi", logs: []string{"hi", "hi", "hi"}, expected: true},
{occurences: 1, subStr: "hi", logs: []string{"bye", "bye"}, expected: false, errMsg: "subStr 'hi' does not occur 1 time(s) in [bye bye]"},
}
for i, tt := range tests {
test := tt
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
match, errMsg := LogMatcher(test.occurences, test.subStr, test.logs)
assert.Equal(t, test.expected, match)
assert.Equal(t, test.errMsg, errMsg)
})
}
}
28 changes: 28 additions & 0 deletions plugin/sampling/internal/calculationstrategy/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2018 The Jaeger Authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the thinking for this thing to be under plugins? I can see swappable storage to be under plugins, but this seems like the core impl. I would keep it inside the collector app.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is core only to the adaptive sampling strategystore. Don't feel the collector app is the right place for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gonna go ahead and land, we can figure out where these files should live later

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calculationstrategy

// ProbabilityCalculator calculates the new probability given the current and target QPS
type ProbabilityCalculator interface {
Calculate(targetQPS, curQPS, prevProbability float64) (newProbability float64)
}

// CalculateFunc wraps a function of appropriate signature and makes a ProbabilityCalculator from it.
type CalculateFunc func(targetQPS, curQPS, prevProbability float64) (newProbability float64)

// Calculate implements Calculator interface.
func (c CalculateFunc) Calculate(targetQPS, curQPS, prevProbability float64) float64 {
return c(targetQPS, curQPS, prevProbability)
}
29 changes: 29 additions & 0 deletions plugin/sampling/internal/calculationstrategy/interface_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calculationstrategy

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCalculateFunc(t *testing.T) {
c := CalculateFunc(func(targetQPS, qps, oldProbability float64) float64 {
return targetQPS
})
val := 1.0
assert.Equal(t, val, c.Calculate(val, 0, 0))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calculationstrategy

const (
defaultPercentageIncreaseCap = 0.5
)

// PercentageIncreaseCappedCalculator is a probability calculator that caps the probability
// increase to a certain percentage of the previous probability.
//
// Given prevProb = 0.1, newProb = 0.5, and cap = 0.5:
// (0.5 - 0.1)/0.1 = 400% increase. Given that our cap is 50%, the probability can only
// increase to 0.15.
//
// Given prevProb = 0.4, newProb = 0.5, and cap = 0.5:
// (0.5 - 0.4)/0.4 = 25% increase. Given that this is below our cap of 50%, the probability
// can increase to 0.5.
type PercentageIncreaseCappedCalculator struct {
percentageIncreaseCap float64
}

// NewPercentageIncreaseCappedCalculator returns a new percentage increase capped calculator.
func NewPercentageIncreaseCappedCalculator(percentageIncreaseCap float64) PercentageIncreaseCappedCalculator {
if percentageIncreaseCap == 0 {
percentageIncreaseCap = defaultPercentageIncreaseCap
}
return PercentageIncreaseCappedCalculator{
percentageIncreaseCap: percentageIncreaseCap,
}
}

// Calculate calculates the new probability.
func (c PercentageIncreaseCappedCalculator) Calculate(targetQPS, curQPS, prevProbability float64) float64 {
factor := targetQPS / curQPS
newProbability := prevProbability * factor
// If curQPS is lower than the targetQPS, we need to increase the probability slowly to
// defend against oversampling.
// Else if curQPS is higher than the targetQPS, jump directly to the newProbability to
// defend against oversampling.
if factor > 1.0 {
percentIncrease := (newProbability - prevProbability) / prevProbability
if percentIncrease > c.percentageIncreaseCap {
newProbability = prevProbability + (prevProbability * c.percentageIncreaseCap)
}
}
return newProbability
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calculationstrategy

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestPercentageIncreaseCappedCalculator(t *testing.T) {
calculator := NewPercentageIncreaseCappedCalculator(0)
tests := []struct {
targetQPS float64
curQPS float64
oldProbability float64
expectedProbability float64
testName string
}{
{1.0, 2.0, 0.1, 0.05, "test1"},
{1.0, 0.5, 0.1, 0.15, "test2"},
{1.0, 0.8, 0.1, 0.125, "test3"},
}
for _, tt := range tests {
probability := calculator.Calculate(tt.targetQPS, tt.curQPS, tt.oldProbability)
assert.InDelta(t, probability, tt.expectedProbability, 0.0001, tt.testName)
}
}
117 changes: 117 additions & 0 deletions plugin/sampling/internal/leaderelection/leader_election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"sync"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

dl "github.com/jaegertracing/jaeger/pkg/distributedlock"
)

const (
acquireLockErrMsg = "Failed to acquire lock"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a const

)

// ElectionParticipant partakes in leader election to become leader.
type ElectionParticipant interface {
IsLeader() bool
}

type electionParticipant struct {
ElectionParticipantOptions
lock dl.Lock
isLeader *atomic.Bool
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
resourceName string
closeChan chan struct{}
wg sync.WaitGroup
}

// ElectionParticipantOptions control behavior of the election participant. TODO func applyDefaults(), parameter error checking, etc.
type ElectionParticipantOptions struct {
LeaderLeaseRefreshInterval time.Duration
FollowerLeaseRefreshInterval time.Duration
Logger *zap.Logger
}

// NewElectionParticipant returns a ElectionParticipant which attempts to become leader.
func NewElectionParticipant(lock dl.Lock, resourceName string, options ElectionParticipantOptions) ElectionParticipant {
return &electionParticipant{
ElectionParticipantOptions: options,
lock: lock,
resourceName: resourceName,
isLeader: atomic.NewBool(false),
closeChan: make(chan struct{}),
}
}

// Start runs a background thread which attempts to acquire the leader lock.
func (p *electionParticipant) Start() error {
p.wg.Add(1)
go p.runAcquireLockLoop()
return nil
}

// Close implements io.Closer.
func (p *electionParticipant) Close() error {
close(p.closeChan)
p.wg.Wait()
return nil
}

// IsLeader returns true if this process is the leader.
func (p *electionParticipant) IsLeader() bool {
return p.isLeader.Load()
}

// runAcquireLockLoop attempts to acquire the leader lock. If it succeeds, it will attempt to retain it,
// otherwise it sleeps and attempts to gain the lock again.
func (p *electionParticipant) runAcquireLockLoop() {
defer p.wg.Done()
ticker := time.NewTicker(p.acquireLock())
for {
select {
case <-ticker.C:
ticker.Stop()
ticker = time.NewTicker(p.acquireLock())
case <-p.closeChan:
ticker.Stop()
return
}
}
}

// acquireLock attempts to acquire the lock and returns the interval to sleep before the next retry.
func (p *electionParticipant) acquireLock() time.Duration {
if acquiredLeaderLock, err := p.lock.Acquire(p.resourceName, p.FollowerLeaseRefreshInterval); err == nil {
p.setLeader(acquiredLeaderLock)
} else {
p.Logger.Error(acquireLockErrMsg, zap.Error(err))
}
if p.IsLeader() {
// If this process holds the leader lock, retry with a shorter cadence
// to retain the leader lease.
return p.LeaderLeaseRefreshInterval
}
// If this process failed to acquire the leader lock, retry with a longer cadence
return p.FollowerLeaseRefreshInterval
}

func (p *electionParticipant) setLeader(isLeader bool) {
p.isLeader.Store(isLeader)
}
Loading