Skip to content

Commit

Permalink
Support gRPC for query service (#1307)
Browse files Browse the repository at this point in the history
* Initial commit, grpc for query-service

Signed-off-by: Annanay <annanay.a@media.net>

* Re-evaluate fields in GRPCHandler

Signed-off-by: Annanay <annanay.a@media.net>

* Use cmux, use stream in GetTraceResponse, regenerate *pb.go and swagger doc

Signed-off-by: Annanay <annanay.a@media.net>

* Edit GetTrace HTTP verb

Signed-off-by: Annanay <annanay.a@media.net>

* Re-add query definitions in a new file

Signed-off-by: Annanay <annanay.a@media.net>

* Address comments, correct API endpoint for query grpc

Signed-off-by: Annanay <annanay.a@media.net>

* Add definitions for other query functions

Signed-off-by: Annanay <annanay.a@media.net>

* Address comments, make API as close to plugin proto

Signed-off-by: Annanay <annanay.a@media.net>

* Fix api definitions

Signed-off-by: Annanay <annanay.a@media.net>

* Add dependency query utility

Signed-off-by: Annanay <annanay.a@media.net>

* Make fmt, add findtraces implementation

Signed-off-by: Annanay <annanay.a@media.net>

* Correct rebase

Signed-off-by: Annanay <annanay.a@media.net>

* Use end_time instead of duration

Signed-off-by: Annanay <annanay.a@media.net>

* dep ensure, fix build

Signed-off-by: Annanay <annanay.a@media.net>

* Addressed comments, fix proto tool versions

Signed-off-by: Annanay <annanay.a@media.net>

* Fix build

Signed-off-by: Annanay <annanay.a@media.net>

* Fix build

Signed-off-by: Annanay <annanay.a@media.net>

* Remove grpc-gateway code generation; use streaming

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Implement streaming

Signed-off-by: Yuri Shkuro <ys@uber.com>

* make fmt lint

Signed-off-by: Yuri Shkuro <ys@uber.com>

* dep update

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Start adding tests

Signed-off-by: Annanay <annanay.a@media.net>

* Add tests for GRPC handler

Signed-off-by: Annanay <annanay.a@media.net>

* Fix grpc handler tests

Signed-off-by: Annanay <annanay.a@media.net>

* WIP fixing tests

Signed-off-by: Annanay <annanay.a@media.net>

* Fix tests

Signed-off-by: Annanay <annanay.a@media.net>

* Fix tests

Signed-off-by: Annanay <annanay.a@media.net>

* Add test

Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
annanay25 authored and yurishkuro committed Apr 22, 2019
1 parent 7009c98 commit 5b8c1f4
Show file tree
Hide file tree
Showing 11 changed files with 3,987 additions and 589 deletions.
27 changes: 14 additions & 13 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ required = [
name = "github.com/grpc-ecosystem/go-grpc-middleware"
version = "1.0.0"

[[constaint]]
[[constraint]]
name = "github.com/soheilhy/cmux"
version = "0.1.4"

[[constraint]]
name = "github.com/rs/cors"
version = "1.3.0"

5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,10 @@ proto:
$(PROTOC) \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/ \
--grpc-gateway_out=$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/ \
--swagger_out=allow_merge=true:$(PWD)/proto-gen/openapi/ \
model/proto/api_v2/*.proto
### grpc-gateway generates 'query.pb.gw.go' that does not respect (gogoproto.customname) = "TraceID"
### --grpc-gateway_out=$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/ \
### --swagger_out=allow_merge=true:$(PWD)/proto-gen/openapi/ \
$(PROTOC) \
$(PROTO_INCLUDES) \
Expand Down
153 changes: 153 additions & 0 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"context"

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const maxSpanCountInChunk = 10

// GRPCHandler implements the GRPC endpoint of the query service.
type GRPCHandler struct {
queryService querysvc.QueryService
logger *zap.Logger
tracer opentracing.Tracer
}

// NewGRPCHandler returns a GRPCHandler
func NewGRPCHandler(queryService querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler {
gH := &GRPCHandler{
queryService: queryService,
logger: logger,
tracer: tracer,
}

return gH
}

// GetTrace is the GRPC handler to fetch traces based on trace-id.
func (g *GRPCHandler) GetTrace(r *api_v2.GetTraceRequest, stream api_v2.QueryService_GetTraceServer) error {
trace, err := g.queryService.GetTrace(stream.Context(), r.TraceID)
if err == spanstore.ErrTraceNotFound {
g.logger.Error("trace not found", zap.Error(err))
return err
}
if err != nil {
g.logger.Error("Could not fetch spans from backend", zap.Error(err))
return err
}
return g.sendSpanChunks(trace.Spans, stream.Send)
}

// ArchiveTrace is the GRPC handler to archive traces.
func (g *GRPCHandler) ArchiveTrace(ctx context.Context, r *api_v2.ArchiveTraceRequest) (*api_v2.ArchiveTraceResponse, error) {
err := g.queryService.ArchiveTrace(ctx, r.TraceID)
if err == spanstore.ErrTraceNotFound {
g.logger.Error("trace not found", zap.Error(err))
return nil, err
}
if err != nil {
g.logger.Error("Could not fetch spans from backend", zap.Error(err))
return nil, err
}

return &api_v2.ArchiveTraceResponse{}, nil
}

// FindTraces is the GRPC handler to fetch traces based on TraceQueryParameters.
func (g *GRPCHandler) FindTraces(r *api_v2.FindTracesRequest, stream api_v2.QueryService_FindTracesServer) error {
query := r.GetQuery()
queryParams := spanstore.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: int(query.SearchDepth),
}
traces, err := g.queryService.FindTraces(stream.Context(), &queryParams)
if err != nil {
g.logger.Error("Error fetching traces", zap.Error(err))
return err
}
for _, trace := range traces {
if err := g.sendSpanChunks(trace.Spans, stream.Send); err != nil {
return err
}
}
return nil
}

func (g *GRPCHandler) sendSpanChunks(spans []*model.Span, sendFn func(*api_v2.SpansResponseChunk) error) error {
chunk := make([]model.Span, 0, len(spans))
for i := 0; i < len(spans); i += maxSpanCountInChunk {
chunk = chunk[:0]
for j := i; j < len(spans) && j < i+maxSpanCountInChunk; j++ {
chunk = append(chunk, *spans[j])
}
if err := sendFn(&api_v2.SpansResponseChunk{Spans: chunk}); err != nil {
g.logger.Error("failed to send response to client", zap.Error(err))
return err
}
}
return nil
}

// GetServices is the GRPC handler to fetch services.
func (g *GRPCHandler) GetServices(ctx context.Context, r *api_v2.GetServicesRequest) (*api_v2.GetServicesResponse, error) {
services, err := g.queryService.GetServices(ctx)
if err != nil {
g.logger.Error("Error fetching services", zap.Error(err))
return nil, err
}

return &api_v2.GetServicesResponse{Services: services}, nil
}

// GetOperations is the GRPC handler to fetch operations.
func (g *GRPCHandler) GetOperations(ctx context.Context, r *api_v2.GetOperationsRequest) (*api_v2.GetOperationsResponse, error) {
service := r.Service
operations, err := g.queryService.GetOperations(ctx, service)
if err != nil {
g.logger.Error("Error fetching operations", zap.Error(err))
return nil, err
}

return &api_v2.GetOperationsResponse{Operations: operations}, nil
}

// GetDependencies is the GRPC handler to fetch dependencies.
func (g *GRPCHandler) GetDependencies(ctx context.Context, r *api_v2.GetDependenciesRequest) (*api_v2.GetDependenciesResponse, error) {
startTime := r.StartTime
endTime := r.EndTime
dependencies, err := g.queryService.GetDependencies(startTime, endTime.Sub(startTime))
if err != nil {
g.logger.Error("Error fetching dependencies", zap.Error(err))
return nil, err
}

return &api_v2.GetDependenciesResponse{Dependencies: dependencies}, nil
}
Loading

0 comments on commit 5b8c1f4

Please sign in to comment.