From a75c8b0f72c38d9a85c908715c3e37eb5cffb131 Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Tue, 19 Nov 2024 21:20:50 -0800 Subject: [PATCH] fix(storage): correct direct connectivity check (#11152) * fix(storage): correct direct connectivity check * refactor direct connectivity tests; experimental doc comment * fix gce check condition * update dual-region check; and remove duplicate * use asia-east1 as outside region * address feedback * add todo for us-west1 dual-region --- storage/integration_test.go | 95 +++++++++++++++++++++++++++---------- storage/storage.go | 4 +- 2 files changed, 74 insertions(+), 25 deletions(-) diff --git a/storage/integration_test.go b/storage/integration_test.go index d2a3286bd4b8..a9dc265c0ba5 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -60,6 +60,7 @@ import ( "google.golang.org/api/iterator" itesting "google.golang.org/api/iterator/testing" "google.golang.org/api/option" + "google.golang.org/api/option/internaloption" "google.golang.org/api/transport" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -326,8 +327,18 @@ var readCases = []readCase{ }, } -func TestIntegration_DetectDirectConnectivity(t *testing.T) { - ctx := skipHTTP("direct connectivity isn't available for json") +// Test in a GCE environment expected to be located in one of: +// - us-west1-a, us-west1-b, us-west-c +// +// The test skips when ran outside of a GCE instance and us-west-*. +// +// Test cases for direct connectivity (DC) check: +// 1. DC detected with co-located GCS bucket in us-west1 +// 2. DC not detected with multi region bucket in EU +// 3. DC not detected with dual region bucket in EUR4 +// 4. DC not detected with regional bucket in EUROPE-WEST1 +func TestIntegration_DetectDirectConnectivityInGCE(t *testing.T) { + ctx := skipHTTP("grpc only test") multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) { h := testHelper{t} // Using Resoource Detector to detect if test is being ran inside GCE @@ -340,32 +351,68 @@ func TestIntegration_DetectDirectConnectivity(t *testing.T) { t.Fatalf("resource.New: %v", err) } attrs := detectedAttrs.Set() - if v, exists := attrs.Value("cloud.platform"); exists && v.AsString() == "gcp_compute_engine" { - v, exists = attrs.Value("cloud.region") - if !exists { - t.Fatalf("CheckDirectConnectivitySupported: region not detected") - } - region := v.AsString() - newBucketName := prefix + uidSpace.New() - newBucket := client.Bucket(newBucketName) - h.mustCreate(newBucket, testutil.ProjID(), &BucketAttrs{Location: region, LocationType: "region"}) - defer h.mustDeleteBucket(newBucket) - err := CheckDirectConnectivitySupported(ctx, newBucketName) - if err != nil { - t.Fatalf("CheckDirectConnectivitySupported: %v", err) - } - } else { - err = CheckDirectConnectivitySupported(ctx, bucket) - if err == nil { - t.Fatal("CheckDirectConnectivitySupported: expected error but none returned") - } - if err != nil && !strings.Contains(err.Error(), "direct connectivity not detected") { - t.Fatalf("CheckDirectConnectivitySupported: failed on a different error %v", err) - } + if v, exists := attrs.Value("cloud.platform"); !exists || v.AsString() != "gcp_compute_engine" { + t.Skip("only testable in a GCE instance") + } + if v, exists := attrs.Value("cloud.region"); !exists || !strings.Contains(strings.ToLower(v.AsString()), "us-west1") { + t.Skip("inside a GCE instance but region is not us-west1") + } + for _, test := range []struct { + name string + attrs *BucketAttrs + expectDirectConnectivity bool + }{ + { + name: "co-located-bucket", + attrs: &BucketAttrs{Location: "us-west1"}, + expectDirectConnectivity: true, + }, + { + name: "not-colocated-multi-region-bucket", + attrs: &BucketAttrs{Location: "EU"}, + }, + { + name: "not-colocated-dual-region-bucket", + attrs: &BucketAttrs{Location: "EUR4"}, + }, + { + name: "not-colocated-region-bucket", + attrs: &BucketAttrs{Location: "EUROPE-WEST1"}, + }, + // TODO: Add a test for DC dual-region with us-west1 when supported + } { + t.Run(test.name, func(t *testing.T) { + newBucketName := prefix + uidSpace.New() + newBucket := client.Bucket(newBucketName) + h.mustCreate(newBucket, testutil.ProjID(), test.attrs) + defer h.mustDeleteBucket(newBucket) + err := CheckDirectConnectivitySupported(ctx, newBucketName) + if err != nil && test.expectDirectConnectivity { + t.Fatalf("CheckDirectConnectivitySupported: expected direct connectivity but failed: %v", err) + } + if err == nil && !test.expectDirectConnectivity { + t.Fatalf("CheckDirectConnectivitySupported: detected direct connectivity when not expected") + } + }) } }) } +// Test handles the case when Direct Connectivity is disabled which causes +// "grpc.lb.locality" to return "" +func TestIntegration_DoNotDetectDirectConnectivityWhenDisabled(t *testing.T) { + ctx := skipHTTP("grpc only test") + multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) { + err := CheckDirectConnectivitySupported(ctx, bucket) + if err == nil { + t.Fatal("CheckDirectConnectivitySupported: expected error but none returned") + } + if err != nil && !strings.Contains(err.Error(), "direct connectivity not detected") { + t.Fatalf("CheckDirectConnectivitySupported: failed on a different error %v", err) + } + }, internaloption.EnableDirectPath(false)) +} + func TestIntegration_BucketCreateDelete(t *testing.T) { ctx := skipJSONReads(context.Background(), "no reads in test") multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, _ string, prefix string, client *Client) { diff --git a/storage/storage.go b/storage/storage.go index cc91f8fe9fa4..66fba72e6a49 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -244,6 +244,8 @@ func NewGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e // Direct connectivity is expected to be available when running from inside // GCP and connecting to a bucket in the same region. // +// Experimental helper that's subject to change. +// // You can pass in [option.ClientOption] you plan on passing to [NewGRPCClient] func CheckDirectConnectivitySupported(ctx context.Context, bucket string, opts ...option.ClientOption) error { view := metric.NewView( @@ -282,7 +284,7 @@ func CheckDirectConnectivitySupported(ctx context.Context, bucket string, opts . hist := m.Data.(metricdata.Histogram[float64]) for _, d := range hist.DataPoints { v, present := d.Attributes.Value("grpc.lb.locality") - if present && v.AsString() != "" { + if present && v.AsString() != "" && v.AsString() != "{}" { return nil } }