diff --git a/lightning/backend/backend.go b/lightning/backend/backend.go index 6352572cf..439b0d982 100644 --- a/lightning/backend/backend.go +++ b/lightning/backend/backend.go @@ -243,7 +243,7 @@ func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int uuid: engineUUID, }, tableName: tableName, - ts: oracle.ComposeTS(time.Now().Unix() * 1000, 0), + ts: oracle.ComposeTS(time.Now().Unix()*1000, 0), }, nil } diff --git a/lightning/backend/local.go b/lightning/backend/local.go index 94c3583d4..63df1a6f7 100644 --- a/lightning/backend/local.go +++ b/lightning/backend/local.go @@ -485,6 +485,14 @@ func (local *local) WriteToTiKV( } } + // if there is not leader currently, we should directly return an error + if leaderPeerMetas == nil { + log.L().Error("write to tikv no leader", zap.Reflect("region", region), + zap.Uint64("leader_id", leaderID), zap.Reflect("meta", meta)) + return nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d", + region.Region.Id, leaderID) + } + log.L().Debug("write to kv", zap.Reflect("region", region), zap.Uint64("leader", leaderID), zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas), zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size), @@ -877,9 +885,11 @@ func (local *local) WriteAndIngestPairs( } for _, meta := range metas { + var err error for i := 0; i < maxRetryTimes; i++ { log.L().Debug("ingest meta", zap.Reflect("meta", meta)) - resp, err := local.Ingest(ctx, meta, region) + var resp *sst.IngestResponse + resp, err = local.Ingest(ctx, meta, region) if err != nil { log.L().Warn("ingest failed", zap.Error(err), zap.Reflect("meta", meta), zap.Reflect("region", region)) @@ -895,27 +905,33 @@ func (local *local) WriteAndIngestPairs( CurrentRegions: []*metapb.Region{region.Region}} } }) - needRetry, newRegion, errIngest := isIngestRetryable(resp, region, meta) - if errIngest == nil { + var needRetry bool + var newRegion *split.RegionInfo + needRetry, newRegion, err = isIngestRetryable(resp, region, meta) + if err == nil { // ingest next meta break } if !needRetry { - log.L().Warn("ingest failed noretry", zap.Error(errIngest), zap.Reflect("meta", meta), + log.L().Warn("ingest failed noretry", zap.Error(err), zap.Reflect("meta", meta), zap.Reflect("region", region)) // met non-retryable error retry whole Write procedure - return errIngest + return err } // retry with not leader and epoch not match error - if newRegion != nil && i < maxRetryTimes-1 { + if newRegion != nil { region = newRegion } else { log.L().Warn("retry ingest due to", zap.Reflect("meta", meta), zap.Reflect("region", region), - zap.Reflect("new region", newRegion), zap.Error(errIngest)) - return errIngest + zap.Reflect("new region", newRegion), zap.Error(err)) + return err } } + if err != nil { + log.L().Error("all retry ingest failed", zap.Reflect("ingest meta", meta), zap.Error(err)) + return errors.Trace(err) + } } return nil }