From 2ec00dc8201ea9a22293cf3facc83a1419f6a3df Mon Sep 17 00:00:00 2001 From: acruikshank Date: Wed, 9 Dec 2020 16:01:36 -0500 Subject: [PATCH 1/3] modify transformable event logic to simply add data --- .gitignore | 1 + messagequeue/messagequeue.go | 2 +- notifications/data_subscriber.go | 44 +++++++++++++++++ notifications/mappable.go | 47 ------------------- notifications/notifications_test.go | 4 +- notifications/types.go | 29 ++++++------ .../peerresponsemanager/peerresponsesender.go | 16 +++---- responsemanager/queryexecutor.go | 10 ++-- responsemanager/responsemanager.go | 12 ++--- responsemanager/responsemanager_test.go | 16 +++---- testutil/testnotifications.go | 20 ++++---- 11 files changed, 99 insertions(+), 102 deletions(-) create mode 100644 .gitignore create mode 100644 notifications/data_subscriber.go delete mode 100644 notifications/mappable.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..485dee64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index 9b422b22..555b91b0 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -141,7 +141,7 @@ func (mq *MessageQueue) mutateNextMessage(mutator func(gsmsg.GraphSyncMessage), } mutator(mq.nextMessage) for _, notifee := range notifees { - notifications.SubscribeOn(mq.eventPublisher, mq.nextMessageTopic, notifee) + notifications.SubscribeWithData(mq.eventPublisher, mq.nextMessageTopic, notifee) } return !mq.nextMessage.Empty() } diff --git a/notifications/data_subscriber.go b/notifications/data_subscriber.go new file mode 100644 index 00000000..983e2a23 --- /dev/null +++ b/notifications/data_subscriber.go @@ -0,0 +1,44 @@ +package notifications + +import "sync" + +type topicDataSubscriber struct { + idMapLk sync.RWMutex + data map[Topic]TopicData + Subscriber +} + +// NewTopicDataSubscriber produces a subscriber that will transform +// events and topics before passing them on to the given subscriber +func NewTopicDataSubscriber(sub Subscriber) TopicDataSubscriber { + return &topicDataSubscriber{ + Subscriber: sub, + data: make(map[Topic]TopicData), + } +} + +func (m *topicDataSubscriber) AddTopicData(id Topic, data TopicData) { + m.idMapLk.Lock() + m.data[id] = data + m.idMapLk.Unlock() +} + +func (m *topicDataSubscriber) transform(id Topic) Topic { + m.idMapLk.RLock() + defer m.idMapLk.RUnlock() + destID, ok := m.data[id] + if !ok { + return id + } + return destID +} + +func (m *topicDataSubscriber) OnNext(topic Topic, ev Event) { + newTopic := m.transform(topic) + m.Subscriber.OnNext(newTopic, ev) +} + +func (m *topicDataSubscriber) OnClose(topic Topic) { + newTopic := m.transform(topic) + m.Subscriber.OnClose(newTopic) +} diff --git a/notifications/mappable.go b/notifications/mappable.go deleted file mode 100644 index 0861d774..00000000 --- a/notifications/mappable.go +++ /dev/null @@ -1,47 +0,0 @@ -package notifications - -import "sync" - -type mappableSubscriber struct { - idMapLk sync.RWMutex - idMap map[Topic]Topic - eventTransform EventTransform - Subscriber -} - -// NewMappableSubscriber produces a subscriber that will transform -// events and topics before passing them on to the given subscriber -func NewMappableSubscriber(sub Subscriber, eventTransform EventTransform) MappableSubscriber { - return &mappableSubscriber{ - Subscriber: sub, - eventTransform: eventTransform, - idMap: make(map[Topic]Topic), - } -} - -func (m *mappableSubscriber) Map(id Topic, destinationID Topic) { - m.idMapLk.Lock() - m.idMap[id] = destinationID - m.idMapLk.Unlock() -} - -func (m *mappableSubscriber) transform(id Topic) Topic { - m.idMapLk.RLock() - defer m.idMapLk.RUnlock() - destID, ok := m.idMap[id] - if !ok { - return id - } - return destID -} - -func (m *mappableSubscriber) OnNext(topic Topic, ev Event) { - newTopic := m.transform(topic) - newEv := m.eventTransform(ev) - m.Subscriber.OnNext(newTopic, newEv) -} - -func (m *mappableSubscriber) OnClose(topic Topic) { - newTopic := m.transform(topic) - m.Subscriber.OnClose(newTopic) -} diff --git a/notifications/notifications_test.go b/notifications/notifications_test.go index f901a3de..83e4aaa0 100644 --- a/notifications/notifications_test.go +++ b/notifications/notifications_test.go @@ -12,10 +12,10 @@ import ( func TestSubscribeOn(t *testing.T) { ctx := context.Background() testCases := map[string]func(ctx context.Context, t *testing.T, ps notifications.Publisher){ - "SubscribeOn": func(ctx context.Context, t *testing.T, ps notifications.Publisher) { + "SubscribeWithData": func(ctx context.Context, t *testing.T, ps notifications.Publisher) { destTopic := "t2" notifee, verifier := testutil.NewTestNotifee(destTopic, 1) - notifications.SubscribeOn(ps, "t1", notifee) + notifications.SubscribeWithData(ps, "t1", notifee) ps.Publish("t1", "hi") ps.Shutdown() verifier.ExpectEvents(ctx, t, []notifications.Event{"hi"}) diff --git a/notifications/types.go b/notifications/types.go index e2c29ef2..ae45d3a1 100644 --- a/notifications/types.go +++ b/notifications/types.go @@ -6,17 +6,20 @@ type Topic interface{} // Event is a publishable event type Event interface{} +// TopicData is data added to every message broadcast on a topic +type TopicData interface{} + // Subscriber is a subscriber that can receive events type Subscriber interface { OnNext(Topic, Event) OnClose(Topic) } -// MappableSubscriber is a subscriber that remaps events received to other topics +// TopicDataSubscriber is a subscriber that remaps events received to other topics // and events -type MappableSubscriber interface { +type TopicDataSubscriber interface { Subscriber - Map(sourceID Topic, destinationID Topic) + AddTopicData(topic Topic, data TopicData) } // Subscribable is a stream that can be subscribed to @@ -37,20 +40,16 @@ type Publisher interface { // EventTransform if a fucntion transforms one kind of event to another type EventTransform func(Event) Event -// Notifee is a mappable suscriber where you want events to appear -// on this specified topic (used to call SubscribeOn to setup a remapping) +// Notifee is a topic data subscriber plus a set of data you want to add to any topics subscribed to +// (used to call SubscribeWithData to inject data when events for a given topic emit) type Notifee struct { - Topic Topic - Subscriber MappableSubscriber + Data TopicData + Subscriber TopicDataSubscriber } -// SubscribeOn subscribes to the given subscribe on the given topic, but -// maps to a differnt topic specified in a notifee which has a mappable -// subscriber -func SubscribeOn(p Subscribable, topic Topic, notifee Notifee) { - notifee.Subscriber.Map(topic, notifee.Topic) +// SubscribeWithData subscribes to the given subscriber on the given topic, and adds the notifies +// custom data into the list of data injected into callbacks when events occur on that topic +func SubscribeWithData(p Subscribable, topic Topic, notifee Notifee) { + notifee.Subscriber.AddTopicData(topic, notifee.Data) p.Subscribe(topic, notifee.Subscriber) } - -// IdentityTransform sets up an event transform that makes no changes -func IdentityTransform(ev Event) Event { return ev } diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index c2a5d3b0..83da1cfe 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -75,8 +75,8 @@ type peerResponseSender struct { responseBuilders []*responsebuilder.ResponseBuilder nextBuilderTopic responsebuilder.Topic queuedMessages chan responsebuilder.Topic - subscriber notifications.MappableSubscriber - allocatorSubscriber notifications.MappableSubscriber + subscriber notifications.TopicDataSubscriber + allocatorSubscriber notifications.TopicDataSubscriber publisher notifications.Publisher } @@ -133,8 +133,8 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa publisher: notifications.NewPublisher(), allocator: allocator, } - prs.subscriber = notifications.NewMappableSubscriber(&subscriber{prs}, notifications.IdentityTransform) - prs.allocatorSubscriber = notifications.NewMappableSubscriber(&allocatorSubscriber{prs}, notifications.IdentityTransform) + prs.subscriber = notifications.NewTopicDataSubscriber(&subscriber{prs}) + prs.allocatorSubscriber = notifications.NewTopicDataSubscriber(&allocatorSubscriber{prs}) return prs } @@ -418,7 +418,7 @@ func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn fun responseBuilder := prs.responseBuilders[len(prs.responseBuilders)-1] buildResponseFn(responseBuilder) for _, notifee := range notifees { - notifications.SubscribeOn(prs.publisher, responseBuilder.Topic(), notifee) + notifications.SubscribeWithData(prs.publisher, responseBuilder.Topic(), notifee) } return !responseBuilder.Empty() } @@ -466,8 +466,8 @@ func (prs *peerResponseSender) sendResponseMessages() { if builder.Empty() { continue } - notifications.SubscribeOn(prs.publisher, builder.Topic(), notifications.Notifee{ - Topic: builder.BlockSize(), + notifications.SubscribeWithData(prs.publisher, builder.Topic(), notifications.Notifee{ + Data: builder.BlockSize(), Subscriber: prs.allocatorSubscriber, }) responses, blks, err := builder.Build() @@ -476,7 +476,7 @@ func (prs *peerResponseSender) sendResponseMessages() { } prs.peerHandler.SendResponse(prs.p, responses, blks, notifications.Notifee{ - Topic: builder.Topic(), + Data: builder.Topic(), Subscriber: prs.subscriber, }) diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index cbedb7ea..ab2a68ca 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -112,12 +112,12 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) func (qe *queryExecutor) prepareQuery(ctx context.Context, p peer.ID, - request gsmsg.GraphSyncRequest, signals signals, sub notifications.MappableSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { + request gsmsg.GraphSyncRequest, signals signals, sub notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { result := qe.requestHooks.ProcessRequestHooks(p, request) peerResponseSender := qe.peerManager.SenderForPeer(p) var transactionError error var isPaused bool - failNotifee := notifications.Notifee{Topic: graphsync.RequestFailedUnknown, Subscriber: sub} + failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub} err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error { for _, extension := range result.Extensions { transaction.SendExtensionData(extension) @@ -199,7 +199,7 @@ func (qe *queryExecutor) executeQuery( loader ipld.Loader, traverser ipldutil.Traverser, signals signals, - sub notifications.MappableSubscriber) (graphsync.ResponseStatusCode, error) { + sub notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) { updateChan := make(chan []gsmsg.GraphSyncRequest) peerResponseSender := qe.peerManager.SenderForPeer(p) err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error { @@ -210,7 +210,7 @@ func (qe *queryExecutor) executeQuery( return nil } blockData := transaction.SendResponse(link, data) - transaction.AddNotifee(notifications.Notifee{Topic: blockData, Subscriber: sub}) + transaction.AddNotifee(notifications.Notifee{Data: blockData, Subscriber: sub}) if blockData.BlockSize() > 0 { result := qe.blockHooks.ProcessBlockHooks(p, request, blockData) for _, extension := range result.Extensions { @@ -253,7 +253,7 @@ func (qe *queryExecutor) executeQuery( } else { code = peerResponseSender.FinishRequest() } - peerResponseSender.AddNotifee(notifications.Notifee{Topic: code, Subscriber: sub}) + peerResponseSender.AddNotifee(notifications.Notifee{Data: code, Subscriber: sub}) return nil }) return code, err diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 452aa7d0..478836ad 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -34,7 +34,7 @@ type inProgressResponseStatus struct { signals signals updates []gsmsg.GraphSyncRequest isPaused bool - subscriber notifications.MappableSubscriber + subscriber notifications.TopicDataSubscriber } type responseKey struct { @@ -50,7 +50,7 @@ type signals struct { type responseTaskData struct { empty bool - subscriber notifications.MappableSubscriber + subscriber notifications.TopicDataSubscriber ctx context.Context request gsmsg.GraphSyncRequest loader ipld.Loader @@ -332,7 +332,7 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync } if result.Err != nil { transaction.FinishWithError(graphsync.RequestFailedUnknown) - transaction.AddNotifee(notifications.Notifee{Topic: graphsync.RequestFailedUnknown, Subscriber: response.subscriber}) + transaction.AddNotifee(notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: response.subscriber}) } return nil }) @@ -395,7 +395,7 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID rm.cancelledListeners.NotifyCancelledListeners(p, response.request) peerResponseSender.FinishWithCancel(requestID) } else if err != errNetworkError { - peerResponseSender.FinishWithError(requestID, graphsync.RequestCancelled, notifications.Notifee{Topic: graphsync.RequestCancelled, Subscriber: response.subscriber}) + peerResponseSender.FinishWithError(requestID, graphsync.RequestCancelled, notifications.Notifee{Data: graphsync.RequestCancelled, Subscriber: response.subscriber}) } delete(rm.inProgressResponses, key) response.cancelFn() @@ -420,7 +420,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { continue } ctx, cancelFn := context.WithCancel(rm.ctx) - sub := notifications.NewMappableSubscriber(&subscriber{ + sub := notifications.NewTopicDataSubscriber(&subscriber{ p: key.p, request: request, ctx: rm.ctx, @@ -428,7 +428,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { blockSentListeners: rm.blockSentListeners, completedListeners: rm.completedListeners, networkErrorListeners: rm.networkErrorListeners, - }, notifications.IdentityTransform) + }) rm.inProgressResponses[key] = &inProgressResponseStatus{ ctx: ctx, diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 492b4965..8fa025cf 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -1103,29 +1103,29 @@ func (td *testData) assertIgnoredCids(set *cid.Set) { } func (td *testData) notifyStatusMessagesSent() { - td.notifeePublisher.PublishMatchingEvents(func(topic notifications.Topic) bool { - _, isSn := topic.(graphsync.ResponseStatusCode) + td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool { + _, isSn := data.(graphsync.ResponseStatusCode) return isSn }, []notifications.Event{peerresponsemanager.Event{Name: peerresponsemanager.Sent}}) } func (td *testData) notifyBlockSendsSent() { - td.notifeePublisher.PublishMatchingEvents(func(topic notifications.Topic) bool { - _, isBsn := topic.(graphsync.BlockData) + td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool { + _, isBsn := data.(graphsync.BlockData) return isBsn }, []notifications.Event{peerresponsemanager.Event{Name: peerresponsemanager.Sent}}) } func (td *testData) notifyStatusMessagesNetworkError(err error) { - td.notifeePublisher.PublishMatchingEvents(func(topic notifications.Topic) bool { - _, isSn := topic.(graphsync.ResponseStatusCode) + td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool { + _, isSn := data.(graphsync.ResponseStatusCode) return isSn }, []notifications.Event{peerresponsemanager.Event{Name: peerresponsemanager.Error, Err: err}}) } func (td *testData) notifyBlockSendsNetworkError(err error) { - td.notifeePublisher.PublishMatchingEvents(func(topic notifications.Topic) bool { - _, isBsn := topic.(graphsync.BlockData) + td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool { + _, isBsn := data.(graphsync.BlockData) return isBsn }, []notifications.Event{peerresponsemanager.Event{Name: peerresponsemanager.Error, Err: err}}) } diff --git a/testutil/testnotifications.go b/testutil/testnotifications.go index 6e3a6fa0..239723a0 100644 --- a/testutil/testnotifications.go +++ b/testutil/testnotifications.go @@ -85,13 +85,13 @@ func (nv *NotifeeVerifier) ExpectClose(ctx context.Context, t *testing.T) { nv.subscriber.ExpectCloses(ctx, t, []notifications.Topic{nv.expectedTopic}) } -func NewTestNotifee(topic notifications.Topic, bufferSize int) (notifications.Notifee, *NotifeeVerifier) { +func NewTestNotifee(data notifications.TopicData, bufferSize int) (notifications.Notifee, *NotifeeVerifier) { subscriber := NewTestSubscriber(bufferSize) return notifications.Notifee{ - Topic: topic, - Subscriber: notifications.NewMappableSubscriber(subscriber, notifications.IdentityTransform), + Data: data, + Subscriber: notifications.NewTopicDataSubscriber(subscriber), }, &NotifeeVerifier{ - expectedTopic: topic, + expectedTopic: data, subscriber: subscriber, } } @@ -107,15 +107,15 @@ func (mp *MockPublisher) AddNotifees(notifees []notifications.Notifee) { mp.notifeesLk.Unlock() } -func (mp *MockPublisher) PublishMatchingEvents(shouldPublish func(notifications.Topic) bool, events []notifications.Event) { +func (mp *MockPublisher) PublishMatchingEvents(shouldPublish func(notifications.TopicData) bool, events []notifications.Event) { mp.notifeesLk.Lock() var newNotifees []notifications.Notifee for _, notifee := range mp.notifees { - if shouldPublish(notifee.Topic) { + if shouldPublish(notifee.Data) { for _, ev := range events { - notifee.Subscriber.OnNext(notifee.Topic, ev) + notifee.Subscriber.OnNext(notifee.Data, ev) } - notifee.Subscriber.OnClose(notifee.Topic) + notifee.Subscriber.OnClose(notifee.Data) } else { newNotifees = append(newNotifees, notifee) } @@ -125,11 +125,11 @@ func (mp *MockPublisher) PublishMatchingEvents(shouldPublish func(notifications. } func (mp *MockPublisher) PublishEvents(events []notifications.Event) { - mp.PublishMatchingEvents(func(notifications.Topic) bool { return true }, events) + mp.PublishMatchingEvents(func(notifications.TopicData) bool { return true }, events) } func (mp *MockPublisher) PublishEventsOnTopics(topics []notifications.Topic, events []notifications.Event) { - shouldPublish := func(topic notifications.Topic) bool { + shouldPublish := func(topic notifications.TopicData) bool { for _, testTopic := range topics { if topic == testTopic { return true From 5c504cae1b76ed3403d73f2316c8f08ac17e3636 Mon Sep 17 00:00:00 2001 From: acruikshank Date: Wed, 9 Dec 2020 17:48:30 -0500 Subject: [PATCH 2/3] permit multiple data topics so sent hook gets calle for every block --- impl/graphsync_test.go | 81 +++++++++++++++++++ notifications/data_subscriber.go | 41 ++++++---- notifications/types.go | 9 +-- .../peerresponsemanager/peerresponsesender.go | 4 +- responsemanager/queryexecutor.go | 4 +- responsemanager/responsemanager.go | 4 +- testutil/testnotifications.go | 10 +-- 7 files changed, 117 insertions(+), 36 deletions(-) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 13ff74eb..85db7cee 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -928,6 +928,87 @@ func TestUnixFSFetch(t *testing.T) { require.Equal(t, origBytes, finalBytes, "should have gotten same bytes written as read but didn't") } +func TestGraphsyncBlockListeners(t *testing.T) { + // create network + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + td := newGsTestData(ctx, t) + + // initialize graphsync on first node to make requests + requestor := td.GraphSyncHost1() + + // setup receiving peer to just record message coming in + blockChainLength := 100 + blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + + // initialize graphsync on second node to response to requests + responder := td.GraphSyncHost2() + + // register hooks to count blocks in various stages + blocksSent := 0 + blocksOutgoing := 0 + blocksIncoming := 0 + responder.RegisterBlockSentListener(func(p peer.ID, request graphsync.RequestData, block graphsync.BlockData) { + blocksSent++ + }) + requestor.RegisterIncomingBlockHook(func(p peer.ID, r graphsync.ResponseData, b graphsync.BlockData, h graphsync.IncomingBlockHookActions) { + blocksIncoming++ + }) + responder.RegisterOutgoingBlockHook(func(p peer.ID, r graphsync.RequestData, b graphsync.BlockData, h graphsync.OutgoingBlockHookActions) { + blocksOutgoing++ + }) + + var receivedResponseData []byte + var receivedRequestData []byte + + requestor.RegisterIncomingResponseHook( + func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { + data, has := responseData.Extension(td.extensionName) + if has { + receivedResponseData = data + } + }) + + responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + var has bool + receivedRequestData, has = requestData.Extension(td.extensionName) + if !has { + hookActions.TerminateWithError(errors.New("Missing extension")) + } else { + hookActions.SendExtensionData(td.extensionResponse) + } + }) + + finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1) + responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { + select { + case finalResponseStatusChan <- status: + default: + } + }) + progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension) + + blockChain.VerifyWholeChain(ctx, progressChan) + testutil.VerifyEmptyErrors(ctx, t, errChan) + require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks") + + // verify extension round trip + require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data") + require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data") + + // verify listener + var finalResponseStatus graphsync.ResponseStatusCode + testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status") + require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus) + + // assert we get notified for all the blocks + require.Equal(t, blockChainLength, blocksOutgoing) + require.Equal(t, blockChainLength, blocksIncoming) + require.Equal(t, blockChainLength, blocksSent) +} + + type gsTestData struct { mn mocknet.Mocknet ctx context.Context diff --git a/notifications/data_subscriber.go b/notifications/data_subscriber.go index 983e2a23..1bda61b0 100644 --- a/notifications/data_subscriber.go +++ b/notifications/data_subscriber.go @@ -2,43 +2,50 @@ package notifications import "sync" -type topicDataSubscriber struct { +type TopicDataSubscriber struct { idMapLk sync.RWMutex - data map[Topic]TopicData + data map[Topic][]TopicData Subscriber } // NewTopicDataSubscriber produces a subscriber that will transform // events and topics before passing them on to the given subscriber -func NewTopicDataSubscriber(sub Subscriber) TopicDataSubscriber { - return &topicDataSubscriber{ +func NewTopicDataSubscriber(sub Subscriber) *TopicDataSubscriber { + return &TopicDataSubscriber{ Subscriber: sub, - data: make(map[Topic]TopicData), + data: make(map[Topic][]TopicData), } } -func (m *topicDataSubscriber) AddTopicData(id Topic, data TopicData) { +func (m *TopicDataSubscriber) AddTopicData(id Topic, data TopicData) { m.idMapLk.Lock() - m.data[id] = data + m.data[id] = append(m.data[id], data) m.idMapLk.Unlock() } -func (m *topicDataSubscriber) transform(id Topic) Topic { +func (m *TopicDataSubscriber) getData(id Topic) []TopicData { m.idMapLk.RLock() defer m.idMapLk.RUnlock() - destID, ok := m.data[id] + + data, ok := m.data[id] if !ok { - return id + return []TopicData{} + } + newData := make([]TopicData, len(data)) + for i, d := range data { + newData[i] = d } - return destID + return newData } -func (m *topicDataSubscriber) OnNext(topic Topic, ev Event) { - newTopic := m.transform(topic) - m.Subscriber.OnNext(newTopic, ev) +func (m *TopicDataSubscriber) OnNext(topic Topic, ev Event) { + for _, data := range m.getData(topic) { + m.Subscriber.OnNext(data, ev) + } } -func (m *topicDataSubscriber) OnClose(topic Topic) { - newTopic := m.transform(topic) - m.Subscriber.OnClose(newTopic) +func (m *TopicDataSubscriber) OnClose(topic Topic) { + for _, data := range m.getData(topic) { + m.Subscriber.OnClose(data) + } } diff --git a/notifications/types.go b/notifications/types.go index ae45d3a1..86da1cdd 100644 --- a/notifications/types.go +++ b/notifications/types.go @@ -15,13 +15,6 @@ type Subscriber interface { OnClose(Topic) } -// TopicDataSubscriber is a subscriber that remaps events received to other topics -// and events -type TopicDataSubscriber interface { - Subscriber - AddTopicData(topic Topic, data TopicData) -} - // Subscribable is a stream that can be subscribed to type Subscribable interface { Subscribe(topic Topic, sub Subscriber) bool @@ -44,7 +37,7 @@ type EventTransform func(Event) Event // (used to call SubscribeWithData to inject data when events for a given topic emit) type Notifee struct { Data TopicData - Subscriber TopicDataSubscriber + Subscriber *TopicDataSubscriber } // SubscribeWithData subscribes to the given subscriber on the given topic, and adds the notifies diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index 83da1cfe..797608dd 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -75,8 +75,8 @@ type peerResponseSender struct { responseBuilders []*responsebuilder.ResponseBuilder nextBuilderTopic responsebuilder.Topic queuedMessages chan responsebuilder.Topic - subscriber notifications.TopicDataSubscriber - allocatorSubscriber notifications.TopicDataSubscriber + subscriber *notifications.TopicDataSubscriber + allocatorSubscriber *notifications.TopicDataSubscriber publisher notifications.Publisher } diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index ab2a68ca..f9d7f0ce 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -112,7 +112,7 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) func (qe *queryExecutor) prepareQuery(ctx context.Context, p peer.ID, - request gsmsg.GraphSyncRequest, signals signals, sub notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { + request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { result := qe.requestHooks.ProcessRequestHooks(p, request) peerResponseSender := qe.peerManager.SenderForPeer(p) var transactionError error @@ -199,7 +199,7 @@ func (qe *queryExecutor) executeQuery( loader ipld.Loader, traverser ipldutil.Traverser, signals signals, - sub notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) { + sub *notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) { updateChan := make(chan []gsmsg.GraphSyncRequest) peerResponseSender := qe.peerManager.SenderForPeer(p) err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error { diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 478836ad..c6bdbc2e 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -34,7 +34,7 @@ type inProgressResponseStatus struct { signals signals updates []gsmsg.GraphSyncRequest isPaused bool - subscriber notifications.TopicDataSubscriber + subscriber *notifications.TopicDataSubscriber } type responseKey struct { @@ -50,7 +50,7 @@ type signals struct { type responseTaskData struct { empty bool - subscriber notifications.TopicDataSubscriber + subscriber *notifications.TopicDataSubscriber ctx context.Context request gsmsg.GraphSyncRequest loader ipld.Loader diff --git a/testutil/testnotifications.go b/testutil/testnotifications.go index 239723a0..e71ef47a 100644 --- a/testutil/testnotifications.go +++ b/testutil/testnotifications.go @@ -113,9 +113,9 @@ func (mp *MockPublisher) PublishMatchingEvents(shouldPublish func(notifications. for _, notifee := range mp.notifees { if shouldPublish(notifee.Data) { for _, ev := range events { - notifee.Subscriber.OnNext(notifee.Data, ev) + notifee.Subscriber.Subscriber.OnNext(notifee.Data, ev) } - notifee.Subscriber.OnClose(notifee.Data) + notifee.Subscriber.Subscriber.OnClose(notifee.Data) } else { newNotifees = append(newNotifees, notifee) } @@ -128,10 +128,10 @@ func (mp *MockPublisher) PublishEvents(events []notifications.Event) { mp.PublishMatchingEvents(func(notifications.TopicData) bool { return true }, events) } -func (mp *MockPublisher) PublishEventsOnTopics(topics []notifications.Topic, events []notifications.Event) { +func (mp *MockPublisher) PublishEventsOnTopicData(data []notifications.TopicData, events []notifications.Event) { shouldPublish := func(topic notifications.TopicData) bool { - for _, testTopic := range topics { - if topic == testTopic { + for _, testTopicData := range data { + if topic == testTopicData { return true } } From f909f177796627407dce16da48de07499f137a00 Mon Sep 17 00:00:00 2001 From: Alex Cruikshank <169613+acruikshank@users.noreply.github.com> Date: Thu, 10 Dec 2020 09:09:37 -0500 Subject: [PATCH 3/3] rename test to match method rename. Co-authored-by: dirkmc --- notifications/notifications_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/notifications/notifications_test.go b/notifications/notifications_test.go index 83e4aaa0..9c2838d7 100644 --- a/notifications/notifications_test.go +++ b/notifications/notifications_test.go @@ -9,7 +9,7 @@ import ( "github.com/ipfs/go-graphsync/testutil" ) -func TestSubscribeOn(t *testing.T) { +func TestSubscribeWithData(t *testing.T) { ctx := context.Background() testCases := map[string]func(ctx context.Context, t *testing.T, ps notifications.Publisher){ "SubscribeWithData": func(ctx context.Context, t *testing.T, ps notifications.Publisher) {