Skip to content

Commit

Permalink
update TopicNameStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyangyu committed May 4, 2022
1 parent be2c11c commit 9baf552
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
10 changes: 6 additions & 4 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,17 @@ func (a *AvroEventBatchEncoder) avroEncode(ctx context.Context, e *model.RowChan
}
}

var fqdn string = e.Table.Schema + "." + e.Table.Table

schemaGen := func() (string, error) {
schema, err := rowToAvroSchema(e.Table.Table, cols, colInfos, enableTiDBExtension, a.decimalHandlingMode)
schema, err := rowToAvroSchema(fqdn, cols, colInfos, enableTiDBExtension, a.decimalHandlingMode)
if err != nil {
return "", errors.Annotate(err, "AvroEventBatchEncoder: generating schema failed")
}
return schema, nil
}

avroCodec, registryID, err := schemaManager.GetCachedOrRegister(ctx, *e.Table, e.TableInfoVersion, schemaGen)
avroCodec, registryID, err := schemaManager.GetCachedOrRegister(ctx, fqdn, e.TableInfoVersion, schemaGen)
if err != nil {
return nil, errors.Annotate(err, "AvroEventBatchEncoder: get-or-register failed")
}
Expand Down Expand Up @@ -243,10 +245,10 @@ type avroLogicalTypeSchema struct {
Scale interface{} `json:"scale,omitempty"`
}

func rowToAvroSchema(name string, columnInfo []*model.Column, colInfos []rowcodec.ColInfo, enableTiDBExtension bool, decimalHandlingMode string) (string, error) {
func rowToAvroSchema(fqdn string, columnInfo []*model.Column, colInfos []rowcodec.ColInfo, enableTiDBExtension bool, decimalHandlingMode string) (string, error) {
top := avroSchemaTop{
Tp: "record",
Name: name,
Name: fqdn,
Fields: nil,
}

Expand Down
37 changes: 16 additions & 21 deletions cdc/sink/codec/avro/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/linkedin/goavro/v2"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/httputil"
"github.com/pingcap/tiflow/pkg/security"
Expand Down Expand Up @@ -71,7 +70,6 @@ type lookupResponse struct {
Schema string `json:"schema"`
}

// NewAvroSchemaManager creates a new AvroSchemaManager
func NewAvroSchemaManager(
ctx context.Context, credential *security.Credential, registryURL string, subjectSuffix string,
) (*AvroSchemaManager, error) {
Expand Down Expand Up @@ -113,21 +111,17 @@ func NewAvroSchemaManager(

var regexRemoveSpaces = regexp.MustCompile(`\s`)

// Register the latest schema for a table to the Registry, by passing in a Codec
// Returns the Schema's ID and err
func (m *AvroSchemaManager) Register(ctx context.Context, tableName model.TableName, codec *goavro.Codec) (int, error) {
func (m *AvroSchemaManager) Register(ctx context.Context, fqdn string, codec *goavro.Codec) (int, error) {
// The Schema Registry expects the JSON to be without newline characters
reqBody := registerRequest{
Schema: regexRemoveSpaces.ReplaceAllString(codec.Schema(), ""),
// Commented out for compatibility with Confluent 5.4.x
// SchemaType: "AVRO",
}
payload, err := json.Marshal(&reqBody)
if err != nil {
return 0, errors.Annotate(
cerror.WrapError(cerror.ErrAvroSchemaAPIError, err), "Could not marshal request to the Registry")
}
uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(tableName)) + "/versions"
uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(fqdn)) + "/versions"
log.Debug("Registering schema", zap.String("uri", uri), zap.ByteString("payload", payload))

req, err := http.NewRequestWithContext(ctx, "POST", uri, bytes.NewReader(payload))
Expand Down Expand Up @@ -180,8 +174,8 @@ func (m *AvroSchemaManager) Register(ctx context.Context, tableName model.TableN
// Calling this method with a tiSchemaID other than that used last time will invariably trigger a RESTful request to the Registry.
// Returns (codec, registry schema ID, error)
// NOT USED for now, reserved for future use.
func (m *AvroSchemaManager) Lookup(ctx context.Context, tableName model.TableName, tiSchemaID uint64) (*goavro.Codec, int, error) {
key := m.tableNameToSchemaSubject(tableName)
func (m *AvroSchemaManager) Lookup(ctx context.Context, fqdn string, tiSchemaID uint64) (*goavro.Codec, int, error) {
key := m.tableNameToSchemaSubject(fqdn)
m.cacheRWLock.RLock()
if entry, exists := m.cache[key]; exists && entry.tiSchemaID == tiSchemaID {
log.Info("Avro schema lookup cache hit",
Expand All @@ -197,7 +191,7 @@ func (m *AvroSchemaManager) Lookup(ctx context.Context, tableName model.TableNam
zap.String("key", key),
zap.Uint64("tiSchemaID", tiSchemaID))

uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(tableName)) + "/versions/latest"
uri := m.registryURL + "/subjects/" + url.QueryEscape(key) + "/versions/latest"
log.Debug("Querying for latest schema", zap.String("uri", uri))

req, err := http.NewRequestWithContext(ctx, "GET", uri, nil)
Expand Down Expand Up @@ -253,7 +247,7 @@ func (m *AvroSchemaManager) Lookup(ctx context.Context, tableName model.TableNam
cacheEntry.tiSchemaID = tiSchemaID

m.cacheRWLock.Lock()
m.cache[m.tableNameToSchemaSubject(tableName)] = cacheEntry
m.cache[key] = cacheEntry
m.cacheRWLock.Unlock()

log.Info("Avro schema lookup successful with cache miss",
Expand All @@ -270,8 +264,8 @@ type SchemaGenerator func() (string, error)

// GetCachedOrRegister checks if the suitable Avro schema has been cached.
// If not, a new schema is generated, registered and cached.
func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName model.TableName, tiSchemaID uint64, schemaGen SchemaGenerator) (*goavro.Codec, int, error) {
key := m.tableNameToSchemaSubject(tableName)
func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, fqdn string, tiSchemaID uint64, schemaGen SchemaGenerator) (*goavro.Codec, int, error) {
key := m.tableNameToSchemaSubject(fqdn)
m.cacheRWLock.RLock()
if entry, exists := m.cache[key]; exists && entry.tiSchemaID == tiSchemaID {
log.Debug("Avro schema GetCachedOrRegister cache hit",
Expand All @@ -298,7 +292,7 @@ func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName m
cerror.WrapError(cerror.ErrAvroSchemaAPIError, err), "GetCachedOrRegister: Could not make goavro codec")
}

id, err := m.Register(ctx, tableName, codec)
id, err := m.Register(ctx, fqdn, codec)
if err != nil {
return nil, 0, errors.Annotate(
cerror.WrapError(cerror.ErrAvroSchemaAPIError, err), "GetCachedOrRegister: Could not register schema")
Expand All @@ -310,7 +304,7 @@ func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName m
cacheEntry.tiSchemaID = tiSchemaID

m.cacheRWLock.Lock()
m.cache[m.tableNameToSchemaSubject(tableName)] = cacheEntry
m.cache[key] = cacheEntry
m.cacheRWLock.Unlock()

log.Info("Avro schema GetCachedOrRegister successful with cache miss",
Expand All @@ -324,8 +318,8 @@ func (m *AvroSchemaManager) GetCachedOrRegister(ctx context.Context, tableName m
// ClearRegistry clears the Registry subject for the given table. Should be idempotent.
// Exported for testing.
// NOT USED for now, reserved for future use.
func (m *AvroSchemaManager) ClearRegistry(ctx context.Context, tableName model.TableName) error {
uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(tableName))
func (m *AvroSchemaManager) ClearRegistry(ctx context.Context, fqdn string) error {
uri := m.registryURL + "/subjects/" + url.QueryEscape(m.tableNameToSchemaSubject(fqdn))
req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil)
if err != nil {
log.Error("Could not construct request for clearRegistry", zap.String("uri", uri))
Expand Down Expand Up @@ -401,7 +395,8 @@ func httpRetry(ctx context.Context, credential *security.Credential, r *http.Req
return resp, nil
}

func (m *AvroSchemaManager) tableNameToSchemaSubject(tableName model.TableName) string {
// We should guarantee unique names for subjects
return tableName.Schema + "_" + tableName.Table + m.subjectSuffix
func (m *AvroSchemaManager) tableNameToSchemaSubject(fqdn string) string {
// obey the RecordNameStrategy but generate a global unique subject
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy
return fqdn + m.subjectSuffix
}

0 comments on commit 9baf552

Please sign in to comment.