Skip to content

Commit

Permalink
Merge branch 'main' into feature/fs-vector-search
Browse files Browse the repository at this point in the history
  • Loading branch information
bhshkh authored Jul 22, 2024
2 parents b6f5f25 + 6195782 commit cfa13be
Show file tree
Hide file tree
Showing 20 changed files with 262 additions and 92 deletions.
4 changes: 2 additions & 2 deletions .release-please-manifest-individual.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"ai": "0.8.0",
"ai": "0.8.1",
"aiplatform": "1.68.0",
"auth": "0.7.1",
"auth": "0.7.2",
"auth/oauth2adapt": "0.2.3",
"bigquery": "1.61.0",
"bigtable": "1.25.0",
Expand Down
8 changes: 8 additions & 0 deletions ai/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## [0.8.1](https://github.com/googleapis/google-cloud-go/compare/ai/v0.8.0...ai/v0.8.1) (2024-07-19)


### Bug Fixes

* **ai:** Bump google.golang.org/api@v0.187.0 ([8fa9e39](https://github.com/googleapis/google-cloud-go/commit/8fa9e398e512fd8533fd49060371e61b5725a85b))
* **ai:** Bump google.golang.org/grpc@v1.64.1 ([8ecc4e9](https://github.com/googleapis/google-cloud-go/commit/8ecc4e9622e5bbe9b90384d5848ab816027226c5))

## [0.8.0](https://github.com/googleapis/google-cloud-go/compare/ai/v0.7.0...ai/v0.8.0) (2024-07-01)


Expand Down
2 changes: 1 addition & 1 deletion ai/internal/version.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions auth/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [0.7.2](https://github.com/googleapis/google-cloud-go/compare/auth/v0.7.1...auth/v0.7.2) (2024-07-22)


### Bug Fixes

* **auth:** Use default client for universe metadata lookup ([#10551](https://github.com/googleapis/google-cloud-go/issues/10551)) ([d9046fd](https://github.com/googleapis/google-cloud-go/commit/d9046fdd1435d1ce48f374806c1def4cb5ac6cd3)), refs [#10544](https://github.com/googleapis/google-cloud-go/issues/10544)

## [0.7.1](https://github.com/googleapis/google-cloud-go/compare/auth/v0.7.0...auth/v0.7.1) (2024-07-10)


Expand Down
5 changes: 3 additions & 2 deletions auth/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ func (c *ComputeUniverseDomainProvider) GetProperty(ctx context.Context) (string

// httpGetMetadataUniverseDomain is a package var for unit test substitution.
var httpGetMetadataUniverseDomain = func(ctx context.Context) (string, error) {
client := metadata.NewClient(&http.Client{Timeout: time.Second})
return client.GetWithContext(ctx, "universe/universe_domain")
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
return metadata.GetWithContext(ctx, "universe/universe_domain")
}

func getMetadataUniverseDomain(ctx context.Context) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ const DetectProjectID = "*detect-project-id*"
// variables. By setting the environment variable QUERY_PREVIEW_ENABLED to the string
// "TRUE", the client will enable preview features, though behavior may still be
// controlled via the bigquery service as well. Currently, the feature(s) in scope
// include: stateless queries (query execution without corresponding job metadata).
// include: short mode queries (query execution without corresponding job metadata).
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
o := []option.ClientOption{
option.WithScopes(Scope),
Expand Down
4 changes: 2 additions & 2 deletions bigquery/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint6
call = call.FormatOptionsUseInt64Timestamp(true)
setClientHeader(call.Header())
backoff := gax.Backoff{
Initial: 1 * time.Second,
Multiplier: 2,
Initial: 50 * time.Millisecond,
Multiplier: 1.3,
Max: 60 * time.Second,
}
var res *bq.GetQueryResultsResponse
Expand Down
20 changes: 13 additions & 7 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,18 @@ func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, l
}, nil
}

// setupDynamicDescriptors aids testing when not using a supplied proto
func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) {
convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
// setupDynamicDescriptors aids testing when not using a supplied proto.
func setupDynamicDescriptors(ctx context.Context, t *testing.T, c *Client, streamName string) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) {

resp, err := c.GetWriteStream(ctx, &storagepb.GetWriteStreamRequest{
Name: streamName,
View: storagepb.WriteStreamView_FULL,
})
if err != nil {
t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err)
t.Fatalf("couldn't get write stream (%q): %v", streamName, err)
}

descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root")
descriptor, err := adapt.StorageSchemaToProto2Descriptor(resp.GetTableSchema(), "root")
if err != nil {
t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err)
}
Expand Down Expand Up @@ -404,7 +408,8 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

md, descriptorProto := setupDynamicDescriptors(t, testdata.GithubArchiveSchema)
defStreamName := fmt.Sprintf("%s/streams/_default", TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID))
md, descriptorProto := setupDynamicDescriptors(ctx, t, mwClient, defStreamName)

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
Expand Down Expand Up @@ -465,7 +470,8 @@ func testDefaultStreamJSONData(ctx context.Context, t *testing.T, mwClient *Clie
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

md, descriptorProto := setupDynamicDescriptors(t, testdata.ComplexTypeSchema)
defStreamName := fmt.Sprintf("%s/streams/_default", TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID))
md, descriptorProto := setupDynamicDescriptors(ctx, t, mwClient, defStreamName)

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
Expand Down
6 changes: 5 additions & 1 deletion bigquery/storage/managedwriter/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,14 @@ func TestValidation_Values(t *testing.T) {
t.Errorf("%s append failed: %v", tc.description, err)
continue
}
if _, err = result.GetResult(ctx); err != nil {
resp, err := result.FullResponse(ctx)
if err != nil {
t.Errorf("%s append response error: %v", tc.description, err)
continue
}
if status := resp.GetError(); status != nil {
t.Errorf("%s append response embeds an error: %v", tc.description, status)
}
// Validate table.
validateTableConstraints(ctx, t, bqClient, testTable, tc.description, tc.constraints...)
}
Expand Down
8 changes: 5 additions & 3 deletions internal/detect/detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
)

const (
projectIDSentinel = "*detect-project-id*"
// ProjectIDSentinel is the value that users should pass for the project ID
// to enable detection.
ProjectIDSentinel = "*detect-project-id*"
envProjectID = "GOOGLE_CLOUD_PROJECT"
)

Expand All @@ -41,8 +43,8 @@ var (
// 1. GOOGLE_CLOUD_PROJECT envvar
// 2. ADC creds.ProjectID
// 3. A static value if the environment is emulated.
func ProjectID(ctx context.Context, projectID string, emulatorEnvVar string, opts ...option.ClientOption) (string, error) {
if projectID != projectIDSentinel {
func ProjectID(ctx context.Context, projectID, emulatorEnvVar string, opts ...option.ClientOption) (string, error) {
if projectID != ProjectIDSentinel {
return projectID, nil
}
// 1. Try a well known environment variable
Expand Down
6 changes: 3 additions & 3 deletions internal/detect/detect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ func TestIt(t *testing.T) {
},
{
name: "environment project id",
projectID: projectIDSentinel,
projectID: ProjectIDSentinel,
env: map[string]string{envProjectID: "environment-project-id"},
want: "environment-project-id",
},
{
name: "adc project id",
projectID: projectIDSentinel,
projectID: ProjectIDSentinel,
adcProjectID: "adc-project-id",
want: "adc-project-id",
},
{
name: "emulator project id",
projectID: projectIDSentinel,
projectID: ProjectIDSentinel,
env: map[string]string{"EMULATOR_HOST": "something"},
want: "emulated-project",
},
Expand Down
29 changes: 29 additions & 0 deletions logging/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,34 @@ parent.Timestamp marks the end of the request.)
You should observe the child log entries grouped under the parent on the console. The
parent entry will not inherit the severity of its children; you must update the
parent severity yourself.
# Automatic Trace/Span ID Extraction
You can automatically populate the Trace, SpanID, and TraceSampled fields of an Entry object by providing an [http.Request] object
within the Entry's HTTPRequest field:
logging.Entry{
HTTPRequest: &logging.HTTPRequest{
Request: // Reference to your http.Request here
}
}
When Entry with an [http.Request] is logged, its Trace, SpanID, and TraceSampled fields may be automatically populated as follows:
1. If you are instrumenting your application with [OpenTelemetry], more specifically [otelhttp],
the Entry's Trace, SpanID, and TraceSampled will be populated with information from the [http.Request]'s span context.
2. Trace, SpanID, and TraceSampled fields will be populated from information from the http.Request's [W3C Traceparent]
or [X-Cloud-Trace-Context] headers, if those headers exist.
Note that if Trace, SpanID, or TraceSampled are explicitly provided within an Entry object, then those values take precedence over values automatically
extracted values.
[http.Request]: https://pkg.go.dev/net/http#Request
[OpenTelemetry]: https://opentelemetry.io/docs/languages/go/
[otelhttp]: https://pkg.go.dev/go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp
[W3C Traceparent]: https://www.w3.org/TR/trace-context
[X-Cloud-Trace-Context]: https://cloud.google.com/trace/docs/trace-context#legacy-http-header
[OpenTelemetry span context]: https://pkg.go.dev/go.opentelemetry.io/otel/trace#SpanContext
*/
package logging // import "cloud.google.com/go/logging"
2 changes: 1 addition & 1 deletion spanner/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (r *spannerRetryer) Retry(err error) (time.Duration, bool) {
// a minimum of 10ms and maximum of 32s. There is no delay before the retry if
// the error was Session not found or failed inline begin transaction.
func runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound(ctx context.Context, f func(context.Context) error) error {
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal)
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.ResourceExhausted, codes.Internal)
funcWithRetry := func(ctx context.Context) error {
for {
err := f(ctx)
Expand Down
51 changes: 51 additions & 0 deletions spanner/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ func TestRetryInfo(t *testing.T) {
}
}

func TestRetryInfoResourceExhausted(t *testing.T) {
s := status.New(codes.ResourceExhausted, "")
s, err := s.WithDetails(&edpb.RetryInfo{
RetryDelay: durationpb.New(time.Second),
})
if err != nil {
t.Fatalf("Error setting retry details: %v", err)
}
gotDelay, ok := ExtractRetryDelay(toSpannerErrorWithCommitInfo(s.Err(), true))
if !ok || !testEqual(time.Second, gotDelay) {
t.Errorf("<ok, retryDelay> = <%t, %v>, want <true, %v>", ok, gotDelay, time.Second)
}
}

func TestRetryInfoInWrappedError(t *testing.T) {
s := status.New(codes.Aborted, "")
s, err := s.WithDetails(&edpb.RetryInfo{
Expand All @@ -58,6 +72,22 @@ func TestRetryInfoInWrappedError(t *testing.T) {
}
}

func TestRetryInfoInWrappedErrorResourceExhausted(t *testing.T) {
s := status.New(codes.ResourceExhausted, "")
s, err := s.WithDetails(&edpb.RetryInfo{
RetryDelay: durationpb.New(time.Second),
})
if err != nil {
t.Fatalf("Error setting retry details: %v", err)
}
gotDelay, ok := ExtractRetryDelay(
&wrappedTestError{wrapped: toSpannerErrorWithCommitInfo(s.Err(), true), msg: "Error that is wrapping a Spanner error"},
)
if !ok || !testEqual(time.Second, gotDelay) {
t.Errorf("<ok, retryDelay> = <%t, %v>, want <true, %v>", ok, gotDelay, time.Second)
}
}

func TestRetryInfoTransactionOutcomeUnknownError(t *testing.T) {
err := toSpannerErrorWithCommitInfo(context.DeadlineExceeded, true)
if gotDelay, ok := ExtractRetryDelay(err); ok {
Expand Down Expand Up @@ -89,3 +119,24 @@ func TestRetryerRespectsServerDelay(t *testing.T) {
t.Fatalf("Retry delay mismatch:\ngot: %v\nwant: %v", maxSeenDelay, serverDelay)
}
}

func TestRetryerRespectsServerDelayResourceExhausted(t *testing.T) {
t.Parallel()
serverDelay := 50 * time.Millisecond
s := status.New(codes.ResourceExhausted, "transaction was aborted")
s, err := s.WithDetails(&edpb.RetryInfo{
RetryDelay: durationpb.New(serverDelay),
})
if err != nil {
t.Fatalf("Error setting retry details: %v", err)
}
retryer := onCodes(gax.Backoff{}, codes.ResourceExhausted)
err = toSpannerErrorWithCommitInfo(s.Err(), true)
maxSeenDelay, shouldRetry := retryer.Retry(err)
if !shouldRetry {
t.Fatalf("expected shouldRetry to be true")
}
if maxSeenDelay != serverDelay {
t.Fatalf("Retry delay mismatch:\ngot: %v\nwant: %v", maxSeenDelay, serverDelay)
}
}
9 changes: 7 additions & 2 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,11 @@ func (p *sessionPool) incNumInUseLocked(ctx context.Context) {

func (p *sessionPool) decNumInUseLocked(ctx context.Context) {
p.numInUse--
if int64(p.numInUse) < 0 {
// print whole call stack trace
logf(p.sc.logger, "Number of sessions in use is negative, resetting it to currSessionsCheckedOutLocked. Stack trace: %s", string(debug.Stack()))
p.numInUse = p.currSessionsCheckedOutLocked()
}
p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions)
p.recordStat(ctx, ReleasedSessionsCount, 1)
if p.otConfig != nil {
Expand Down Expand Up @@ -1459,12 +1464,12 @@ func (hc *healthChecker) healthCheck(s *session) {
defer hc.markDone(s)
if !s.pool.isValid() {
// Session pool is closed, perform a garbage collection.
s.destroy(false, true)
s.destroy(false, false)
return
}
if err := s.ping(); isSessionNotFoundError(err) {
// Ping failed, destroy the session.
s.destroy(false, true)
s.destroy(false, false)
}
}

Expand Down
7 changes: 5 additions & 2 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,9 +1101,11 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
wantCRC uint32
checkCRC bool
)
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil {
if params.offset == 0 && params.length < 0 {
checkCRC = true
}
wantCRC = checksums.GetCrc32C()
checkCRC = true
}

r = &Reader{
Expand All @@ -1115,6 +1117,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
LastModified: obj.GetUpdateTime().AsTime(),
Metageneration: obj.GetMetageneration(),
Generation: obj.GetGeneration(),
CRC32C: wantCRC,
},
reader: &gRPCReader{
stream: res.stream,
Expand Down
28 changes: 16 additions & 12 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,18 +1415,20 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
}
} else {
size = res.ContentLength
// Check the CRC iff all of the following hold:
// - We asked for content (length != 0).
// - We got all the content (status != PartialContent).
// - The server sent a CRC header.
// - The Go http stack did not uncompress the file.
// - We were not served compressed data that was uncompressed on download.
// The problem with the last two cases is that the CRC will not match -- GCS
// computes it on the compressed contents, but we compute it on the
// uncompressed contents.
if params.length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
crc, checkCRC = parseCRC32c(res)
}
}

// Check the CRC iff all of the following hold:
// - We asked for content (length != 0).
// - We got all the content (status != PartialContent).
// - The server sent a CRC header.
// - The Go http stack did not uncompress the file.
// - We were not served compressed data that was uncompressed on download.
// The problem with the last two cases is that the CRC will not match -- GCS
// computes it on the compressed contents, but we compute it on the
// uncompressed contents.
crc, checkCRC = parseCRC32c(res)
if params.length == 0 || res.StatusCode == http.StatusPartialContent || res.Uncompressed || uncompressedByServer(res) {
checkCRC = false
}

remain := res.ContentLength
Expand Down Expand Up @@ -1463,6 +1465,8 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
StartOffset: startOffset,
Generation: params.gen,
Metageneration: metaGen,
CRC32C: crc,
Decompressed: res.Uncompressed || uncompressedByServer(res),
}
return &Reader{
Attrs: attrs,
Expand Down
Loading

0 comments on commit cfa13be

Please sign in to comment.