Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ctxStore and add ctx fileds to logger #369

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/grpc/direct/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package main
import (
"context"

"github.com/gotomicro/ego/server/egovernor"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/gotomicro/ego"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/examples/helloworld"
"github.com/gotomicro/ego/server"
"github.com/gotomicro/ego/server/egovernor"
"github.com/gotomicro/ego/server/egrpc"
)

Expand All @@ -36,6 +36,8 @@ func (g Greeter) SayHello(ctx context.Context, request *helloworld.HelloRequest)
if request.Name == "error" {
return nil, status.Error(codes.Unavailable, "error")
}
egrpc.CtxStoreSet(ctx, "x-biz-guid", "123")
egrpc.CtxStoreSet(ctx, "x-biz-uid", "100")
// header := metadata.Pairs("x-header-key", "val")
// err := grpc.SendHeader(context, header)
// if err != nil {
Expand Down
50 changes: 36 additions & 14 deletions server/egrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ import (

sentinel "github.com/alibaba/sentinel-golang/api"
sentinelBase "github.com/alibaba/sentinel-golang/core/base"
"github.com/gotomicro/ego/core/eerrors"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/esentinel"
"github.com/gotomicro/ego/core/etrace"
"github.com/gotomicro/ego/core/transport"
"github.com/gotomicro/ego/core/util/xstring"
"github.com/gotomicro/ego/internal/ecode"
"github.com/gotomicro/ego/internal/egrpcinteceptor"
"github.com/gotomicro/ego/internal/tools"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
Expand All @@ -32,6 +22,17 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/gotomicro/ego/core/eerrors"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/esentinel"
"github.com/gotomicro/ego/core/etrace"
"github.com/gotomicro/ego/core/transport"
"github.com/gotomicro/ego/core/util/xstring"
"github.com/gotomicro/ego/internal/ecode"
"github.com/gotomicro/ego/internal/egrpcinteceptor"
"github.com/gotomicro/ego/internal/tools"
)

func traceUnaryServerInterceptor() grpc.UnaryServerInterceptor {
Expand Down Expand Up @@ -206,8 +207,23 @@ func (c *Container) prometheusStreamServerInterceptor(ss grpc.ServerStream, info
emetric.ServerHandleCounter.Inc(emetric.TypeGRPCStream, info.FullMethod, getPeerName(ss.Context()), pbStatus.Message(), strconv.Itoa(ecode.GrpcToHTTPStatusCode(pbStatus.Code())), serviceName)
}

type ctxStore struct {
kvs map[string]any
}

type ctxStoreStruct struct{}

// CtxStoreSet 从ctx中尝试获取ctxStore,并往其中插入kv
func CtxStoreSet(ctx context.Context, k string, v any) {
skv, ok := ctx.Value(ctxStoreStruct{}).(*ctxStore)
if ok {
skv.kvs[k] = v
}
}

func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (res interface{}, err error) {
ctx = context.WithValue(ctx, ctxStoreStruct{}, &ctxStore{kvs: map[string]any{}})
// 默认过滤掉该探活日志
if c.config.EnableSkipHealthLog && info.FullMethod == "/grpc.health.v1.Health/Check" {
return handler(ctx, req)
Expand Down Expand Up @@ -270,7 +286,13 @@ func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor
elog.FieldPeerIP(getPeerIP(ctx)),
)

skv, skvOk := ctx.Value(ctxStoreStruct{}).(*ctxStore)
for _, key := range loggerKeys {
if skvOk {
if v, ok := skv.kvs[key]; ok {
fields = append(fields, elog.Any(strings.ToLower(key), v))
}
}
if value := tools.ContextValue(ctx, key); value != "" {
fields = append(fields, elog.FieldCustomKeyValue(key, value))
}
Expand Down Expand Up @@ -319,7 +341,7 @@ func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor
c.prometheusUnaryServerInterceptor(ctx, info, spbStatus, cost)
}()

//if enableCPUUsage(ctx) {
// if enableCPUUsage(ctx) {
// var stat = xcpu.Stat{}
// xcpu.ReadStat(&stat)
// if stat.Usage > 0 {
Expand All @@ -330,7 +352,7 @@ func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor
// c.logger.Error("set header error", elog.FieldErr(err))
// }
// }
//}
// }
return handler(ctx, req)
}
}
Expand All @@ -349,9 +371,9 @@ func (c *Container) prometheusUnaryServerInterceptor(ctx context.Context, info *
}

// enableCPUUsage 是否开启cpu利用率
//func enableCPUUsage(ctx context.Context) bool {
// func enableCPUUsage(ctx context.Context) bool {
// return tools.GrpcHeaderValue(ctx, "enable-cpu-usage") == "true"
//}
// }

// getPeerName 获取对端应用名称
func getPeerName(ctx context.Context) string {
Expand Down
Loading