Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
etcd: fix json unmarshall error and sources not found after upgrade t…
Browse files Browse the repository at this point in the history
…o 2.0.2 (#1635)
  • Loading branch information
GMHDBJD authored May 6, 2021
1 parent c88ea87 commit 423c81d
Show file tree
Hide file tree
Showing 14 changed files with 493 additions and 16 deletions.
54 changes: 49 additions & 5 deletions .github/workflows/upgrade-via-tiup.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
name: Upgrade via TiUP

on:
pull_request:
branches:
- master
- release-2.0
schedule:
- cron: '3 22 * * *' # run at minute 06:03 UTC+8
workflow_dispatch:
Expand Down Expand Up @@ -35,7 +39,7 @@ jobs:
GOPATH=${GITHUB_WORKSPACE}/go docker-compose up -d
- name: Run test cases
working-directory: ${{ env.working-directoryr }}
working-directory: ${{ env.working-directory }}
run: |
cd ${{ env.working-directory }}/tests/tiup/docker
docker-compose exec -T control bash -c "cd /go/src/github.com/pingcap/dm && ./tests/tiup/upgrade-from-v1.sh"
Expand Down Expand Up @@ -63,7 +67,7 @@ jobs:
strategy:
fail-fast: false
matrix:
previous_v2: ["v2.0.0", "v2.0.1"]
previous_v2: ["v2.0.0", "v2.0.1", "v2.0.2"]

steps:

Expand All @@ -76,19 +80,59 @@ jobs:
uses: actions/checkout@v2
with:
path: go/src/github.com/pingcap/dm

- name: Build
if: ${{ github.ref != 'refs/heads/main' }}
working-directory: ${{ env.working-directory }}
run: make build nolint=true

- name: Package files
if: ${{ github.ref != 'refs/heads/main' }}
run: |
mkdir ${{ github.workspace }}/package
cd ${{ github.workspace }}/package
echo "package dm-master"
mkdir dm-master
cp ${{ env.working-directory }}/bin/dm-master dm-master
cp -r ${{ env.working-directory }}/dm/dm-ansible/conf dm-master
cp -r ${{ env.working-directory }}/dm/dm-ansible/scripts dm-master
tar -czvf dm-master-nightly-linux-amd64.tar.gz dm-master
echo "package dm-worker"
mkdir dm-worker
cp ${{ env.working-directory }}/bin/dm-worker dm-worker
cp -r ${{ env.working-directory }}/dm/dm-ansible/conf dm-worker/conf
cp -r ${{ env.working-directory }}/dm/dm-ansible/scripts dm-worker/scripts
tar -czvf dm-worker-nightly-linux-amd64.tar.gz dm-worker
echo "package dmctl"
mkdir dmctl
cp ${{ env.working-directory }}/bin/dmctl dmctl
cp -r ${{ env.working-directory }}/dm/dm-ansible/conf dmctl/conf
cp -r ${{ env.working-directory }}/dm/dm-ansible/scripts dmctl/scripts
tar -czvf dmctl-nightly-linux-amd64.tar.gz dmctl
- name: Setup containers
working-directory: ${{ env.working-directory }}
run: |
cd ${{ env.working-directory }}/tests/tiup/docker
GOPATH=${GITHUB_WORKSPACE}/go docker-compose up -d
- name: Copy package files
if: ${{ github.ref != 'refs/heads/main' }}
run: |
cd ${{ github.workspace }}/package
docker cp dm-master-nightly-linux-amd64.tar.gz control:/tmp
docker cp dm-worker-nightly-linux-amd64.tar.gz control:/tmp
docker cp dmctl-nightly-linux-amd64.tar.gz control:/tmp
# TODO: support more CUR_VER
- name: Run test cases
working-directory: ${{ env.working-directoryr }}
working-directory: ${{ env.working-directory }}
run: |
cd ${{ env.working-directory }}/tests/tiup/docker
docker-compose exec -T control bash -c "cd /go/src/github.com/pingcap/dm && ./tests/tiup/upgrade-from-v2.sh ${{ matrix.previous_v2 }} nightly"
docker-compose exec -e ref=${{ github.ref }} -T control bash -c "cd /go/src/github.com/pingcap/dm && ./tests/tiup/upgrade-from-v2.sh ${{ matrix.previous_v2 }} nightly"
# send Slack notify if failed.
# NOTE: With the exception of `GITHUB_TOKEN`, secrets are not passed to the runner when a workflow is triggered from a forked repository.
Expand Down Expand Up @@ -131,7 +175,7 @@ jobs:
GOPATH=${GITHUB_WORKSPACE}/go docker-compose up -d
- name: Run test cases before upgrade
working-directory: ${{ env.working-directoryr }}
working-directory: ${{ env.working-directory }}
run: |
cd ${{ env.working-directory }}/tests/tiup/docker
docker-compose exec -T control bash -c "cd /go/src/github.com/pingcap/dm && ./tests/tiup/upgrade-tidb.sh before_upgrade nightly"
Expand Down
10 changes: 10 additions & 0 deletions dm/master/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ func (s *Server) bootstrap(ctx context.Context) error {
return nil
}

func (s *Server) bootstrapBeforeSchedulerStart(ctx context.Context) error {
log.L().Info("start before scheduler start")
// no need for v1.0.x
if s.cfg.V1SourcesPath != "" {
return nil
}

return upgrade.TryUpgradeBeforeSchedulerStart(ctx, s.etcdClient)
}

// importFromV10x tries to import/upgrade the cluster from v1.0.x.
func (s *Server) importFromV10x(ctx context.Context) error {
// 1. check whether need to upgrade based on the cluster version.
Expand Down
11 changes: 10 additions & 1 deletion dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func (s *Server) electionNotify(ctx context.Context) {
log.L().Info("current member become the leader", zap.String("current member", s.cfg.Name))
s.leader.Store(oneselfStartingLeader)

// try to upgrade the cluster before scheduler start
err := s.bootstrapBeforeSchedulerStart(ctx)
if err != nil {
log.L().Error("fail to bootstrap the cluster before scheduler start", zap.Error(err))
s.retireLeader()
s.election.Resign()
continue
}

// NOTE: for logic errors, we should return with `true`, so that the cluster can serve requests and the user can fix these errors.
// otherwise no member of DM-master can become the leader and the user can't fix them (the cluster may need to be fixed offline with some other tools like etcdctl).
ok := s.startLeaderComponent(ctx)
Expand All @@ -76,7 +85,7 @@ func (s *Server) electionNotify(ctx context.Context) {
// so if the old leader failed when upgrading, the new leader can try again.
// NOTE: if the cluster has been upgraded, calling this method again should have no side effects.
// NOTE: now, bootstrap relies on scheduler to handle DM-worker instances, sources, tasks, etcd.
err := s.bootstrap(ctx)
err = s.bootstrap(ctx)
if err != nil {
log.L().Error("fail to bootstrap the cluster", zap.Error(err))
s.retireLeader()
Expand Down
2 changes: 1 addition & 1 deletion dm/master/workerrpc/rawgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *GRPCClient) Close() error {
return nil
}

// Closed returns whether this grpc conn is closed. only used for test now
// Closed returns whether this grpc conn is closed. only used for test now.
func (c *GRPCClient) Closed() bool {
return c.closed.Load()
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
)

// TODO: much of the code in optimistic mode is very similar to pessimistic mode, we can try to combine them together.
Expand Down Expand Up @@ -148,6 +149,18 @@ func (i Info) toJSON() (string, error) {
// infoFromJSON constructs Info from its JSON represent.
func infoFromJSON(s string) (i Info, err error) {
err = json.Unmarshal([]byte(s), &i)
if err != nil {
// For compatibility.
// In v2.0.2, we changed struct of table-info-after but forgot to upgrade etcd value.
// To keep the ModRevision of info, we change them after getting info instead of change all the value in etcd when upgrade
// All the Info will be upgraded after new info putted or lock resolved.
oldInfo, newErr := oldInfoFromJSON(s)
if newErr != nil {
log.L().Error("unmarshal old info", log.ShortError(newErr))
return
}
return oldInfo.toInfo(), nil
}
return
}

Expand Down Expand Up @@ -294,3 +307,38 @@ func ClearTestInfoOperationSchema(cli *clientv3.Client) error {
_, err := cli.Txn(context.Background()).Then(clearSource, clearInfo, clearOp, clearISOp, clearColumns).Commit()
return err
}

// OldInfo represents info in etcd before v2.0.2.
type OldInfo struct {
Task string `json:"task"`
Source string `json:"source"`
UpSchema string `json:"up-schema"`
UpTable string `json:"up-table"`
DownSchema string `json:"down-schema"`
DownTable string `json:"down-table"`
DDLs []string `json:"ddls"`

TableInfoBefore *model.TableInfo `json:"table-info-before"` // the tracked table schema before applying the DDLs
TableInfoAfter *model.TableInfo `json:"table-info-after"` // the tracked table schema after applying the DDLs
}

// oldInfoFromJSON constructs OldInfo from its JSON represent.
func oldInfoFromJSON(s string) (oldInfo OldInfo, err error) {
err = json.Unmarshal([]byte(s), &oldInfo)
return
}

// toInfo converts OldInfo to Info.
func (oldInfo *OldInfo) toInfo() Info {
return Info{
Task: oldInfo.Task,
Source: oldInfo.Source,
UpSchema: oldInfo.UpSchema,
UpTable: oldInfo.UpTable,
DownSchema: oldInfo.DownSchema,
DownTable: oldInfo.DownTable,
DDLs: oldInfo.DDLs,
TableInfoBefore: oldInfo.TableInfoBefore,
TableInfosAfter: []*model.TableInfo{oldInfo.TableInfoAfter},
}
}
121 changes: 121 additions & 0 deletions pkg/shardddl/optimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package optimism

import (
"context"
"encoding/json"
"sync"
"testing"
"time"
Expand All @@ -28,6 +29,9 @@ import (
"github.com/pingcap/tidb/util/mock"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
)

var etcdTestCli *clientv3.Client
Expand Down Expand Up @@ -85,6 +89,91 @@ func (t *testForEtcd) TestInfoJSON(c *C) {
c.Assert(i2, DeepEquals, i1)
}

func (t *testForEtcd) TestEtcdInfoUpgrade(c *C) {
defer clearTestInfoOperation(c)

var (
source1 = "mysql-replica-1"
source2 = "mysql-replica-2"
task1 = "task-1"
task2 = "task-2"
upSchema = "foo_1"
upTable = "bar_1"
downSchema = "foo"
downTable = "bar"
p = parser.New()
se = mock.NewContext()
tblID int64 = 222
tblI1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
tblI2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`)
tblI3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`)
tblI4 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT, c3 INT)`)
i11 = NewInfo(task1, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c1 INT"}, tblI1, []*model.TableInfo{tblI2})
i12 = NewInfo(task1, source2, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c2 INT"}, tblI2, []*model.TableInfo{tblI3})
i21 = NewInfo(task2, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c3 INT"}, tblI3, []*model.TableInfo{tblI4})
oi11 = newOldInfo(task1, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c1 INT"}, tblI1, tblI2)
oi12 = newOldInfo(task1, source2, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c2 INT"}, tblI2, tblI3)
oi21 = newOldInfo(task2, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c3 INT"}, tblI3, tblI4)
)

// put the oldInfo
rev1, err := putOldInfo(etcdTestCli, oi11)
c.Assert(err, IsNil)
rev2, err := putOldInfo(etcdTestCli, oi11)
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)

// put another key and get again with 2 info.
rev3, err := putOldInfo(etcdTestCli, oi12)
c.Assert(err, IsNil)
c.Assert(rev3, Greater, rev2)

// get all infos.
ifm, rev4, err := GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev4, Equals, rev3)
c.Assert(ifm, HasLen, 1)
c.Assert(ifm, HasKey, task1)
c.Assert(ifm[task1], HasLen, 2)
c.Assert(ifm[task1][source1], HasLen, 1)
c.Assert(ifm[task1][source1][upSchema], HasLen, 1)
c.Assert(ifm[task1][source2], HasLen, 1)
c.Assert(ifm[task1][source2][upSchema], HasLen, 1)

i11WithVer := i11
i11WithVer.Version = 2
i11WithVer.Revision = rev2
i12WithVer := i12
i12WithVer.Version = 1
i12WithVer.Revision = rev4
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
c.Assert(ifm[task1][source2][upSchema][upTable], DeepEquals, i12WithVer)

// start the watcher.
wch := make(chan Info, 10)
ech := make(chan error, 10)
var wg sync.WaitGroup
wg.Add(1)
watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
go func() {
defer wg.Done()
WatchInfo(watchCtx, etcdTestCli, rev4+1, wch, ech) // revision+1
}()

// put another oldInfo for a different task.
// version start from 1
// simulate v2.0.1 worker and v2.0.2 master
rev5, err := putOldInfo(etcdTestCli, oi21)
c.Assert(err, IsNil)
infoWithVer := <-wch
i21WithVer := i21
i21WithVer.Version = 1
i21WithVer.Revision = rev5
c.Assert(infoWithVer, DeepEquals, i21WithVer)
c.Assert(len(ech), Equals, 0)
}

func (t *testForEtcd) TestInfoEtcd(c *C) {
defer clearTestInfoOperation(c)

Expand Down Expand Up @@ -237,3 +326,35 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(info, DeepEquals, i12c)
c.Assert(len(ech), Equals, 0)
}

func newOldInfo(task, source, upSchema, upTable, downSchema, downTable string,
ddls []string, tableInfoBefore *model.TableInfo, tableInfoAfter *model.TableInfo) OldInfo {
return OldInfo{
Task: task,
Source: source,
UpSchema: upSchema,
UpTable: upTable,
DownSchema: downSchema,
DownTable: downTable,
DDLs: ddls,
TableInfoBefore: tableInfoBefore,
TableInfoAfter: tableInfoAfter,
}
}

func putOldInfo(cli *clientv3.Client, oldInfo OldInfo) (int64, error) {
data, err := json.Marshal(oldInfo)
if err != nil {
return 0, err
}
key := common.ShardDDLOptimismInfoKeyAdapter.Encode(oldInfo.Task, oldInfo.Source, oldInfo.UpSchema, oldInfo.UpTable)

ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := cli.Put(ctx, key, string(data))
if err != nil {
return 0, err
}
return resp.Header.Revision, nil
}
Loading

0 comments on commit 423c81d

Please sign in to comment.