diff --git a/sdks/go/pkg/beam/io/bigtableio/bigtable.go b/sdks/go/pkg/beam/io/bigtableio/bigtable.go index df7a6d98cd41..a309576e6df6 100644 --- a/sdks/go/pkg/beam/io/bigtableio/bigtable.go +++ b/sdks/go/pkg/beam/io/bigtableio/bigtable.go @@ -30,8 +30,8 @@ import ( func init() { register.DoFn3x1[context.Context, int, func(*Mutation) bool, error](&writeFn{}) - register.DoFn3x1[context.Context, int, func(*Mutation) bool, error](&writeBatchFn{}) register.Iter1[*Mutation]() + register.DoFn2x1[context.Context, Mutation, error](&writeBatchFn{}) } // Mutation represents a necessary serializable wrapper analogue @@ -65,7 +65,7 @@ func (m *Mutation) Set(family, column string, ts bigtable.Timestamp, value []byt m.Ops = append(m.Ops, Operation{Family: family, Column: column, Ts: ts, Value: value}) } -// WithGroupKey sets a custom group key to be utilised by beam.GroupByKey. +// WithGroupKey sets a custom group key to be utilised by beam.GroupByKey within bigtableio.Write(). func (m *Mutation) WithGroupKey(key string) *Mutation { m.GroupKey = key return m @@ -87,11 +87,12 @@ func Write(s beam.Scope, project, instanceID, table string, col beam.PCollection } // WriteBatch writes the elements of the given PCollection -// to bigtable using bigtable.ApplyBulk(). +// to bigtable using bigtable.ApplyBulk(), to be used for batch processing. // For the underlying bigtable.ApplyBulk function to work properly // the maximum number of operations per bigtableio.Mutation of the input // PCollection must not be greater than 100,000. For more information // see https://cloud.google.com/bigtable/docs/writes#batch for more. +// Note: WriteBatch only runs on a single worker machine. func WriteBatch(s beam.Scope, project, instanceID, table string, col beam.PCollection) { t := col.Type().Type() err := mustBeBigtableioMutation(t) @@ -101,9 +102,7 @@ func WriteBatch(s beam.Scope, project, instanceID, table string, col beam.PColle s = s.Scope("bigtable.WriteBatch") - pre := beam.ParDo(s, addGroupKeyFn, col) - post := beam.GroupByKey(s, pre) - beam.ParDo0(s, &writeBatchFn{Project: project, InstanceID: instanceID, TableName: table, Type: beam.EncodedType{T: t}}, post) + beam.ParDo0(s, &writeBatchFn{Project: project, InstanceID: instanceID, TableName: table, Type: beam.EncodedType{T: t}}, col) } func addGroupKeyFn(mutation Mutation) (int, Mutation) { @@ -190,6 +189,8 @@ type writeBatchFn struct { TableName string `json:"tableName"` // Table is a bigtable.Table instance with an eventual open connection table *bigtable.Table `json:"-"` + // MutationBatcher is responsible for collecting and writing mutations as batches to bigtable + MutationBatcher MutationBatcher `json:"mutationBatcher"` // Type is the encoded schema type. Type beam.EncodedType `json:"type"` } @@ -205,55 +206,67 @@ func (f *writeBatchFn) Setup(ctx context.Context) error { return nil } -func (f *writeBatchFn) Teardown() error { +func (f *writeBatchFn) Teardown(ctx context.Context) error { + if err := f.MutationBatcher.flush(ctx, *f.table); err != nil { + return fmt.Errorf("could not flush mutationBatcher on teardown: %v", err) + } + if err := f.client.Close(); err != nil { return fmt.Errorf("could not close data operations client: %v", err) } + return nil } -func (f *writeBatchFn) ProcessElement(ctx context.Context, key int, values func(*Mutation) bool) error { +func (f *writeBatchFn) ProcessElement(ctx context.Context, mutation Mutation) error { + return f.MutationBatcher.mutate(ctx, mutation, *f.table) +} - var rowKeysInBatch []string - var mutationsInBatch []*bigtable.Mutation +type MutationBatcher struct { + OpsInBatch int // opsInBatch is used to make sure that one batch does not include more than 100000 operations/mutations + RowKeysInBatch []string + MutationsInBatch []Mutation +} - // opsAddedToBatch is used to make sure that one batch does not include more than 100000 operations/mutations - opsAddedToBatch := 0 +func (b *MutationBatcher) mutate(ctx context.Context, mutation Mutation, table bigtable.Table) error { - var mutation Mutation - for values(&mutation) { + err := validateMutation(mutation) + if err != nil { + return fmt.Errorf("invalid bigtableio.Mutation: %s", err) + } - err := validateMutation(mutation) + opsInMutation := len(mutation.Ops) + + if (b.OpsInBatch + opsInMutation) > 100000 { + err = b.flush(ctx, table) if err != nil { - return fmt.Errorf("invalid bigtableio.Mutation: %s", err) + return fmt.Errorf("failed to flush in mutationBatcher.mutate(): %s", err) } + } - opsInMutation := len(mutation.Ops) - - if (opsAddedToBatch + opsInMutation) > 100000 { - err := tryApplyBulk(f.table.ApplyBulk(ctx, rowKeysInBatch, mutationsInBatch)) - if err != nil { - return err - } + b.OpsInBatch += len(mutation.Ops) + b.RowKeysInBatch = append(b.RowKeysInBatch, mutation.RowKey) + b.MutationsInBatch = append(b.MutationsInBatch, mutation) - rowKeysInBatch = nil - mutationsInBatch = nil - opsAddedToBatch = 0 - } + return nil +} - rowKeysInBatch = append(rowKeysInBatch, mutation.RowKey) - mutationsInBatch = append(mutationsInBatch, getBigtableMutation(mutation)) - opsAddedToBatch += len(mutation.Ops) +func (b *MutationBatcher) flush(ctx context.Context, table bigtable.Table) error { + var bigtableMutations []*bigtable.Mutation + for _, mutation := range b.MutationsInBatch { + bigtableMutations = append(bigtableMutations, getBigtableMutation(mutation)) } - if len(rowKeysInBatch) != 0 && len(mutationsInBatch) != 0 { - err := tryApplyBulk(f.table.ApplyBulk(ctx, rowKeysInBatch, mutationsInBatch)) - if err != nil { - return err - } + err := tryApplyBulk(table.ApplyBulk(ctx, b.RowKeysInBatch, bigtableMutations)) + if err != nil { + return err } + b.OpsInBatch = 0 + b.RowKeysInBatch = nil + b.MutationsInBatch = nil + return nil } diff --git a/sdks/go/pkg/beam/io/bigtableio/bigtable_test.go b/sdks/go/pkg/beam/io/bigtableio/bigtable_test.go index 1294a28bf7b6..73cb37368da8 100644 --- a/sdks/go/pkg/beam/io/bigtableio/bigtable_test.go +++ b/sdks/go/pkg/beam/io/bigtableio/bigtable_test.go @@ -156,11 +156,11 @@ func TestValidateMutationFailsWhenGreaterThanHundredKOps(t *testing.T) { // Examples: -func ExampleWriteBatch() { +func ExampleWrite() { pipeline := beam.NewPipeline() s := pipeline.Root() - //sample PBCollection + // sample PBCollection bigtableioMutationCol := beam.CreateList(s, func() []Mutation { columnFamilyName := "stats_summary" timestamp := bigtable.Now() @@ -172,7 +172,7 @@ func ExampleWriteBatch() { rowKeyA := deviceA + "#a0b81f74#20190501" // bigtableio.NewMutation(rowKeyA).WithGroupKey(deviceA) - mutA := NewMutation(rowKeyA).WithGroupKey(deviceA) // this groups bundles by device identifiers + mutA := NewMutation(rowKeyA).WithGroupKey(deviceA) mutA.Set(columnFamilyName, "connected_wifi", timestamp, []byte("1")) mutA.Set(columnFamilyName, "os_build", timestamp, []byte("12155.0.0-rc1")) @@ -190,6 +190,42 @@ func ExampleWriteBatch() { return muts }()) + // bigtableio.Write(...) + Write(s, "project", "instanceId", "tableName", bigtableioMutationCol) +} + +func ExampleWriteBatch() { + pipeline := beam.NewPipeline() + s := pipeline.Root() + + // sample PBCollection + bigtableioMutationCol := beam.CreateList(s, func() []Mutation { + columnFamilyName := "stats_summary" + timestamp := bigtable.Now() + + // var muts []bigtableio.Mutation + var muts []Mutation + + rowKeyA := "tablet#a0b81f74#20190501" + + // bigtableio.NewMutation(rowKeyA) + mutA := NewMutation(rowKeyA) + mutA.Set(columnFamilyName, "connected_wifi", timestamp, []byte("1")) + mutA.Set(columnFamilyName, "os_build", timestamp, []byte("12155.0.0-rc1")) + + muts = append(muts, *mutA) + + rowKeyB := "phone#a0b81f74#20190502" + + mutB := NewMutation(rowKeyB) + mutB.Set(columnFamilyName, "connected_wifi", timestamp, []byte("1")) + mutB.Set(columnFamilyName, "os_build", timestamp, []byte("12145.0.0-rc6")) + + muts = append(muts, *mutB) + + return muts + }()) + // bigtableio.WriteBatch(...) WriteBatch(s, "project", "instanceId", "tableName", bigtableioMutationCol) }