Skip to content

Commit

Permalink
feat(bigtable): Async refresh dry run in parallel with sync refresh (g…
Browse files Browse the repository at this point in the history
…oogleapis#11066)

* More logs

* async refresh option

* Removing logs

* Removing logs

* sending callback

* creating instrument

* Correct the metrics

* Correct the spelling

* Update google-api-go-client version
  • Loading branch information
bhshkh authored Dec 11, 2024
1 parent 2f94a82 commit 169e309
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
40 changes: 28 additions & 12 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
btopt "cloud.google.com/go/bigtable/internal/option"
"cloud.google.com/go/internal/trace"
gax "github.com/googleapis/gax-go/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
Expand Down Expand Up @@ -95,6 +96,18 @@ func NewClient(ctx context.Context, project, instance string, opts ...option.Cli

// NewClientWithConfig creates a new client with the given config.
func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
metricsProvider := config.MetricsProvider
if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" {
// Do not emit metrics when emulator is being used
metricsProvider = NoopMetricsProvider{}
}

// Create a OpenTelemetry metrics configuration
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, project, instance, config.AppProfile, metricsProvider, opts...)
if err != nil {
return nil, err
}

o, err := btopt.DefaultClientOptions(prodAddr, mtlsProdAddr, Scope, clientUserAgent)
if err != nil {
return nil, err
Expand All @@ -112,21 +125,24 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
// Allow non-default service account in DirectPath.
o = append(o, internaloption.AllowNonDefaultServiceAccount(true))
o = append(o, opts...)
connPool, err := gtransport.DialPool(ctx, o...)
if err != nil {
return nil, fmt.Errorf("dialing: %w", err)
}

metricsProvider := config.MetricsProvider
if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" {
// Do not emit metrics when emulator is being used
metricsProvider = NoopMetricsProvider{}
}
asyncRefreshMetricAttrs := metricsTracerFactory.clientAttributes
asyncRefreshMetricAttrs = append(asyncRefreshMetricAttrs,
attribute.String(metricLabelKeyTag, "async_refresh_dry_run"),

// Create a OpenTelemetry metrics configuration
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, project, instance, config.AppProfile, metricsProvider, opts...)
// Table, cluster and zone are unknown at this point
// Use default values
attribute.String(monitoredResLabelKeyTable, defaultTable),
attribute.String(monitoredResLabelKeyCluster, defaultCluster),
attribute.String(monitoredResLabelKeyZone, defaultZone),
)
o = append(o, internaloption.EnableAsyncRefreshDryRun(func() {
metricsTracerFactory.debugTags.Add(context.Background(), 1,
metric.WithAttributes(asyncRefreshMetricAttrs...))
}))
connPool, err := gtransport.DialPool(ctx, o...)
if err != nil {
return nil, err
return nil, fmt.Errorf("dialing: %w", err)
}

return &Client{
Expand Down
8 changes: 4 additions & 4 deletions bigtable/conformance_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ trap cleanup EXIT

# Run the conformance tests
cd $conformanceTestsHome
# Tests in https://github.com/googleapis/cloud-bigtable-clients-test/tree/main/tests can only be run on go1.22.5
go install golang.org/dl/go1.22.5@latest
go1.22.5 download
go1.22.5 test -v -proxy_addr=:$testProxyPort | tee -a $sponge_log
# Tests in https://github.com/googleapis/cloud-bigtable-clients-test/tree/main/tests can only be run on go1.22.7
go install golang.org/dl/go1.22.7@latest
go1.22.7 download
go1.22.7 test -v -proxy_addr=:$testProxyPort | tee -a $sponge_log
RETURN_CODE=$?

echo "exiting with ${RETURN_CODE}"
Expand Down
17 changes: 16 additions & 1 deletion bigtable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
metricLabelKeyAppProfile = "app_profile"
metricLabelKeyMethod = "method"
metricLabelKeyStatus = "status"
metricLabelKeyTag = "tag"
metricLabelKeyStreamingOperation = "streaming"
metricLabelKeyClientName = "client_name"
metricLabelKeyClientUID = "client_uid"
Expand All @@ -59,6 +60,7 @@ const (
metricNameAttemptLatencies = "attempt_latencies"
metricNameServerLatencies = "server_latencies"
metricNameRetryCount = "retry_count"
metricNameDebugTags = "debug_tags"

// Metric units
metricUnitMS = "ms"
Expand All @@ -79,7 +81,7 @@ var (
800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0,
400000.0, 800000.0, 1600000.0, 3200000.0}

// All the built-in metrics have same attributes except 'status' and 'streaming'
// All the built-in metrics have same attributes except 'tag', 'status' and 'streaming'
// These attributes need to be added to only few of the metrics
metricsDetails = map[string]metricInfo{
metricNameOperationLatencies: {
Expand Down Expand Up @@ -148,6 +150,7 @@ type builtinMetricsTracerFactory struct {
serverLatencies metric.Float64Histogram
attemptLatencies metric.Float64Histogram
retryCount metric.Int64Counter
debugTags metric.Int64Counter
}

func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) {
Expand Down Expand Up @@ -253,6 +256,16 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
metric.WithDescription("The number of additional RPCs sent after the initial attempt."),
metric.WithUnit(metricUnitCount),
)
if err != nil {
return err
}

// Create debug_tags
tf.debugTags, err = meter.Int64Counter(
metricNameDebugTags,
metric.WithDescription("A counter of internal client events used for debugging."),
metric.WithUnit(metricUnitCount),
)
return err
}

Expand All @@ -271,6 +284,7 @@ type builtinMetricsTracer struct {
instrumentServerLatencies metric.Float64Histogram
instrumentAttemptLatencies metric.Float64Histogram
instrumentRetryCount metric.Int64Counter
instrumentDebugTags metric.Int64Counter

tableName string
method string
Expand Down Expand Up @@ -363,6 +377,7 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
instrumentServerLatencies: tf.serverLatencies,
instrumentAttemptLatencies: tf.attemptLatencies,
instrumentRetryCount: tf.retryCount,
instrumentDebugTags: tf.debugTags,

tableName: tableName,
isStreaming: isStreaming,
Expand Down
3 changes: 2 additions & 1 deletion bigtable/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
)

const (
defaultCluster = "unspecified"
defaultCluster = "<unspecified>"
defaultZone = "global"
defaultTable = "<unspecified>"
)

// get GFE latency in ms from response metadata
Expand Down

0 comments on commit 169e309

Please sign in to comment.