diff --git a/go.sum b/go.sum index 9d8cc706299..0b3b1ae0de8 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,7 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/aws/aws-sdk-go v1.35.3 h1:r0puXncSaAfRt7Btml2swUo74Kao+vKhO3VLjwDjK54= github.com/aws/aws-sdk-go v1.35.3/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA= @@ -111,6 +112,7 @@ github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mo github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/bradleyjkemp/cupaloy/v2 v2.5.0/go.mod h1:TD5UU0rdYTbu/TtuwFuWrtiRARuN7mtRipvs/bsShSE= github.com/bradleyjkemp/grpc-tools v0.2.5/go.mod h1:9OM0QfQGzMUC98I2kvHMK4Lw0memhg8j2BosoL4ME0M= +github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g= github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets= github.com/carlmjohnson/flagext v0.21.0/go.mod h1:Eenv0epIUAr4NuedNmkzI8WmBmjIxZC239XcKxYS2ac= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= @@ -203,6 +205,7 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= @@ -211,6 +214,7 @@ github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7Bv github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 h1:8i9x3Q4hW1kLE4ScsOtUlwVHT76LKhkmOw9zbDxnyUc= github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17/go.mod h1:3Ys1pJhyVaB6iWigv4o2r6Ug1GZmfDWqvqmO6bjojg0= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -489,6 +493,7 @@ github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/ github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= @@ -510,6 +515,7 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= github.com/juju/loggo v0.0.0-20180524022052-584905176618/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= +github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY= github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -621,6 +627,7 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/montanaflynn/stats v0.5.0 h1:2EkzeTSqBB4V4bJwWrt5gIIrZmpJBcoIRGS2kWLgzmk= github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= @@ -665,6 +672,7 @@ github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc= github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= +github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d h1:U+PMnTlV2tu7RuMK5etusZG3Cf+rpow5hqQByeCzJ2g= github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d/go.mod h1:lXfE4PvvTW5xOjO6Mba8zDPyw8M93B6AQ7frTGnMlA8= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -849,6 +857,7 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -866,6 +875,7 @@ github.com/swaggo/swag v1.5.1/go.mod h1:1Bl9F/ZBpVWh22nY0zmYyASPO1lI/zIwRDrpZU+t github.com/swaggo/swag v1.6.3/go.mod h1:wcc83tB4Mb2aNiL/HP4MFeQdpHUrca+Rp/DRNgWAUio= github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476/go.mod h1:xDhTyuFIujYiN3DKWC/H/83xcfHp+UE/IzWWampG7Zc= github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= +github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs= github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= github.com/thoas/go-funk v0.7.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/thoas/go-funk v0.8.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= @@ -922,12 +932,17 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+ github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/vmihailenco/msgpack/v4 v4.3.11/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4= github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1/go.mod h1:xlngVLeyQ/Qi05oQxhQ+oTuqa03RjMwMfk/7/TCs+QI= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser v0.1.1 h1:quXMXlA39OCbd2wAdTsGDlK9RkOk6Wuw+x37wVyIuWY= github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f h1:9DDCDwOyEy/gId+IEMrFHLuQ5R/WV0KNxWLler8X2OY= github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f/go.mod h1:8sdOQnirw1PrcnTJYkmW1iOHtUmblMmGdUOHyWYycLI= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= diff --git a/pkg/p2p/message_handler_manager.go b/pkg/p2p/message_handler_manager.go new file mode 100644 index 00000000000..4fa9df6b6eb --- /dev/null +++ b/pkg/p2p/message_handler_manager.go @@ -0,0 +1,149 @@ +package p2p + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/dm/pkg/log" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +const ( + defaultHandlerOperationTimeout = 3 * time.Second +) + +// MessageHandlerManager is for managing message topic handlers. +// NOTE: for each topic, only one handler is allowed. +type MessageHandlerManager interface { + RegisterHandler(ctx context.Context, topic Topic, tpi TypeInformation, fn HandlerFunc) (bool, error) + + UnregisterHandler(ctx context.Context, topic Topic) (bool, error) + + CheckError(ctx context.Context) error + + // Clean unregisters all existing handlers. + Clean(ctx context.Context) error + + // SetTimeout sets the timeout for handler operations. + // A timeout is needed because the underlying handler operations are + // asynchronous in the MessageServer. + SetTimeout(timeout time.Duration) +} + +func newMessageHandlerManager(registrar handlerRegistrar) MessageHandlerManager { + return &messageHandlerManagerImpl{ + messageServer: registrar, + timeout: atomic.NewDuration(defaultHandlerOperationTimeout), + topics: make(map[Topic]<-chan error), + } +} + +// handlerRegistrar is an interface for the handler management-related +// functionalities of MessageServer. +// This interface is for easier unit-testing. +type handlerRegistrar interface { + SyncAddHandler(context.Context, Topic, TypeInformation, HandlerFunc) (<-chan error, error) + SyncRemoveHandler(context.Context, Topic) error +} + +type messageHandlerManagerImpl struct { + messageServer handlerRegistrar + // timeout is atomic to avoid unnecessary blocking + timeout *atomic.Duration + + // mu protects topics + mu sync.Mutex + topics map[Topic]<-chan error +} + +func (m *messageHandlerManagerImpl) RegisterHandler( + ctx context.Context, + topic Topic, + tpi TypeInformation, + fn HandlerFunc, +) (bool, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.topics[topic]; ok { + // A handler for this topic already exists. + return false, nil + } + + ctx, cancel := m.makeContext(ctx) + defer cancel() + + errCh, err := m.messageServer.SyncAddHandler(ctx, topic, tpi, fn) + if err != nil { + return false, errors.Trace(err) + } + m.topics[topic] = errCh + + return true, nil +} + +func (m *messageHandlerManagerImpl) UnregisterHandler(ctx context.Context, topic Topic) (bool, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.topics[topic]; !ok { + // The handler for this topic does not exist + return false, nil + } + + ctx, cancel := m.makeContext(ctx) + defer cancel() + + if err := m.messageServer.SyncRemoveHandler(ctx, topic); err != nil { + return false, errors.Trace(err) + } + delete(m.topics, topic) + + return true, nil +} + +func (m *messageHandlerManagerImpl) CheckError(ctx context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() + + for topic, errCh := range m.topics { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err := <-errCh: + if err == nil { + continue + } + log.L().Warn("handler error received", + zap.String("topic", topic)) + return errors.Trace(err) + default: + } + } + return nil +} + +func (m *messageHandlerManagerImpl) Clean(ctx context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() + + for topic := range m.topics { + if err := m.messageServer.SyncRemoveHandler(ctx, topic); err != nil { + return errors.Trace(err) + } + } + + return nil +} + +func (m *messageHandlerManagerImpl) SetTimeout(timeout time.Duration) { + m.timeout.Store(timeout) +} + +func (m *messageHandlerManagerImpl) makeContext(parent context.Context) (context.Context, context.CancelFunc) { + timeout := m.timeout.Load() + return context.WithTimeout(parent, timeout) +} diff --git a/pkg/p2p/message_handler_manager_test.go b/pkg/p2p/message_handler_manager_test.go new file mode 100644 index 00000000000..bc7c96b96f3 --- /dev/null +++ b/pkg/p2p/message_handler_manager_test.go @@ -0,0 +1,125 @@ +package p2p + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/errors" + p2pImpl "github.com/pingcap/tiflow/pkg/p2p" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// handlerRegistrar must be implemented by MessageServer +var _ handlerRegistrar = (*p2pImpl.MessageServer)(nil) + +type mockHandlerRegistrar struct { + mock.Mock +} + +func (r *mockHandlerRegistrar) SyncAddHandler( + ctx context.Context, + topic Topic, + information TypeInformation, + handlerFunc HandlerFunc, +) (<-chan error, error) { + args := r.Called(ctx, topic, information, handlerFunc) + return args.Get(0).(<-chan error), args.Error(1) +} + +func (r *mockHandlerRegistrar) SyncRemoveHandler(ctx context.Context, topic Topic) error { + args := r.Called(ctx, topic) + return args.Error(0) +} + +type msgContent struct{} + +func TestMessageHandlerManagerBasics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + registrar := &mockHandlerRegistrar{} + manager := newMessageHandlerManager(registrar) + + errCh1 := make(chan error, 1) + registrar.On("SyncAddHandler", mock.Anything, "test-topic-1", &msgContent{}, mock.Anything). + Return((<-chan error)(errCh1), nil) + ok, err := manager.RegisterHandler(ctx, "test-topic-1", &msgContent{}, func(NodeID, MessageValue) error { + // This function does not matter here + return nil + }) + require.NoError(t, err) + require.True(t, ok) + registrar.AssertExpectations(t) + + // Test duplicate handler + ok, err = manager.RegisterHandler(ctx, "test-topic-1", &msgContent{}, func(NodeID, MessageValue) error { + // This function does not matter here + return nil + }) + require.NoError(t, err) + require.False(t, ok) + + errCh2 := make(chan error, 1) + registrar.ExpectedCalls = nil + registrar.On("SyncAddHandler", mock.Anything, "test-topic-2", &msgContent{}, mock.Anything). + Return((<-chan error)(errCh2), nil) + ok, err = manager.RegisterHandler(ctx, "test-topic-2", &msgContent{}, func(NodeID, MessageValue) error { + // This function does not matter here + return nil + }) + require.NoError(t, err) + require.True(t, ok) + registrar.AssertExpectations(t) + + err = manager.CheckError(ctx) + require.NoError(t, err) + + errCh1 <- errors.New("fake error") + err = manager.CheckError(ctx) + require.Error(t, err) + + registrar.ExpectedCalls = nil + registrar.On("SyncRemoveHandler", mock.Anything, "test-topic-1").Return(nil) + ok, err = manager.UnregisterHandler(ctx, "test-topic-1") + require.NoError(t, err) + require.True(t, ok) + registrar.AssertExpectations(t) + + // duplicate unregister + ok, err = manager.UnregisterHandler(ctx, "test-topic-1") + require.NoError(t, err) + require.False(t, ok) + + registrar.ExpectedCalls = nil + registrar.On("SyncRemoveHandler", mock.Anything, "test-topic-2").Return(nil) + err = manager.Clean(ctx) + require.NoError(t, err) +} + +func TestMessageHandlerManagerTimeout(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + registrar := &mockHandlerRegistrar{} + manager := newMessageHandlerManager(registrar) + manager.SetTimeout(time.Duration(0)) + + registrar.On("SyncAddHandler", mock.Anything, "test-topic-1", &msgContent{}, mock.Anything). + Return((<-chan error)(nil), errors.New("fake error")). + Run(func(args mock.Arguments) { + ctx := args.Get(0).(context.Context) + select { + case <-ctx.Done(): + default: + require.Fail(t, "context should have been canceled") + } + }) + + _, err := manager.RegisterHandler(ctx, "test-topic-1", &msgContent{}, func(NodeID, MessageValue) error { + // This function does not matter here + return nil + }) + require.Error(t, err) +} diff --git a/pkg/p2p/server.go b/pkg/p2p/server.go new file mode 100644 index 00000000000..283a675bc96 --- /dev/null +++ b/pkg/p2p/server.go @@ -0,0 +1,117 @@ +package p2p + +import ( + "context" + "net" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/dm/pkg/log" + p2pImpl "github.com/pingcap/tiflow/pkg/p2p" + "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/proto/p2p" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +// Re-export some types +type ( + Topic = p2pImpl.Topic + NodeID = p2pImpl.NodeID + Config = p2pImpl.MessageServerConfig +) + +type ( + TypeInformation = interface{} + MessageValue = interface{} + HandlerFunc = func(sender NodeID, value MessageValue) error +) + +type ( + MessageServerOpt = func(*Config) +) + +// read only +var defaultServerConfig = Config{ + MaxPendingMessageCountPerTopic: 256, + MaxPendingTaskCount: 102400, + SendChannelSize: 16, + AckInterval: time.Millisecond * 200, + WorkerPoolSize: 4, + MaxPeerCount: 1024, + WaitUnregisterHandleTimeoutThreshold: time.Millisecond * 100, +} + +// MessageRPCService is a background service wrapping a MessageServer instance. +type MessageRPCService struct { + messageServer *p2pImpl.MessageServer + grpcServer *grpc.Server +} + +// NewMessageRPCService creates a new MessageRPCService. +// Note: TLS is not supported for now. +func NewMessageRPCService( + selfID NodeID, + _credential *security.Credential, + opts ...MessageServerOpt, +) (*MessageRPCService, error) { + // Deep copy + config := defaultServerConfig + // Apply opts + for _, opt := range opts { + opt(&config) + } + + messageServer := p2pImpl.NewMessageServer(selfID, &config) + // TODO zixiong: support accepting TLS connections. + grpcSvr := grpc.NewServer() + p2p.RegisterCDCPeerToPeerServer(grpcSvr, messageServer) + return &MessageRPCService{ + messageServer: messageServer, + grpcServer: grpcSvr, + }, nil +} + +// Serve listens on `l` and creates the background goroutine for the message server. +func (s *MessageRPCService) Serve(ctx context.Context, l net.Listener) error { + defer func() { + err := l.Close() + if err != nil { + log.L().Warn("failed to close Listener", zap.Error(err)) + } + }() + + wg, ctx := errgroup.WithContext(ctx) + + wg.Go(func() (err error) { + defer log.L().ErrorFilterContextCanceled("message server exited", zap.Error(err)) + return errors.Trace(s.messageServer.Run(ctx)) + }) + + wg.Go(func() (err error) { + defer func() { + // TODO (zixiong) filter out expected harmless errors. + log.L().Debug("grpc server exited", zap.Error(err)) + }() + return errors.Trace(s.grpcServer.Serve(l)) + }) + + // We need a separate goroutine for canceling the gRPC server + // because the `Serve` method provides by the library does not + // support canceling by contexts, which is a more idiomatic way. + wg.Go(func() error { + <-ctx.Done() + log.L().Debug("context canceled, stopping the gRPC server") + + s.grpcServer.Stop() + return nil + }) + + return wg.Wait() +} + +// MakeHandlerManager returns a MessageHandlerManager +func (s *MessageRPCService) MakeHandlerManager() MessageHandlerManager { + return newMessageHandlerManager(s.messageServer) +} diff --git a/pkg/p2p/server_integration_test.go b/pkg/p2p/server_integration_test.go new file mode 100644 index 00000000000..e356403cb7d --- /dev/null +++ b/pkg/p2p/server_integration_test.go @@ -0,0 +1,83 @@ +package p2p + +import ( + "context" + "fmt" + "math" + "net" + "sync" + "testing" + "time" + + "github.com/phayes/freeport" + p2pImpl "github.com/pingcap/tiflow/pkg/p2p" + "github.com/pingcap/tiflow/pkg/security" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func makeListenerForServerTests(t *testing.T) (l net.Listener, addr string) { + port := freeport.GetPort() + addr = fmt.Sprintf("127.0.0.1:%d", port) + l, err := net.Listen("tcp", addr) + require.NoError(t, err) + return +} + +// read only +var clientConfigForUnitTesting = &p2pImpl.MessageClientConfig{ + SendChannelSize: 0, // unbuffered channel to make tests more reliable + BatchSendInterval: time.Second, + MaxBatchBytes: math.MaxInt64, + MaxBatchCount: math.MaxInt64, + RetryRateLimitPerSecond: 999.0, + ClientVersion: "v5.4.0", // a fake version + AdvertisedAddr: "fake-addr:8300", +} + +func TestMessageRPCServiceBasics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + l, addr := makeListenerForServerTests(t) + messageSrvc, err := NewMessageRPCService("test-node-1", &security.Credential{} /* no TLS */) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := messageSrvc.Serve(ctx, l) + require.Error(t, err) + require.Regexp(t, ".*canceled.*", err.Error()) + }() + + var called atomic.Bool + handlerManager := messageSrvc.MakeHandlerManager() + ok, err := handlerManager.RegisterHandler(ctx, "test-topic-1", &msgContent{}, func(sender NodeID, value MessageValue) error { + require.Equal(t, "test-client-1", sender) + require.IsType(t, &msgContent{}, value) + require.False(t, called.Swap(true)) + return nil + }) + require.NoError(t, err) + require.True(t, ok) + + client := p2pImpl.NewMessageClient("test-client-1", clientConfigForUnitTesting) + wg.Add(1) + go func() { + defer wg.Done() + err := client.Run(ctx, "tcp", addr, "test-node-1", &security.Credential{} /* no TLS */) + require.Error(t, err) + require.Regexp(t, ".*canceled.*", err.Error()) + }() + + _, err = client.SendMessage(ctx, "test-topic-1", &msgContent{}) + require.NoError(t, err) + require.Eventually(t, func() bool { + return called.Load() + }, 5*time.Second, 10*time.Millisecond) + + cancel() + wg.Wait() +}