Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expr: clean from direct use zipper/limiter from carbonapi app global config #811

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/carbonapi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"time"

"github.com/go-graphite/carbonapi/cache"
"github.com/go-graphite/carbonapi/cmd/carbonapi/interfaces"
"github.com/go-graphite/carbonapi/expr"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/tlsconfig"
zipperCfg "github.com/go-graphite/carbonapi/zipper/config"
zipper "github.com/go-graphite/carbonapi/zipper/interfaces"
zipperTypes "github.com/go-graphite/carbonapi/zipper/types"

"github.com/lomik/zapwriter"
Expand Down Expand Up @@ -116,7 +117,7 @@ type ConfigType struct {
DefaultTimeZone *time.Location `mapstructure:"-" json:"-"`

// ZipperInstance is API entry to carbonzipper
ZipperInstance interfaces.CarbonZipper `mapstructure:"-" json:"-"`
ZipperInstance zipper.CarbonZipper `mapstructure:"-" json:"-"`

// Limiter limits concurrent zipper requests
Limiter limiter.SimpleLimiter `mapstructure:"-" json:"-"`
Expand All @@ -132,6 +133,11 @@ func (c ConfigType) String() string {
}
}

func (c *ConfigType) SetZipper(zipper zipper.CarbonZipper) {
c.ZipperInstance = zipper
expr.InitWithZipper(c.Limiter, c.ZipperInstance)
}

var Config = ConfigType{
ExtrapolateExperiment: false,
Buckets: 10,
Expand Down
2 changes: 1 addition & 1 deletion cmd/carbonapi/http/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func init() {
config.Config.Upstreams.Backends = []string{"dummy"}
config.SetUpConfigUpstreams(logger)
config.SetUpConfig(logger, "(test)")
config.Config.ZipperInstance = newMockCarbonZipper()
config.Config.SetZipper(newMockCarbonZipper())
emptyStringList := make([]string, 0)
InitHandlers(emptyStringList, emptyStringList)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/carbonapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {
dns.UseDNSCache(config.Config.CachingDNSRefreshTime)
}

config.Config.ZipperInstance = newZipper(carbonapiHttp.ZipperStats, &config.Config.Upstreams, config.Config.IgnoreClientTimeout, zapwriter.Logger("zipper"))
config.Config.SetZipper(newZipper(carbonapiHttp.ZipperStats, &config.Config.Upstreams, config.Config.IgnoreClientTimeout, zapwriter.Logger("zipper")))

wg := sync.WaitGroup{}
serve := func(listen config.Listener, handler http.Handler) {
Expand Down
42 changes: 33 additions & 9 deletions expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,39 @@ package expr

import (
"context"
"errors"

"github.com/ansel1/merry"
pb "github.com/go-graphite/protocol/carbonapi_v3_pb"

"github.com/go-graphite/carbonapi/cmd/carbonapi/config"
_ "github.com/go-graphite/carbonapi/expr/functions"
"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/interfaces"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/parser"
utilctx "github.com/go-graphite/carbonapi/util/ctx"
zipper "github.com/go-graphite/carbonapi/zipper/interfaces"
)

type evaluator struct{}
// built-in evaluator with query limiter and CarbonZipper
type evaluator struct {
limiter limiter.SimpleLimiter
zipper zipper.CarbonZipper
}

var ErrZipperNotInit = errors.New("zipper not initailized")

func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (map[parser.MetricRequest][]*types.MetricData, error) {
if err := config.Config.Limiter.Enter(ctx); err != nil {
if eval.zipper == nil {
return values, ErrZipperNotInit
}

if err := eval.limiter.Enter(ctx); err != nil {
return nil, err
}
defer config.Config.Limiter.Leave()
defer eval.limiter.Leave()

multiFetchRequest := pb.MultiFetchRequest{}
metricRequestCache := make(map[string]parser.MetricRequest)
Expand Down Expand Up @@ -74,7 +87,7 @@ func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, unti
}

if len(multiFetchRequest.Metrics) > 0 {
metrics, _, err := config.Config.ZipperInstance.Render(ctx, multiFetchRequest)
metrics, _, err := eval.zipper.Render(ctx, multiFetchRequest)
// If we had only partial result, we want to do our best to actually do our job
if err != nil && merry.HTTPCode(err) >= 400 && !haveFallbackSeries {
return nil, err
Expand All @@ -97,7 +110,7 @@ func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, unti
targetValues[m] = values[m]
}

if config.Config.ZipperInstance.ScaleToCommonStep() {
if eval.zipper.ScaleToCommonStep() {
targetValues = helper.ScaleValuesToCommonStep(targetValues)
}

Expand Down Expand Up @@ -131,10 +144,21 @@ func (eval evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int
return EvalExpr(ctx, exp, from, until, values)
}

var _evaluator = evaluator{}
// _evaluator must be init with InitWithZipper or InitWithEval before usage
var _evaluator interfaces.Evaluator

// InitWithZipper call on configure phase (not thread-safe) with built-in evaluator with query limiter and CarbonZipper (for refetch/etc)
func InitWithZipper(limiter limiter.SimpleLimiter, zipper zipper.CarbonZipper) {
eval := &evaluator{
limiter: limiter,
zipper: zipper,
}
InitWithEval(eval)
}

func init() {
helper.SetEvaluator(_evaluator)
// InitWithEval call on configure phase (not thread-safe) with custom evaluator
func InitWithEval(eval interfaces.Evaluator) {
_evaluator = eval
metadata.SetEvaluator(_evaluator)
}

Expand Down
1 change: 1 addition & 0 deletions expr/expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
func init() {
rewrite.New(make(map[string]string))
functions.New(make(map[string]string))
InitWithZipper(nil, nil)
}

func TestGetBuckets(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/absolute/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func New(configFile string) []interfaces.FunctionMetadata {
}

func (f *absolute) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
return helper.ForEachSeriesDo(ctx, e, from, until, values, func(a *types.MetricData, r *types.MetricData) *types.MetricData {
return helper.ForEachSeriesDo(ctx, f.GetEvaluator(), e, from, until, values, func(a *types.MetricData, r *types.MetricData) *types.MetricData {
for i, v := range a.Values {
if math.IsNaN(a.Values[i]) {
r.Values[i] = math.NaN()
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/absolute/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -15,7 +14,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
4 changes: 2 additions & 2 deletions expr/functions/aggregate/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (f *aggregate) Do(ctx context.Context, e parser.Expr, from, until int64, va
if e.Target() == "aggregate" {
return nil, err
} else {
args, err = helper.GetSeriesArgsAndRemoveNonExisting(ctx, e, from, until, values)
args, err = helper.GetSeriesArgsAndRemoveNonExisting(ctx, f.GetEvaluator(), e, from, until, values)
if err != nil {
return nil, err
}
Expand All @@ -61,7 +61,7 @@ func (f *aggregate) Do(ctx context.Context, e parser.Expr, from, until int64, va
xFilesFactor = -1 // xFilesFactor is not used by the ...Series functions
}
} else {
args, err = helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err = helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aggregate/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

fconfig "github.com/go-graphite/carbonapi/expr/functions/config"
"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -19,7 +18,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aggregateLine/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func New(configFile string) []interfaces.FunctionMetadata {

// aggregateLine(*seriesLists)
func (f *aggregateLine) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aggregateLine/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -17,7 +16,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
4 changes: 2 additions & 2 deletions expr/functions/aggregateSeriesLists/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func (f *aggregateSeriesLists) Do(ctx context.Context, e parser.Expr, from, unti
return nil, parser.ErrMissingArgument
}

seriesList1, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
seriesList1, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
seriesList2, err := helper.GetSeriesArg(ctx, e.Arg(1), from, until, values)
seriesList2, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(1), from, until, values)
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions expr/functions/aggregateSeriesLists/function_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package aggregateSeriesLists

import (
"github.com/go-graphite/carbonapi/expr/helper"
"math"
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
th "github.com/go-graphite/carbonapi/tests"
"math"
"testing"
"time"
)

func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aggregateWithWildcards/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (f *aggregateWithWildcards) Do(ctx context.Context, e parser.Expr, from, un
return nil, parser.ErrMissingArgument
}

args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aggregateWithWildcards/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -17,7 +16,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/alias/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f *alias) Do(ctx context.Context, e parser.Expr, from, until int64, values
return nil, parser.ErrMissingArgument
}

args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/alias/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -16,7 +15,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aliasByBase64/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func New(configFile string) []interfaces.FunctionMetadata {
}

func (f *aliasByBase64) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aliasByBase64/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -16,7 +15,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aliasByMetric/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func New(configFile string) []interfaces.FunctionMetadata {
}

func (f *aliasByMetric) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
return helper.ForEachSeriesDo1(ctx, e, from, until, values, func(a *types.MetricData) *types.MetricData {
return helper.ForEachSeriesDo1(ctx, f.GetEvaluator(), e, from, until, values, func(a *types.MetricData) *types.MetricData {
metric := types.ExtractNameTag(a.Name)
part := strings.Split(metric, ".")
name := part[len(part)-1]
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aliasByMetric/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand All @@ -16,7 +15,6 @@ func init() {
md := New("")
evaluator := th.EvaluatorFromFunc(md[0].F)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
for _, m := range md {
metadata.RegisterFunction(m.Name, m.F)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/aliasByNode/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (f *aliasByNode) Do(ctx context.Context, e parser.Expr, from, until int64,
return nil, parser.ErrMissingArgument
}

args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
args, err := helper.GetSeriesArg(ctx, f.GetEvaluator(), e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions expr/functions/aliasByNode/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
Expand Down Expand Up @@ -42,7 +41,6 @@ func init() {

evaluator := th.EvaluatorFromFuncWithMetadata(metadata.FunctionMD.Functions)
metadata.SetEvaluator(evaluator)
helper.SetEvaluator(evaluator)
}

func TestAliasByNode(t *testing.T) {
Expand Down
Loading
Loading