From ec0cbb20ba42b7ef03688a06dc0a380e9b27e394 Mon Sep 17 00:00:00 2001 From: ron-gal <125445217+ron-gal@users.noreply.github.com> Date: Fri, 23 Aug 2024 18:58:28 -0400 Subject: [PATCH] feat(bigtable): Add UpdateFamily to allow updating a family type (#10759) * feat(bigtable): Add UpdateFamily to allow updating a family type * fix API diff * fix bug * fix field mask * add integration test * fix vet * fix vet * fix vet --- bigtable/admin.go | 71 +++++++++++++++++++++++++----------- bigtable/admin_test.go | 2 +- bigtable/bttest/inmem.go | 9 ++++- bigtable/integration_test.go | 14 ++++++- 4 files changed, 71 insertions(+), 25 deletions(-) diff --git a/bigtable/admin.go b/bigtable/admin.go index 8dd9f929056f..69f57f0efa7b 100644 --- a/bigtable/admin.go +++ b/bigtable/admin.go @@ -667,41 +667,82 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, return ti, nil } -type gcPolicySettings struct { +type updateFamilyOption struct { ignoreWarnings bool } -// GCPolicyOption is the interface to change GC policy settings +// GCPolicyOption is deprecated, kept for backwards compatibility, use UpdateFamilyOption in new code type GCPolicyOption interface { - apply(s *gcPolicySettings) + apply(s *updateFamilyOption) } +// UpdateFamilyOption is the interface to update family settings +type UpdateFamilyOption GCPolicyOption + type ignoreWarnings bool -func (w ignoreWarnings) apply(s *gcPolicySettings) { +func (w ignoreWarnings) apply(s *updateFamilyOption) { s.ignoreWarnings = bool(w) } -// IgnoreWarnings returns a gcPolicyOption that ignores safety checks when modifying the column families +// IgnoreWarnings returns a updateFamilyOption that ignores safety checks when modifying the column families func IgnoreWarnings() GCPolicyOption { return ignoreWarnings(true) } -func (ac *AdminClient) setGCPolicy(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error { +// SetGCPolicy specifies which cells in a column family should be garbage collected. +// GC executes opportunistically in the background; table reads may return data +// matching the GC policy. +func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error { + return ac.UpdateFamily(ctx, table, family, Family{GCPolicy: policy}) +} + +// SetGCPolicyWithOptions is similar to SetGCPolicy but allows passing options +func (ac *AdminClient) SetGCPolicyWithOptions(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error { + familyOpts := []UpdateFamilyOption{} + for _, opt := range opts { + if opt != nil { + familyOpts = append(familyOpts, opt.(UpdateFamilyOption)) + } + } + return ac.UpdateFamily(ctx, table, family, Family{GCPolicy: policy}, familyOpts...) +} + +// UpdateFamily updates column families' garbage colleciton policies and value type. +func (ac *AdminClient) UpdateFamily(ctx context.Context, table, familyName string, family Family, opts ...UpdateFamilyOption) error { ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() - s := gcPolicySettings{} + s := updateFamilyOption{} for _, opt := range opts { if opt != nil { opt.apply(&s) } } + + cf := &btapb.ColumnFamily{} + mask := &field_mask.FieldMask{} + if family.GCPolicy != nil { + cf.GcRule = family.GCPolicy.proto() + mask.Paths = append(mask.Paths, "gc_rule") + + } + if family.ValueType != nil { + cf.ValueType = family.ValueType.proto() + mask.Paths = append(mask.Paths, "value_type") + } + + // No update + if len(mask.Paths) == 0 { + return nil + } + req := &btapb.ModifyColumnFamiliesRequest{ Name: prefix + "/tables/" + table, Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ - Id: family, - Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}}, + Id: familyName, + Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: cf}, + UpdateMask: mask, }}, IgnoreWarnings: s.ignoreWarnings, } @@ -709,18 +750,6 @@ func (ac *AdminClient) setGCPolicy(ctx context.Context, table, family string, po return err } -// SetGCPolicy specifies which cells in a column family should be garbage collected. -// GC executes opportunistically in the background; table reads may return data -// matching the GC policy. -func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error { - return ac.SetGCPolicyWithOptions(ctx, table, family, policy) -} - -// SetGCPolicyWithOptions is similar to SetGCPolicy but allows passing options -func (ac *AdminClient) SetGCPolicyWithOptions(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error { - return ac.setGCPolicy(ctx, table, family, policy, opts...) -} - // DropRowRange permanently deletes a row range from the specified table. func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error { ctx = mergeOutgoingMetadata(ctx, ac.md) diff --git a/bigtable/admin_test.go b/bigtable/admin_test.go index 8cbaaeae1f3d..445aa447fbed 100644 --- a/bigtable/admin_test.go +++ b/bigtable/admin_test.go @@ -368,7 +368,7 @@ func TestTableAdmin_UpdateTableDisableChangeStream(t *testing.T) { func TestTableAdmin_SetGcPolicy(t *testing.T) { for _, test := range []struct { desc string - opts GCPolicyOption + opts UpdateFamilyOption want bool }{ { diff --git a/bigtable/bttest/inmem.go b/bigtable/bttest/inmem.go index c53bb2b4dc00..b2da28599ebf 100644 --- a/bigtable/bttest/inmem.go +++ b/bigtable/bttest/inmem.go @@ -333,10 +333,15 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu return true }) } else if modify := mod.GetUpdate(); modify != nil { - if _, ok := tbl.families[mod.Id]; !ok { + newcf := newColumnFamily(req.Name+"/columnFamilies/"+mod.Id, 0, modify) + cf, ok := tbl.families[mod.Id] + if !ok { return nil, fmt.Errorf("no such family %q", mod.Id) } - newcf := newColumnFamily(req.Name+"/columnFamilies/"+mod.Id, 0, modify) + if cf.valueType != newcf.valueType { + return nil, status.Errorf(codes.InvalidArgument, "Immutable fields 'value_type' cannot be updated") + } + // assume that we ALWAYS want to replace by the new setting // we may need partial update through tbl.families[mod.Id] = newcf diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 240e5bc54222..2e5ead76904a 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -283,7 +283,7 @@ func TestIntegration_ReadRowList(t *testing.T) { func TestIntegration_Aggregates(t *testing.T) { ctx := context.Background() - _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) + _, _, ac, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -346,6 +346,18 @@ func TestIntegration_Aggregates(t *testing.T) { if !testutil.Equal(row, wantRow) { t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) } + + err = ac.UpdateFamily(ctx, tableName, family, Family{ValueType: AggregateType{ + Input: Int64Type{}, + Aggregator: MinAggregator{}, + }}) + if err == nil { + t.Fatalf("Expected UpdateFamily to fail, but it didn't") + } + wantError := "Immutable fields 'value_type' cannot be updated" + if !strings.Contains(err.Error(), wantError) { + t.Errorf("Wrong error. Expected to containt %q but was %v", wantError, err) + } } func TestIntegration_ReadRowListReverse(t *testing.T) {