Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve bigtableio.WriteBatch performance by approx. 20% due to non blocking batch #24364

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 48 additions & 35 deletions sdks/go/pkg/beam/io/bigtableio/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (

func init() {
register.DoFn3x1[context.Context, int, func(*Mutation) bool, error](&writeFn{})
register.DoFn3x1[context.Context, int, func(*Mutation) bool, error](&writeBatchFn{})
register.Iter1[*Mutation]()
register.DoFn2x1[context.Context, Mutation, error](&writeBatchFn{})
}

// Mutation represents a necessary serializable wrapper analogue
Expand Down Expand Up @@ -65,7 +65,7 @@ func (m *Mutation) Set(family, column string, ts bigtable.Timestamp, value []byt
m.Ops = append(m.Ops, Operation{Family: family, Column: column, Ts: ts, Value: value})
}

// WithGroupKey sets a custom group key to be utilised by beam.GroupByKey.
// WithGroupKey sets a custom group key to be utilised by beam.GroupByKey within bigtableio.Write().
func (m *Mutation) WithGroupKey(key string) *Mutation {
m.GroupKey = key
return m
Expand All @@ -87,11 +87,12 @@ func Write(s beam.Scope, project, instanceID, table string, col beam.PCollection
}

// WriteBatch writes the elements of the given PCollection<bigtableio.Mutation>
// to bigtable using bigtable.ApplyBulk().
// to bigtable using bigtable.ApplyBulk(), to be used for batch processing.
// For the underlying bigtable.ApplyBulk function to work properly
// the maximum number of operations per bigtableio.Mutation of the input
// PCollection must not be greater than 100,000. For more information
// see https://cloud.google.com/bigtable/docs/writes#batch for more.
// Note: WriteBatch only runs on a single worker machine.
func WriteBatch(s beam.Scope, project, instanceID, table string, col beam.PCollection) {
t := col.Type().Type()
err := mustBeBigtableioMutation(t)
Expand All @@ -101,9 +102,7 @@ func WriteBatch(s beam.Scope, project, instanceID, table string, col beam.PColle

s = s.Scope("bigtable.WriteBatch")

pre := beam.ParDo(s, addGroupKeyFn, col)
post := beam.GroupByKey(s, pre)
beam.ParDo0(s, &writeBatchFn{Project: project, InstanceID: instanceID, TableName: table, Type: beam.EncodedType{T: t}}, post)
beam.ParDo0(s, &writeBatchFn{Project: project, InstanceID: instanceID, TableName: table, Type: beam.EncodedType{T: t}}, col)
}

func addGroupKeyFn(mutation Mutation) (int, Mutation) {
Expand Down Expand Up @@ -190,6 +189,8 @@ type writeBatchFn struct {
TableName string `json:"tableName"`
// Table is a bigtable.Table instance with an eventual open connection
table *bigtable.Table `json:"-"`
// MutationBatcher is responsible for collecting and writing mutations as batches to bigtable
MutationBatcher MutationBatcher `json:"mutationBatcher"`
// Type is the encoded schema type.
Type beam.EncodedType `json:"type"`
}
Expand All @@ -205,55 +206,67 @@ func (f *writeBatchFn) Setup(ctx context.Context) error {
return nil
}

func (f *writeBatchFn) Teardown() error {
func (f *writeBatchFn) Teardown(ctx context.Context) error {
if err := f.MutationBatcher.flush(ctx, *f.table); err != nil {
return fmt.Errorf("could not flush mutationBatcher on teardown: %v", err)
}

if err := f.client.Close(); err != nil {
return fmt.Errorf("could not close data operations client: %v", err)
}

return nil
}

func (f *writeBatchFn) ProcessElement(ctx context.Context, key int, values func(*Mutation) bool) error {
func (f *writeBatchFn) ProcessElement(ctx context.Context, mutation Mutation) error {
return f.MutationBatcher.mutate(ctx, mutation, *f.table)
}

var rowKeysInBatch []string
var mutationsInBatch []*bigtable.Mutation
type MutationBatcher struct {
OpsInBatch int // opsInBatch is used to make sure that one batch does not include more than 100000 operations/mutations
RowKeysInBatch []string
MutationsInBatch []Mutation
}

// opsAddedToBatch is used to make sure that one batch does not include more than 100000 operations/mutations
opsAddedToBatch := 0
func (b *MutationBatcher) mutate(ctx context.Context, mutation Mutation, table bigtable.Table) error {

var mutation Mutation
for values(&mutation) {
err := validateMutation(mutation)
if err != nil {
return fmt.Errorf("invalid bigtableio.Mutation: %s", err)
}

err := validateMutation(mutation)
opsInMutation := len(mutation.Ops)

if (b.OpsInBatch + opsInMutation) > 100000 {
err = b.flush(ctx, table)
if err != nil {
return fmt.Errorf("invalid bigtableio.Mutation: %s", err)
return fmt.Errorf("failed to flush in mutationBatcher.mutate(): %s", err)
}
}

opsInMutation := len(mutation.Ops)

if (opsAddedToBatch + opsInMutation) > 100000 {
err := tryApplyBulk(f.table.ApplyBulk(ctx, rowKeysInBatch, mutationsInBatch))
if err != nil {
return err
}
b.OpsInBatch += len(mutation.Ops)
b.RowKeysInBatch = append(b.RowKeysInBatch, mutation.RowKey)
b.MutationsInBatch = append(b.MutationsInBatch, mutation)

rowKeysInBatch = nil
mutationsInBatch = nil
opsAddedToBatch = 0
}
return nil
}

rowKeysInBatch = append(rowKeysInBatch, mutation.RowKey)
mutationsInBatch = append(mutationsInBatch, getBigtableMutation(mutation))
opsAddedToBatch += len(mutation.Ops)
func (b *MutationBatcher) flush(ctx context.Context, table bigtable.Table) error {

var bigtableMutations []*bigtable.Mutation
for _, mutation := range b.MutationsInBatch {
bigtableMutations = append(bigtableMutations, getBigtableMutation(mutation))
}

if len(rowKeysInBatch) != 0 && len(mutationsInBatch) != 0 {
err := tryApplyBulk(f.table.ApplyBulk(ctx, rowKeysInBatch, mutationsInBatch))
if err != nil {
return err
}
err := tryApplyBulk(table.ApplyBulk(ctx, b.RowKeysInBatch, bigtableMutations))
if err != nil {
return err
}

b.OpsInBatch = 0
b.RowKeysInBatch = nil
b.MutationsInBatch = nil

return nil
}

Expand Down
42 changes: 39 additions & 3 deletions sdks/go/pkg/beam/io/bigtableio/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ func TestValidateMutationFailsWhenGreaterThanHundredKOps(t *testing.T) {

// Examples:

func ExampleWriteBatch() {
func ExampleWrite() {
pipeline := beam.NewPipeline()
s := pipeline.Root()

//sample PBCollection<bigtableio.Mutation>
// sample PBCollection<bigtableio.Mutation>
bigtableioMutationCol := beam.CreateList(s, func() []Mutation {
columnFamilyName := "stats_summary"
timestamp := bigtable.Now()
Expand All @@ -172,7 +172,7 @@ func ExampleWriteBatch() {
rowKeyA := deviceA + "#a0b81f74#20190501"

// bigtableio.NewMutation(rowKeyA).WithGroupKey(deviceA)
mutA := NewMutation(rowKeyA).WithGroupKey(deviceA) // this groups bundles by device identifiers
mutA := NewMutation(rowKeyA).WithGroupKey(deviceA)
mutA.Set(columnFamilyName, "connected_wifi", timestamp, []byte("1"))
mutA.Set(columnFamilyName, "os_build", timestamp, []byte("12155.0.0-rc1"))

Expand All @@ -190,6 +190,42 @@ func ExampleWriteBatch() {
return muts
}())

// bigtableio.Write(...)
Write(s, "project", "instanceId", "tableName", bigtableioMutationCol)
}

func ExampleWriteBatch() {
pipeline := beam.NewPipeline()
s := pipeline.Root()

// sample PBCollection<bigtableio.Mutation>
bigtableioMutationCol := beam.CreateList(s, func() []Mutation {
columnFamilyName := "stats_summary"
timestamp := bigtable.Now()

// var muts []bigtableio.Mutation
var muts []Mutation

rowKeyA := "tablet#a0b81f74#20190501"

// bigtableio.NewMutation(rowKeyA)
mutA := NewMutation(rowKeyA)
mutA.Set(columnFamilyName, "connected_wifi", timestamp, []byte("1"))
mutA.Set(columnFamilyName, "os_build", timestamp, []byte("12155.0.0-rc1"))

muts = append(muts, *mutA)

rowKeyB := "phone#a0b81f74#20190502"

mutB := NewMutation(rowKeyB)
mutB.Set(columnFamilyName, "connected_wifi", timestamp, []byte("1"))
mutB.Set(columnFamilyName, "os_build", timestamp, []byte("12145.0.0-rc6"))

muts = append(muts, *mutB)

return muts
}())

// bigtableio.WriteBatch(...)
WriteBatch(s, "project", "instanceId", "tableName", bigtableioMutationCol)
}