Skip to content

Commit

Permalink
support filter for key & value
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Aug 17, 2023
1 parent c340278 commit c4d46ac
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 8 deletions.
4 changes: 3 additions & 1 deletion cdc/cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,9 +1110,11 @@ func (s *eventFeedSession) receiveFromStream(
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(captureAddr, changefeedID)

eventFilter := util.EventFilterFromCtx(ctx)

// always create a new region worker, because `receiveFromStreamV2` is ensured
// to call exactly once from outter code logic
worker := newRegionWorker(s, addr)
worker := newRegionWorker(s, addr, eventFilter)

defer worker.evictAllRegions()

Expand Down
14 changes: 13 additions & 1 deletion cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type regionWorkerMetrics struct {
metricPullEventCommitCounter prometheus.Counter
metricPullEventCommittedCounter prometheus.Counter
metricPullEventRollbackCounter prometheus.Counter
metricPullEventFilterOutCounter prometheus.Counter
metricSendEventResolvedCounter prometheus.Counter
metricSendEventCommitCounter prometheus.Counter
metricSendEventCommittedCounter prometheus.Counter
Expand Down Expand Up @@ -156,9 +157,11 @@ type regionWorker struct {

enableOldValue bool
storeAddr string

eventFilter *util.Filter
}

func newRegionWorker(s *eventFeedSession, addr string) *regionWorker {
func newRegionWorker(s *eventFeedSession, addr string, eventFilter *util.Filter) *regionWorker {
cfg := config.GetGlobalServerConfig().KVClient
worker := &regionWorker{
session: s,
Expand All @@ -171,6 +174,7 @@ func newRegionWorker(s *eventFeedSession, addr string) *regionWorker {
enableOldValue: s.enableOldValue,
storeAddr: addr,
concurrent: cfg.WorkerConcurrent,
eventFilter: eventFilter,
}
return worker
}
Expand All @@ -187,6 +191,7 @@ func (w *regionWorker) initMetrics(ctx context.Context) {
metrics.metricPullEventCommitCounter = pullEventCounter.WithLabelValues(cdcpb.Event_COMMIT.String(), captureAddr, changefeedID)
metrics.metricPullEventPrewriteCounter = pullEventCounter.WithLabelValues(cdcpb.Event_PREWRITE.String(), captureAddr, changefeedID)
metrics.metricPullEventRollbackCounter = pullEventCounter.WithLabelValues(cdcpb.Event_ROLLBACK.String(), captureAddr, changefeedID)
metrics.metricPullEventFilterOutCounter = pullEventCounter.WithLabelValues("filter-out", captureAddr, changefeedID)
metrics.metricSendEventResolvedCounter = sendEventCounter.WithLabelValues("native-resolved", captureAddr, changefeedID)
metrics.metricSendEventCommitCounter = sendEventCounter.WithLabelValues("commit", captureAddr, changefeedID)
metrics.metricSendEventCommittedCounter = sendEventCounter.WithLabelValues("committed", captureAddr, changefeedID)
Expand Down Expand Up @@ -655,6 +660,13 @@ func (w *regionWorker) handleEventEntry(
}
case cdcpb.Event_COMMITTED:
w.metrics.metricPullEventCommittedCounter.Inc()

if !w.eventFilter.EventMatch(entry) {
w.metrics.metricPullEventFilterOutCounter.Inc()
log.Info("handleEventEntry: event is filter out and drop", zap.String("OpType", entry.OpType.String()), zap.String("key", string(entry.Key)))
continue
}

revent, err := assembleRowEvent(regionID, entry, w.enableOldValue)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/processor/pipeline/keyspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func NewKeySpanPipeline(ctx cdcContext.Context,

sinkNode := newSinkNode(keyspanID, sink, replicaInfo.StartTs, targetTs, flowController)

p.AppendNode(ctx, "puller", newPullerNode(keyspanID, replicaInfo))
p.AppendNode(ctx, "puller", newPullerNode(keyspanID, replicaInfo, replConfig.Filter))
p.AppendNode(ctx, "sorter", sorterNode)
p.AppendNode(ctx, "sink", sinkNode)

Expand Down
6 changes: 5 additions & 1 deletion cdc/cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ type pullerNode struct {
keyspanName string
keyspan regionspan.Span
replicaInfo *model.KeySpanReplicaInfo
eventFilter *util.Filter
cancel context.CancelFunc
wg *errgroup.Group
}

func newPullerNode(
keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo,
keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo, filterConfig *util.FilterConfig,
) pipeline.Node {
keyspan := regionspan.Span{Start: replicaInfo.Start, End: replicaInfo.End}
filter := util.CreateFilter(filterConfig)
return &pullerNode{
keyspanID: keyspanID,
keyspanName: keyspan.Name(),
keyspan: keyspan,
replicaInfo: replicaInfo,
eventFilter: &filter,
}
}

Expand All @@ -63,6 +66,7 @@ func (n *pullerNode) InitWithWaitGroup(ctx pipeline.NodeContext, wg *errgroup.Gr
ctxC = util.PutKeySpanInfoInCtx(ctxC, n.keyspanID, n.keyspanName)
ctxC = util.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID)
ctxC = util.PutEventFilterInCtx(ctxC, n.eventFilter)

plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage,
n.replicaInfo.StartTs, n.keyspans(), true)
Expand Down
12 changes: 12 additions & 0 deletions cdc/pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ const (
"scheduler": {
"type": "keyspan-number",
"polling-time": -1
},
"filter": {
"key-format": "raw",
"key-prefix": "prefix",
"key-pattern": "key\\x00pattern",
"value-pattern": "value\\ffpattern"
}
}`

Expand All @@ -151,6 +157,12 @@ const (
"scheduler": {
"type": "keyspan-number",
"polling-time": -1
},
"filter": {
"key-format": "raw",
"key-prefix": "prefix",
"key-pattern": "key\\x00pattern",
"value-pattern": "value\\ffpattern"
}
}`
)
17 changes: 13 additions & 4 deletions cdc/pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/tikv/migration/cdc/pkg/config/outdated"
"github.com/tikv/migration/cdc/pkg/util"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -33,16 +34,18 @@ var defaultReplicaConfig = &ReplicaConfig{
Tp: "keyspan-number",
PollingTime: -1,
},
Filter: &util.FilterConfig{},
}

// ReplicaConfig represents some addition replication config for a changefeed
type ReplicaConfig replicaConfig

type replicaConfig struct {
EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"`
CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"`
Sink *SinkConfig `toml:"sink" json:"sink"`
Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"`
EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"`
CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"`
Sink *SinkConfig `toml:"sink" json:"sink"`
Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"`
Filter *util.FilterConfig `toml:"filter" json:"filter"`
}

// Marshal returns the json marshal format of a ReplicationConfig
Expand Down Expand Up @@ -113,6 +116,12 @@ func (c *ReplicaConfig) Validate() error {
return err
}
}
if c.Filter != nil {
err := c.Filter.Validate()
if err != nil {
return err
}
}
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions cdc/pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/migration/cdc/pkg/util"
)

func mustIdentJSON(t *testing.T, j string) string {
Expand All @@ -38,6 +39,12 @@ func TestReplicaConfigMarshal(t *testing.T) {
Columns: []string{"a", "b"},
},
}
conf.Filter = &util.FilterConfig{
KeyFormat: "raw",
KeyPrefix: `prefix`,
KeyPattern: `key\x00pattern`,
ValuePattern: `value\ffpattern`,
}
b, err := conf.Marshal()
require.Nil(t, err)
require.Equal(t, testCfgTestReplicaConfigMarshal1, mustIdentJSON(t, b))
Expand Down
13 changes: 13 additions & 0 deletions cdc/pkg/util/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
ctxKeyIsOwner = ctxKey("isOwner")
ctxKeyTimezone = ctxKey("timezone")
ctxKeyKVStorage = ctxKey("kvStorage")
ctxEventFilter = ctxKey("eventFilter")
)

// CaptureAddrFromCtx returns a capture ID stored in the specified context.
Expand Down Expand Up @@ -121,6 +122,18 @@ func PutChangefeedIDInCtx(ctx context.Context, changefeedID string) context.Cont
return context.WithValue(ctx, ctxKeyChangefeedID, changefeedID)
}

func EventFilterFromCtx(ctx context.Context) *Filter {
filter, ok := ctx.Value(ctxEventFilter).(*Filter)
if !ok {
return nil
}
return filter
}

func PutEventFilterInCtx(ctx context.Context, filter *Filter) context.Context {
return context.WithValue(ctx, ctxEventFilter, filter)
}

// ZapFieldCapture returns a zap field containing capture address
// TODO: log redact for capture address
func ZapFieldCapture(ctx context.Context) zap.Field {
Expand Down
111 changes: 111 additions & 0 deletions cdc/pkg/util/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"regexp"

"github.com/pingcap/kvproto/pkg/cdcpb"
"golang.org/x/net/html/charset"
)

type FilterConfig struct {
KeyFormat string `toml:"key-format" json:"key-format"`
KeyPrefix string `toml:"key-prefix" json:"key-prefix"`
KeyPattern string `toml:"key-pattern" json:"key-pattern"`
ValuePattern string `toml:"value-pattern" json:"value-pattern"`
}

func (c *FilterConfig) Validate() error {
if c.KeyPrefix != "" {
if _, err := ParseKey(c.KeyFormat, c.KeyPrefix); err != nil {
return errors.New(fmt.Sprintf("invalid key-prefix: %s", err.Error()))
}
}
if c.KeyPattern != "" {
if _, err := regexp.Compile(c.KeyPattern); err != nil {
return errors.New(fmt.Sprintf("invalid key-pattern: %s", err.Error()))
}
}

if c.ValuePattern != "" {
if _, err := regexp.Compile(c.ValuePattern); err != nil {
return errors.New(fmt.Sprintf("invalid value-pattern: %s", err.Error()))
}
}

return nil
}

type Filter struct {
keyPrefix []byte
keyPattern *regexp.Regexp
valuePattern *regexp.Regexp
}

func CreateFilter(conf *FilterConfig) Filter {
var (
keyPrefix []byte
keyPattern *regexp.Regexp
valuePattern *regexp.Regexp
)

if conf.KeyPrefix != "" {
keyPrefix, _ = ParseKey(conf.KeyFormat, conf.KeyPrefix)
}
if conf.KeyPattern != "" {
keyPattern = regexp.MustCompile(conf.KeyPattern)
}
if conf.ValuePattern != "" {
valuePattern = regexp.MustCompile(conf.ValuePattern)
}

return Filter{
keyPrefix: keyPrefix,
keyPattern: keyPattern,
valuePattern: valuePattern,
}
}

func (f *Filter) EventMatch(entry *cdcpb.Event_Row) bool {
// Filter on put & delete only.
if entry.GetOpType() != cdcpb.Event_Row_DELETE && entry.GetOpType() != cdcpb.Event_Row_PUT {
return true
}

if len(f.keyPrefix) > 0 && !bytes.HasPrefix(entry.Key, f.keyPrefix) {
return false
}
if f.keyPattern != nil && !f.keyPattern.MatchString(ConvertToUTF8(entry.Key, "latin1")) {
return false
}
if entry.GetOpType() == cdcpb.Event_Row_PUT &&
f.valuePattern != nil &&
!f.valuePattern.MatchString(ConvertToUTF8(entry.GetValue(), "latin1")) {
return false
}

return true
}

func ConvertToUTF8(strBytes []byte, origEncoding string) string {
byteReader := bytes.NewReader(strBytes)
reader, _ := charset.NewReaderLabel(origEncoding, byteReader)
strBytes, _ = ioutil.ReadAll(reader)
return string(strBytes)
}
42 changes: 42 additions & 0 deletions cdc/pkg/util/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFilterConfig(t *testing.T) {
t.Parallel()
conf := FilterConfig{}
require.Nil(t, conf.Validate())

conf = FilterConfig{}
conf.KeyFormat = "escaped"
conf.KeyPrefix = `prefix\x11`
conf.KeyPattern = `key\x00pattern`
conf.ValuePattern = `value\ffpattern`
require.Nil(t, conf.Validate())

conf = FilterConfig{}
conf.KeyPattern = "\xfd\xe2" // invalid utf8
require.Error(t, conf.Validate())

conf = FilterConfig{}
conf.KeyFormat = "hex"
conf.KeyPrefix = "zz" // invalid utf8
require.Error(t, conf.Validate())
}

0 comments on commit c4d46ac

Please sign in to comment.