diff --git a/lightning/backend/local.go b/lightning/backend/local.go index 735147307..1819a9491 100644 --- a/lightning/backend/local.go +++ b/lightning/backend/local.go @@ -519,9 +519,9 @@ func (local *local) WriteToTiKV( } if !iter.Valid() { - log.L().Info("keys within region is empty, skip ingest", zap.Binary("start", start), - zap.Binary("regionStart", region.Region.StartKey), zap.Binary("end", end), - zap.Binary("regionEnd", region.Region.EndKey)) + log.L().Info("keys within region is empty, skip ingest", log.ZapRedactBinary("start", start), + log.ZapRedactBinary("regionStart", region.Region.StartKey), log.ZapRedactBinary("end", end), + log.ZapRedactBinary("regionEnd", region.Region.EndKey)) return nil, nil, nil } @@ -643,8 +643,8 @@ func (local *local) WriteToTiKV( // if there is not leader currently, we should directly return an error if leaderPeerMetas == nil { - log.L().Warn("write to tikv no leader", zap.Reflect("region", region), - zap.Uint64("leader_id", leaderID), zap.Reflect("meta", meta), + log.L().Warn("write to tikv no leader", log.ZapRedactReflect("region", region), + zap.Uint64("leader_id", leaderID), log.ZapRedactReflect("meta", meta), zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size)) return nil, nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d", region.Region.Id, leaderID) @@ -661,9 +661,9 @@ func (local *local) WriteToTiKV( firstKey := append([]byte{}, iter.Key()...) remainRange = &Range{start: firstKey, end: regionRange.end} log.L().Info("write to tikv partial finish", zap.Int("count", totalCount), - zap.Int64("size", size), zap.Binary("startKey", regionRange.start), zap.Binary("endKey", regionRange.end), - zap.Binary("remainStart", remainRange.start), zap.Binary("remainEnd", remainRange.end), - zap.Reflect("region", region)) + zap.Int64("size", size), log.ZapRedactBinary("startKey", regionRange.start), log.ZapRedactBinary("endKey", regionRange.end), + log.ZapRedactBinary("remainStart", remainRange.start), log.ZapRedactBinary("remainEnd", remainRange.end), + log.ZapRedactReflect("region", region)) } return leaderPeerMetas, remainRange, nil @@ -762,7 +762,7 @@ func (local *local) readAndSplitIntoRange(engineFile *LocalFile) ([]Range, error log.L().Info("split engine key ranges", zap.Stringer("engine", engineFile.Uuid), zap.Int64("totalSize", engineFile.TotalSize), zap.Int64("totalCount", engineFile.Length), - zap.Binary("firstKey", firstKey), zap.Binary("lastKey", lastKey), + log.ZapRedactBinary("firstKey", firstKey), log.ZapRedactBinary("lastKey", lastKey), zap.Int("ranges", len(ranges))) return ranges, nil @@ -884,9 +884,9 @@ func (local *local) writeAndIngestByRange( } if !hasKey { log.L().Info("There is no pairs in iterator", - zap.Binary("start", start), - zap.Binary("end", end), - zap.Binary("next end", nextKey(end))) + log.ZapRedactBinary("start", start), + log.ZapRedactBinary("end", end), + log.ZapRedactBinary("next end", nextKey(end))) return nil } pairStart := append([]byte{}, iter.Key()...) @@ -915,7 +915,7 @@ WriteAndIngest: regions, err = paginateScanRegion(ctx, local.splitCli, startKey, endKey, 128) if err != nil || len(regions) == 0 { log.L().Warn("scan region failed", log.ShortError(err), zap.Int("region_len", len(regions)), - zap.Binary("startKey", startKey), zap.Binary("endKey", endKey), zap.Int("retry", retry)) + log.ZapRedactBinary("startKey", startKey), log.ZapRedactBinary("endKey", endKey), zap.Int("retry", retry)) retry++ continue WriteAndIngest } @@ -938,8 +938,8 @@ WriteAndIngest: } else { retry++ } - log.L().Info("retry write and ingest kv pairs", zap.Binary("startKey", pairStart), - zap.Binary("endKey", end), log.ShortError(err), zap.Int("retry", retry)) + log.L().Info("retry write and ingest kv pairs", log.ZapRedactBinary("startKey", pairStart), + log.ZapRedactBinary("endKey", end), log.ShortError(err), zap.Int("retry", retry)) continue WriteAndIngest } if rg != nil { @@ -988,8 +988,8 @@ loopWrite: if errors.Cause(err) == context.Canceled { return nil, err } - log.L().Warn("ingest failed", log.ShortError(err), zap.Reflect("meta", meta), - zap.Reflect("region", region)) + log.L().Warn("ingest failed", log.ShortError(err), log.ZapRedactReflect("meta", meta), + log.ZapRedactReflect("region", region)) errCnt++ continue } @@ -1012,8 +1012,8 @@ loopWrite: } switch retryTy { case retryNone: - log.L().Warn("ingest failed noretry", log.ShortError(err), zap.Reflect("meta", meta), - zap.Reflect("region", region)) + log.L().Warn("ingest failed noretry", log.ShortError(err), log.ZapRedactReflect("meta", meta), + log.ZapRedactReflect("region", region)) // met non-retryable error retry whole Write procedure return remainRange, err case retryWrite: @@ -1028,7 +1028,8 @@ loopWrite: if err != nil { log.L().Warn("write and ingest region, will retry import full range", log.ShortError(err), - zap.Stringer("region", region.Region), zap.Binary("start", start), zap.Binary("end", end)) + log.ZapRedactStringer("region", region.Region), log.ZapRedactBinary("start", start), + log.ZapRedactBinary("end", end)) } return remainRange, errors.Trace(err) } @@ -1242,7 +1243,7 @@ func (local *local) isIngestRetryable( if newRegion != nil { return newRegion, nil } - log.L().Warn("get region by key return nil, will retry", zap.Reflect("region", region), + log.L().Warn("get region by key return nil, will retry", log.ZapRedactReflect("region", region), zap.Int("retry", i)) select { case <-ctx.Done(): diff --git a/lightning/backend/localhelper.go b/lightning/backend/localhelper.go index 3f3cc5276..07cefadbc 100644 --- a/lightning/backend/localhelper.go +++ b/lightning/backend/localhelper.go @@ -58,14 +58,15 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] minKey := codec.EncodeBytes([]byte{}, ranges[0].start) maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].end) + var err error scatterRegions := make([]*split.RegionInfo, 0) var retryKeys [][]byte waitTime := splitRegionBaseBackOffTime for i := 0; i < SplitRetryTimes; i++ { log.L().Info("split and scatter region", - zap.Binary("minKey", minKey), - zap.Binary("maxKey", maxKey), + log.ZapRedactBinary("minKey", minKey), + log.ZapRedactBinary("maxKey", maxKey), zap.Int("retry", i), ) if i > 0 { @@ -82,18 +83,18 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] var regions []*split.RegionInfo regions, err = paginateScanRegion(ctx, local.splitCli, minKey, maxKey, 128) if err != nil { - log.L().Warn("paginate scan region failed", zap.Binary("minKey", minKey), zap.Binary("maxKey", maxKey), + log.L().Warn("paginate scan region failed", log.ZapRedactBinary("minKey", minKey), log.ZapRedactBinary("maxKey", maxKey), log.ShortError(err), zap.Int("retry", i)) continue } if len(regions) == 0 { - log.L().Warn("paginate scan region returns empty result", zap.Binary("minKey", minKey), zap.Binary("maxKey", maxKey), + log.L().Warn("paginate scan region returns empty result", log.ZapRedactBinary("minKey", minKey), log.ZapRedactBinary("maxKey", maxKey), zap.Int("retry", i)) return errors.New("paginate scan region returns empty result") } - log.L().Info("paginate scan region finished", zap.Binary("minKey", minKey), zap.Binary("maxKey", maxKey), + log.L().Info("paginate scan region finished", log.ZapRedactBinary("minKey", minKey), log.ZapRedactBinary("maxKey", maxKey), zap.Int("regions", len(regions))) regionMap := make(map[uint64]*split.RegionInfo) @@ -107,9 +108,9 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] lastKeyEnc := codec.EncodeBytes([]byte{}, retryKeys[len(retryKeys)-1]) if bytes.Compare(firstKeyEnc, regions[0].Region.StartKey) < 0 || !beforeEnd(lastKeyEnc, regions[len(regions)-1].Region.EndKey) { log.L().Warn("no valid key for split region", - zap.Binary("firstKey", firstKeyEnc), zap.Binary("lastKey", lastKeyEnc), - zap.Binary("firstRegionStart", regions[0].Region.StartKey), - zap.Binary("lastRegionEnd", regions[len(regions)-1].Region.EndKey)) + log.ZapRedactBinary("firstKey", firstKeyEnc), log.ZapRedactBinary("lastKey", lastKeyEnc), + log.ZapRedactBinary("firstRegionStart", regions[0].Region.StartKey), + log.ZapRedactBinary("lastRegionEnd", regions[len(regions)-1].Region.EndKey)) return errors.New("check split keys failed") } splitKeyMap = getSplitKeys(retryKeys, regions) @@ -131,18 +132,18 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] splitRegionEnd := codec.EncodeBytes([]byte{}, keys[end-1]) if bytes.Compare(splitRegionStart, splitRegion.Region.StartKey) < 0 || !beforeEnd(splitRegionEnd, splitRegion.Region.EndKey) { log.L().Fatal("no valid key in region", - zap.Binary("startKey", splitRegionStart), zap.Binary("endKey", splitRegionEnd), - zap.Binary("regionStart", splitRegion.Region.StartKey), zap.Binary("regionEnd", splitRegion.Region.EndKey), - zap.Reflect("region", splitRegion)) + log.ZapRedactBinary("startKey", splitRegionStart), log.ZapRedactBinary("endKey", splitRegionEnd), + log.ZapRedactBinary("regionStart", splitRegion.Region.StartKey), log.ZapRedactBinary("regionEnd", splitRegion.Region.EndKey), + log.ZapRedactReflect("region", splitRegion)) } splitRegion, newRegions, err = local.BatchSplitRegions(ctx, splitRegion, keys[start:end]) if err != nil { if strings.Contains(err.Error(), "no valid key") { for _, key := range keys { log.L().Warn("no valid key", - zap.Binary("startKey", region.Region.StartKey), - zap.Binary("endKey", region.Region.EndKey), - zap.Binary("key", codec.EncodeBytes([]byte{}, key))) + log.ZapRedactBinary("startKey", region.Region.StartKey), + log.ZapRedactBinary("endKey", region.Region.EndKey), + log.ZapRedactBinary("key", codec.EncodeBytes([]byte{}, key))) } return errors.Trace(err) } @@ -204,8 +205,10 @@ func paginateScanRegion( ctx context.Context, client split.SplitClient, startKey, endKey []byte, limit int, ) ([]*split.RegionInfo, error) { if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 { - return nil, errors.Errorf("startKey > endKey, startKey %s, endkey %s", - hex.EncodeToString(startKey), hex.EncodeToString(endKey)) + log.L().Error("startKey > endKey when paginating scan region", + log.ZapRedactString("startKey", hex.EncodeToString(startKey)), + log.ZapRedactString("endKey", hex.EncodeToString(endKey))) + return nil, errors.Errorf("startKey > endKey when paginating scan region") } var regions []*split.RegionInfo @@ -299,7 +302,7 @@ func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split. ok, err := local.isScatterRegionFinished(ctx, regionID) if err != nil { log.L().Warn("scatter region failed: do not have the region", - zap.Stringer("region", regionInfo.Region)) + log.ZapRedactStringer("region", regionInfo.Region)) return } if ok { diff --git a/lightning/backend/sql2kv.go b/lightning/backend/sql2kv.go index 19ef074b3..3878e30a0 100644 --- a/lightning/backend/sql2kv.go +++ b/lightning/backend/sql2kv.go @@ -214,7 +214,7 @@ func (row rowArrayMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error } encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { enc.AddString("kind", kindStr[kind]) - enc.AddString("val", str) + enc.AddString("val", log.RedactString(str)) return nil })) } @@ -236,10 +236,12 @@ func logKVConvertFailed(logger log.Logger, row []types.Datum, j int, colInfo *mo log.ShortError(err), ) + log.L().Error("failed to covert kv value", log.ZapRedactReflect("origVal", original.GetValue()), + zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O), + zap.Int("columnID", j+1)) return errors.Annotatef( err, - "failed to cast `%v` as %s for column `%s` (#%d)", - original.GetValue(), &colInfo.FieldType, colInfo.Name.O, j+1, + "failed to cast value as %s for column `%s` (#%d)", &colInfo.FieldType, colInfo.Name.O, j+1, ) } diff --git a/lightning/backend/sql2kv_test.go b/lightning/backend/sql2kv_test.go index 75955ff0f..d3a7441cc 100644 --- a/lightning/backend/sql2kv_test.go +++ b/lightning/backend/sql2kv_test.go @@ -88,7 +88,7 @@ func (s *kvSuite) TestEncode(c *C) { }) c.Assert(err, IsNil) pairs, err := strictMode.Encode(logger, rows, 1, []int{0, 1}) - c.Assert(err, ErrorMatches, "failed to cast `10000000` as tinyint\\(4\\) for column `c1` \\(#1\\):.*overflows tinyint") + c.Assert(err, ErrorMatches, "failed to cast value as tinyint\\(4\\) for column `c1` \\(#1\\):.*overflows tinyint") c.Assert(pairs, IsNil) rowsWithPk := []types.Datum{ @@ -96,7 +96,7 @@ func (s *kvSuite) TestEncode(c *C) { types.NewStringDatum("invalid-pk"), } pairs, err = strictMode.Encode(logger, rowsWithPk, 2, []int{0, 1}) - c.Assert(err, ErrorMatches, "failed to cast `invalid-pk` as bigint\\(20\\) for column `_tidb_rowid`.*Truncated.*") + c.Assert(err, ErrorMatches, "failed to cast value as bigint\\(20\\) for column `_tidb_rowid`.*Truncated.*") rowsWithPk2 := []types.Datum{ types.NewIntDatum(1), diff --git a/lightning/backend/tidb.go b/lightning/backend/tidb.go index bf81e0f86..9db9dabdf 100644 --- a/lightning/backend/tidb.go +++ b/lightning/backend/tidb.go @@ -378,8 +378,8 @@ func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, colu // Retry will be done externally, so we're not going to retry here. _, err := be.db.ExecContext(ctx, insertStmt.String()) if err != nil { - log.L().Error("execute statement failed", zap.String("stmt", insertStmt.String()), - zap.Array("rows", rows), zap.Error(err)) + log.L().Error("execute statement failed", log.ZapRedactString("stmt", insertStmt.String()), + log.ZapRedactArray("rows", rows), zap.Error(err)) } failpoint.Inject("FailIfImportedSomeRows", func() { panic("forcing failure due to FailIfImportedSomeRows, before saving checkpoint") diff --git a/lightning/config/global.go b/lightning/config/global.go index eb1075fda..6ec954cd1 100644 --- a/lightning/config/global.go +++ b/lightning/config/global.go @@ -144,6 +144,7 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon logLevel := flagext.ChoiceVar(fs, "L", "", `log level: info, debug, warn, error, fatal (default info)`, "", "info", "debug", "warn", "warning", "error", "fatal") logFilePath := fs.String("log-file", "", "log file path") + redactLog := fs.Bool("redact-log", false, "whether to redact sensitive info in log") tidbHost := fs.String("tidb-host", "", "TiDB server host") tidbPort := fs.Int("tidb-port", 0, "TiDB server port (default 4000)") tidbUser := fs.String("tidb-user", "", "TiDB user name to connect") @@ -198,6 +199,9 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon if *logFilePath != "" { cfg.App.Config.File = *logFilePath } + if *redactLog { + cfg.App.Config.RedactLog = *redactLog + } // "-" is a special config for log to stdout if cfg.App.Config.File == "-" { cfg.App.Config.File = "" diff --git a/lightning/log/log.go b/lightning/log/log.go index e3b8f3328..c597af477 100644 --- a/lightning/log/log.go +++ b/lightning/log/log.go @@ -44,6 +44,8 @@ type Config struct { FileMaxDays int `toml:"max-days" json:"max-days"` // Maximum number of old log files to retain. FileMaxBackups int `toml:"max-backups" json:"max-backups"` + // Redact sensitive logs during the whole process + RedactLog bool `toml:"redact-log" json:"redact-log"` } func (cfg *Config) Adjust() { @@ -98,6 +100,8 @@ func InitLogger(cfg *Config, tidbLoglevel string) error { appLogger = Logger{logger.WithOptions(zap.AddStacktrace(zap.DPanicLevel))} appLevel = props.Level + InitRedact(cfg.RedactLog) + return nil } diff --git a/lightning/log/redact.go b/lightning/log/redact.go new file mode 100644 index 000000000..e75f5cadc --- /dev/null +++ b/lightning/log/redact.go @@ -0,0 +1,89 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "fmt" + + "github.com/pingcap/errors" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// InitRedact inits the enableRedactLog +func InitRedact(redactLog bool) { + errors.RedactLogEnabled.Store(redactLog) +} + +// NeedRedact returns whether to redact log +func NeedRedact() bool { + return errors.RedactLogEnabled.Load() +} + +// ZapRedactBinary receives zap.Binary and return omitted information if redact log enabled +func ZapRedactBinary(key string, val []byte) zapcore.Field { + if NeedRedact() { + return zap.String(key, "?") + } + return zap.Binary(key, val) +} + +// ZapRedactArray receives zap.Array and return omitted information if redact log enabled +func ZapRedactArray(key string, val zapcore.ArrayMarshaler) zapcore.Field { + if NeedRedact() { + return zap.String(key, "?") + } + return zap.Array(key, val) +} + +// ZapRedactReflect receives zap.Reflect and return omitted information if redact log enabled +func ZapRedactReflect(key string, val interface{}) zapcore.Field { + if NeedRedact() { + return zap.String(key, "?") + } + return zap.Reflect(key, val) +} + +// ZapRedactStringer receives stringer argument and return omitted information in zap.Field if redact log enabled +func ZapRedactStringer(key string, arg fmt.Stringer) zap.Field { + return zap.Stringer(key, RedactStringer(arg)) +} + +// ZapRedactString receives stringer argument and return omitted information in zap.Field if redact log enabled +func ZapRedactString(key string, arg string) zap.Field { + return zap.String(key, RedactString(arg)) +} + +// RedactString receives string argument and return omitted information if redact log enabled +func RedactString(arg string) string { + if NeedRedact() { + return "?" + } + return arg +} + +// RedactStringer receives stringer argument and return omitted information if redact log enabled +func RedactStringer(arg fmt.Stringer) fmt.Stringer { + if NeedRedact() { + return stringer{} + } + return arg +} + +type stringer struct{} + +// String implement fmt.Stringer +func (s stringer) String() string { + return "?" +} diff --git a/tidb-lightning.toml b/tidb-lightning.toml index e31fdb2e5..eea44bbba 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -41,6 +41,8 @@ file = "tidb-lightning.log" max-size = 128 # MB max-days = 28 max-backups = 14 +# If set to true, lightning will redact sensitive infomation in log. +redact-log = false [security] # specifies certificates and keys for TLS connections within the cluster.