From e4f37299a0f72726af69220a777cbb855517a53e Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Sun, 20 Mar 2022 11:11:26 +0100 Subject: [PATCH] fix: orbitdb pubsub payload handler - add an optional Topic field on orbitdb pubsub message Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- baseorbitdb/events.go | 15 +++ baseorbitdb/events_handler.go | 52 +++++++---- baseorbitdb/orbitdb.go | 160 ++++++++++++++++++++------------ iface/interface.go | 9 +- pubsub/directchannel/channel.go | 4 +- pubsub/event.go | 13 ++- pubsub/oneonone/channel.go | 7 +- pubsub/pubsubcoreapi/pubsub.go | 8 +- pubsub/pubsubraw/pubsub.go | 4 +- 9 files changed, 173 insertions(+), 99 deletions(-) create mode 100644 baseorbitdb/events.go diff --git a/baseorbitdb/events.go b/baseorbitdb/events.go new file mode 100644 index 00000000..60a9e3e3 --- /dev/null +++ b/baseorbitdb/events.go @@ -0,0 +1,15 @@ +package baseorbitdb + +import "github.com/libp2p/go-libp2p-core/peer" + +type EventExchangeHeads struct { + Peer peer.ID + Message *MessageExchangeHeads +} + +func NewEventExchangeHeads(p peer.ID, msg *MessageExchangeHeads) EventExchangeHeads { + return EventExchangeHeads{ + Peer: p, + Message: msg, + } +} diff --git a/baseorbitdb/events_handler.go b/baseorbitdb/events_handler.go index 09ddd508..8c29e023 100644 --- a/baseorbitdb/events_handler.go +++ b/baseorbitdb/events_handler.go @@ -7,42 +7,54 @@ import ( ipfslog "berty.tech/go-ipfs-log" "berty.tech/go-ipfs-log/enc" + "berty.tech/go-ipfs-log/entry" "berty.tech/go-orbit-db/iface" "berty.tech/go-orbit-db/stores" - "go.uber.org/zap" ) -func (o *orbitDB) handleEventPubSubPayload(ctx context.Context, e *iface.EventPubSubPayload, sharedKey enc.SharedKey) error { - heads := &exchangedHeads{} - payload := e.Payload +func (o *orbitDB) handleEventExchangeHeads(ctx context.Context, e *EventExchangeHeads, sharedKey enc.SharedKey) error { + message := e.Message - if sharedKey != nil { - var err error + var ( + address string + rawHeads []byte + ) - payload, err = sharedKey.Open(payload) + if sharedKey != nil { + // open address + rawAddress, err := sharedKey.Open(message.Address) if err != nil { - return fmt.Errorf("unable to decrypt payload: %w", err) + return fmt.Errorf("unable to decrypt address: %w", err) + } + address = string(rawAddress) + + // open heads + if rawHeads, err = sharedKey.Open(message.Heads); err != nil { + return fmt.Errorf("unable to decrypt heads: %w", err) } + } else { + address = string(message.Address) + rawHeads = message.Heads } - err := json.Unmarshal(payload, &heads) - if err != nil { - o.logger.Error("unable to unmarshal heads", zap.Error(err)) + store, ok := o.getStore(address) + if !ok { + return fmt.Errorf("receiving heads from unknown store") } - o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(heads.Heads), heads.Address)) - store, ok := o.getStore(heads.Address) + heads := []*entry.Entry{} + if err := json.Unmarshal(rawHeads, &heads); err != nil { + return fmt.Errorf("unable to parse heads: %w", err) + } - if !ok { - return fmt.Errorf("heads from unknown store, skipping") + untypedHeads := make([]ipfslog.Entry, len(heads)) + for i, h := range heads { + untypedHeads[i] = h } - if len(heads.Heads) > 0 { - untypedHeads := make([]ipfslog.Entry, len(heads.Heads)) - for i := range heads.Heads { - untypedHeads[i] = heads.Heads[i] - } + o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(heads), address)) + if len(heads) > 0 { if err := store.Sync(ctx, untypedHeads); err != nil { return fmt.Errorf("unable to sync heads: %w", err) } diff --git a/baseorbitdb/orbitdb.go b/baseorbitdb/orbitdb.go index e4264976..458476a6 100644 --- a/baseorbitdb/orbitdb.go +++ b/baseorbitdb/orbitdb.go @@ -1,6 +1,7 @@ package baseorbitdb import ( + "bytes" "context" "encoding/json" "fmt" @@ -75,11 +76,6 @@ type DetermineAddressOptions = iface.DetermineAddressOptions // DirectChannelFactory An alias of the type defined in the iface package type DirectChannelFactory = iface.DirectChannelFactory -type exchangedHeads struct { - Address string `json:"address,omitempty"` - Heads []*entry.Entry `json:"heads,omitempty"` -} - func boolPtr(val bool) *bool { return &val } @@ -111,15 +107,19 @@ type orbitDB struct { keystore keystore.Interface closeKeystore func() error stores map[string]Store - directConnections iface.DirectChannel eventBus event.Bus directory string cache cache.Interface logger *zap.Logger tracer trace.Tracer + directChannel iface.DirectChannel + directChannelHandlers []func(*MessageExchangeHeads) - // emitter - emitterNewPeer event.Emitter + // emitters + emitters struct { + newPeer event.Emitter + newHeads event.Emitter + } muStoreTypes sync.RWMutex muStores sync.RWMutex @@ -131,6 +131,12 @@ type orbitDB struct { muAccessControllerTypes sync.RWMutex } +type MessageExchangeHeads struct { + Address []byte `json:"address"` + Heads []byte `json:"heads"` + Topic []byte `json:"topic,omitempty"` +} + func (o *orbitDB) Logger() *zap.Logger { return o.logger } @@ -192,13 +198,6 @@ func (o *orbitDB) getStore(address string) (iface.Store, bool) { return store, ok } -// func (o *orbitDB) deleteStore(address string) { -// o.muStores.Lock() -// defer o.muStores.Unlock() - -// delete(o.stores, address) -// } - func (o *orbitDB) closeAllStores() { o.muStores.Lock() defer o.muStores.Unlock() @@ -222,7 +221,7 @@ func (o *orbitDB) closeCache() { } func (o *orbitDB) closeDirectConnections() { - if err := o.directConnections.Close(); err != nil { + if err := o.directChannel.Close(); err != nil { o.logger.Error("unable to close connection", zap.Error(err)) } } @@ -338,11 +337,6 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity, } eventBus := eventbus.NewBus() - emitterNewPeer, err := eventBus.Emitter(new(stores.EventNewPeer)) - if err != nil { - return nil, errors.Wrap(err, "unable to create global emitter") - } - if options.DirectChannelFactory == nil { options.DirectChannelFactory = oneonone.NewChannelFactory(is) } @@ -384,9 +378,8 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity, cache: options.Cache, directory: *options.Directory, eventBus: eventBus, - emitterNewPeer: emitterNewPeer, stores: map[string]Store{}, - directConnections: directConnections, + directChannel: directConnections, closeKeystore: options.CloseKeystore, storeTypes: map[string]iface.StoreConstructor{}, accessControllerTypes: map[string]iface.AccessControllerConstructor{}, @@ -394,6 +387,21 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity, tracer: options.Tracer, } + odb.emitters.newPeer, err = eventBus.Emitter(new(stores.EventNewPeer)) + if err != nil { + return nil, errors.Wrap(err, "unable to create global emitter") + } + + // set new heads as stateful, so newly subscriber can replay last event in case they missed it + odb.emitters.newHeads, err = eventBus.Emitter(new(EventExchangeHeads), eventbus.Stateful) + if err != nil { + return nil, errors.Wrap(err, "unable to create global emitter") + } + + if err := odb.monitorDirectChannel(ctx, eventBus); err != nil { + return nil, errors.Wrap(err, "unable to monitor direct channel") + } + return odb, nil } @@ -609,8 +617,8 @@ func (o *orbitDB) Open(ctx context.Context, dbAddress string, options *CreateDBO return store, nil } -func (o *orbitDB) monitorChannels(ctx context.Context, store Store) error { - sub, err := o.eventBus.Subscribe(new(iface.EventPubSubPayload), eventbus.BufSize(128)) +func (o *orbitDB) monitorChannels(ctx context.Context, topic iface.PubSubTopic, store Store) error { + sub, err := o.eventBus.Subscribe(new(EventExchangeHeads), eventbus.BufSize(128)) if err != nil { return fmt.Errorf("unable to init event bus: %w", err) } @@ -627,8 +635,14 @@ func (o *orbitDB) monitorChannels(ctx context.Context, store Store) error { case e = <-sub.Out(): } - evt := e.(iface.EventPubSubPayload) - if err := o.handleEventPubSubPayload(ctx, &evt, sharedKey); err != nil { + evt := e.(EventExchangeHeads) + + if !bytes.Equal(evt.Message.Topic, []byte(topic.Topic())) || + len(evt.Message.Heads) == 0 { + continue // skip unwanted topic + } + + if err := o.handleEventExchangeHeads(ctx, &evt, sharedKey); err != nil { o.logger.Error("unable to handle pubsub payload", zap.Error(err)) } } @@ -813,7 +827,7 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd return nil, errors.Wrap(err, "unable to store listener") } - if err := o.monitorChannels(ctx, store); err != nil { + if err := o.monitorChannels(ctx, topic, store); err != nil { return nil, errors.Wrap(err, "unable to monitor channel") } @@ -825,13 +839,6 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd return store, nil } -// not used -// func (o *orbitDB) onClose(addr cid.Cid) error { -// o.deleteStore(addr.String()) - -// return nil -// } - func (o *orbitDB) storeListener(ctx context.Context, store Store, topic iface.PubSubTopic) error { sub, err := store.EventBus().Subscribe(new(stores.EventWrite)) if err != nil { @@ -883,7 +890,7 @@ func (o *orbitDB) pubSubChanListener(ctx context.Context, store Store, topic ifa for e := range chPeers { switch evt := e.(type) { case *iface.EventPubSubJoin: - go o.onNewPeerJoined(ctx, evt.Peer, store) + go o.onNewPeerJoined(ctx, topic.Topic(), evt.Peer, store) o.logger.Debug(fmt.Sprintf("peer %s joined from %s self is %s", evt.Peer.String(), addr, o.PeerID())) case *iface.EventPubSubLeave: @@ -937,7 +944,7 @@ func (o *orbitDB) pubSubChanListener(ctx context.Context, store Store, topic ifa return nil } -func (o *orbitDB) onNewPeerJoined(ctx context.Context, p peer.ID, store Store) { +func (o *orbitDB) onNewPeerJoined(ctx context.Context, topic string, p peer.ID, store Store) { self, err := o.IPFS().Key().Self(ctx) if err == nil { o.logger.Debug(fmt.Sprintf("%s: New peer '%s' connected to %s", self.ID(), p, store.Address().String())) @@ -945,58 +952,58 @@ func (o *orbitDB) onNewPeerJoined(ctx context.Context, p peer.ID, store Store) { o.logger.Debug(fmt.Sprintf("New peer '%s' connected to %s", p, store.Address().String())) } - if err := o.exchangeHeads(ctx, p, store); err != nil { + if err := o.exchangeHeads(ctx, topic, p, store); err != nil { if !errors.Is(err, context.Canceled) { o.logger.Error("unable to exchange heads", zap.Error(err)) } return } - if err := o.emitterNewPeer.Emit(stores.NewEventNewPeer(p)); err != nil { + if err := o.emitters.newPeer.Emit(stores.NewEventNewPeer(p)); err != nil { o.logger.Error("unable emit NewPeer event", zap.Error(err)) } } -func (o *orbitDB) exchangeHeads(ctx context.Context, p peer.ID, store Store) error { +func (o *orbitDB) exchangeHeads(ctx context.Context, topic string, p peer.ID, store Store) error { sharedKey := store.SharedKey() o.logger.Debug(fmt.Sprintf("connecting to %s", p)) - if err := o.directConnections.Connect(ctx, p); err != nil { + if err := o.directChannel.Connect(ctx, p); err != nil { return errors.Wrap(err, "unable to connect to peer") } o.logger.Debug(fmt.Sprintf("connected to %s", p)) - var heads []*entry.Entry headsBytes, err := store.Cache().Get(ctx, datastore.NewKey("_localHeads")) if err != nil && err != datastore.ErrNotFound { return errors.Wrap(err, "unable to get local heads from cache") } - if headsBytes != nil { - err = json.Unmarshal(headsBytes, &heads) - if err != nil { - o.logger.Warn("unable to unmarshal cached local heads", zap.Error(err)) + payload := &MessageExchangeHeads{} + if sharedKey == nil { + payload.Heads = headsBytes + payload.Address = []byte(store.Address().String()) + } else { + if len(headsBytes) > 0 { + if payload.Heads, err = sharedKey.Seal(headsBytes); err != nil { + return errors.Wrap(err, "unable seal heads") + } + } else { + payload.Heads = []byte{} } - } - exchangedHeads := &exchangedHeads{ - Address: store.Address().String(), - Heads: heads, + rawAddress := []byte(store.Address().String()) + if payload.Address, err = sharedKey.Seal(rawAddress); err != nil { + return errors.Wrap(err, "unable to seal address") + } + payload.Topic = []byte(topic) } - exchangedHeadsBytes, err := json.Marshal(exchangedHeads) + rawPayload, err := json.Marshal(payload) if err != nil { return errors.Wrap(err, "unable to serialize heads to exchange") } - if sharedKey != nil { - exchangedHeadsBytes, err = sharedKey.Seal(exchangedHeadsBytes) - if err != nil { - return errors.Wrap(err, "unable to encrypt payload") - } - } - - if err = o.directConnections.Send(ctx, p, exchangedHeadsBytes); err != nil { + if err = o.directChannel.Send(ctx, p, rawPayload); err != nil { return errors.Wrap(err, "unable to send heads on direct channel") } @@ -1006,6 +1013,41 @@ func (o *orbitDB) EventBus() event.Bus { return o.eventBus } +func (o *orbitDB) monitorDirectChannel(ctx context.Context, bus event.Bus) error { + sub, err := bus.Subscribe(new(iface.EventPubSubPayload)) + if err != nil { + return fmt.Errorf("unable to init pubsub subscriber: %w", err) + } + + go func() { + for { + var e interface{} + select { + case <-ctx.Done(): + return + case e = <-sub.Out(): + } + + msg := MessageExchangeHeads{} + evt := e.(iface.EventPubSubPayload) + if err := json.Unmarshal(evt.Payload, &msg); err != nil { + o.logger.Warn("unable to monitor direct channel", zap.Error(err)) + continue + } + + if len(msg.Topic) == 0 { + msg.Topic = msg.Address + } + + if err := o.emitters.newHeads.Emit(NewEventExchangeHeads(evt.Peer, &msg)); err != nil { + o.logger.Warn("unable to emit new heads", zap.Error(err)) + } + } + }() + + return nil +} + func makeDirectChannel(ctx context.Context, bus event.Bus, df iface.DirectChannelFactory, opts *iface.DirectChannelOptions) (iface.DirectChannel, error) { emitter, err := pubsub.NewPayloadEmitter(bus) if err != nil { diff --git a/iface/interface.go b/iface/interface.go index 8905f19e..1de99ab8 100644 --- a/iface/interface.go +++ b/iface/interface.go @@ -348,7 +348,7 @@ type DirectChannel interface { Connect(context.Context, peer.ID) error // Send Sends a message to the other peer - Send(context.Context, peer.ID, []byte) error + Send(ctx context.Context, peer peer.ID, data []byte) error // Close Closes the connection Close() error @@ -409,14 +409,17 @@ type EventPubSubMessage struct { // EventPubSubPayload An event received on new messages type EventPubSubPayload struct { Payload []byte + Peer peer.ID } // EventPubSubJoin Is an event triggered when a peer joins the channel type EventPubSubJoin struct { - Peer peer.ID + Topic string + Peer peer.ID } // EventPubSubLeave Is an event triggered when a peer leave the channel type EventPubSubLeave struct { - Peer peer.ID + Topic string + Peer peer.ID } diff --git a/pubsub/directchannel/channel.go b/pubsub/directchannel/channel.go index cbb0faeb..00cae344 100644 --- a/pubsub/directchannel/channel.go +++ b/pubsub/directchannel/channel.go @@ -75,7 +75,7 @@ func (d *directChannel) handleNewPeer(s network.Stream) { return } - if err := d.emitter.Emit(pubsub.NewEventPayload(data)); err != nil { + if err := d.emitter.Emit(pubsub.NewEventPayload(data, s.Conn().RemotePeer())); err != nil { d.logger.Error("unable to emit on emitter", zap.Error(err)) } } @@ -112,7 +112,7 @@ func (c *holderChannels) NewChannel(ctx context.Context, emitter iface.DirectCha emitter: emitter, } - c.host.SetStreamHandler(PROTOCOL, dc.handleNewPeer) + c.host.SetStreamHandlerMatch(PROTOCOL, func(proto string) bool { return true }, dc.handleNewPeer) return dc, nil } diff --git a/pubsub/event.go b/pubsub/event.go index c664edc7..78de51c3 100644 --- a/pubsub/event.go +++ b/pubsub/event.go @@ -36,22 +36,25 @@ func NewEventMessage(content []byte) *iface.EventPubSubMessage { } // NewEventPayload Creates a new Message event -func NewEventPayload(payload []byte) *iface.EventPubSubPayload { +func NewEventPayload(payload []byte, peerid peer.ID) *iface.EventPubSubPayload { return &iface.EventPubSubPayload{ Payload: payload, + Peer: peerid, } } // NewEventPeerJoin creates a new EventPubSubJoin event -func NewEventPeerJoin(p peer.ID) Event { +func NewEventPeerJoin(p peer.ID, topic string) Event { return &iface.EventPubSubJoin{ - Peer: p, + Peer: p, + Topic: topic, } } // NewEventPeerLeave creates a new EventPubSubLeave event -func NewEventPeerLeave(p peer.ID) Event { +func NewEventPeerLeave(p peer.ID, topic string) Event { return &iface.EventPubSubLeave{ - Peer: p, + Peer: p, + Topic: topic, } } diff --git a/pubsub/oneonone/channel.go b/pubsub/oneonone/channel.go index f9ba7852..745a9bac 100644 --- a/pubsub/oneonone/channel.go +++ b/pubsub/oneonone/channel.go @@ -77,7 +77,7 @@ func (c *channels) Connect(ctx context.Context, target peer.ID) error { return c.waitForPeers(ctx, target, id) } -func (c *channels) Send(ctx context.Context, p peer.ID, data []byte) error { +func (c *channels) Send(ctx context.Context, p peer.ID, head []byte) error { var id string c.muSubs.RLock() if ch, ok := c.subs[p]; ok { @@ -87,7 +87,7 @@ func (c *channels) Send(ctx context.Context, p peer.ID, data []byte) error { } c.muSubs.RUnlock() - err := c.ipfs.PubSub().Publish(ctx, id, data) + err := c.ipfs.PubSub().Publish(ctx, id, head) if err != nil { return errors.Wrap(err, "unable to publish data on pubsub") } @@ -130,7 +130,6 @@ func (c *channels) getChannelID(p peer.ID) string { func (c *channels) monitorTopic(sub coreapi.PubSubSubscription, p peer.ID) { for { - msg, err := sub.Next(c.ctx) switch err { case nil: @@ -148,7 +147,7 @@ func (c *channels) monitorTopic(sub coreapi.PubSubSubscription, p peer.ID) { continue } - if err := c.emitter.Emit(pubsub.NewEventPayload(msg.Data())); err != nil { + if err := c.emitter.Emit(pubsub.NewEventPayload(msg.Data(), p)); err != nil { c.logger.Warn("unable to emit event payload", zap.Error(err)) } } diff --git a/pubsub/pubsubcoreapi/pubsub.go b/pubsub/pubsubcoreapi/pubsub.go index dee7cebb..4accd713 100644 --- a/pubsub/pubsubcoreapi/pubsub.go +++ b/pubsub/pubsubcoreapi/pubsub.go @@ -79,12 +79,12 @@ func (p *psTopic) WatchPeers(ctx context.Context) (<-chan events.Event, error) { return } - for _, p := range joining { - ch <- pubsub.NewEventPeerJoin(p) + for _, pid := range joining { + ch <- pubsub.NewEventPeerJoin(pid, p.Topic()) } - for _, p := range leaving { - ch <- pubsub.NewEventPeerLeave(p) + for _, pid := range leaving { + ch <- pubsub.NewEventPeerLeave(pid, p.Topic()) } select { diff --git a/pubsub/pubsubraw/pubsub.go b/pubsub/pubsubraw/pubsub.go index dd493b32..c302ec81 100644 --- a/pubsub/pubsubraw/pubsub.go +++ b/pubsub/pubsubraw/pubsub.go @@ -51,9 +51,9 @@ func (p *psTopic) WatchPeers(ctx context.Context) (<-chan events.Event, error) { switch evt.Type { case p2ppubsub.PeerJoin: - ch <- pubsub.NewEventPeerJoin(evt.Peer) + ch <- pubsub.NewEventPeerJoin(evt.Peer, p.Topic()) case p2ppubsub.PeerLeave: - ch <- pubsub.NewEventPeerLeave(evt.Peer) + ch <- pubsub.NewEventPeerLeave(evt.Peer, p.Topic()) } } }()