Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/config, sink(ticdc): support output raw change event for mq and cloud storage sink (#11226) #11290

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,18 +354,20 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
SASLOAuthGrantType: c.Sink.KafkaConfig.SASLOAuthGrantType,
SASLOAuthAudience: c.Sink.KafkaConfig.SASLOAuthAudience,
LargeMessageHandle: largeMessageHandle,
OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent,
}
}

if c.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}
}
Expand Down Expand Up @@ -502,18 +504,20 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
SASLOAuthGrantType: cloned.Sink.KafkaConfig.SASLOAuthGrantType,
SASLOAuthAudience: cloned.Sink.KafkaConfig.SASLOAuthAudience,
LargeMessageHandle: largeMessageHandle,
OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent,
}
}

if cloned.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}
}
Expand Down Expand Up @@ -679,18 +683,20 @@ type KafkaConfig struct {
SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"`
SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"`

LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
44 changes: 20 additions & 24 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (r *RedoLog) GetCommitTs() Ts {
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent(sinkScheme string) error {
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string, _ bool) error {
return nil
}

Expand Down Expand Up @@ -377,7 +377,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(sinkScheme string) error {
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string, _ bool) error {
return nil
}

Expand Down Expand Up @@ -794,11 +794,19 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error {
if !t.shouldSplitUpdateEvent(sinkScheme) {
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error {
if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent {
// For MySQL Sink, all update events will be split into insert and delete at the puller side
// according to whether the changefeed is in safemode. We don't split update event here(in sink)
// since there may be OOM issues. For more information, ref https://github.com/tikv/tikv/issues/17062.
//
// For the Kafka and Storage sink, the outputRawChangeEvent parameter is introduced to control
// split behavior. TiCDC only output original change event if outputRawChangeEvent is true.
return nil
}

// Try to split update events for the Kafka and Storage sink if outputRawChangeEvent is false.
// Note it is only for backward compatibility, and we should remove this logic in the future.
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
Expand All @@ -807,21 +815,6 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error {
return nil
}

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
return !sink.IsMySQLCompatibleScheme(sinkScheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
Expand All @@ -831,8 +824,7 @@ func trySplitAndSortUpdateEvent(
split := false
for _, e := range events {
if e == nil {
log.Warn("skip emit nil event",
zap.Any("event", e))
log.Warn("skip emit nil event", zap.Any("event", e))
continue
}

Expand All @@ -842,8 +834,7 @@ func trySplitAndSortUpdateEvent(
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event",
zap.Any("event", e))
log.Warn("skip emit empty row event", zap.Any("event", e))
continue
}

Expand All @@ -869,7 +860,7 @@ func trySplitAndSortUpdateEvent(

// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
Expand Down Expand Up @@ -912,6 +903,11 @@ func SplitUpdateEvent(
// NOTICE: clean up pre cols for insert event.
insertEvent.PreColumns = nil

log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs),
zap.Uint64("commitTs", updateEvent.CommitTs),
zap.Any("preCols", updateEvent.PreColumns),
zap.Any("cols", updateEvent.Columns))

return &deleteEvent, &insertEvent, nil
}

Expand Down
26 changes: 23 additions & 3 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {

events := []*RowChangedEvent{
{
TableInfo: &TableInfo{},
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
Expand Down Expand Up @@ -573,6 +574,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {

events = []*RowChangedEvent{
{
TableInfo: &TableInfo{},
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
Expand Down Expand Up @@ -613,6 +615,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {

events = []*RowChangedEvent{
{
TableInfo: &TableInfo{},
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
Expand All @@ -624,6 +627,12 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {
}

var ukUpdatedEvent = &RowChangedEvent{
TableInfo: &TableInfo{
TableName: TableName{
Schema: "test",
Table: "t1",
},
},
PreColumns: []*Column{
{
Name: "col1",
Expand Down Expand Up @@ -656,21 +665,32 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
Rows: []*RowChangedEvent{ukUpdatedEvent},
}

err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
outputRawChangeEvent := true
notOutputRawChangeEvent := false
err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}
4 changes: 4 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (m *mockSink) WriteEvents(events ...*eventsink.CallbackableEvent[*model.Row
return nil
}

func (m *mockSink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, false
}

func (m *mockSink) GetEvents() []*eventsink.CallbackableEvent[*model.RowChangedEvent] {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChange
return
}

// Scheme returns the sink scheme.
func (s *Sink) Scheme() string {
return sink.BlackHoleScheme
// SchemeOption returns the scheme and the option.
func (s *Sink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, true
}

// Close do nothing.
Expand Down
26 changes: 14 additions & 12 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type eventFragment struct {
// dmlSink is the cloud storage sink.
// It will send the events to cloud storage systems.
type dmlSink struct {
changefeedID model.ChangeFeedID
scheme string
changefeedID model.ChangeFeedID
scheme string
outputRawChangeEvent bool
// last sequence number
lastSeqNum uint64
// encodingWorkers defines a group of workers for encoding events.
Expand Down Expand Up @@ -133,13 +134,14 @@ func NewCloudStorageSink(

wgCtx, wgCancel := context.WithCancel(ctx)
s := &dmlSink{
changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx),
scheme: strings.ToLower(sinkURI.Scheme),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx),
scheme: strings.ToLower(sinkURI.Scheme),
outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
}
s.alive.msgCh = chann.NewDrainableChann[eventFragment]()

Expand Down Expand Up @@ -244,9 +246,9 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single
return nil
}

// Scheme returns the sink scheme.
func (s *dmlSink) Scheme() string {
return s.scheme
// SchemeOption returns the scheme and the option.
func (s *dmlSink) SchemeOption() (string, bool) {
return s.scheme, s.outputRawChangeEvent
}

// Close closes the cloud storage sink.
Expand Down
5 changes: 2 additions & 3 deletions cdc/sinkv2/eventsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated.
// Note that sinkScheme is used to control the split behavior.
TrySplitAndSortUpdateEvent(sinkScheme string) error
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated
TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error
}

// CallbackFunc is the callback function for callbackable event.
Expand Down
4 changes: 2 additions & 2 deletions cdc/sinkv2/eventsink/event_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type EventSink[E TableEvent] interface {
// WriteEvents writes events to the sink.
// This is an asynchronously and thread-safe method.
WriteEvents(events ...*CallbackableEvent[E]) error
// Scheme returns the sink scheme.
Scheme() string
// SchemeOption returns the sink scheme and whether the sink should output raw change event.
SchemeOption() (scheme string, outputRawChangeEvent bool)
// Close closes the sink. Can be called with `WriteEvents` concurrently.
Close()
// The EventSink meets internal errors and has been dead already.
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewKafkaDMLSink(
}

s, err := newSink(ctx, sinkURI, p, topicManager, eventRouter, encoderConfig,
replicaConfig.Sink.EncoderConcurrency, errCh)
replicaConfig.Sink.EncoderConcurrency, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading
Loading