Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor drainer syncer #532

Merged
merged 22 commits into from
Apr 24, 2019
Merged

refactor drainer syncer #532

merged 22 commits into from
Apr 24, 2019

Conversation

july2993
Copy link
Contributor

@july2993 july2993 commented Apr 10, 2019

What problem does this PR solve?

https://internal.pingcap.net/jira/browse/TOOL-963
discard the origin Translator and executor interface, add a new Syncer interface in drainer/sync/

  • avoid to split transaction in translator and assemble back in executor.
  • avoid some magic handle about safe ts of checkpoint by kafka and flash doing batch execute at downstream
  • unify using pkg/loader to sync to mysql in drainer like other tool
  • much simplify syncer.go
  • add unit test about relate change code, make syncer.go /sync /translator covered more than 75%, making total coverage from 45.625% to 53.721%

bad side:

  • change a lot.., may introduce some problems unknown.

What is changed and how it works?

discard the origin Translator and executor interface, add a new Syncer interface

Check List

Tests

  • Unit test
  • Integration test

Code changes

  • Has exported function/method change
  • Has exported variable/fields change
  • Has interface methods change

Side effects

Related changes

  • Need to cherry-pick to the release branch

@july2993
Copy link
Contributor Author

/run-unit-test

@sre-bot
Copy link

sre-bot commented Apr 10, 2019

Hi contributor, thanks for your PR.

This patch needs to be approved by someone of admins. They should reply with "/ok-to-test" to accept this PR for running test automatically.

@july2993
Copy link
Contributor Author

/run-unit-test

@july2993
Copy link
Contributor Author

/run-all-tests

Makefile Show resolved Hide resolved
drainer/sync/flash_test.go Outdated Show resolved Hide resolved
drainer/sync/syncer.go Outdated Show resolved Hide resolved
drainer/syncer.go Outdated Show resolved Hide resolved
drainer/syncer.go Outdated Show resolved Hide resolved
drainer/translator/mysql.go Outdated Show resolved Hide resolved
drainer/translator/mysql.go Outdated Show resolved Hide resolved
drainer/translator/mysql.go Outdated Show resolved Hide resolved
drainer/translator/mysql_test.go Outdated Show resolved Hide resolved
drainer/translator/mysql_test.go Outdated Show resolved Hide resolved
@july2993
Copy link
Contributor Author

/run-unit-test

@july2993
Copy link
Contributor Author

/run-unit-test

drainer/server.go Outdated Show resolved Hide resolved
drainer/sync/bench_kafka_test.go Outdated Show resolved Hide resolved
drainer/sync/flash.go Outdated Show resolved Hide resolved
drainer/sync/flash_test.go Show resolved Hide resolved
drainer/sync/kafka.go Show resolved Hide resolved
drainer/translator/flash.go Outdated Show resolved Hide resolved
drainer/translator/flash.go Outdated Show resolved Hide resolved
drainer/translator/kafka.go Outdated Show resolved Hide resolved
drainer/translator/kafka.go Outdated Show resolved Hide resolved
drainer/translator/pb.go Outdated Show resolved Hide resolved
@suzaku
Copy link
Contributor

suzaku commented Apr 19, 2019

It took me more than 75 minutes to review this PR. 😿

@july2993
Copy link
Contributor Author

It took me more than 75 minutes to review this PR. 😿

much quicker than as expected by me

@july2993
Copy link
Contributor Author

/run-all-tests

@july2993
Copy link
Contributor Author

PTAL @suzaku @GregoryIan

drainer/syncer.go Outdated Show resolved Hide resolved
@july2993
Copy link
Contributor Author

/run-all-tests

drainer/sync/flash.go Show resolved Hide resolved
drainer/sync/flash.go Outdated Show resolved Hide resolved
func (p *KafkaSyncer) Close() error {
close(p.shutdown)

err := <-p.Error()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no error occurs, will this line block forever?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may return if SetErr is called, and may return nil

Copy link
Contributor

@suzaku suzaku Apr 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (t *testExecutorSuite) TestBaseError(c *C) {
	be := newBaseError()
	select {
	case err := <-be.Error():
		c.Assert(err, IsNil)
	case <-time.After(time.Second):
		c.Fatal("Timeout")
	}
}

If SetErr is not called, it may wait forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, as what i said.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can not be sure there must be something wrong when close is called, maybe we should use select to avoid blocking?

@IANTHEREAL
Copy link
Collaborator

LGTM

@july2993
Copy link
Contributor Author

/run-all-tests

@july2993
Copy link
Contributor Author

@WangXiangUSTC @kennytm PTAL, we need at least three LGTMs for this pr

@WangXiangUSTC @kennytm

Copy link
Contributor

@WangXiangUSTC WangXiangUSTC left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest LGTM

drainer/sync/bench_kafka_test.go Show resolved Hide resolved
drainer/syncer.go Outdated Show resolved Hide resolved
if !ok {
return errors.Errorf("not found table id: %d", mutation.GetTableId())
return false, errors.Errorf("not found table id: %d", mutation.GetTableId())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about use errors.NotFoundf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pingcap/errors#11
we should not use errors.NotFoundf, instead we can remove it all, keep this now.

drainer/translator/kafka.go Outdated Show resolved Hide resolved
@july2993
Copy link
Contributor Author

/run-all-tests

@WangXiangUSTC
Copy link
Contributor

LGTM

@july2993 july2993 merged commit 0780072 into master Apr 24, 2019
@july2993 july2993 deleted the hjh/drainer branch April 24, 2019 05:50
july2993 added a commit to july2993/tidb-binlog that referenced this pull request May 22, 2019
Drop the Check() and and String() of Checkpoint.
Drop MetaCheckpoint of FlashCheckPoint, after pingcap#532
there's no need to keep this.
@july2993 july2993 mentioned this pull request May 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants