Skip to content

Commit

Permalink
fix: improve message cache
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com>
  • Loading branch information
gfanton committed Apr 4, 2022
1 parent 96637fe commit 0c02ed0
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 69 deletions.
9 changes: 7 additions & 2 deletions go/pkg/bertyprotocol/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,14 @@ func FillMessageKeysHolderUsingPreviousData(ctx context.Context, gc *GroupContex
for pk, sec := range publishedSecrets {
if err := gc.MessageKeystore().RegisterChainKey(ctx, gc.Group(), pk, sec, gc.DevicePubKey().Equals(pk)); err != nil {
gc.logger.Error("unable to register chain key", zap.Error(err))
} else {
ch <- pk
continue
}
// A new chainKey is registered, check if cached messages can be opened with it
if rawPK, err := pk.Raw(); err == nil {
gc.MessageStore().ProcessMessageQueueForDevicePK(ctx, rawPK)
}

ch <- pk
}

close(ch)
Expand Down
125 changes: 67 additions & 58 deletions go/pkg/bertyprotocol/store_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type MessageStore struct {

deviceCaches map[string]*groupCache
muDeviceCaches sync.RWMutex
cmessage chan *messageCacheItem
cmessage chan *messageItem
// muProcess sync.RWMutex

cache map[string]*ring.Ring
Expand Down Expand Up @@ -83,24 +83,26 @@ func (m *MessageStore) openMessage(ctx context.Context, e ipfslog.Entry) (*proto
ownPK = md.PrivateDevice().GetPublic()
}

headers, msg, attachmentsCIDs, err := m.mks.OpenEnvelope(ctx, m.g, ownPK, op.GetValue(), e.GetHash())
env, headers, err := cryptoutil.OpenEnvelopeHeaders(op.GetValue(), m.g)
if err != nil {
m.logger.Error("unable to open envelope", zap.Error(err))
return nil, err
return nil, errcode.ErrCryptoDecrypt.Wrap(err)
}

err = m.devKS.AttachmentSecretSlicePut(attachmentsCIDs, msg.GetProtocolMetadata().GetAttachmentsSecrets())
if err != nil {
m.logger.Error("unable to put attachments keys in keystore", zap.Error(err))
return nil, err
if !m.mks.HasSecretForRawDevicePK(ctx, m.g.PublicKey, headers.DevicePK) {
if err := m.addToMessageQueue(ctx, e); err != nil {
m.logger.Error("unable to add message to cache", zap.Error(err))
}

return nil, fmt.Errorf("no secret for device")
}

eventContext := newEventContext(e.GetHash(), e.GetNext(), m.g, attachmentsCIDs)
return &protocoltypes.GroupMessageEvent{
EventContext: eventContext,
Headers: headers,
Message: msg.GetPlaintext(),
}, err
return m.processMessage(ctx, ownPK, &messageItem{
op: op,
env: env,
headers: headers,
ownPK: ownPK,
hash: e.GetHash(),
})
}

type groupCache struct {
Expand All @@ -121,24 +123,21 @@ func (m *MessageStore) CacheSizeForDevicePK(devicePK []byte) (size int, ok bool)

func (m *MessageStore) ProcessMessageQueueForDevicePK(ctx context.Context, devicePK []byte) {
m.muDeviceCaches.Lock()
device, ok := m.deviceCaches[string(devicePK)]
if ok {
if device.hasSecret = m.mks.HasSecretForRawDevicePK(ctx, m.g.PublicKey, devicePK); device.hasSecret {
if next := device.queue.Next(); next != nil {
m.cmessage <- next
}
} else {
if device, ok := m.deviceCaches[string(devicePK)]; ok {
if device.hasSecret = m.mks.HasSecretForRawDevicePK(ctx, m.g.PublicKey, devicePK); !device.hasSecret {
m.logger.Error("unable to process message, no secret found for device pk", logutil.PrivateBinary("devicepk", devicePK))
} else if next := device.queue.Next(); next != nil {
m.cmessage <- next
}
}
m.muDeviceCaches.Unlock()
}

func (m *MessageStore) processMessage(ctx context.Context, ownPK crypto.PubKey, message *messageCacheItem) error {
func (m *MessageStore) processMessage(ctx context.Context, ownPK crypto.PubKey, message *messageItem) (*protocoltypes.GroupMessageEvent, error) {
// process message
msg, attachmentsCIDs, err := m.mks.OpenEnvelopePayload(ctx, message.env, message.headers, m.g, ownPK, message.hash)
if err != nil {
return err
return nil, err
}

err = m.devKS.AttachmentSecretSlicePut(attachmentsCIDs, msg.GetProtocolMetadata().GetAttachmentsSecrets())
Expand All @@ -153,17 +152,11 @@ func (m *MessageStore) processMessage(ctx context.Context, ownPK crypto.PubKey,

entry := message.op.GetEntry()
eventContext := newEventContext(entry.GetHash(), entry.GetNext(), m.g, attachmentsCIDs)
evt := protocoltypes.GroupMessageEvent{
return &protocoltypes.GroupMessageEvent{
EventContext: eventContext,
Headers: message.headers,
Message: msg.GetPlaintext(),
}

if err := m.emitters.groupMessage.Emit(evt); err != nil {
m.logger.Warn("unable to emit group message event", zap.Error(err))
}

return nil
}, nil
}

func (m *MessageStore) processMessageLoop(ctx context.Context) {
Expand All @@ -178,7 +171,7 @@ func (m *MessageStore) processMessageLoop(ctx context.Context) {
semProcess := semaphore.NewWeighted(32)

for {
var message *messageCacheItem
var message *messageItem
select {
case message = <-m.cmessage:
case <-ctx.Done():
Expand All @@ -200,36 +193,52 @@ func (m *MessageStore) processMessageLoop(ctx context.Context) {
m.deviceCaches[devicepk] = device
}

// check for device secret, if unavailable add message to cache queue
if !device.hasSecret {
device.queue.Add(message)
_ = m.emitters.groupCacheMessage.Emit(*message)
} else {
go func() {
// wait for a process slot
if err := semProcess.Acquire(ctx, 1); err != nil {
m.logger.Error("unable to acquire lock", zap.Error(err))
return
}
m.muDeviceCaches.Unlock()
continue
}

// start process routine
go func() {
// wait for a process slot
if err := semProcess.Acquire(ctx, 1); err != nil {
m.logger.Error("unable to acquire lock", zap.Error(err))
return
}

if err := m.processMessage(ctx, ownPK, message); err != nil {
if errcode.Is(err, errcode.ErrCryptoDecryptPayload) {
// @FIXME(gfanton): this should not happen
m.logger.Warn("unable to open envelope, adding envelope to cache for later process", zap.Error(err))
// defer release a process slot
defer semProcess.Release(1)

// if failed to decrypt add to queue, for later process
device.queue.Add(message)
_ = m.emitters.groupCacheMessage.Emit(*message)
} else {
m.logger.Error("unable to prcess message", zap.Error(err))
}
} else if next := device.queue.Next(); next != nil {
m.cmessage <- next
// process the message
evt, err := m.processMessage(ctx, ownPK, message)
if err != nil {
if errcode.Is(err, errcode.ErrCryptoDecryptPayload) {
// @FIXME(gfanton): this should not happen
m.logger.Warn("unable to open envelope, adding envelope to cache for later process", zap.Error(err))

// if failed to decrypt add to queue, for later process
device.queue.Add(message)
_ = m.emitters.groupCacheMessage.Emit(*message)
} else {
m.logger.Error("unable to prcess message", zap.Error(err))
}

// release a process slot
semProcess.Release(1)
}()
}
return
}

// emit new message event
if err := m.emitters.groupMessage.Emit(*evt); err != nil {
m.logger.Warn("unable to emit group message event", zap.Error(err))
}

if next := device.queue.Next(); next != nil {
m.cmessage <- next
}
}()

m.muDeviceCaches.Unlock()
}
}
Expand All @@ -255,7 +264,7 @@ func (m *MessageStore) addToMessageQueue(ctx context.Context, e ipfslog.Entry) e
return errcode.ErrCryptoDecrypt.Wrap(err)
}

msg := &messageCacheItem{
msg := &messageItem{
hash: e.GetHash(),
env: env,
headers: headers,
Expand Down Expand Up @@ -405,7 +414,7 @@ func constructorFactoryGroupMessage(s *BertyOrbitDB, logger *zap.Logger) iface.S
eventBus: options.EventBus,
devKS: s.deviceKeystore,
mks: s.messageKeystore,
cmessage: make(chan *messageCacheItem),
cmessage: make(chan *messageItem),
g: g,
logger: logger,
deviceCaches: make(map[string]*groupCache),
Expand All @@ -422,7 +431,7 @@ func constructorFactoryGroupMessage(s *BertyOrbitDB, logger *zap.Logger) iface.S
}

// for debug/test purpose
if store.emitters.groupCacheMessage, err = store.eventBus.Emitter(new(messageCacheItem)); err != nil {
if store.emitters.groupCacheMessage, err = store.eventBus.Emitter(new(messageItem)); err != nil {
return nil, errcode.ErrOrbitDBInit.Wrap(err)
}

Expand Down
16 changes: 8 additions & 8 deletions go/pkg/bertyprotocol/store_message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,42 @@ import (
)

// An Item is something we manage in a priority queue.
type messageCacheItem struct {
type messageItem struct {
op operation.Operation
env *protocoltypes.MessageEnvelope
headers *protocoltypes.MessageHeaders
ownPK crypto.PubKey
hash cid.Cid
}

func (m *messageCacheItem) Counter() uint64 {
func (m *messageItem) Counter() uint64 {
return m.headers.Counter
}

// A priorityMessageQueue implements heap.Interface and holds Items.
type priorityMessageQueue struct {
messages []*messageCacheItem
messages []*messageItem
muMessages sync.RWMutex
}

func newPriorityMessageQueue() *priorityMessageQueue {
queue := &priorityMessageQueue{
messages: []*messageCacheItem{},
messages: []*messageItem{},
}
heap.Init(queue)
return queue
}

func (pq *priorityMessageQueue) Add(m *messageCacheItem) {
func (pq *priorityMessageQueue) Add(m *messageItem) {
pq.muMessages.Lock()
heap.Push(pq, m)
pq.muMessages.Unlock()
}

func (pq *priorityMessageQueue) Next() (item *messageCacheItem) {
func (pq *priorityMessageQueue) Next() (item *messageItem) {
pq.muMessages.Lock()
if len(pq.messages) > 0 {
item = heap.Pop(pq).(*messageCacheItem)
item = heap.Pop(pq).(*messageItem)
}
pq.muMessages.Unlock()
return
Expand Down Expand Up @@ -75,7 +75,7 @@ func (pq *priorityMessageQueue) Swap(i, j int) {
}

func (pq *priorityMessageQueue) Push(x interface{}) {
pq.messages = append(pq.messages, x.(*messageCacheItem))
pq.messages = append(pq.messages, x.(*messageItem))
}

func (pq *priorityMessageQueue) Pop() (item interface{}) {
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/bertyprotocol/store_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func Test_Add_Messages_To_Cache(t *testing.T) {
require.NoError(t, err)

cadded, err := peers[1].GC.MessageStore().EventBus().Subscribe(
new(messageCacheItem), eventbus.BufSize(entriesCount))
new(messageItem), eventbus.BufSize(entriesCount))
require.NoError(t, err)

for i := 0; i < entriesCount; i++ {
Expand Down

0 comments on commit 0c02ed0

Please sign in to comment.