Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/jackc/pgx: add initial support #1537

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions contrib/jackc/pgx/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package pgx

import (
"context"
"math"
"time"

"github.com/jackc/pgx/v5"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
ddtracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// TraceBatchStart marks the start of a batch, implementing pgx.BatchTracer
func (t *tracer) TraceBatchStart(ctx context.Context, _ *pgx.Conn, _ pgx.TraceBatchStartData) context.Context {
opts := []ddtrace.StartSpanOption{
ddtracer.ServiceName(t.serviceName),
ddtracer.SpanType(ext.SpanTypeSQL),
ddtracer.StartTime(time.Now()),
ddtracer.Tag("sql.query_type", "Batch"),
ddtracer.Tag(ext.ResourceName, "pgx.batch"),
}
for key, tag := range t.tags {
opts = append(opts, ddtracer.Tag(key, tag))
}
if !math.IsNaN(t.analyticsRate) {
opts = append(opts, ddtracer.Tag(ext.EventSampleRate, t.analyticsRate))
}
_, ctx = ddtracer.StartSpanFromContext(ctx, "pgx.batch", opts...)

return ctx
}

// TraceBatchQuery traces the query of a batch, implementing pgx.BatchTracer
func (t *tracer) TraceBatchQuery(ctx context.Context, c *pgx.Conn, data pgx.TraceBatchQueryData) {
opts := []ddtrace.StartSpanOption{
ddtracer.ServiceName(t.serviceName),
ddtracer.SpanType(ext.SpanTypeSQL),
ddtracer.StartTime(time.Now()),
ddtracer.Tag(ext.ResourceName, data.SQL),
}
if t.traceArgs {
opts = append(opts, ddtracer.Tag("sql.args", data.Args))
}
for key, tag := range t.tags {
opts = append(opts, ddtracer.Tag(key, tag))
}
if !math.IsNaN(t.analyticsRate) {
opts = append(opts, ddtracer.Tag(ext.EventSampleRate, t.analyticsRate))
}
ddtracer.StartSpanFromContext(ctx, "pgx.batch_query", opts...)
}

// TraceBatchEnd marks the end of a batch, implementing pgx.BatchTracer
func (t *tracer) TraceBatchEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceBatchEndData) {
span, exists := ddtracer.SpanFromContext(ctx)
if !exists {
return
}

if data.Err != nil {
span.SetTag(ext.Error, data.Err)
}
span.Finish()
}
45 changes: 45 additions & 0 deletions contrib/jackc/pgx/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pgx

import (
"context"
"math"
"time"

"github.com/jackc/pgx/v5"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
ddtracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// TraceConnectStart marks the start of a pgx connect operation, implementing pgx.ConnectTracer
func (t *tracer) TraceConnectStart(ctx context.Context, _ pgx.TraceConnectStartData) context.Context {
opts := []ddtrace.StartSpanOption{
ddtracer.ServiceName(t.serviceName),
ddtracer.SpanType(ext.SpanTypeSQL),
ddtracer.StartTime(time.Now()),
ddtracer.Tag("sql.query_type", "Connect"),
}
for key, tag := range t.tags {
opts = append(opts, ddtracer.Tag(key, tag))
}
if !math.IsNaN(t.analyticsRate) {
opts = append(opts, ddtracer.Tag(ext.EventSampleRate, t.analyticsRate))
}
_, ctx = ddtracer.StartSpanFromContext(ctx, "pgx.connect", opts...)

return ctx
}

// TraceConnectEnd marks the end of a pgx connect operation, implementing pgx.ConnectTracer
func (t *tracer) TraceConnectEnd(ctx context.Context, data pgx.TraceConnectEndData) {
span, exists := ddtracer.SpanFromContext(ctx)
if !exists {
return
}

if data.Err != nil {
span.SetTag(ext.Error, data.Err)
}
span.Finish()
}
53 changes: 53 additions & 0 deletions contrib/jackc/pgx/copyfrom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package pgx

import (
"context"
"math"
"time"

"github.com/jackc/pgx/v5"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
ddtracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// TraceCopyFromStart marks the start of a CopyFrom query, implementing pgx.CopyFromTracer
func (t *tracer) TraceCopyFromStart(ctx context.Context, _ *pgx.Conn, data pgx.TraceCopyFromStartData) context.Context {
opts := []ddtrace.StartSpanOption{
ddtracer.ServiceName(t.serviceName),
ddtracer.SpanType(ext.SpanTypeSQL),
ddtracer.StartTime(time.Now()),
ddtracer.Tag("sql.query_type", "Query"),
ddtracer.Tag(ext.ResourceName, "pgx.copyfrom"),
ddtracer.Tag("pgx.table_name", data.TableName),
ddtracer.Tag("pgx.columns", data.ColumnNames),
}
for key, tag := range t.tags {
opts = append(opts, ddtracer.Tag(key, tag))
}
if !math.IsNaN(t.analyticsRate) {
opts = append(opts, ddtracer.Tag(ext.EventSampleRate, t.analyticsRate))
}

_, ctx = ddtracer.StartSpanFromContext(ctx, "pgx.copyfrom", opts...)

return ctx
}

// TraceCopyFromEnd marks the end of a CopyFrom query, implementing pgx.CopyFromTracer
func (t *tracer) TraceCopyFromEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceCopyFromEndData) {
span, exists := ddtracer.SpanFromContext(ctx)
if !exists {
return
}

if t.traceStatus {
span.SetTag("pgx.status", data.CommandTag.String())
}

if data.Err != nil {
span.SetTag(ext.Error, data.Err)
}
span.Finish()
}
89 changes: 89 additions & 0 deletions contrib/jackc/pgx/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package pgx

import (
"math"

"gopkg.in/DataDog/dd-trace-go.v1/internal"
)

// tracer contains configs to tracing and would implement pgx.QueryTracer, pgx.BatchTracer,
// pgx.CopyFromTracer, pgx.PrepareTracer and pgx.ConnectTracer.
type tracer struct {
serviceName string
tags map[string]any
analyticsRate float64
traceArgs bool
traceStatus bool
}

func buildTracer(options ...Option) *tracer {
t := defaults()
for _, opt := range options {
opt(t)
}
return t
}

// Option is a function that modifies the configuration.
type Option func(*tracer)

func defaults() *tracer {
analyticsRate := math.NaN()
if internal.BoolEnv("DD_TRACE_PGX_ENABLED", false) {
analyticsRate = 1.0
}
return &tracer{
serviceName: "postgres.db",
analyticsRate: analyticsRate,
}
}

// WithServiceName sets the service name.
func WithServiceName(name string) Option {
return func(t *tracer) {
t.serviceName = name
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(t *tracer) {
if on {
t.analyticsRate = 1.0
}
}
}

// WithAnalyticsRate sets the sampling rate for Trace Analytics events
// correlated to started spans.
func WithAnalyticsRate(rate float64) Option {
return func(t *tracer) {
if rate >= 0.0 && rate <= 1.0 {
t.analyticsRate = rate
}
}
}

// WithCustomTag will attach the value to the span tagged by the key
func WithCustomTag(key string, value interface{}) Option {
return func(t *tracer) {
if t.tags == nil {
t.tags = make(map[string]interface{})
}
t.tags[key] = value
}
}

// TraceArgs will report the arguments of the queries, if on is set to true
func TraceArgs(on bool) Option {
return func(t *tracer) {
t.traceArgs = on
}
}

// TraceStatus will report the status of the queries, if on is set to true
func TraceStatus(on bool) Option {
return func(t *tracer) {
t.traceStatus = on
}
}
62 changes: 62 additions & 0 deletions contrib/jackc/pgx/pgx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

// Package pgx provides functions to trace the jackc/pgx package (v5) (https://github.com/jackc/pgx).

package pgx

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

// TracedConn returns a traced *pgx.Conn
func TracedConn(ctx context.Context, connString string, options ...Option) (*pgx.Conn, error) {
cc, err := pgx.ParseConfig(connString)
if err != nil {
return nil, fmt.Errorf("contrib/jackc/pgx: invalid connection string [%v]: %w", connString, err)
}

cc.Tracer = buildTracer(options...)
c, err := pgx.ConnectConfig(ctx, cc)
if err != nil {
return nil, fmt.Errorf("contrib/jackc/pgx: connect: %w", err)
}

return c, nil
}

// TracedConnWithConfig returns a traced *pgx.Conn with the config
func TracedConnWithConfig(ctx context.Context, config *pgx.ConnConfig, options ...Option) (*pgx.Conn, error) {
config.Tracer = buildTracer(options...)

return pgx.ConnectConfig(ctx, config)
}

// TracedPool returns a traced *pgxpool.Pool for concurrent use
func TracedPool(ctx context.Context, connString string, options ...Option) (*pgxpool.Pool, error) {
cc, err := pgxpool.ParseConfig(connString)
if err != nil {
return nil, fmt.Errorf("contrib/jackc/pgx: invalid connection string for pool [%v]: %w", connString, err)
}
cc.ConnConfig.Tracer = buildTracer(options...)

p, err := pgxpool.NewWithConfig(ctx, cc)
if err != nil {
return nil, fmt.Errorf("contrib/jackc/pgx: new pool: %w", err)
}

return p, nil
}

// TracedPoolWithConfig returns a traced *pgxpool.Pool with config, for concurrent use
func TracedPoolWithConfig(ctx context.Context, config *pgxpool.Config, options ...Option) (*pgxpool.Pool, error) {
config.ConnConfig.Tracer = buildTracer(options...)

return pgxpool.NewWithConfig(ctx, config)
}
49 changes: 49 additions & 0 deletions contrib/jackc/pgx/prepare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pgx

import (
"context"
"math"
"time"

"github.com/jackc/pgx/v5"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
ddtracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// TracePrepareStart marks the start of a pgx prepare operation, implementing pgx.PrepareTracer
func (t *tracer) TracePrepareStart(ctx context.Context, _ *pgx.Conn, data pgx.TracePrepareStartData) context.Context {
opts := []ddtrace.StartSpanOption{
ddtracer.ServiceName(t.serviceName),
ddtracer.SpanType(ext.SpanTypeSQL),
ddtracer.StartTime(time.Now()),
ddtracer.Tag("sql.query_type", "Prepare"),
ddtracer.Tag("pgx.prepared_statement_name", data.Name),
ddtracer.Tag(ext.ResourceName, data.SQL),
}
for key, tag := range t.tags {
opts = append(opts, ddtracer.Tag(key, tag))
}
if !math.IsNaN(t.analyticsRate) {
opts = append(opts, ddtracer.Tag(ext.EventSampleRate, t.analyticsRate))
}
_, ctx = ddtracer.StartSpanFromContext(ctx, "pgx.prepare", opts...)

return ctx
}

// TracePrepareEnd marks the end of a pgx prepare operation, implementing pgx.PrepareTracer
func (t *tracer) TracePrepareEnd(ctx context.Context, _ *pgx.Conn, data pgx.TracePrepareEndData) {
span, exists := ddtracer.SpanFromContext(ctx)
if !exists {
return
}

span.SetTag("pgx.already_prepared", data.AlreadyPrepared)

if data.Err != nil {
span.SetTag(ext.Error, data.Err)
}
span.Finish()
}
Loading