Skip to content

Commit

Permalink
Merge branch 'master' into cli-support-cdc-cluster-id
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored May 18, 2022
2 parents 53256a2 + af849ad commit c7332b8
Show file tree
Hide file tree
Showing 39 changed files with 109 additions and 82 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/craft"
"github.com/pingcap/tiflow/cdc/sink/mq/codec/craft"
"github.com/pingcap/tiflow/proto/benchmark"
"github.com/stretchr/testify/require"
)
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion cdc/sink/codec/craft.go → cdc/sink/mq/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/craft"
"github.com/pingcap/tiflow/cdc/sink/mq/codec/craft"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
32 changes: 32 additions & 0 deletions cdc/sink/mq/codec/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import "github.com/pingcap/tiflow/cdc/model"

// EventBatchDecoder is an abstraction for events decoder
// this interface is only for testing now
type EventBatchDecoder interface {
// HasNext returns
// 1. the type of the next event
// 2. a bool if the next event is exist
// 3. error
HasNext() (model.MqMessageType, bool, error)
// NextResolvedEvent returns the next resolved event if exists
NextResolvedEvent() (uint64, error)
// NextRowChangedEvent returns the next row changed event if exists
NextRowChangedEvent() (*model.RowChangedEvent, error)
// NextDDLEvent returns the next DDL event if exists
NextDDLEvent() (*model.DDLEvent, error)
}
63 changes: 63 additions & 0 deletions cdc/sink/mq/codec/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import (
"context"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

// EventBatchEncoder is an abstraction for events encoder
type EventBatchEncoder interface {
// EncodeCheckpointEvent appends a checkpoint event into the batch.
// This event will be broadcast to all partitions to signal a global checkpoint.
EncodeCheckpointEvent(ts uint64) (*MQMessage, error)
// AppendRowChangedEvent appends the calling context, a row changed event and the dispatch
// topic into the batch
AppendRowChangedEvent(context.Context, string, *model.RowChangedEvent) error
// EncodeDDLEvent appends a DDL event into the batch
EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error)
// Build builds the batch and returns the bytes of key and value.
Build() []*MQMessage
// Size returns the size of the batch(bytes)
Size() int
}

// EncoderBuilder builds encoder with context.
type EncoderBuilder interface {
Build() EventBatchEncoder
}

// NewEventBatchEncoderBuilder returns an EncoderBuilder
func NewEventBatchEncoderBuilder(ctx context.Context, c *Config) (EncoderBuilder, error) {
switch c.protocol {
case config.ProtocolDefault, config.ProtocolOpen:
return newJSONEventBatchEncoderBuilder(c), nil
case config.ProtocolCanal:
return newCanalEventBatchEncoderBuilder(), nil
case config.ProtocolAvro:
return newAvroEventBatchEncoderBuilder(ctx, c)
case config.ProtocolMaxwell:
return newMaxwellEventBatchEncoderBuilder(), nil
case config.ProtocolCanalJSON:
return newCanalFlatEventBatchEncoderBuilder(c), nil
case config.ProtocolCraft:
return newCraftEventBatchEncoderBuilder(c), nil
default:
return nil, cerror.ErrMQSinkUnknownProtocol.GenWithStackByArgs(c.protocol)
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
70 changes: 1 addition & 69 deletions cdc/sink/codec/interface.go → cdc/sink/mq/codec/message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,32 +14,14 @@
package codec

import (
"context"
"encoding/binary"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/tikv/client-go/v2/oracle"
)

// EventBatchEncoder is an abstraction for events encoder
type EventBatchEncoder interface {
// EncodeCheckpointEvent appends a checkpoint event into the batch.
// This event will be broadcast to all partitions to signal a global checkpoint.
EncodeCheckpointEvent(ts uint64) (*MQMessage, error)
// AppendRowChangedEvent appends the calling context, a row changed event and the dispatch
// topic into the batch
AppendRowChangedEvent(context.Context, string, *model.RowChangedEvent) error
// EncodeDDLEvent appends a DDL event into the batch
EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error)
// Build builds the batch and returns the bytes of key and value.
Build() []*MQMessage
// Size returns the size of the batch(bytes)
Size() int
}

// MQMessage represents an MQ message to the mqSink
type MQMessage struct {
Key []byte
Expand Down Expand Up @@ -133,53 +115,3 @@ func NewMQMessage(

return ret
}

// EventBatchDecoder is an abstraction for events decoder
// this interface is only for testing now
type EventBatchDecoder interface {
// HasNext returns
// 1. the type of the next event
// 2. a bool if the next event is exist
// 3. error
HasNext() (model.MqMessageType, bool, error)
// NextResolvedEvent returns the next resolved event if exists
NextResolvedEvent() (uint64, error)
// NextRowChangedEvent returns the next row changed event if exists
NextRowChangedEvent() (*model.RowChangedEvent, error)
// NextDDLEvent returns the next DDL event if exists
NextDDLEvent() (*model.DDLEvent, error)
}

// EncoderResult indicates an action request by the encoder to the mqSink
type EncoderResult uint8

// Enum types of EncoderResult
const (
EncoderNeedAsyncWrite EncoderResult = iota
EncoderNeedSyncWrite
)

// EncoderBuilder builds encoder with context.
type EncoderBuilder interface {
Build() EventBatchEncoder
}

// NewEventBatchEncoderBuilder returns an EncoderBuilder
func NewEventBatchEncoderBuilder(ctx context.Context, c *Config) (EncoderBuilder, error) {
switch c.protocol {
case config.ProtocolDefault, config.ProtocolOpen:
return newJSONEventBatchEncoderBuilder(c), nil
case config.ProtocolCanal:
return newCanalEventBatchEncoderBuilder(), nil
case config.ProtocolAvro:
return newAvroEventBatchEncoderBuilder(ctx, c)
case config.ProtocolMaxwell:
return newMaxwellEventBatchEncoderBuilder(), nil
case config.ProtocolCanalJSON:
return newCanalFlatEventBatchEncoderBuilder(c), nil
case config.ProtocolCraft:
return newCraftEventBatchEncoderBuilder(c), nil
default:
return nil, cerror.ErrMQSinkUnknownProtocol.GenWithStackByArgs(c.protocol)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 PingCAP, Inc.
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/mq/manager"
kafkamanager "github.com/pingcap/tiflow/cdc/sink/mq/manager/kafka"
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/producer"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq_flush_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
"github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
)
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
kafkap "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/Shopify/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/kafka"
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/kafka"
"github.com/pingcap/tiflow/pkg/util"
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
"github.com/pingcap/tiflow/pkg/kafka"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/producer/mq_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package producer
import (
"context"

"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
)

// Producer is an interface of mq producer
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/producer/pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/codec"
"github.com/pingcap/tiflow/cdc/sink/mq/dispatcher"
cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/pingcap/tiflow/pkg/config"
Expand Down

0 comments on commit c7332b8

Please sign in to comment.