Skip to content

Commit

Permalink
config(ticdc): add integrity configs to control row level checksum fu…
Browse files Browse the repository at this point in the history
…nctionality (#8778)

close #8776
  • Loading branch information
3AceShowHand authored Apr 11, 2023
1 parent eb89d8c commit ab752f4
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 2 deletions.
22 changes: 22 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type ReplicaConfig struct {
Sink *SinkConfig `json:"sink"`
Consistent *ConsistentConfig `json:"consistent"`
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
Integrity *IntegrityConfig `json:"integrity"`
}

// ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig
Expand Down Expand Up @@ -294,6 +295,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
WriteKeyThreshold: c.Scheduler.WriteKeyThreshold,
}
}
if c.Integrity != nil {
res.Integrity = &config.IntegrityConfig{
IntegrityCheckLevel: c.Integrity.IntegrityCheckLevel,
CorruptionHandleLevel: c.Integrity.CorruptionHandleLevel,
}
}
return res
}

Expand Down Expand Up @@ -414,6 +421,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
WriteKeyThreshold: cloned.Scheduler.WriteKeyThreshold,
}
}

if cloned.Integrity != nil {
res.Integrity = &IntegrityConfig{
IntegrityCheckLevel: cloned.Integrity.IntegrityCheckLevel,
CorruptionHandleLevel: cloned.Integrity.CorruptionHandleLevel,
}
}

return res
}

Expand Down Expand Up @@ -575,6 +590,13 @@ type ChangefeedSchedulerConfig struct {
WriteKeyThreshold int `toml:"write_key_threshold" json:"write_key_threshold"`
}

// IntegrityConfig is the config for integrity check
// This is a duplicate of config.IntegrityConfig
type IntegrityConfig struct {
IntegrityCheckLevel string `json:"integrity_check_level"`
CorruptionHandleLevel string `json:"corruption_handle_level"`
}

// EtcdData contains key/value pair of etcd data
type EtcdData struct {
Key string `json:"key,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,18 @@ var defaultAPIConfig = &ReplicaConfig{
WriteKeyThreshold: config.GetDefaultReplicaConfig().
Scheduler.WriteKeyThreshold,
},
Integrity: &IntegrityConfig{
IntegrityCheckLevel: config.GetDefaultReplicaConfig().Integrity.IntegrityCheckLevel,
CorruptionHandleLevel: config.GetDefaultReplicaConfig().Integrity.CorruptionHandleLevel,
},
}

func TestDefaultReplicaConfig(t *testing.T) {
t.Parallel()
require.Equal(t, defaultAPIConfig, GetDefaultReplicaConfig())
cfg := GetDefaultReplicaConfig()
require.NotNil(t, cfg.Scheduler)
require.NotNil(t, cfg.Integrity)
cfg2 := cfg.toInternalReplicaConfigWithOriginConfig(&config.ReplicaConfig{})
require.Equal(t, config.GetDefaultReplicaConfig(), cfg2)
cfg3 := ToAPIReplicaConfig(config.GetDefaultReplicaConfig())
Expand Down
4 changes: 4 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error {
info.Config.Scheduler = defaultConfig.Scheduler
}

if info.Config.Integrity == nil {
info.Config.Integrity = defaultConfig.Integrity
}

return nil
}

Expand Down
14 changes: 14 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,17 @@ var doc = `{
}
}
},
"v2.IntegrityConfig": {
"type": "object",
"properties": {
"corruption_handle_level": {
"type": "string"
},
"integrity_check_level": {
"type": "string"
}
}
},
"v2.LogLevelReq": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2080,6 +2091,9 @@ var doc = `{
"ignore_ineligible_table": {
"type": "boolean"
},
"integrity": {
"$ref": "#/definitions/v2.IntegrityConfig"
},
"memory_quota": {
"type": "integer"
},
Expand Down
14 changes: 14 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,17 @@
}
}
},
"v2.IntegrityConfig": {
"type": "object",
"properties": {
"corruption_handle_level": {
"type": "string"
},
"integrity_check_level": {
"type": "string"
}
}
},
"v2.LogLevelReq": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2061,6 +2072,9 @@
"ignore_ineligible_table": {
"type": "boolean"
},
"integrity": {
"$ref": "#/definitions/v2.IntegrityConfig"
},
"memory_quota": {
"type": "integer"
},
Expand Down
9 changes: 9 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,13 @@ definitions:
type: string
type: array
type: object
v2.IntegrityConfig:
properties:
corruption_handle_level:
type: string
integrity_check_level:
type: string
type: object
v2.LogLevelReq:
properties:
log_level:
Expand Down Expand Up @@ -527,6 +534,8 @@ definitions:
type: boolean
ignore_ineligible_table:
type: boolean
integrity:
$ref: '#/definitions/v2.IntegrityConfig'
memory_quota:
type: integer
mounter:
Expand Down
17 changes: 15 additions & 2 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ const (
"scheduler": {
"enable-table-across-nodes": false,
"region-threshold": 100000
}
},
"integrity": {
"integrity-check-level": "none",
"corruption-handle-level": "warn"
}
}`

testCfgTestServerConfigMarshal = `{
Expand Down Expand Up @@ -204,7 +208,12 @@ const (
"enable-table-across-nodes": true,
"region-per-span": 0,
"region-threshold": 100001,
"write-key-threshold": 100001
"write-key-threshold": 100001,
"region-per-span": 0
},
"integrity": {
"integrity-check-level": "none",
"corruption-handle-level": "warn"
}
}`

Expand Down Expand Up @@ -263,6 +272,10 @@ const (
"enable-table-across-nodes": true,
"region-threshold": 100001,
"write-key-threshold": 100001
},
"integrity": {
"integrity-check-level": "none",
"corruption-handle-level": "warn"
}
}`
)
71 changes: 71 additions & 0 deletions pkg/config/integrity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

// IntegrityConfig represents integrity check config for a changefeed.
type IntegrityConfig struct {
IntegrityCheckLevel string `toml:"integrity-check-level" json:"integrity-check-level"`
CorruptionHandleLevel string `toml:"corruption-handle-level" json:"corruption-handle-level"`
}

const (
// IntegrityCheckLevelNone means no integrity check, the default value.
IntegrityCheckLevelNone string = "none"
// IntegrityCheckLevelCorrectness means check each row data correctness.
IntegrityCheckLevelCorrectness string = "correctness"
)

const (
// CorruptionHandleLevelWarn is the default value,
// log the corrupted event, and mark it as corrupted and send it to the downstream.
CorruptionHandleLevelWarn string = "warn"
// CorruptionHandleLevelError means log the corrupted event, and then stopped the changefeed.
CorruptionHandleLevelError string = "error"
)

// Validate the integrity config.
func (c *IntegrityConfig) Validate() error {
if c.IntegrityCheckLevel != IntegrityCheckLevelNone &&
c.IntegrityCheckLevel != IntegrityCheckLevelCorrectness {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs()
}
if c.CorruptionHandleLevel != CorruptionHandleLevelWarn &&
c.CorruptionHandleLevel != CorruptionHandleLevelError {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs()
}

if c.Enabled() {
log.Info("integrity check is enabled",
zap.Any("integrityCheckLevel", c.IntegrityCheckLevel),
zap.Any("corruptionHandleLevel", c.CorruptionHandleLevel))
}

return nil
}

// Enabled returns true if the integrity check is enabled.
func (c *IntegrityConfig) Enabled() bool {
return c.IntegrityCheckLevel == IntegrityCheckLevelCorrectness
}

// ErrorHandle returns true if the corruption handle level is error.
func (c *IntegrityConfig) ErrorHandle() bool {
return c.CorruptionHandleLevel == CorruptionHandleLevelError
}
11 changes: 11 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ var defaultReplicaConfig = &ReplicaConfig{
RegionThreshold: 100_000,
WriteKeyThreshold: 0,
},
Integrity: &IntegrityConfig{
IntegrityCheckLevel: IntegrityCheckLevelNone,
CorruptionHandleLevel: CorruptionHandleLevelWarn,
},
}

// GetDefaultReplicaConfig returns the default replica config.
Expand Down Expand Up @@ -114,6 +118,7 @@ type replicaConfig struct {
Consistent *ConsistentConfig `toml:"consistent" json:"consistent"`
// Scheduler is the configuration for scheduler.
Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"`
Integrity *IntegrityConfig `toml:"integrity" json:"integrity"`
}

// Marshal returns the json marshal format of a ReplicationConfig
Expand Down Expand Up @@ -215,6 +220,12 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error {
c.Scheduler.EnableTableAcrossNodes = false
}

if c.Integrity != nil {
if err := c.Integrity.Validate(); err != nil {
return err
}
}

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
Sink: &config.SinkConfig{Protocol: "open-protocol"},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
},
},
Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713, ResolvedTs: 421980720003809281},
Expand Down Expand Up @@ -175,6 +176,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
Sink: &config.SinkConfig{Protocol: "open-protocol"},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
},
},
Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713, ResolvedTs: 421980720003809281},
Expand Down Expand Up @@ -229,6 +231,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
Sink: &config.SinkConfig{Protocol: "open-protocol"},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
},
},
Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713, ResolvedTs: 421980720003809281},
Expand Down Expand Up @@ -318,6 +321,7 @@ func TestPatchInfo(t *testing.T) {
Sink: defaultConfig.Sink,
Consistent: defaultConfig.Consistent,
Scheduler: defaultConfig.Scheduler,
Integrity: defaultConfig.Integrity,
},
})
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand All @@ -335,6 +339,7 @@ func TestPatchInfo(t *testing.T) {
Sink: defaultConfig.Sink,
Consistent: defaultConfig.Consistent,
Scheduler: defaultConfig.Scheduler,
Integrity: defaultConfig.Integrity,
},
})
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand Down
8 changes: 8 additions & 0 deletions tests/integration_tests/api_v2/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ var customReplicaConfig = &ReplicaConfig{
EnableTableAcrossNodes: false,
RegionThreshold: 13,
},
Integrity: &IntegrityConfig{
IntegrityCheckLevel: "none",
CorruptionHandleLevel: "warn",
},
}

// defaultReplicaConfig check if the default values is changed
Expand Down Expand Up @@ -139,6 +143,10 @@ var defaultReplicaConfig = &ReplicaConfig{
EnableTableAcrossNodes: false,
RegionThreshold: 100_000,
},
Integrity: &IntegrityConfig{
IntegrityCheckLevel: "none",
CorruptionHandleLevel: "warn",
},
}

func testStatus(ctx context.Context, client *CDCRESTClient) error {
Expand Down
8 changes: 8 additions & 0 deletions tests/integration_tests/api_v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ type ReplicaConfig struct {
Sink *SinkConfig `json:"sink"`
Consistent *ConsistentConfig `json:"consistent"`
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
Integrity *IntegrityConfig `json:"integrity"`
}

// FilterConfig represents filter config for a changefeed
Expand Down Expand Up @@ -291,6 +292,13 @@ type ChangefeedSchedulerConfig struct {
WriteKeyThreshold int `toml:"write_key_threshold" json:"write_key_threshold"`
}

// IntegrityConfig is the config for integrity check
// This is a duplicate of config.IntegrityConfig
type IntegrityConfig struct {
IntegrityCheckLevel string `json:"integrity_check_level"`
CorruptionHandleLevel string `json:"corruption_handle_level"`
}

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
UpstreamID uint64 `json:"upstream_id,omitempty"`
Expand Down

0 comments on commit ab752f4

Please sign in to comment.