Skip to content

Commit

Permalink
fix(pubsub): allow updating topic schema fields individually (#7362)
Browse files Browse the repository at this point in the history
Currently, the entirety of `SchemaSettings` must be updated when calling `UpdateTopic`. Instead, allow updating these fields individually (e.g. first/last revision) without needing to specify schema ID or encoding.
  • Loading branch information
hongalex authored Feb 27, 2023
1 parent 6970514 commit f09e059
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 4 deletions.
23 changes: 22 additions & 1 deletion pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,28 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p
}
t.proto.MessageRetentionDuration = req.Topic.MessageRetentionDuration
case "schema_settings":
t.proto.SchemaSettings = req.Topic.SchemaSettings
// Clear this field.
t.proto.SchemaSettings = &pb.SchemaSettings{}
case "schema_settings.schema":
if t.proto.SchemaSettings == nil {
t.proto.SchemaSettings = &pb.SchemaSettings{}
}
t.proto.SchemaSettings.Schema = req.Topic.SchemaSettings.Schema
case "schema_settings.encoding":
if t.proto.SchemaSettings == nil {
t.proto.SchemaSettings = &pb.SchemaSettings{}
}
t.proto.SchemaSettings.Encoding = req.Topic.SchemaSettings.Encoding
case "schema_settings.first_revision_id":
if t.proto.SchemaSettings == nil {
t.proto.SchemaSettings = &pb.SchemaSettings{}
}
t.proto.SchemaSettings.FirstRevisionId = req.Topic.SchemaSettings.FirstRevisionId
case "schema_settings.last_revision_id":
if t.proto.SchemaSettings == nil {
t.proto.SchemaSettings = &pb.SchemaSettings{}
}
t.proto.SchemaSettings.LastRevisionId = req.Topic.SchemaSettings.LastRevisionId
default:
return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
}
Expand Down
24 changes: 23 additions & 1 deletion pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ type TopicConfigToUpdate struct {
RetentionDuration optional.Duration

// Schema defines the schema settings upon topic creation.
//
// Use the zero value &SchemaSettings{} to remove the schema from the topic.
SchemaSettings *SchemaSettings
}

Expand Down Expand Up @@ -407,7 +409,27 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
}
if cfg.SchemaSettings != nil {
pt.SchemaSettings = schemaSettingsToProto(cfg.SchemaSettings)
paths = append(paths, "schema_settings")
clearSchema := true
if pt.SchemaSettings.Schema != "" {
paths = append(paths, "schema_settings.schema")
clearSchema = false
}
if pt.SchemaSettings.Encoding != pb.Encoding_ENCODING_UNSPECIFIED {
paths = append(paths, "schema_settings.encoding")
clearSchema = false
}
if pt.SchemaSettings.FirstRevisionId != "" {
paths = append(paths, "schema_settings.first_revision_id")
clearSchema = false
}
if pt.SchemaSettings.LastRevisionId != "" {
paths = append(paths, "schema_settings.last_revision_id")
clearSchema = false
}
// Clear the schema if none of it's value changes.
if clearSchema {
paths = append(paths, "schema_settings")
}
}
return &pb.UpdateTopicRequest{
Topic: pt,
Expand Down
14 changes: 12 additions & 2 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,18 @@ func TestUpdateTopic_SchemaSettings(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(config2.SchemaSettings, settings) {
t.Errorf("\ngot %+v\nwant %+v", config2, settings)
if !testutil.Equal(config2.SchemaSettings, settings, opt) {
t.Errorf("\ngot %+v\nwant %+v", config2.SchemaSettings, settings)
}

// Clear schema settings.
settings = &SchemaSettings{}
config3, err := topic.Update(ctx, TopicConfigToUpdate{SchemaSettings: settings})
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(config3.SchemaSettings, settings, opt) {
t.Errorf("\ngot %+v\nwant %+v", config3.SchemaSettings, settings)
}
}

Expand Down

0 comments on commit f09e059

Please sign in to comment.