Skip to content

Commit

Permalink
add custom spans for tracing in clickhouse datasource (#1060)
Browse files Browse the repository at this point in the history
  • Loading branch information
Umang01-hash authored Oct 1, 2024
1 parent a380f91 commit afe1cee
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 34 deletions.
49 changes: 42 additions & 7 deletions pkg/gofr/datasource/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package clickhouse
import (
"context"
"errors"
"fmt"
"strings"
"time"

"go.opentelemetry.io/otel/attribute"

"go.opentelemetry.io/otel/trace"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -120,9 +123,13 @@ func pushDBMetrics(conn Conn, metrics Metrics) {
// Exec should be used for DDL and simple statements.
// It should not be used for larger inserts or query iterations.
func (c *client) Exec(ctx context.Context, query string, args ...any) error {
defer c.sendOperationStats(time.Now(), "Exec", query, args...)
tracedCtx, span := c.addTrace(ctx, "exec", query)

err := c.conn.Exec(tracedCtx, query, args...)

return c.conn.Exec(ctx, query, args...)
defer c.sendOperationStats(time.Now(), "Exec", query, "exec", span, args...)

return err
}

// Select method allows a set of response rows to be marshaled into a slice of structs with a single invocation..
Expand All @@ -139,20 +146,29 @@ func (c *client) Exec(ctx context.Context, query string, args ...any) error {
//
// err = ctx.Clickhouse.Select(ctx, &user, "SELECT * FROM users") .
func (c *client) Select(ctx context.Context, dest any, query string, args ...any) error {
defer c.sendOperationStats(time.Now(), "Select", query, args...)
tracedCtx, span := c.addTrace(ctx, "select", query)

err := c.conn.Select(tracedCtx, dest, query, args...)

return c.conn.Select(ctx, dest, query, args...)
defer c.sendOperationStats(time.Now(), "Select", query, "select", span, args...)

return err
}

// AsyncInsert allows the user to specify whether the client should wait for the server to complete the insert or
// respond once the data has been received.
func (c *client) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
defer c.sendOperationStats(time.Now(), "AsyncInsert", query, args...)
tracedCtx, span := c.addTrace(ctx, "async-insert", query)

err := c.conn.AsyncInsert(tracedCtx, query, wait, args...)

return c.conn.AsyncInsert(ctx, query, wait, args...)
defer c.sendOperationStats(time.Now(), "AsyncInsert", query, "async-insert", span, args...)

return err
}

func (c *client) sendOperationStats(start time.Time, methodType, query string, args ...interface{}) {
func (c *client) sendOperationStats(start time.Time, methodType, query string, method string,
span trace.Span, args ...interface{}) {
duration := time.Since(start).Milliseconds()

c.logger.Debug(&Log{
Expand All @@ -162,6 +178,11 @@ func (c *client) sendOperationStats(start time.Time, methodType, query string, a
Args: args,
})

if span != nil {
defer span.End()
span.SetAttributes(attribute.Int64(fmt.Sprintf("clickhouse.%v.duration", method), duration))
}

c.metrics.RecordHistogram(context.Background(), "app_clickhouse_stats", float64(duration), "hosts", c.config.Hosts,
"database", c.config.Database, "type", getOperationType(query))
}
Expand Down Expand Up @@ -198,3 +219,17 @@ func (c *client) HealthCheck(ctx context.Context) (any, error) {

return &h, nil
}

func (c *client) addTrace(ctx context.Context, method, query string) (context.Context, trace.Span) {
if c.tracer != nil {
contextWithTrace, span := c.tracer.Start(ctx, fmt.Sprintf("clickhouse-%v", method))

span.SetAttributes(
attribute.String("clickhouse.query", query),
)

return contextWithTrace, span
}

return ctx, nil
}
61 changes: 34 additions & 27 deletions pkg/gofr/datasource/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"go.uber.org/mock/gomock"
)

func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client) {
func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, *MockLogger, client) {
t.Helper()

ctrl := gomock.NewController(t)
Expand All @@ -30,36 +30,37 @@ func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client)
Database: "test",
}, logger: mockLogger, metrics: mockMetric}

return mockConn, mockMetric, c
return mockConn, mockMetric, mockLogger, c
}

func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) {
logs := stderrOutputForFunc(func() {
_, mockMetric, _ := getClickHouseTestConnection(t)
mockLogger := NewMockLogger(gomock.NewController(t))
_, mockMetric, _, _ := getClickHouseTestConnection(t)
mockLogger := NewMockLogger(gomock.NewController(t))

cl := New(Config{
Hosts: "localhost:8000",
Username: "user",
Password: "pass",
Database: "test",
})
cl := New(Config{
Hosts: "localhost:8000",
Username: "user",
Password: "pass",
Database: "test",
})

cl.UseLogger(mockLogger)
cl.UseMetrics(mockMetric)
cl.UseLogger(mockLogger)
cl.UseMetrics(mockMetric)

mockMetric.EXPECT().NewHistogram("app_clickhouse_stats", "Response time of Clickhouse queries in milliseconds.", gomock.Any())
mockMetric.EXPECT().NewGauge("app_clickhouse_open_connections", "Number of open Clickhouse connections.")
mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.")
mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes()
mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes()
mockMetric.EXPECT().NewHistogram("app_clickhouse_stats", "Response time of Clickhouse queries in milliseconds.", gomock.Any())
mockMetric.EXPECT().NewGauge("app_clickhouse_open_connections", "Number of open Clickhouse connections.")
mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.")
mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes()
mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Logf("connecting to clickhouse db at %v to database %v", "localhost:8000", "test")
mockLogger.EXPECT().Errorf("ping failed with error %v", gomock.Any())

cl.Connect()
cl.Connect()

time.Sleep(100 * time.Millisecond)
})
time.Sleep(100 * time.Millisecond)

assert.Contains(t, logs, "ping failed with error dial tcp [::1]:8000: connect: connection refused")
assert.True(t, mockLogger.ctrl.Satisfied())
assert.True(t, mockMetric.ctrl.Satisfied())
}

func stderrOutputForFunc(f func()) string {
Expand All @@ -78,7 +79,7 @@ func stderrOutputForFunc(f func()) string {
}

func Test_ClickHouse_HealthUP(t *testing.T) {
mockConn, _, c := getClickHouseTestConnection(t)
mockConn, _, _, c := getClickHouseTestConnection(t)

mockConn.EXPECT().Ping(gomock.Any()).Return(nil)

Expand All @@ -88,7 +89,7 @@ func Test_ClickHouse_HealthUP(t *testing.T) {
}

func Test_ClickHouse_HealthDOWN(t *testing.T) {
mockConn, _, c := getClickHouseTestConnection(t)
mockConn, _, _, c := getClickHouseTestConnection(t)

mockConn.EXPECT().Ping(gomock.Any()).Return(sql.ErrConnDone)

Expand All @@ -100,13 +101,15 @@ func Test_ClickHouse_HealthDOWN(t *testing.T) {
}

func Test_ClickHouse_Exec(t *testing.T) {
mockConn, mockMetric, c := getClickHouseTestConnection(t)
mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t)

ctx := context.Background()

mockConn.EXPECT().Exec(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
"8f165e2d-feef-416c-95f6-913ce3172e15", "gofr", "10").Return(nil)

mockLogger.EXPECT().Debug(gomock.Any())

mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts,
"database", c.config.Database, "type", "INSERT")

Expand All @@ -116,7 +119,7 @@ func Test_ClickHouse_Exec(t *testing.T) {
}

func Test_ClickHouse_Select(t *testing.T) {
mockConn, mockMetric, c := getClickHouseTestConnection(t)
mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t)

type User struct {
ID string `ch:"id"`
Expand All @@ -130,6 +133,8 @@ func Test_ClickHouse_Select(t *testing.T) {

mockConn.EXPECT().Select(ctx, &user, "SELECT * FROM users").Return(nil)

mockLogger.EXPECT().Debug(gomock.Any())

mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts,
"database", c.config.Database, "type", "SELECT")

Expand All @@ -139,7 +144,7 @@ func Test_ClickHouse_Select(t *testing.T) {
}

func Test_ClickHouse_AsyncInsert(t *testing.T) {
mockConn, mockMetric, c := getClickHouseTestConnection(t)
mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t)

ctx := context.Background()

Expand All @@ -149,6 +154,8 @@ func Test_ClickHouse_AsyncInsert(t *testing.T) {
mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts,
"database", c.config.Database, "type", "INSERT")

mockLogger.EXPECT().Debug(gomock.Any())

err := c.AsyncInsert(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)", true,
"8f165e2d-feef-416c-95f6-913ce3172e15", "user", "10")

Expand Down
5 changes: 5 additions & 0 deletions pkg/gofr/external_db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gofr

import (
"go.opentelemetry.io/otel"
"gofr.dev/pkg/gofr/container"
"gofr.dev/pkg/gofr/datasource/file"
)
Expand Down Expand Up @@ -52,6 +53,10 @@ func (a *App) AddClickhouse(db container.ClickhouseProvider) {
db.UseLogger(a.Logger())
db.UseMetrics(a.Metrics())

tracer := otel.GetTracerProvider().Tracer("gofr-clickhouse")

db.UseTracer(tracer)

db.Connect()

a.container.Clickhouse = db
Expand Down
2 changes: 2 additions & 0 deletions pkg/gofr/external_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel"
"go.uber.org/mock/gomock"

"gofr.dev/pkg/gofr/container"
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestApp_AddClickhouse(t *testing.T) {

mock.EXPECT().UseLogger(app.Logger())
mock.EXPECT().UseMetrics(app.Metrics())
mock.EXPECT().UseTracer(otel.GetTracerProvider().Tracer("gofr-clickhouse"))
mock.EXPECT().Connect()

app.AddClickhouse(mock)
Expand Down

0 comments on commit afe1cee

Please sign in to comment.