Skip to content

Commit

Permalink
feat(bigquery): add trace instrumentation support for individual rpcs (
Browse files Browse the repository at this point in the history
…#6493)

* feat(bigquery): add trace instrumentation support for individual rpcs
  • Loading branch information
shollyman authored Sep 2, 2022
1 parent e42772c commit eedc632
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 2 deletions.
5 changes: 5 additions & 0 deletions bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"cloud.google.com/go/bigquery/internal"
cloudinternal "cloud.google.com/go/internal"
"cloud.google.com/go/internal/detect"
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
gax "github.com/googleapis/gax-go/v2"
bq "google.golang.org/api/bigquery/v2"
Expand Down Expand Up @@ -119,7 +120,9 @@ func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*
var res *bq.Job
var err error
invoke := func() error {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.insert")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}
// A job with a client-generated ID can be retried; the presence of the
Expand Down Expand Up @@ -149,7 +152,9 @@ func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*
var res *bq.QueryResponse
var err error
invoke := func() error {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.query")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}

Expand Down
19 changes: 18 additions & 1 deletion bigquery/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,12 @@ func (d *Dataset) deleteInternal(ctx context.Context, deleteContents bool) (err

call := d.c.bqs.Datasets.Delete(d.ProjectID, d.DatasetID).Context(ctx).DeleteContents(deleteContents)
setClientHeader(call.Header())
return call.Do()
return runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.datasets.delete")
err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
}

// Metadata fetches the metadata for the dataset.
Expand All @@ -228,7 +233,9 @@ func (d *Dataset) Metadata(ctx context.Context) (md *DatasetMetadata, err error)
setClientHeader(call.Header())
var ds *bq.Dataset
if err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.datasets.get")
ds, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -284,7 +291,9 @@ func (d *Dataset) Update(ctx context.Context, dm DatasetMetadataToUpdate, etag s
}
var ds2 *bq.Dataset
if err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.datasets.patch")
ds2, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -391,7 +400,9 @@ var listTables = func(it *TableIterator, pageSize int, pageToken string) (*bq.Ta
}
var res *bq.TableList
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.tables.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down Expand Up @@ -476,7 +487,9 @@ var listModels = func(it *ModelIterator, pageSize int, pageToken string) (*bq.Li
}
var res *bq.ListModelsResponse
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.models.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down Expand Up @@ -563,7 +576,9 @@ var listRoutines = func(it *RoutineIterator, pageSize int, pageToken string) (*b
}
var res *bq.ListRoutinesResponse
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.routines.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down Expand Up @@ -667,7 +682,9 @@ var listDatasets = func(it *DatasetIterator, pageSize int, pageToken string) (*b
}
var res *bq.DatasetList
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.datasets.list")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
return res, err
Expand Down
2 changes: 2 additions & 0 deletions bigquery/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error {
setClientHeader(call.Header())
var res *bq.TableDataInsertAllResponse
err = runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tabledata.insertAll")
res, err = call.Do()
trace.EndSpan(ctx, err)
return err
})
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion bigquery/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ func (j *Job) Cancel(ctx context.Context) error {
Context(ctx)
setClientHeader(call.Header())
return runWithRetry(ctx, func() error {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.cancel")
_, err := call.Do()
trace.EndSpan(sCtx, err)
return err
})
}
Expand All @@ -257,7 +259,9 @@ func (j *Job) Delete(ctx context.Context) (err error) {
setClientHeader(call.Header())

return runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.delete")
err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
}
Expand Down Expand Up @@ -343,7 +347,9 @@ func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint6
}
var res *bq.GetQueryResultsResponse
err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.getQueryResults")
res, err = call.Do()
trace.EndSpan(sCtx, err)
if err != nil {
return !retryableError(err, jobRetryReasons), err
}
Expand Down Expand Up @@ -837,7 +843,14 @@ func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
if it.ParentJobID != "" {
req.ParentJobId(it.ParentJobID)
}
res, err := req.Do()
var res *bq.JobList
err := runWithRetry(it.ctx, func() (err error) {
sCtx := trace.StartSpan(it.ctx, "bigquery.jobs.list")
res, err = req.Do()
trace.EndSpan(sCtx, err)
return err
})

if err != nil {
return "", err
}
Expand Down Expand Up @@ -870,7 +883,9 @@ func (c *Client) getJobInternal(ctx context.Context, jobID, location, projectID
}
setClientHeader(call.Header())
err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.get")
job, err = call.Do()
trace.EndSpan(sCtx, err)
return err
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions bigquery/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (m *Model) Metadata(ctx context.Context) (mm *ModelMetadata, err error) {
setClientHeader(req.Header())
var model *bq.Model
err = runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.models.get")
model, err = req.Do()
trace.EndSpan(ctx, err)
return err
})
if err != nil {
Expand All @@ -111,7 +113,9 @@ func (m *Model) Update(ctx context.Context, mm ModelMetadataToUpdate, etag strin
}
var res *bq.Model
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.models.patch")
res, err = call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions bigquery/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func (r *Routine) Metadata(ctx context.Context) (rm *RoutineMetadata, err error)
setClientHeader(req.Header())
var routine *bq.Routine
err = runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.routines.get")
routine, err = req.Do()
trace.EndSpan(ctx, err)
return err
})
if err != nil {
Expand Down Expand Up @@ -129,7 +131,9 @@ func (r *Routine) Update(ctx context.Context, upd *RoutineMetadataToUpdate, etag
}
var res *bq.Routine
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.routines.update")
res, err = call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions bigquery/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,9 @@ func (t *Table) Create(ctx context.Context, tm *TableMetadata) (err error) {
req := t.c.bqs.Tables.Insert(t.ProjectID, t.DatasetID, table).Context(ctx)
setClientHeader(req.Header())
return runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.insert")
_, err = req.Do()
trace.EndSpan(ctx, err)
return err
})
}
Expand Down Expand Up @@ -716,7 +718,9 @@ func (t *Table) Metadata(ctx context.Context, opts ...TableMetadataOption) (md *
setClientHeader(tgc.call.Header())
var res *bq.Table
if err := runWithRetry(ctx, func() (err error) {
sCtx := trace.StartSpan(ctx, "bigquery.tables.get")
res, err = tgc.call.Do()
trace.EndSpan(sCtx, err)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -783,7 +787,9 @@ func (t *Table) Delete(ctx context.Context) (err error) {
setClientHeader(call.Header())

return runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.delete")
err = call.Do()
trace.EndSpan(ctx, err)
return err
})
}
Expand Down Expand Up @@ -841,7 +847,9 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin
}
var res *bq.Table
if err := runWithRetry(ctx, func() (err error) {
ctx = trace.StartSpan(ctx, "bigquery.tables.patch")
res, err = tpc.call.Do()
trace.EndSpan(ctx, err)
return err
}); err != nil {
return nil, err
Expand Down
99 changes: 99 additions & 0 deletions bigquery/trace_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bigquery

import (
"context"
"strings"
"testing"
"time"

"go.opencensus.io/trace"
)

// testExporter is a testing exporter for validating captured spans.
type testExporter struct {
spans []*trace.SpanData
}

func (te *testExporter) ExportSpan(s *trace.SpanData) {
te.spans = append(te.spans, s)
}

// hasSpans checks that the exporter has all the span names
// specified in the slice. It returns the unmatched names.
func (te *testExporter) hasSpans(names []string) []string {
matches := make(map[string]struct{})
for _, n := range names {
matches[n] = struct{}{}
}
for _, s := range te.spans {
delete(matches, s.Name)
}
var unmatched []string
for k := range matches {
unmatched = append(unmatched, k)
}
return unmatched
}

func TestIntegration_Tracing(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}

ctx := context.Background()

for _, tc := range []struct {
description string
callF func(ctx context.Context)
wantSpans []string
}{
{
description: "fast path query",
callF: func(ctx context.Context) {
client.Query("SELECT SESSION_USER()").Read(ctx)
},
wantSpans: []string{"bigquery.jobs.query", "cloud.google.com/go/bigquery.Query.Run"},
},
{
description: "slow path query",
callF: func(ctx context.Context) {
q := client.Query("SELECT SESSION_USER()")
q.JobTimeout = time.Hour
q.Read(ctx)
},
wantSpans: []string{"bigquery.jobs.insert", "bigquery.jobs.getQueryResults", "cloud.google.com/go/bigquery.Job.Read", "cloud.google.com/go/bigquery.Query.Run"},
},
{
description: "table metadata",
callF: func(ctx context.Context) {
client.DatasetInProject("bigquery-public-data", "samples").Table("shakespeare").Metadata(ctx)
},
wantSpans: []string{"bigquery.tables.get", "cloud.google.com/go/bigquery.Table.Metadata"},
},
} {
exporter := &testExporter{}
trace.RegisterExporter(exporter)
traceCtx, span := trace.StartSpan(ctx, "testspan", trace.WithSampler(trace.AlwaysSample()))
tc.callF(traceCtx)
span.End()
trace.UnregisterExporter(exporter)

if unmatched := exporter.hasSpans(tc.wantSpans); len(unmatched) > 0 {
t.Errorf("case (%s): unmatched spans: %s", tc.description, strings.Join(unmatched, ","))
}
}
}

0 comments on commit eedc632

Please sign in to comment.