Skip to content

Commit

Permalink
refactor & fix #4934
Browse files Browse the repository at this point in the history
  • Loading branch information
ben1009 committed Mar 17, 2022
1 parent 1bda67b commit 4c8b192
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ifeq (${CDC_ENABLE_VENDOR}, 1)
GOVENDORFLAG := -mod=vendor
endif

GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
GOBUILD :=GOOS=linux CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
GOBUILDNOVENDOR := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath
GOTEST := CGO_ENABLED=1 $(GO) test -p $(P) --race
GOTESTNORACE := CGO_ENABLED=1 $(GO) test -p $(P)
Expand Down
7 changes: 7 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package owner

import (
"context"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -500,9 +501,15 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
c.barriers.Update(ddlJobBarrier, newDDLResolvedTs)

case syncPointBarrier:
// clean up barriers if syncpoint disabled when update changefeed
if !ctx.ChangefeedVars().Info.SyncPointEnabled {
c.barriers.Update(syncPointBarrier, math.MaxUint64)
return 0, nil
}
if !blocked {
return barrierTs, nil
}

nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.SyncPointInterval))
if err := c.sink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL, so
func generateDSNStr(ctx context.Context, id string, uri *url.URL, dsnType string) (string, error) {
scheme := strings.ToLower(uri.Scheme)
if scheme != "mysql" && scheme != "tidb" && scheme != "mysql+ssl" && scheme != "tidb+ssl" {
return "", errors.New("can create mysql sink with unsupported scheme")
return "", errors.New("can not create mysql sink with unsupported scheme")
}

params := defaultParams.Clone()
Expand Down
4 changes: 4 additions & 0 deletions cdc/verification/module_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ func (m *ModuleVerification) GC(ctx context.Context, endTs string) error {
default:
}

if endTs == "" {
return nil
}

err := m.deleteTrackData(endTs)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cdc/verification/module_verification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func TestNewModuleVerification(t *testing.T) {
defer m4.Close()
require.Nil(t, err)
require.NotSame(t, m4, m3)

err = m4.GC(context.Background(), "")
require.Nil(t, err)
}

func TestModuleVerification_SentTrackData(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions cdc/verification/verification_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package verification

import (
Expand Down

0 comments on commit 4c8b192

Please sign in to comment.