Skip to content

Commit

Permalink
fix(storage): correct direct connectivity check (googleapis#11152)
Browse files Browse the repository at this point in the history
* fix(storage): correct direct connectivity check

* refactor direct connectivity tests; experimental doc comment

* fix gce check condition

* update dual-region check; and remove duplicate

* use asia-east1 as outside region

* address feedback

* add todo for us-west1 dual-region
  • Loading branch information
frankyn authored Nov 20, 2024
1 parent 771aa46 commit a75c8b0
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 25 deletions.
95 changes: 71 additions & 24 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"google.golang.org/api/iterator"
itesting "google.golang.org/api/iterator/testing"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/api/transport"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -326,8 +327,18 @@ var readCases = []readCase{
},
}

func TestIntegration_DetectDirectConnectivity(t *testing.T) {
ctx := skipHTTP("direct connectivity isn't available for json")
// Test in a GCE environment expected to be located in one of:
// - us-west1-a, us-west1-b, us-west-c
//
// The test skips when ran outside of a GCE instance and us-west-*.
//
// Test cases for direct connectivity (DC) check:
// 1. DC detected with co-located GCS bucket in us-west1
// 2. DC not detected with multi region bucket in EU
// 3. DC not detected with dual region bucket in EUR4
// 4. DC not detected with regional bucket in EUROPE-WEST1
func TestIntegration_DetectDirectConnectivityInGCE(t *testing.T) {
ctx := skipHTTP("grpc only test")
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) {
h := testHelper{t}
// Using Resoource Detector to detect if test is being ran inside GCE
Expand All @@ -340,32 +351,68 @@ func TestIntegration_DetectDirectConnectivity(t *testing.T) {
t.Fatalf("resource.New: %v", err)
}
attrs := detectedAttrs.Set()
if v, exists := attrs.Value("cloud.platform"); exists && v.AsString() == "gcp_compute_engine" {
v, exists = attrs.Value("cloud.region")
if !exists {
t.Fatalf("CheckDirectConnectivitySupported: region not detected")
}
region := v.AsString()
newBucketName := prefix + uidSpace.New()
newBucket := client.Bucket(newBucketName)
h.mustCreate(newBucket, testutil.ProjID(), &BucketAttrs{Location: region, LocationType: "region"})
defer h.mustDeleteBucket(newBucket)
err := CheckDirectConnectivitySupported(ctx, newBucketName)
if err != nil {
t.Fatalf("CheckDirectConnectivitySupported: %v", err)
}
} else {
err = CheckDirectConnectivitySupported(ctx, bucket)
if err == nil {
t.Fatal("CheckDirectConnectivitySupported: expected error but none returned")
}
if err != nil && !strings.Contains(err.Error(), "direct connectivity not detected") {
t.Fatalf("CheckDirectConnectivitySupported: failed on a different error %v", err)
}
if v, exists := attrs.Value("cloud.platform"); !exists || v.AsString() != "gcp_compute_engine" {
t.Skip("only testable in a GCE instance")
}
if v, exists := attrs.Value("cloud.region"); !exists || !strings.Contains(strings.ToLower(v.AsString()), "us-west1") {
t.Skip("inside a GCE instance but region is not us-west1")
}
for _, test := range []struct {
name string
attrs *BucketAttrs
expectDirectConnectivity bool
}{
{
name: "co-located-bucket",
attrs: &BucketAttrs{Location: "us-west1"},
expectDirectConnectivity: true,
},
{
name: "not-colocated-multi-region-bucket",
attrs: &BucketAttrs{Location: "EU"},
},
{
name: "not-colocated-dual-region-bucket",
attrs: &BucketAttrs{Location: "EUR4"},
},
{
name: "not-colocated-region-bucket",
attrs: &BucketAttrs{Location: "EUROPE-WEST1"},
},
// TODO: Add a test for DC dual-region with us-west1 when supported
} {
t.Run(test.name, func(t *testing.T) {
newBucketName := prefix + uidSpace.New()
newBucket := client.Bucket(newBucketName)
h.mustCreate(newBucket, testutil.ProjID(), test.attrs)
defer h.mustDeleteBucket(newBucket)
err := CheckDirectConnectivitySupported(ctx, newBucketName)
if err != nil && test.expectDirectConnectivity {
t.Fatalf("CheckDirectConnectivitySupported: expected direct connectivity but failed: %v", err)
}
if err == nil && !test.expectDirectConnectivity {
t.Fatalf("CheckDirectConnectivitySupported: detected direct connectivity when not expected")
}
})
}
})
}

// Test handles the case when Direct Connectivity is disabled which causes
// "grpc.lb.locality" to return ""
func TestIntegration_DoNotDetectDirectConnectivityWhenDisabled(t *testing.T) {
ctx := skipHTTP("grpc only test")
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) {
err := CheckDirectConnectivitySupported(ctx, bucket)
if err == nil {
t.Fatal("CheckDirectConnectivitySupported: expected error but none returned")
}
if err != nil && !strings.Contains(err.Error(), "direct connectivity not detected") {
t.Fatalf("CheckDirectConnectivitySupported: failed on a different error %v", err)
}
}, internaloption.EnableDirectPath(false))
}

func TestIntegration_BucketCreateDelete(t *testing.T) {
ctx := skipJSONReads(context.Background(), "no reads in test")
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, _ string, prefix string, client *Client) {
Expand Down
4 changes: 3 additions & 1 deletion storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ func NewGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e
// Direct connectivity is expected to be available when running from inside
// GCP and connecting to a bucket in the same region.
//
// Experimental helper that's subject to change.
//
// You can pass in [option.ClientOption] you plan on passing to [NewGRPCClient]
func CheckDirectConnectivitySupported(ctx context.Context, bucket string, opts ...option.ClientOption) error {
view := metric.NewView(
Expand Down Expand Up @@ -282,7 +284,7 @@ func CheckDirectConnectivitySupported(ctx context.Context, bucket string, opts .
hist := m.Data.(metricdata.Histogram[float64])
for _, d := range hist.DataPoints {
v, present := d.Attributes.Value("grpc.lb.locality")
if present && v.AsString() != "" {
if present && v.AsString() != "" && v.AsString() != "{}" {
return nil
}
}
Expand Down

0 comments on commit a75c8b0

Please sign in to comment.