Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add /api/sampling endpoint in collector #1990

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/flags"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
Expand Down Expand Up @@ -253,12 +254,23 @@ func startCollector(
r := mux.NewRouter()
apiHandler := collectorApp.NewAPIHandler(jaegerBatchesHandler)
apiHandler.RegisterRoutes(r)
httpPortStr := ":" + strconv.Itoa(cOpts.CollectorHTTPPort)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

cfgHandler := clientcfgHandler.NewHTTPHandler(clientcfgHandler.HTTPHandlerParams{
ConfigManager: &clientcfgHandler.ConfigManager{
SamplingStrategyStore: strategyStore,
// TODO provide baggage manager
},
MetricsFactory: metricsFactory,
BasePath: "/api",
LegacySamplingEndpoint: false,
})
cfgHandler.RegisterRoutes(r)

recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)
go startZipkinHTTPAPI(logger, cOpts.CollectorZipkinHTTPPort, zipkinSpansHandler, recoveryHandler)

logger.Info("Starting jaeger-collector HTTP server", zap.Int("http-port", cOpts.CollectorHTTPPort))
httpPortStr := ":" + strconv.Itoa(cOpts.CollectorHTTPPort)
logger.Info("Starting jaeger-collector HTTP server", zap.String("http-host-port", httpPortStr))
go func() {
if err := http.ListenAndServe(httpPortStr, recoveryHandler(r)); err != nil {
logger.Fatal("Could not launch jaeger-collector HTTP server", zap.Error(err))
Expand Down
17 changes: 15 additions & 2 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/docs"
"github.com/jaegertracing/jaeger/cmd/env"
"github.com/jaegertracing/jaeger/cmd/flags"
clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
Expand Down Expand Up @@ -135,13 +136,25 @@ func main() {
r := mux.NewRouter()
apiHandler := app.NewAPIHandler(jaegerBatchesHandler)
apiHandler.RegisterRoutes(r)
httpPortStr := ":" + strconv.Itoa(builderOpts.CollectorHTTPPort)

cfgHandler := clientcfgHandler.NewHTTPHandler(clientcfgHandler.HTTPHandlerParams{
ConfigManager: &clientcfgHandler.ConfigManager{
SamplingStrategyStore: strategyStore,
// TODO provide baggage manager
},
MetricsFactory: metricsFactory,
BasePath: "/api",
LegacySamplingEndpoint: false,
})
cfgHandler.RegisterRoutes(r)

recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)
httpHandler := recoveryHandler(r)

go startZipkinHTTPAPI(logger, builderOpts.CollectorZipkinHTTPPort, builderOpts.CollectorZipkinAllowedOrigins, builderOpts.CollectorZipkinAllowedHeaders, zipkinSpansHandler, recoveryHandler)

logger.Info("Starting jaeger-collector HTTP server", zap.Int("http-port", builderOpts.CollectorHTTPPort))
httpPortStr := ":" + strconv.Itoa(builderOpts.CollectorHTTPPort)
logger.Info("Starting jaeger-collector HTTP server", zap.String("http-host-port", httpPortStr))
go func() {
if err := http.ListenAndServe(httpPortStr, httpHandler); err != nil {
logger.Fatal("Could not launch service", zap.Error(err))
Expand Down
42 changes: 42 additions & 0 deletions pkg/clientcfg/clientcfghttp/cfgmgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientcfghttp

import (
"errors"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// ConfigManager implements ClientConfigManager.
type ConfigManager struct {
SamplingStrategyStore strategystore.StrategyStore
BaggageManager baggage.BaggageRestrictionManager
}

// GetSamplingStrategy implements ClientConfigManager.GetSamplingStrategy.
func (c *ConfigManager) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
return c.SamplingStrategyStore.GetSamplingStrategy(serviceName)
}

// GetBaggageRestrictions implements ClientConfigManager.GetBaggageRestrictions.
func (c *ConfigManager) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
if c.BaggageManager == nil {
return nil, errors.New("baggage restrictions not implemented")
}
return c.BaggageManager.GetBaggageRestrictions(serviceName)
}
75 changes: 75 additions & 0 deletions pkg/clientcfg/clientcfghttp/cfgmgr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientcfghttp

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

type mockSamplingStore struct {
samplingResponse *sampling.SamplingStrategyResponse
}

func (m *mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
if m.samplingResponse == nil {
return nil, errors.New("no mock response provided")
}
return m.samplingResponse, nil
}

type mockBaggageMgr struct {
baggageResponse []*baggage.BaggageRestriction
}

func (m *mockBaggageMgr) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
if m.baggageResponse == nil {
return nil, errors.New("no mock response provided")
}
return m.baggageResponse, nil
}

func TestConfigManager(t *testing.T) {
bgm := &mockBaggageMgr{}
mgr := &ConfigManager{
SamplingStrategyStore: &mockSamplingStore{
samplingResponse: &sampling.SamplingStrategyResponse{},
},
BaggageManager: bgm,
}
t.Run("GetSamplingStrategy", func(t *testing.T) {
r, err := mgr.GetSamplingStrategy("foo")
require.NoError(t, err)
assert.Equal(t, sampling.SamplingStrategyResponse{}, *r)
})
t.Run("GetBaggageRestrictions", func(t *testing.T) {
expResp := []*baggage.BaggageRestriction{}
bgm.baggageResponse = expResp
r, err := mgr.GetBaggageRestrictions("foo")
require.NoError(t, err)
assert.Equal(t, expResp, r)
})
t.Run("GetBaggageRestrictionsError", func(t *testing.T) {
mgr.BaggageManager = nil
_, err := mgr.GetBaggageRestrictions("foo")
assert.EqualError(t, err, "baggage not implemented")
})
}
33 changes: 18 additions & 15 deletions pkg/clientcfg/clientcfghttp/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,22 @@ var (

// HTTPHandlerParams contains parameters that must be passed to NewHTTPHandler.
type HTTPHandlerParams struct {
ConfigManager configmanager.ClientConfigManager // required
MetricsFactory metrics.Factory // required
ConfigManager configmanager.ClientConfigManager // required
MetricsFactory metrics.Factory // required

// BasePath will be used as a prefix for the endpoints, e.g. "/api"
BasePath string

// LegacySamplingEndpoint enables returning sampling strategy from "/" endpoint
// using Thrift 0.9.2 enum codes.
LegacySamplingEndpoint bool
}

// HTTPHandler implements endpoints for used by Jaeger clients to retrieve client configuration,
// such as sampling and baggage restrictions.
type HTTPHandler struct {
legacySamplingEndpoint bool
manager configmanager.ClientConfigManager
metrics struct {
params HTTPHandlerParams
metrics struct {
// Number of good sampling requests
SamplingRequestSuccess metrics.Counter `metric:"http-server.requests" tags:"type=sampling"`

Expand All @@ -73,27 +78,25 @@ type HTTPHandler struct {

// NewHTTPHandler creates new HTTPHandler.
func NewHTTPHandler(params HTTPHandlerParams) *HTTPHandler {
handler := &HTTPHandler{
manager: params.ConfigManager,
legacySamplingEndpoint: params.LegacySamplingEndpoint,
}
handler := &HTTPHandler{params: params}
metrics.MustInit(&handler.metrics, params.MetricsFactory, nil)
return handler
}

// RegisterRoutes registers configuration handlers with Gorilla Router.
func (h *HTTPHandler) RegisterRoutes(router *mux.Router) {
if h.legacySamplingEndpoint {
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
prefix := h.params.BasePath
if h.params.LegacySamplingEndpoint {
router.HandleFunc(prefix+"/", func(w http.ResponseWriter, r *http.Request) {
h.serveSamplingHTTP(w, r, true /* thriftEnums092 */)
}).Methods(http.MethodGet)
}

router.HandleFunc("/sampling", func(w http.ResponseWriter, r *http.Request) {
router.HandleFunc(prefix+"/sampling", func(w http.ResponseWriter, r *http.Request) {
h.serveSamplingHTTP(w, r, false /* thriftEnums092 */)
}).Methods(http.MethodGet)

router.HandleFunc("/baggageRestrictions", func(w http.ResponseWriter, r *http.Request) {
router.HandleFunc(prefix+"/baggageRestrictions", func(w http.ResponseWriter, r *http.Request) {
h.serveBaggageHTTP(w, r)
}).Methods(http.MethodGet)

Expand Down Expand Up @@ -123,7 +126,7 @@ func (h *HTTPHandler) serveSamplingHTTP(w http.ResponseWriter, r *http.Request,
if err != nil {
return
}
resp, err := h.manager.GetSamplingStrategy(service)
resp, err := h.params.ConfigManager.GetSamplingStrategy(service)
if err != nil {
h.metrics.CollectorProxyFailures.Inc(1)
http.Error(w, fmt.Sprintf("collector error: %+v", err), http.StatusInternalServerError)
Expand Down Expand Up @@ -153,7 +156,7 @@ func (h *HTTPHandler) serveBaggageHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
resp, err := h.manager.GetBaggageRestrictions(service)
resp, err := h.params.ConfigManager.GetBaggageRestrictions(service)
if err != nil {
h.metrics.CollectorProxyFailures.Inc(1)
http.Error(w, fmt.Sprintf("collector error: %+v", err), http.StatusInternalServerError)
Expand Down
Loading