Skip to content

Commit

Permalink
lightning: fix lightning failed to log encoding error (#45241) (#45364)
Browse files Browse the repository at this point in the history
close #44321
  • Loading branch information
ti-chi-bot authored Jul 24, 2023
1 parent ff1f933 commit cf5b115
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 6 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_test(
embed = [":kv"],
flaky = True,
race = "on",
shard_count = 19,
deps = [
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
Expand Down
32 changes: 26 additions & 6 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ import (
"golang.org/x/exp/slices"
)

const (
maxLogLength = 512 * 1024
)

var ExtraHandleColumnInfo = model.NewExtraHandleColInfo()

type genCol struct {
Expand Down Expand Up @@ -223,6 +227,7 @@ var kindStr = [...]string{
// MarshalLogArray implements the zapcore.ArrayMarshaler interface
func (row RowArrayMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
for _, datum := range row {
var totalLength = 0
kind := datum.Kind()
var str string
var err error
Expand All @@ -239,6 +244,14 @@ func (row RowArrayMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error
return err
}
}
if len(str) > maxLogLength {
str = str[0:1024] + " (truncated)"
}
totalLength += len(str)
if totalLength >= maxLogLength {
encoder.AppendString("The row has been truncated, and the log has exited early.")
return nil
}
if err := encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error {
enc.AddString("kind", kindStr[kind])
enc.AddString("val", redact.String(str))
Expand All @@ -250,7 +263,7 @@ func (row RowArrayMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error
return nil
}

func logKVConvertFailed(logger log.Logger, row []types.Datum, j int, colInfo *model.ColumnInfo, err error) error {
func LogKVConvertFailed(logger log.Logger, row []types.Datum, j int, colInfo *model.ColumnInfo, err error) error {
var original types.Datum
if 0 <= j && j < len(row) {
original = row[j]
Expand All @@ -265,9 +278,16 @@ func logKVConvertFailed(logger log.Logger, row []types.Datum, j int, colInfo *mo
log.ShortError(err),
)

logger.Error("failed to convert kv value", logutil.RedactAny("origVal", original.GetValue()),
zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O),
zap.Int("columnID", j+1))
if len(original.GetString()) >= maxLogLength {
originalPrefix := original.GetString()[0:1024] + " (truncated)"
logger.Error("failed to convert kv value", logutil.RedactAny("origVal", originalPrefix),
zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O),
zap.Int("columnID", j+1))
} else {
logger.Error("failed to convert kv value", logutil.RedactAny("origVal", original.GetValue()),
zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O),
zap.Int("columnID", j+1))
}
return errors.Annotatef(
err,
"failed to cast value as %s for column `%s` (#%d)", &colInfo.FieldType, colInfo.Name.O, j+1,
Expand Down Expand Up @@ -381,7 +401,7 @@ func (kvcodec *tableKVEncoder) Encode(
}
value, err = kvcodec.getActualDatum(rowID, i, theDatum)
if err != nil {
return nil, logKVConvertFailed(logger, row, j, col.ToInfo(), err)
return nil, LogKVConvertFailed(logger, row, j, col.ToInfo(), err)
}

record = append(record, value)
Expand Down Expand Up @@ -412,7 +432,7 @@ func (kvcodec *tableKVEncoder) Encode(
value, err = types.NewIntDatum(rowID), nil
}
if err != nil {
return nil, logKVConvertFailed(logger, row, j, ExtraHandleColumnInfo, err)
return nil, LogKVConvertFailed(logger, row, j, ExtraHandleColumnInfo, err)
}
record = append(record, value)
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.RowIDAllocType)
Expand Down
37 changes: 37 additions & 0 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package kv_test
import (
"errors"
"fmt"
"os"
"path/filepath"
"reflect"
"strings"
"testing"

lkv "github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand Down Expand Up @@ -718,3 +721,37 @@ func BenchmarkSQL2KV(b *testing.B) {
require.Equal(b, l, 2)
}
}

func TestLogKVConvertFailed(t *testing.T) {
tempPath := filepath.Join(t.TempDir(), "/temp.txt")
logCfg := &log.Config{File: tempPath, FileMaxSize: 1}
err := log.InitLogger(logCfg, "info")
require.NoError(t, err)

modelName := model.NewCIStr("c1")
modelState := model.StatePublic
modelFieldType := *types.NewFieldType(mysql.TypeTiny)
c1 := &model.ColumnInfo{ID: 1, Name: modelName, State: modelState, Offset: 0, FieldType: modelFieldType}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
_, err = tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
require.NoError(t, err)

var newString strings.Builder
for i := 0; i < 100000; i++ {
newString.WriteString("test_test_test_test_")
}
newDatum := types.NewStringDatum(newString.String())
rows := []types.Datum{}
for i := 0; i <= 10; i++ {
rows = append(rows, newDatum)
}
err = lkv.LogKVConvertFailed(log.L(), rows, 6, c1, err)
require.NoError(t, err)

var content []byte
content, err = os.ReadFile(tempPath)
require.NoError(t, err)
require.LessOrEqual(t, 500, len(string(content)))
require.NotContains(t, content, "exceeds maximum file size")
}

0 comments on commit cf5b115

Please sign in to comment.