From 9baf55219477985b4d46db3db66b9e9a1087bd29 Mon Sep 17 00:00:00 2001 From: zhangyangyu Date: Wed, 4 May 2022 23:33:58 +0800 Subject: [PATCH] update TopicNameStrategy --- cdc/sink/codec/avro.go | 10 ++++--- cdc/sink/codec/avro/schema_registry.go | 37 +++++++++++--------------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index f5e89e650a8..93b119f5617 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -155,15 +155,17 @@ func (a *AvroEventBatchEncoder) avroEncode(ctx context.Context, e *model.RowChan } } + var fqdn string = e.Table.Schema + "." + e.Table.Table + schemaGen := func() (string, error) { - schema, err := rowToAvroSchema(e.Table.Table, cols, colInfos, enableTiDBExtension, a.decimalHandlingMode) + schema, err := rowToAvroSchema(fqdn, cols, colInfos, enableTiDBExtension, a.decimalHandlingMode) if err != nil { return "", errors.Annotate(err, "AvroEventBatchEncoder: generating schema failed") } return schema, nil } - avroCodec, registryID, err := schemaManager.GetCachedOrRegister(ctx, *e.Table, e.TableInfoVersion, schemaGen) + avroCodec, registryID, err := schemaManager.GetCachedOrRegister(ctx, fqdn, e.TableInfoVersion, schemaGen) if err != nil { return nil, errors.Annotate(err, "AvroEventBatchEncoder: get-or-register failed") } @@ -243,10 +245,10 @@ type avroLogicalTypeSchema struct { Scale interface{} `json:"scale,omitempty"` } -func rowToAvroSchema(name string, columnInfo []*model.Column, colInfos []rowcodec.ColInfo, enableTiDBExtension bool, decimalHandlingMode string) (string, error) { +func rowToAvroSchema(fqdn string, columnInfo []*model.Column, colInfos []rowcodec.ColInfo, enableTiDBExtension bool, decimalHandlingMode string) (string, error) { top := avroSchemaTop{ Tp: "record", - Name: name, + Name: fqdn, Fields: nil, } diff --git a/cdc/sink/codec/avro/schema_registry.go b/cdc/sink/codec/avro/schema_registry.go index 1b9fea60f1d..642944326ec 100644 --- a/cdc/sink/codec/avro/schema_registry.go +++ b/cdc/sink/codec/avro/schema_registry.go @@ -29,7 +29,6 @@ import ( "github.com/linkedin/goavro/v2" "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/httputil" "github.com/pingcap/tiflow/pkg/security" @@ -71,7 +70,6 @@ type lookupResponse struct { Schema string `json:"schema"` } -// NewAvroSchemaManager creates a new AvroSchemaManager func NewAvroSchemaManager( ctx context.Context, credential *security.Credential, registryURL string, subjectSuffix string, ) (*AvroSchemaManager, error) { @@ -113,21 +111,17 @@ func NewAvroSchemaManager( var regexRemoveSpaces = regexp.MustCompile(`\s`) -// Register the latest schema for a table to the Registry, by passing in a Codec -// Returns the Schema's ID and err -func (m *AvroSchemaManager) Register(ctx context.Context, tableName model.TableName, codec *goavro.Codec) (int, error) { +func (m *AvroSchemaManager) Register(ctx context.Context, fqdn string, codec *goavro.Codec) (int, error) { // The Schema Registry expects the JSON to be without newline characters reqBody := registerRequest{ Schema: regexRemoveSpaces.ReplaceAllString(codec.Schema(), ""), - // Commented out for compatibility with Confluent 5.4.x - // SchemaType: "AVRO", } payload, err := json.Marshal(&reqBody) if err != nil { return 0, errors.Annotate( cerror.WrapError(cerror.ErrAvroSchemaAPIError, err), "Could not marshal request to the Registry") } - uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(tableName)) + "/versions" + uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(fqdn)) + "/versions" log.Debug("Registering schema", zap.String("uri", uri), zap.ByteString("payload", payload)) req, err := http.NewRequestWithContext(ctx, "POST", uri, bytes.NewReader(payload)) @@ -180,8 +174,8 @@ func (m *AvroSchemaManager) Register(ctx context.Context, tableName model.TableN // Calling this method with a tiSchemaID other than that used last time will invariably trigger a RESTful request to the Registry. // Returns (codec, registry schema ID, error) // NOT USED for now, reserved for future use. -func (m *AvroSchemaManager) Lookup(ctx context.Context, tableName model.TableName, tiSchemaID uint64) (*goavro.Codec, int, error) { - key := m.tableNameToSchemaSubject(tableName) +func (m *AvroSchemaManager) Lookup(ctx context.Context, fqdn string, tiSchemaID uint64) (*goavro.Codec, int, error) { + key := m.tableNameToSchemaSubject(fqdn) m.cacheRWLock.RLock() if entry, exists := m.cache[key]; exists && entry.tiSchemaID == tiSchemaID { log.Info("Avro schema lookup cache hit", @@ -197,7 +191,7 @@ func (m *AvroSchemaManager) Lookup(ctx context.Context, tableName model.TableNam zap.String("key", key), zap.Uint64("tiSchemaID", tiSchemaID)) - uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(tableName)) + "/versions/latest" + uri := m.registryURL + "/subjects/" + url.QueryEscape(key) + "/versions/latest" log.Debug("Querying for latest schema", zap.String("uri", uri)) req, err := http.NewRequestWithContext(ctx, "GET", uri, nil) @@ -253,7 +247,7 @@ func (m *AvroSchemaManager) Lookup(ctx context.Context, tableName model.TableNam cacheEntry.tiSchemaID = tiSchemaID m.cacheRWLock.Lock() - m.cache[m.tableNameToSchemaSubject(tableName)] = cacheEntry + m.cache[key] = cacheEntry m.cacheRWLock.Unlock() log.Info("Avro schema lookup successful with cache miss", @@ -270,8 +264,8 @@ type SchemaGenerator func() (string, error) // GetCachedOrRegister checks if the suitable Avro schema has been cached. // If not, a new schema is generated, registered and cached. -func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName model.TableName, tiSchemaID uint64, schemaGen SchemaGenerator) (*goavro.Codec, int, error) { - key := m.tableNameToSchemaSubject(tableName) +func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, fqdn string, tiSchemaID uint64, schemaGen SchemaGenerator) (*goavro.Codec, int, error) { + key := m.tableNameToSchemaSubject(fqdn) m.cacheRWLock.RLock() if entry, exists := m.cache[key]; exists && entry.tiSchemaID == tiSchemaID { log.Debug("Avro schema GetCachedOrRegister cache hit", @@ -298,7 +292,7 @@ func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName m cerror.WrapError(cerror.ErrAvroSchemaAPIError, err), "GetCachedOrRegister: Could not make goavro codec") } - id, err := m.Register(ctx, tableName, codec) + id, err := m.Register(ctx, fqdn, codec) if err != nil { return nil, 0, errors.Annotate( cerror.WrapError(cerror.ErrAvroSchemaAPIError, err), "GetCachedOrRegister: Could not register schema") @@ -310,7 +304,7 @@ func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName m cacheEntry.tiSchemaID = tiSchemaID m.cacheRWLock.Lock() - m.cache[m.tableNameToSchemaSubject(tableName)] = cacheEntry + m.cache[key] = cacheEntry m.cacheRWLock.Unlock() log.Info("Avro schema GetCachedOrRegister successful with cache miss", @@ -324,8 +318,8 @@ func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName m // ClearRegistry clears the Registry subject for the given table. Should be idempotent. // Exported for testing. // NOT USED for now, reserved for future use. -func (m *AvroSchemaManager) ClearRegistry(ctx context.Context, tableName model.TableName) error { - uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(tableName)) +func (m *AvroSchemaManager) ClearRegistry(ctx context.Context, fqdn string) error { + uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(fqdn)) req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil) if err != nil { log.Error("Could not construct request for clearRegistry", zap.String("uri", uri)) @@ -401,7 +395,8 @@ func httpRetry(ctx context.Context, credential *security.Credential, r *http.Req return resp, nil } -func (m *AvroSchemaManager) tableNameToSchemaSubject(tableName model.TableName) string { - // We should guarantee unique names for subjects - return tableName.Schema + "_" + tableName.Table + m.subjectSuffix +func (m *AvroSchemaManager) tableNameToSchemaSubject(fqdn string) string { + // obey the RecordNameStrategy but generate a global unique subject + // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy + return fqdn + m.subjectSuffix }