Skip to content

Commit

Permalink
pkg/p2p: add some helper structs for integrating p2p (pingcap#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored Dec 23, 2021
1 parent 8d040b4 commit ed9f956
Show file tree
Hide file tree
Showing 5 changed files with 489 additions and 0 deletions.
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.35.3 h1:r0puXncSaAfRt7Btml2swUo74Kao+vKhO3VLjwDjK54=
github.com/aws/aws-sdk-go v1.35.3/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
Expand All @@ -111,6 +112,7 @@ github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mo
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/bradleyjkemp/cupaloy/v2 v2.5.0/go.mod h1:TD5UU0rdYTbu/TtuwFuWrtiRARuN7mtRipvs/bsShSE=
github.com/bradleyjkemp/grpc-tools v0.2.5/go.mod h1:9OM0QfQGzMUC98I2kvHMK4Lw0memhg8j2BosoL4ME0M=
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g=
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets=
github.com/carlmjohnson/flagext v0.21.0/go.mod h1:Eenv0epIUAr4NuedNmkzI8WmBmjIxZC239XcKxYS2ac=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
Expand Down Expand Up @@ -203,6 +205,7 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand All @@ -211,6 +214,7 @@ github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7Bv
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 h1:8i9x3Q4hW1kLE4ScsOtUlwVHT76LKhkmOw9zbDxnyUc=
github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17/go.mod h1:3Ys1pJhyVaB6iWigv4o2r6Ug1GZmfDWqvqmO6bjojg0=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -489,6 +493,7 @@ github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
Expand All @@ -510,6 +515,7 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
github.com/juju/loggo v0.0.0-20180524022052-584905176618/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U=
github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY=
github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down Expand Up @@ -621,6 +627,7 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.5.0 h1:2EkzeTSqBB4V4bJwWrt5gIIrZmpJBcoIRGS2kWLgzmk=
github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
Expand Down Expand Up @@ -665,6 +672,7 @@ github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d h1:U+PMnTlV2tu7RuMK5etusZG3Cf+rpow5hqQByeCzJ2g=
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d/go.mod h1:lXfE4PvvTW5xOjO6Mba8zDPyw8M93B6AQ7frTGnMlA8=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
Expand Down Expand Up @@ -849,6 +857,7 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand All @@ -866,6 +875,7 @@ github.com/swaggo/swag v1.5.1/go.mod h1:1Bl9F/ZBpVWh22nY0zmYyASPO1lI/zIwRDrpZU+t
github.com/swaggo/swag v1.6.3/go.mod h1:wcc83tB4Mb2aNiL/HP4MFeQdpHUrca+Rp/DRNgWAUio=
github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476/go.mod h1:xDhTyuFIujYiN3DKWC/H/83xcfHp+UE/IzWWampG7Zc=
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs=
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM=
github.com/thoas/go-funk v0.7.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/thoas/go-funk v0.8.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
Expand Down Expand Up @@ -922,12 +932,17 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vmihailenco/msgpack/v4 v4.3.11/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4=
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1/go.mod h1:xlngVLeyQ/Qi05oQxhQ+oTuqa03RjMwMfk/7/TCs+QI=
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/tagparser v0.1.1 h1:quXMXlA39OCbd2wAdTsGDlK9RkOk6Wuw+x37wVyIuWY=
github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f h1:9DDCDwOyEy/gId+IEMrFHLuQ5R/WV0KNxWLler8X2OY=
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f/go.mod h1:8sdOQnirw1PrcnTJYkmW1iOHtUmblMmGdUOHyWYycLI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
Expand Down
149 changes: 149 additions & 0 deletions pkg/p2p/message_handler_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package p2p

import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
defaultHandlerOperationTimeout = 3 * time.Second
)

// MessageHandlerManager is for managing message topic handlers.
// NOTE: for each topic, only one handler is allowed.
type MessageHandlerManager interface {
RegisterHandler(ctx context.Context, topic Topic, tpi TypeInformation, fn HandlerFunc) (bool, error)

UnregisterHandler(ctx context.Context, topic Topic) (bool, error)

CheckError(ctx context.Context) error

// Clean unregisters all existing handlers.
Clean(ctx context.Context) error

// SetTimeout sets the timeout for handler operations.
// A timeout is needed because the underlying handler operations are
// asynchronous in the MessageServer.
SetTimeout(timeout time.Duration)
}

func newMessageHandlerManager(registrar handlerRegistrar) MessageHandlerManager {
return &messageHandlerManagerImpl{
messageServer: registrar,
timeout: atomic.NewDuration(defaultHandlerOperationTimeout),
topics: make(map[Topic]<-chan error),
}
}

// handlerRegistrar is an interface for the handler management-related
// functionalities of MessageServer.
// This interface is for easier unit-testing.
type handlerRegistrar interface {
SyncAddHandler(context.Context, Topic, TypeInformation, HandlerFunc) (<-chan error, error)
SyncRemoveHandler(context.Context, Topic) error
}

type messageHandlerManagerImpl struct {
messageServer handlerRegistrar
// timeout is atomic to avoid unnecessary blocking
timeout *atomic.Duration

// mu protects topics
mu sync.Mutex
topics map[Topic]<-chan error
}

func (m *messageHandlerManagerImpl) RegisterHandler(
ctx context.Context,
topic Topic,
tpi TypeInformation,
fn HandlerFunc,
) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()

if _, ok := m.topics[topic]; ok {
// A handler for this topic already exists.
return false, nil
}

ctx, cancel := m.makeContext(ctx)
defer cancel()

errCh, err := m.messageServer.SyncAddHandler(ctx, topic, tpi, fn)
if err != nil {
return false, errors.Trace(err)
}
m.topics[topic] = errCh

return true, nil
}

func (m *messageHandlerManagerImpl) UnregisterHandler(ctx context.Context, topic Topic) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()

if _, ok := m.topics[topic]; !ok {
// The handler for this topic does not exist
return false, nil
}

ctx, cancel := m.makeContext(ctx)
defer cancel()

if err := m.messageServer.SyncRemoveHandler(ctx, topic); err != nil {
return false, errors.Trace(err)
}
delete(m.topics, topic)

return true, nil
}

func (m *messageHandlerManagerImpl) CheckError(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()

for topic, errCh := range m.topics {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case err := <-errCh:
if err == nil {
continue
}
log.L().Warn("handler error received",
zap.String("topic", topic))
return errors.Trace(err)
default:
}
}
return nil
}

func (m *messageHandlerManagerImpl) Clean(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()

for topic := range m.topics {
if err := m.messageServer.SyncRemoveHandler(ctx, topic); err != nil {
return errors.Trace(err)
}
}

return nil
}

func (m *messageHandlerManagerImpl) SetTimeout(timeout time.Duration) {
m.timeout.Store(timeout)
}

func (m *messageHandlerManagerImpl) makeContext(parent context.Context) (context.Context, context.CancelFunc) {
timeout := m.timeout.Load()
return context.WithTimeout(parent, timeout)
}
125 changes: 125 additions & 0 deletions pkg/p2p/message_handler_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package p2p

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
p2pImpl "github.com/pingcap/tiflow/pkg/p2p"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

// handlerRegistrar must be implemented by MessageServer
var _ handlerRegistrar = (*p2pImpl.MessageServer)(nil)

type mockHandlerRegistrar struct {
mock.Mock
}

func (r *mockHandlerRegistrar) SyncAddHandler(
ctx context.Context,
topic Topic,
information TypeInformation,
handlerFunc HandlerFunc,
) (<-chan error, error) {
args := r.Called(ctx, topic, information, handlerFunc)
return args.Get(0).(<-chan error), args.Error(1)
}

func (r *mockHandlerRegistrar) SyncRemoveHandler(ctx context.Context, topic Topic) error {
args := r.Called(ctx, topic)
return args.Error(0)
}

type msgContent struct{}

func TestMessageHandlerManagerBasics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

registrar := &mockHandlerRegistrar{}
manager := newMessageHandlerManager(registrar)

errCh1 := make(chan error, 1)
registrar.On("SyncAddHandler", mock.Anything, "test-topic-1", &msgContent{}, mock.Anything).
Return((<-chan error)(errCh1), nil)
ok, err := manager.RegisterHandler(ctx, "test-topic-1", &msgContent{}, func(NodeID, MessageValue) error {
// This function does not matter here
return nil
})
require.NoError(t, err)
require.True(t, ok)
registrar.AssertExpectations(t)

// Test duplicate handler
ok, err = manager.RegisterHandler(ctx, "test-topic-1", &msgContent{}, func(NodeID, MessageValue) error {
// This function does not matter here
return nil
})
require.NoError(t, err)
require.False(t, ok)

errCh2 := make(chan error, 1)
registrar.ExpectedCalls = nil
registrar.On("SyncAddHandler", mock.Anything, "test-topic-2", &msgContent{}, mock.Anything).
Return((<-chan error)(errCh2), nil)
ok, err = manager.RegisterHandler(ctx, "test-topic-2", &msgContent{}, func(NodeID, MessageValue) error {
// This function does not matter here
return nil
})
require.NoError(t, err)
require.True(t, ok)
registrar.AssertExpectations(t)

err = manager.CheckError(ctx)
require.NoError(t, err)

errCh1 <- errors.New("fake error")
err = manager.CheckError(ctx)
require.Error(t, err)

registrar.ExpectedCalls = nil
registrar.On("SyncRemoveHandler", mock.Anything, "test-topic-1").Return(nil)
ok, err = manager.UnregisterHandler(ctx, "test-topic-1")
require.NoError(t, err)
require.True(t, ok)
registrar.AssertExpectations(t)

// duplicate unregister
ok, err = manager.UnregisterHandler(ctx, "test-topic-1")
require.NoError(t, err)
require.False(t, ok)

registrar.ExpectedCalls = nil
registrar.On("SyncRemoveHandler", mock.Anything, "test-topic-2").Return(nil)
err = manager.Clean(ctx)
require.NoError(t, err)
}

func TestMessageHandlerManagerTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

registrar := &mockHandlerRegistrar{}
manager := newMessageHandlerManager(registrar)
manager.SetTimeout(time.Duration(0))

registrar.On("SyncAddHandler", mock.Anything, "test-topic-1", &msgContent{}, mock.Anything).
Return((<-chan error)(nil), errors.New("fake error")).
Run(func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)
select {
case <-ctx.Done():
default:
require.Fail(t, "context should have been canceled")
}
})

_, err := manager.RegisterHandler(ctx, "test-topic-1", &msgContent{}, func(NodeID, MessageValue) error {
// This function does not matter here
return nil
})
require.Error(t, err)
}
Loading

0 comments on commit ed9f956

Please sign in to comment.