Skip to content

Commit

Permalink
fix: race conditions in log struct
Browse files Browse the repository at this point in the history
  • Loading branch information
glouvigny committed Nov 5, 2019
1 parent 5f02942 commit f93fb0b
Showing 1 changed file with 69 additions and 17 deletions.
86 changes: 69 additions & 17 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"strconv"
"strings"
"sync"
"time"

"berty.tech/go-ipfs-log/entry/sorting"
Expand Down Expand Up @@ -40,14 +41,19 @@ type IPFSLog struct {
heads iface.IPFSLogOrderedEntries
Next iface.IPFSLogOrderedEntries
Clock iface.IPFSLogLamportClock
lock sync.RWMutex
}

func (l *IPFSLog) SetEntries(entries iface.IPFSLogOrderedEntries) {
l.Entries = entries
}

func (l *IPFSLog) RawHeads() iface.IPFSLogOrderedEntries {
return l.heads
l.lock.RLock()
heads := l.heads
l.lock.RUnlock()

return heads
}

// maxInt Returns the larger of x or y
Expand Down Expand Up @@ -214,22 +220,26 @@ func (l *IPFSLog) traverse(rootEntries iface.IPFSLogOrderedEntries, amount int,
func (l *IPFSLog) Append(ctx context.Context, payload []byte, pointerCount int) (iface.IPFSLogEntry, error) {
// INFO: JS default value for pointerCount is 1
// Update the clock (find the latest clock)
newTime := maxClockTimeForEntries(l.heads.Slice(), 0)
l.lock.RLock()
heads := l.heads
l.lock.RUnlock()

newTime := maxClockTimeForEntries(heads.Slice(), 0)
newTime = maxInt(l.Clock.GetTime(), newTime) + 1

l.Clock = entry.NewLamportClock(l.Clock.GetID(), newTime)

// Get the required amount of hashes to next entries (as per current state of the log)
references, err := l.traverse(l.heads, maxInt(pointerCount, l.heads.Len()), "")
references, err := l.traverse(heads, maxInt(pointerCount, heads.Len()), "")
if err != nil {
return nil, errors.Wrap(err, "append failed")
}

next := []cid.Cid{}

keys := l.heads.Keys()
keys := heads.Keys()
for _, k := range keys {
e, _ := l.heads.Get(k)
e, _ := heads.Get(k)
next = append(next, e.GetHash())
}
for _, e := range references {
Expand All @@ -256,12 +266,15 @@ func (l *IPFSLog) Append(ctx context.Context, payload []byte, pointerCount int)
l.Entries.Set(e.Hash.String(), e)

for _, k := range keys {
nextEntry, _ := l.heads.Get(k)
nextEntry, _ := heads.Get(k)
l.Next.Set(nextEntry.GetHash().String(), e)
}

l.heads = entry.NewOrderedMap()
l.heads.Set(e.Hash.String(), e)
heads = entry.NewOrderedMapFromEntries([]iface.IPFSLogEntry{e})

l.lock.Lock()
l.heads = heads
l.lock.Unlock()

return e, nil
}
Expand Down Expand Up @@ -299,7 +312,10 @@ func (l *IPFSLog) Iterator(options *IteratorOptions, output chan<- iface.IPFSLog
amount = *options.Amount
}

l.lock.RLock()
start := l.heads.Slice()
l.lock.RUnlock()

if options.LTE != nil {
start = nil

Expand Down Expand Up @@ -410,7 +426,12 @@ func (l *IPFSLog) Join(otherLog iface.IPFSLog, size int) (iface.IPFSLog, error)
}
}

mergedHeads := entry.FindHeads(l.heads.Merge(otherLog.RawHeads()))
l.lock.RLock()
heads := l.heads
l.lock.RUnlock()

mergedHeads := entry.FindHeads(heads.Merge(otherLog.RawHeads()))

for idx, e := range mergedHeads {
// notReferencedByNewItems
if _, ok := nextsFromNewItems.Get(e.GetHash().String()); ok {
Expand All @@ -423,18 +444,34 @@ func (l *IPFSLog) Join(otherLog iface.IPFSLog, size int) (iface.IPFSLog, error)
}
}

l.lock.Lock()
l.heads = entry.NewOrderedMapFromEntries(mergedHeads)
l.lock.Unlock()

if size > -1 {
tmp := l.Values().Slice()
tmp = tmp[len(tmp)-size:]
l.Entries = entry.NewOrderedMapFromEntries(tmp)
l.heads = entry.NewOrderedMapFromEntries(entry.FindHeads(entry.NewOrderedMapFromEntries(tmp)))

entries := entry.NewOrderedMapFromEntries(tmp)
heads := entry.NewOrderedMapFromEntries(entry.FindHeads(entry.NewOrderedMapFromEntries(tmp)))

l.lock.Lock()
l.Entries = entries
l.heads = heads
l.lock.Unlock()
}

// Find the latest clock from the heads
maxClock := maxClockTimeForEntries(l.heads.Slice(), 0)
l.Clock = entry.NewLamportClock(l.Clock.GetID(), maxInt(l.Clock.GetTime(), maxClock))
l.lock.RLock()
headsSlice := l.heads.Slice()
clock := l.Clock
l.lock.RUnlock()

maxClock := maxClockTimeForEntries(headsSlice, 0)

l.lock.Lock()
l.Clock = entry.NewLamportClock(clock.GetID(), maxInt(clock.GetTime(), maxClock))
l.lock.Unlock()

return l, nil
}
Expand Down Expand Up @@ -513,9 +550,13 @@ func (l *IPFSLog) ToString(payloadMapper func(iface.IPFSLogEntry) string) string

// ToSnapshot exports a Snapshot-able version of the log
func (l *IPFSLog) ToSnapshot() *Snapshot {
l.lock.RLock()
heads := l.heads.Slice()
l.lock.RUnlock()

return &Snapshot{
ID: l.ID,
Heads: entrySliceToCids(l.heads.Slice()),
Heads: entrySliceToCids(heads),
Values: l.Values().Slice(),
}
}
Expand Down Expand Up @@ -689,18 +730,26 @@ func NewFromEntry(ctx context.Context, services io.IpfsServices, identity *ident
//
// The values are in linearized order according to their Lamport clocks
func (l *IPFSLog) Values() iface.IPFSLogOrderedEntries {
if l.heads == nil {
l.lock.RLock()
heads := l.heads
l.lock.RUnlock()

if heads == nil {
return entry.NewOrderedMap()
}
stack, _ := l.traverse(l.heads, -1, "")
stack, _ := l.traverse(heads, -1, "")
sorting.Reverse(stack)

return entry.NewOrderedMapFromEntries(stack)
}

// ToJSON Returns a log in a JSON serializable structure
func (l *IPFSLog) ToJSON() *JSONLog {
stack := l.heads.Slice()
l.lock.RLock()
heads := l.heads
l.lock.RUnlock()

stack := heads.Slice()
sorting.Sort(l.SortFn, stack)
sorting.Reverse(stack)

Expand All @@ -727,7 +776,10 @@ func (l *IPFSLog) GetEntries() iface.IPFSLogOrderedEntries {
//
// Heads are the entries that are not referenced by other entries in the log
func (l *IPFSLog) Heads() iface.IPFSLogOrderedEntries {
l.lock.RLock()
heads := l.heads.Slice()
l.lock.RUnlock()

sorting.Sort(l.SortFn, heads)
sorting.Reverse(heads)

Expand Down

0 comments on commit f93fb0b

Please sign in to comment.