-
Notifications
You must be signed in to change notification settings - Fork 28
/
orbitdb_many_adds_berty_test.go
138 lines (104 loc) · 3.21 KB
/
orbitdb_many_adds_berty_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package weshnet
import (
"context"
"fmt"
"os"
"path"
"sync"
"testing"
"time"
sync_ds "github.com/ipfs/go-datastore/sync"
"github.com/juju/fslock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"berty.tech/go-orbit-db/iface"
"berty.tech/weshnet/v2/pkg/ipfsutil"
"berty.tech/weshnet/v2/pkg/protocoltypes"
"berty.tech/weshnet/v2/pkg/secretstore"
"berty.tech/weshnet/v2/pkg/testutil"
)
func testAddBerty(ctx context.Context, t *testing.T, node ipfsutil.CoreAPIMock, g *protocoltypes.Group, pathBase string, storageKey []byte, storageSalt []byte, amountToAdd, amountCurrentlyPresent int) {
t.Helper()
testutil.FilterSpeed(t, testutil.Fast)
t.Logf("TestAddBerty: amountToAdd: %d, amountCurrentlyPresent: %d\n", amountToAdd, amountCurrentlyPresent)
api := node.API()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
lock := fslock.New(path.Join(pathBase, "lock"))
err := lock.TryLock()
require.NoError(t, err)
defer lock.Unlock()
baseDS, err := GetRootDatastoreForPath(pathBase, storageKey, storageSalt, zap.NewNop())
require.NoError(t, err)
baseDS = sync_ds.MutexWrap(baseDS)
defer testutil.Close(t, baseDS)
secretStore, err := secretstore.NewSecretStore(baseDS, nil)
require.NoError(t, err)
defer secretStore.Close()
odb, err := NewWeshOrbitDB(ctx, api, &NewOrbitDBOptions{
Datastore: baseDS,
SecretStore: secretStore,
})
require.NoError(t, err)
defer testutil.Close(t, odb)
replicate := false
gc, err := odb.OpenGroup(ctx, g, &iface.CreateDBOptions{
Replicate: &replicate,
})
require.NoError(t, err)
defer gc.Close()
defer testutil.Close(t, gc)
wg := sync.WaitGroup{}
wg.Add(amountToAdd * 2)
amountCurrentlyFound := 0
messages, err := gc.MessageStore().ListEvents(ctx, nil, nil, false)
require.NoError(t, err)
for range messages {
amountCurrentlyFound++
}
sub, err := gc.MessageStore().EventBus().Subscribe(new(*protocoltypes.GroupMessageEvent))
require.NoError(t, err)
defer sub.Close()
// Watch for incoming new messages
go func() {
for range sub.Out() {
wg.Done()
}
}()
_, err = gc.MetadataStore().AddDeviceToGroup(ctx)
require.NoError(t, err)
for i := 0; i < amountToAdd; i++ {
_, err := gc.MessageStore().AddMessage(ctx, []byte(fmt.Sprintf("%d", i)))
require.NoError(t, err)
wg.Done()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(30 * time.Second):
}
require.Equal(t, amountCurrentlyPresent, amountCurrentlyFound)
}
func TestAddBerty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
api := ipfsutil.TestingCoreAPI(ctx, t)
pathBase, err := os.MkdirTemp("", "manyaddstest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(pathBase)
g, _, err := NewGroupMultiMember()
require.NoError(t, err)
storageKey := []byte("42424242424242424242424242424242")
storageSalt := []byte("2121212121212121")
testAddBerty(ctx, t, api, g, pathBase, storageKey, storageSalt, 20, 0)
testAddBerty(ctx, t, api, g, pathBase, storageKey, storageSalt, 0, 20)
testAddBerty(ctx, t, api, g, pathBase, storageKey, storageSalt, 20, 20)
testAddBerty(ctx, t, api, g, pathBase, storageKey, storageSalt, 0, 40)
// FIXME: use github.com/stretchr/testify/suite
}