Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): support default value controls (#…
Browse files Browse the repository at this point in the history
…8686)

* feat(bigquery/storage/managedwriter): support default value controls

In terms of public surface, this PR adds new options to control how
missing values are interpreted when writing.

For ManagedStream instantiation, the options are:
* WithDefaultMissingValueInterpretation (blanket setting for all columns)
* WithMissingValueInterpretations (per-column settings)

To support updates, these are added as AppendOptions:
* UpdateDefaultMissingValueInterpretation
* UpdateMissingValueInterpretations

Implementation-wise, this PR rips out the previous schema-specific
versioner and expands the concept to a versioned AppendRowsRequest
template.  This more general mechanism allows us to version all
settings that manifest as request fields in the AppendRowsRequest.
  • Loading branch information
shollyman authored Oct 24, 2023
1 parent 3e5ba24 commit dfa8e22
Show file tree
Hide file tree
Showing 14 changed files with 792 additions and 207 deletions.
33 changes: 11 additions & 22 deletions bigquery/storage/managedwriter/appendresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ type pendingWrite struct {
// likely outcome when processing requests and it allows us to be efficient on send.
// We retain the additional information to build the complete request in the related fields.
req *storagepb.AppendRowsRequest
descVersion *descriptorVersion // schema at time of creation
reqTmpl *versionedTemplate // request template at time of creation
traceID string
writeStreamID string

Expand All @@ -188,21 +188,21 @@ type pendingWrite struct {
// to the pending results for later consumption. The provided context is
// embedded in the pending write, as the write may be retried and we want
// to respect the original context for expiry/cancellation etc.
func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, curDescVersion *descriptorVersion, writeStreamID, traceID string) *pendingWrite {
func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, reqTmpl *versionedTemplate, writeStreamID, traceID string) *pendingWrite {
pw := &pendingWrite{
writer: src,
result: newAppendResult(),
reqCtx: ctx,

req: req,
descVersion: curDescVersion,
req: req, // minimal req, typically just row data
reqTmpl: reqTmpl, // remainder of templated request
writeStreamID: writeStreamID,
traceID: traceID,
}
// Compute the approx size for flow control purposes.
pw.reqSize = proto.Size(pw.req) + len(writeStreamID) + len(traceID)
if pw.descVersion != nil {
pw.reqSize += proto.Size(pw.descVersion.descriptorProto)
if pw.reqTmpl != nil {
pw.reqSize += proto.Size(pw.reqTmpl.tmpl)
}
return pw
}
Expand All @@ -221,33 +221,22 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error)
close(pw.result.ready)
// Cleanup references remaining on the write explicitly.
pw.req = nil
pw.descVersion = nil
pw.reqTmpl = nil
pw.writer = nil
pw.reqCtx = nil
}

func (pw *pendingWrite) constructFullRequest(addTrace bool) *storagepb.AppendRowsRequest {
req := &storagepb.AppendRowsRequest{}
if pw.reqTmpl != nil {
req = proto.Clone(pw.reqTmpl.tmpl).(*storagepb.AppendRowsRequest)
}
if pw.req != nil {
req = proto.Clone(pw.req).(*storagepb.AppendRowsRequest)
proto.Merge(req, pw.req)
}
if addTrace {
req.TraceId = buildTraceID(&streamSettings{TraceID: pw.traceID})
}
req.WriteStream = pw.writeStreamID
if pw.descVersion != nil {
ps := &storagepb.ProtoSchema{
ProtoDescriptor: pw.descVersion.descriptorProto,
}
if pr := req.GetProtoRows(); pr != nil {
pr.WriterSchema = ps
} else {
req.Rows = &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: ps,
},
}
}
}
return req
}
15 changes: 8 additions & 7 deletions bigquery/storage/managedwriter/appendresult_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func TestPendingWrite(t *testing.T) {
func TestPendingWrite_ConstructFullRequest(t *testing.T) {

testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")}
testDV := newDescriptorVersion(testDP)
testTmpl := newVersionedTemplate().revise(reviseProtoSchema(testDP))

testEmptyTraceID := buildTraceID(&streamSettings{})

for _, tc := range []struct {
Expand All @@ -144,7 +145,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
{
desc: "nil request",
pw: &pendingWrite{
descVersion: testDV,
reqTmpl: testTmpl,
},
want: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
Expand All @@ -159,8 +160,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
{
desc: "empty req w/trace",
pw: &pendingWrite{
req: &storagepb.AppendRowsRequest{},
descVersion: testDV,
req: &storagepb.AppendRowsRequest{},
reqTmpl: testTmpl,
},
addTrace: true,
want: &storagepb.AppendRowsRequest{
Expand All @@ -177,8 +178,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
{
desc: "basic req",
pw: &pendingWrite{
req: &storagepb.AppendRowsRequest{},
descVersion: testDV,
req: &storagepb.AppendRowsRequest{},
reqTmpl: testTmpl,
},
want: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
Expand All @@ -194,7 +195,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
desc: "everything w/trace",
pw: &pendingWrite{
req: &storagepb.AppendRowsRequest{},
descVersion: testDV,
reqTmpl: testTmpl,
traceID: "foo",
writeStreamID: "streamid",
},
Expand Down
1 change: 1 addition & 0 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
id: newUUID(writerIDPrefix),
c: c,
streamSettings: defaultStreamSettings(),
curTemplate: newVersionedTemplate(),
}
// apply writer options.
for _, opt := range opts {
Expand Down
18 changes: 16 additions & 2 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,22 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
// Additionally, we check multiplex status as schema changes for explicit streams
// require reconnect, whereas multiplex does not.
forceReconnect := false
if pw.writer != nil && pw.descVersion != nil && pw.descVersion.isNewer(pw.writer.curDescVersion) {
pw.writer.curDescVersion = pw.descVersion
promoted := false
if pw.writer != nil && pw.reqTmpl != nil {
if !pw.reqTmpl.Compatible(pw.writer.curTemplate) {
if pw.writer.curTemplate == nil {
// promote because there's no current template
pw.writer.curTemplate = pw.reqTmpl
promoted = true
} else {
if pw.writer.curTemplate.versionTime.Before(pw.reqTmpl.versionTime) {
pw.writer.curTemplate = pw.reqTmpl
promoted = true
}
}
}
}
if promoted {
if co.optimizer == nil {
forceReconnect = true
} else {
Expand Down
95 changes: 94 additions & 1 deletion bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Run("TestLargeInsertWithRetry", func(t *testing.T) {
testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset)
})

t.Run("DefaultValueHandling", func(t *testing.T) {
testDefaultValueHandling(ctx, t, mwClient, bqClient, dataset)
})
})
}

Expand Down Expand Up @@ -1262,6 +1264,97 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
)
}

func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.DefaultValueSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.DefaultValuesPartialSchema{
// We only populate the id, as remaining fields are used to test default values.
Id: proto.String("someval"),
}
var data []byte
var err error
if data, err = proto.Marshal(m); err != nil {
t.Fatalf("failed to marshal test row data")
}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
opts = append(opts, WithSchemaDescriptor(descriptorProto))
ms, err := mwClient.NewManagedStream(ctx, opts...)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

var result *AppendResult

// Send one row, verify default values were set as expected.

result, err = ms.AppendRows(ctx, [][]byte{data})
if err != nil {
t.Errorf("append failed: %v", err)
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}

validateTableConstraints(ctx, t, bqClient, testTable, "after first row",
withExactRowCount(1),
withNonNullCount("id", 1),
withNullCount("strcol_withdef", 1),
withNullCount("intcol_withdef", 1),
withNullCount("otherstr_withdef", 0)) // not part of partial schema

// Change default MVI to use nulls.
// We expect the fields in the partial schema to leverage nulls rather than default values.
// The fields outside the partial schema continue to obey default values.
result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE))
if err != nil {
t.Errorf("append failed: %v", err)
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}

validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi is DEFAULT_VALUE)",
withExactRowCount(2),
withNullCount("strcol_withdef", 1), // doesn't increment, as it gets default value
withNullCount("intcol_withdef", 1)) // doesn't increment, as it gets default value

// Change per-column MVI to use default value
result, err = ms.AppendRows(ctx, [][]byte{data},
UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
"strcol_withdef": storagepb.AppendRowsRequest_NULL_VALUE,
}))
if err != nil {
t.Errorf("append failed: %v", err)
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}

validateTableConstraints(ctx, t, bqClient, testTable, "after third row (explicit column mvi)",
withExactRowCount(3),
withNullCount("strcol_withdef", 2), // increments as it's null for this column
withNullCount("intcol_withdef", 1), // doesn't increment, still default value
withNonNullCount("otherstr_withdef", 3), // not part of descriptor, always gets default value
withNullCount("otherstr", 3), // not part of descriptor, always gets null
withNullCount("strcol", 3), // no default value defined, always gets null
withNullCount("intcol", 3), // no default value defined, always gets null
)
}

func TestIntegration_DetectProjectID(t *testing.T) {
ctx := context.Background()
testCreds := testutil.Credentials(ctx)
Expand Down
17 changes: 12 additions & 5 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ type ManagedStream struct {

streamSettings *streamSettings
// retains the current descriptor for the stream.
curDescVersion *descriptorVersion
c *Client
retry *statelessRetryer
curTemplate *versionedTemplate
c *Client
retry *statelessRetryer

// writer state
mu sync.Mutex
Expand Down Expand Up @@ -298,13 +298,20 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
return nil, err
}
// Ensure we build the request and pending write with a consistent schema version.
curSchemaVersion := ms.curDescVersion
curTemplate := ms.curTemplate
req := ms.buildRequest(data)
pw := newPendingWrite(ctx, ms, req, curSchemaVersion, ms.streamSettings.streamID, ms.streamSettings.TraceID)
pw := newPendingWrite(ctx, ms, req, curTemplate, ms.streamSettings.streamID, ms.streamSettings.TraceID)
// apply AppendOption opts
for _, opt := range opts {
opt(pw)
}
// Post-request fixup after options are applied.
if pw.reqTmpl != nil {
if pw.reqTmpl.tmpl != nil {
// MVIs must be set on each request, but _default_ MVIs persist across the stream lifetime. Sigh.
pw.req.MissingValueInterpretations = pw.reqTmpl.tmpl.GetMissingValueInterpretations()
}
}

// Call the underlying append. The stream has it's own retained context and will surface expiry on
// it's own, but we also need to respect any deadline for the provided context.
Expand Down
18 changes: 9 additions & 9 deletions bigquery/storage/managedwriter/managed_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestManagedStream_RequestOptimization(t *testing.T) {
}
ms.streamSettings.streamID = "FOO"
ms.streamSettings.TraceID = "TRACE"
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))

fakeData := [][]byte{
[]byte("foo"),
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestManagedStream_FlowControllerFailure(t *testing.T) {
router.conn.fc = newFlowController(1, 0)
router.conn.fc.acquire(ctx, 0)

ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))

fakeData := [][]byte{
[]byte("foo"),
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestManagedStream_AppendWithDeadline(t *testing.T) {
t.Errorf("addWriter: %v", err)
}
conn := router.conn
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))

fakeData := [][]byte{
[]byte("foo"),
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) {
ctx: ctx,
streamSettings: defaultStreamSettings(),
}
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter: %v", err)
}
Expand All @@ -316,7 +316,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) {
cancel()

// First, append with an invalid context.
pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curDescVersion, "", "")
pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curTemplate, "", "")
err := ms.appendWithRetry(pw)
if err != context.Canceled {
t.Errorf("expected cancelled context error, got: %v", err)
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) {
ctx: ctx,
streamSettings: defaultStreamSettings(),
}
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter: %v", err)
}
Expand Down Expand Up @@ -509,7 +509,7 @@ func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) {
retry: newStatelessRetryer(),
}
ms.retry.maxAttempts = 4
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter: %v", err)
}
Expand Down Expand Up @@ -575,7 +575,7 @@ func TestManagedWriter_CancellationDuringRetry(t *testing.T) {
streamSettings: defaultStreamSettings(),
retry: newStatelessRetryer(),
}
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter: %v", err)
}
Expand Down Expand Up @@ -624,7 +624,7 @@ func TestManagedStream_Closure(t *testing.T) {
streamSettings: defaultStreamSettings(),
}
ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter A: %v", err)
}
Expand Down
Loading

0 comments on commit dfa8e22

Please sign in to comment.