Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle new peer by store #104

Merged
merged 1 commit into from
May 6, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions baseorbitdb/orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ type orbitDB struct {

// emitters
emitters struct {
newPeer event.Emitter
newHeads event.Emitter
}

Expand Down Expand Up @@ -395,11 +394,6 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity,
messageMarshaler: options.MessageMarshaler,
}

odb.emitters.newPeer, err = eventBus.Emitter(new(stores.EventNewPeer))
if err != nil {
return nil, errors.Wrap(err, "unable to create global emitter")
}

// set new heads as stateful, so newly subscriber can replay last event in case they missed it
odb.emitters.newHeads, err = eventBus.Emitter(new(EventExchangeHeads), eventbus.Stateful)
if err != nil {
Expand Down Expand Up @@ -850,10 +844,23 @@ func (o *orbitDB) pubSubChanListener(ctx context.Context, store Store, topic ifa
return err
}

newPeerEmitter, err := store.EventBus().Emitter(new(stores.EventNewPeer))
if err != nil {
return fmt.Errorf("unable to init emitter: %w", err)
}

go func() {
defer newPeerEmitter.Close()

for e := range chPeers {
switch evt := e.(type) {
case *iface.EventPubSubJoin:
// notify store that we have a new peers
if err := newPeerEmitter.Emit(stores.NewEventNewPeer(evt.Peer)); err != nil {
o.logger.Error("unable to emit event new peer", zap.Error(err))
}

// handle new peers
go o.onNewPeerJoined(ctx, evt.Peer, store)
o.logger.Debug(fmt.Sprintf("peer %s joined from %s self is %s", evt.Peer.String(), addr, o.PeerID()))

Expand Down Expand Up @@ -912,10 +919,6 @@ func (o *orbitDB) onNewPeerJoined(ctx context.Context, p peer.ID, store Store) {
}
return
}

if err := o.emitters.newPeer.Emit(stores.NewEventNewPeer(p)); err != nil {
o.logger.Error("unable emit NewPeer event", zap.Error(err))
}
}

func (o *orbitDB) exchangeHeads(ctx context.Context, p peer.ID, store Store) error {
Expand Down