Skip to content

Commit

Permalink
fix: orbitdb pubsub payload handler
Browse files Browse the repository at this point in the history
- add an optional Topic field on orbitdb pubsub message

Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com>
  • Loading branch information
gfanton committed Mar 24, 2022
1 parent 748476e commit e4f3729
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 99 deletions.
15 changes: 15 additions & 0 deletions baseorbitdb/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package baseorbitdb

import "github.com/libp2p/go-libp2p-core/peer"

type EventExchangeHeads struct {
Peer peer.ID
Message *MessageExchangeHeads
}

func NewEventExchangeHeads(p peer.ID, msg *MessageExchangeHeads) EventExchangeHeads {
return EventExchangeHeads{
Peer: p,
Message: msg,
}
}
52 changes: 32 additions & 20 deletions baseorbitdb/events_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,54 @@ import (

ipfslog "berty.tech/go-ipfs-log"
"berty.tech/go-ipfs-log/enc"
"berty.tech/go-ipfs-log/entry"
"berty.tech/go-orbit-db/iface"
"berty.tech/go-orbit-db/stores"
"go.uber.org/zap"
)

func (o *orbitDB) handleEventPubSubPayload(ctx context.Context, e *iface.EventPubSubPayload, sharedKey enc.SharedKey) error {
heads := &exchangedHeads{}
payload := e.Payload
func (o *orbitDB) handleEventExchangeHeads(ctx context.Context, e *EventExchangeHeads, sharedKey enc.SharedKey) error {
message := e.Message

if sharedKey != nil {
var err error
var (
address string
rawHeads []byte
)

payload, err = sharedKey.Open(payload)
if sharedKey != nil {
// open address
rawAddress, err := sharedKey.Open(message.Address)
if err != nil {
return fmt.Errorf("unable to decrypt payload: %w", err)
return fmt.Errorf("unable to decrypt address: %w", err)
}
address = string(rawAddress)

// open heads
if rawHeads, err = sharedKey.Open(message.Heads); err != nil {
return fmt.Errorf("unable to decrypt heads: %w", err)
}
} else {
address = string(message.Address)
rawHeads = message.Heads
}

err := json.Unmarshal(payload, &heads)
if err != nil {
o.logger.Error("unable to unmarshal heads", zap.Error(err))
store, ok := o.getStore(address)
if !ok {
return fmt.Errorf("receiving heads from unknown store")
}

o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(heads.Heads), heads.Address))
store, ok := o.getStore(heads.Address)
heads := []*entry.Entry{}
if err := json.Unmarshal(rawHeads, &heads); err != nil {
return fmt.Errorf("unable to parse heads: %w", err)
}

if !ok {
return fmt.Errorf("heads from unknown store, skipping")
untypedHeads := make([]ipfslog.Entry, len(heads))
for i, h := range heads {
untypedHeads[i] = h
}

if len(heads.Heads) > 0 {
untypedHeads := make([]ipfslog.Entry, len(heads.Heads))
for i := range heads.Heads {
untypedHeads[i] = heads.Heads[i]
}
o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(heads), address))

if len(heads) > 0 {
if err := store.Sync(ctx, untypedHeads); err != nil {
return fmt.Errorf("unable to sync heads: %w", err)
}
Expand Down
Loading

0 comments on commit e4f3729

Please sign in to comment.