Skip to content

Commit

Permalink
fix: construct a TraceContext with the correct traceparent (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown committed Jul 19, 2022
1 parent d703959 commit 3ecb2cc
Show file tree
Hide file tree
Showing 25 changed files with 288 additions and 232 deletions.
24 changes: 12 additions & 12 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,6 @@ func init() {
rootCommand.AddCommand(startCommand)
}

func initTracing(ctx context.Context, jaegerEndpoint string) {
traceCtl, err := tracing.NewTracer(Version, jaegerEndpoint)
if err != nil {
log.Fatalf("could not setup tracing manager: %s", err.Error())
}

go func() {
<-ctx.Done()
traceCtl.Shutdown(ctx)
}()
}

func initServer(ctx context.Context, lis net.Listener) {
go func() {
<-ctx.Done()
Expand All @@ -234,6 +222,18 @@ func initServer(ctx context.Context, lis net.Listener) {
log.Infof("start api server : %s", lis.Addr())
}

func initTracing(ctx context.Context, jaegerEndpoint string) {
traceCtl, err := tracing.NewTracer(Version, jaegerEndpoint)
if err != nil {
log.Fatalf("could not setup tracing manager: %s", err.Error())
}

go func() {
<-ctx.Done()
traceCtl.Shutdown(ctx)
}()
}

//func initHolmes() *holmes.Holmes {
// logUtils.DefaultLogger.SetLogLevel(logUtils.ERROR)
// h, _ := holmes.New(
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b // indirect
go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0 // indirect
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/jaeger v1.7.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
go.uber.org/multierr v1.7.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -917,14 +916,11 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/exporters/jaeger v1.7.0 h1:wXgjiRldljksZkZrldGVe6XrG9u3kYDyQmkZwmm5dI0=
go.opentelemetry.io/otel/exporters/jaeger v1.7.0/go.mod h1:PwQAOqBgqbLQRKlj466DuD2qyMjbtcPpfPfj+AqbSBs=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0 h1:8hPcgCg0rUJiKE6VWahRvjgLUrNl7rW2hffUEPKXVEM=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0/go.mod h1:K4GDXPY6TjUiwbOh+DkKaEdCF8y+lvMoM6SeAPyfCCM=
go.opentelemetry.io/otel/sdk v1.7.0 h1:4OmStpcKVOfvDOgCt7UriAPtKolwIhxpnSNI/yK+1B0=
go.opentelemetry.io/otel/sdk v1.7.0/go.mod h1:uTEOTwaqIVuTGiJN7ii13Ibp75wJmYUDe374q6cZwUU=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
Expand Down
57 changes: 29 additions & 28 deletions pkg/executor/read_write_splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,24 +153,24 @@ func (executor *ReadWriteSplittingExecutor) ExecuteFieldList(ctx context.Context
}

func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context, sql string) (proto.Result, uint16, error) {
newCtx, span := tracing.GetTraceSpan(ctx, tracing.RWExecComQuery)
defer span.End()
var (
db *DataSourceBrief
tx proto.Tx
result proto.Result
err error
)

connectionID := proto.ConnectionID(newCtx)
queryStmt := proto.QueryStmt(newCtx)
spanCtx, span := tracing.GetTraceSpan(ctx, tracing.RWSComQuery)
defer span.End()

connectionID := proto.ConnectionID(spanCtx)
queryStmt := proto.QueryStmt(spanCtx)
switch stmt := queryStmt.(type) {
case *ast.SetStmt:
if shouldStartTransaction(stmt) {
db = executor.masters.Next(proto.WithMaster(newCtx)).(*DataSourceBrief)
db = executor.masters.Next(proto.WithMaster(spanCtx)).(*DataSourceBrief)
// TODO add metrics
tx, result, err = db.DB.Begin(newCtx)
tx, result, err = db.DB.Begin(spanCtx)
if err != nil {
return nil, 0, err
}
Expand All @@ -180,12 +180,12 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
tx = txi.(proto.Tx)
return tx.Query(newCtx, sql)
return tx.Query(spanCtx, sql)
}
// set to all db
for _, db := range executor.all {
go func(db *DataSourceBrief) {
if _, _, err := db.DB.Query(newCtx, sql); err != nil {
if _, _, err := db.DB.Query(spanCtx, sql); err != nil {
log.Error(err)
}
}(db)
Expand All @@ -196,9 +196,9 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context
}, 0, nil
}
case *ast.BeginStmt:
db = executor.masters.Next(proto.WithMaster(newCtx)).(*DataSourceBrief)
db = executor.masters.Next(proto.WithMaster(spanCtx)).(*DataSourceBrief)
// TODO add metrics
tx, result, err = db.DB.Begin(newCtx)
tx, result, err = db.DB.Begin(spanCtx)
if err != nil {
return nil, 0, err
}
Expand All @@ -212,7 +212,7 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context
defer executor.localTransactionMap.Delete(connectionID)
tx = txi.(proto.Tx)
// TODO add metrics
if result, err = tx.Commit(newCtx); err != nil {
if result, err = tx.Commit(spanCtx); err != nil {
return nil, 0, err
}
return result, 0, err
Expand All @@ -224,7 +224,7 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context
defer executor.localTransactionMap.Delete(connectionID)
tx = txi.(proto.Tx)
// TODO add metrics
if result, err = tx.Rollback(newCtx); err != nil {
if result, err = tx.Rollback(spanCtx); err != nil {
return nil, 0, err
}
return result, 0, err
Expand All @@ -233,19 +233,19 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context
if ok {
// in local transaction
tx = txi.(proto.Tx)
return tx.Query(newCtx, sql)
return tx.Query(spanCtx, sql)
}
withMasterCtx := proto.WithMaster(newCtx)
withMasterCtx := proto.WithMaster(spanCtx)
db = executor.masters.Next(withMasterCtx).(*DataSourceBrief)
return db.DB.Query(withMasterCtx, sql)
case *ast.SelectStmt:
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
// in local transaction
tx = txi.(proto.Tx)
return tx.Query(newCtx, sql)
return tx.Query(spanCtx, sql)
}
withSlaveCtx := proto.WithSlave(newCtx)
withSlaveCtx := proto.WithSlave(spanCtx)
if has, dsName := hasUseDBHint(stmt.TableHints); has {
protoDB := resource.GetDBManager().GetDB(dsName)
if protoDB == nil {
Expand All @@ -263,42 +263,43 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context
if ok {
// in local transaction
tx = txi.(proto.Tx)
return tx.Query(newCtx, sql)
return tx.Query(spanCtx, sql)
}
withSlaveCtx := proto.WithSlave(newCtx)
withSlaveCtx := proto.WithSlave(spanCtx)
db = executor.reads.Next(withSlaveCtx).(*DataSourceBrief)
return db.DB.Query(withSlaveCtx, sql)
}
}

func (executor *ReadWriteSplittingExecutor) ExecutorComStmtExecute(ctx context.Context, stmt *proto.Stmt) (proto.Result, uint16, error) {
newCtx, span := tracing.GetTraceSpan(ctx, tracing.RWExecComStmt)
spanCtx, span := tracing.GetTraceSpan(ctx, tracing.RWSComStmtExecute)
defer span.End()
connectionID := proto.ConnectionID(newCtx)

connectionID := proto.ConnectionID(spanCtx)
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
// in local transaction
tx := txi.(proto.Tx)
return tx.ExecuteStmt(newCtx, stmt)
return tx.ExecuteStmt(spanCtx, stmt)
}
switch st := stmt.StmtNode.(type) {
case *ast.InsertStmt, *ast.DeleteStmt, *ast.UpdateStmt:
db := executor.masters.Next(proto.WithMaster(newCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithMaster(newCtx), stmt)
db := executor.masters.Next(proto.WithMaster(spanCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithMaster(spanCtx), stmt)
case *ast.SelectStmt:
var db *DataSourceBrief
if has, dsName := hasUseDBHint(st.TableHints); has {
protoDB := resource.GetDBManager().GetDB(dsName)
if protoDB == nil {
log.Debugf("data source %d not found", dsName)
db = executor.reads.Next(proto.WithSlave(newCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(newCtx), stmt)
db = executor.reads.Next(proto.WithSlave(spanCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(spanCtx), stmt)
} else {
return protoDB.ExecuteStmt(proto.WithSlave(newCtx), stmt)
return protoDB.ExecuteStmt(proto.WithSlave(spanCtx), stmt)
}
}
db = executor.reads.Next(proto.WithSlave(newCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(newCtx), stmt)
db = executor.reads.Next(proto.WithSlave(spanCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(spanCtx), stmt)
default:
return nil, 0, errors.Errorf("unsupported %t statement", stmt.StmtNode)
}
Expand Down
26 changes: 14 additions & 12 deletions pkg/executor/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (executor *ShardingExecutor) GetPostFilters() []proto.DBPostFilter {
}

func (executor *ShardingExecutor) ExecuteMode() config.ExecuteMode {
return config.RWS
return config.SHD
}

func (executor *ShardingExecutor) ProcessDistributedTransaction() bool {
Expand Down Expand Up @@ -202,18 +202,19 @@ func (executor *ShardingExecutor) ExecutorComQuery(ctx context.Context, sql stri
plan proto.Plan
err error
)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ShardExecComQuery)

spanCtx, span := tracing.GetTraceSpan(ctx, tracing.SHDComQuery)
defer span.End()

log.Debugf("query: %s", sql)
queryStmt := proto.QueryStmt(newCtx)
queryStmt := proto.QueryStmt(spanCtx)
if queryStmt == nil {
return nil, 0, errors.New("query stmt should not be nil")
}
if _, ok := queryStmt.(*ast.SetStmt); ok {
for _, db := range executor.all {
go func(db *DataSourceBrief) {
if _, _, err := db.DB.Query(newCtx, sql); err != nil {
if _, _, err := db.DB.Query(spanCtx, sql); err != nil {
log.Error(err)
}
}(db)
Expand All @@ -227,16 +228,16 @@ func (executor *ShardingExecutor) ExecutorComQuery(ctx context.Context, sql stri
if selectStmt, ok := queryStmt.(*ast.SelectStmt); ok {
if selectStmt.Fields != nil && len(selectStmt.Fields.Fields) > 0 {
if _, ok := selectStmt.Fields.Fields[0].Expr.(*ast.VariableExpr); ok {
return executor.all[0].DB.Query(newCtx, sql)
return executor.all[0].DB.Query(spanCtx, sql)
}
}
}
plan, err = executor.optimizer.Optimize(newCtx, queryStmt)
plan, err = executor.optimizer.Optimize(spanCtx, queryStmt)
if err != nil {
return nil, 0, err
}
proto.WithVariable(newCtx, constant.TransactionTimeout, executor.config.TransactionTimeout)
return plan.Execute(newCtx)
proto.WithVariable(spanCtx, constant.TransactionTimeout, executor.config.TransactionTimeout)
return plan.Execute(spanCtx)
}

func (executor *ShardingExecutor) ExecutorComStmtExecute(ctx context.Context, stmt *proto.Stmt) (proto.Result, uint16, error) {
Expand All @@ -245,19 +246,20 @@ func (executor *ShardingExecutor) ExecutorComStmtExecute(ctx context.Context, st
plan proto.Plan
err error
)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ShardExecComStmt)

spanCtx, span := tracing.GetTraceSpan(ctx, tracing.SHDComStmtExecute)
defer span.End()

for i := 0; i < len(stmt.BindVars); i++ {
parameterID := fmt.Sprintf("v%d", i+1)
args = append(args, stmt.BindVars[parameterID])
}
plan, err = executor.optimizer.Optimize(newCtx, stmt.StmtNode, args...)
plan, err = executor.optimizer.Optimize(spanCtx, stmt.StmtNode, args...)
if err != nil {
return nil, 0, err
}
proto.WithVariable(newCtx, constant.TransactionTimeout, executor.config.TransactionTimeout)
return plan.Execute(newCtx)
proto.WithVariable(spanCtx, constant.TransactionTimeout, executor.config.TransactionTimeout)
return plan.Execute(spanCtx)
}

func (executor *ShardingExecutor) ConnectionClose(ctx context.Context) {
Expand Down
32 changes: 18 additions & 14 deletions pkg/executor/single_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri
result proto.Result
err error
)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.SDBExecComQuery)
spanCtx, span := tracing.GetTraceSpan(ctx, tracing.SDBComQuery)
defer span.End()
connectionID := proto.ConnectionID(newCtx)
queryStmt := proto.QueryStmt(newCtx)

connectionID := proto.ConnectionID(spanCtx)
queryStmt := proto.QueryStmt(spanCtx)
if queryStmt == nil {
return nil, 0, errors.New("query stmt should not be nil")
}
Expand All @@ -141,7 +142,7 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri
case *ast.SetStmt:
if shouldStartTransaction(stmt) {
// TODO add metrics
tx, result, err = db.Begin(newCtx)
tx, result, err = db.Begin(spanCtx)
if err != nil {
return nil, 0, err
}
Expand All @@ -151,13 +152,13 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
tx = txi.(proto.Tx)
return tx.Query(newCtx, sql)
return tx.Query(spanCtx, sql)
}
return db.Query(newCtx, sql)
return db.Query(spanCtx, sql)
}
case *ast.BeginStmt:
// TODO add metrics
tx, result, err = db.Begin(newCtx)
tx, result, err = db.Begin(spanCtx)
if err != nil {
return nil, 0, err
}
Expand All @@ -171,7 +172,7 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri
defer executor.localTransactionMap.Delete(connectionID)
tx = txi.(proto.Tx)
// TODO add metrics
if result, err = tx.Commit(newCtx); err != nil {
if result, err = tx.Commit(spanCtx); err != nil {
return nil, 0, err
}
return result, 0, err
Expand All @@ -183,30 +184,33 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri
defer executor.localTransactionMap.Delete(connectionID)
tx = txi.(proto.Tx)
// TODO add metrics
if result, err = tx.Rollback(newCtx); err != nil {
if result, err = tx.Rollback(spanCtx); err != nil {
return nil, 0, err
}
return result, 0, err
default:
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
tx = txi.(proto.Tx)
return tx.Query(newCtx, sql)
return tx.Query(spanCtx, sql)
}
return db.Query(newCtx, sql)
return db.Query(spanCtx, sql)
}
}

func (executor *SingleDBExecutor) ExecutorComStmtExecute(ctx context.Context, stmt *proto.Stmt) (proto.Result, uint16, error) {
connectionID := proto.ConnectionID(ctx)
spanCtx, span := tracing.GetTraceSpan(ctx, tracing.SDBComStmtExecute)
defer span.End()

connectionID := proto.ConnectionID(spanCtx)
log.Debugf("connectionID: %d, prepare: %s", connectionID, stmt.SqlText)
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
tx := txi.(proto.Tx)
return tx.ExecuteStmt(ctx, stmt)
return tx.ExecuteStmt(spanCtx, stmt)
}
db := resource.GetDBManager().GetDB(executor.dataSource)
return db.ExecuteStmt(ctx, stmt)
return db.ExecuteStmt(spanCtx, stmt)
}

func (executor *SingleDBExecutor) ConnectionClose(ctx context.Context) {
Expand Down
Loading

0 comments on commit 3ecb2cc

Please sign in to comment.