forked from filecoin-project/dagstore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dagstore_async.go
165 lines (128 loc) · 5.54 KB
/
dagstore_async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package dagstore
import (
"context"
"github.com/filecoin-project/dagstore/index"
carindex "github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
"github.com/filecoin-project/dagstore/mount"
)
//
// This file contains methods that are called from the event loop
// but are run asynchronously in dedicated goroutines.
//
// acquireAsync acquires a shard by fetching its data, obtaining its index, and
// joining them to form a ShardAccessor.
func (d *DAGStore) acquireAsync(ctx context.Context, w *waiter, s *Shard, mnt mount.Mount) {
k := s.key
reader, err := mnt.Fetch(ctx)
if err := ctx.Err(); err != nil {
log.Warnw("context cancelled while fetching shard; releasing", "shard", s.key, "error", err)
// release the shard to decrement the refcount that's incremented before `acquireAsync` is called.
_ = d.queueTask(&task{op: OpShardRelease, shard: s}, d.completionCh)
// send the shard error to the caller for correctness
// since the context is cancelled, the result will be discarded.
d.dispatchResult(&ShardResult{Key: k, Error: err}, w)
return
}
if err != nil {
log.Warnw("acquire: failed to fetch from mount upgrader", "shard", s.key, "error", err)
// release the shard to decrement the refcount that's incremented before `acquireAsync` is called.
_ = d.queueTask(&task{op: OpShardRelease, shard: s}, d.completionCh)
// fail the shard
_ = d.failShard(s, d.completionCh, "failed to acquire reader of mount so we can return the accessor: %w", err)
// send the shard error to the caller.
d.dispatchResult(&ShardResult{Key: k, Error: err}, w)
return
}
log.Debugw("acquire: successfully fetched from mount upgrader", "shard", s.key)
// acquire the index.
idx, err := d.indices.GetFullIndex(k)
if err := ctx.Err(); err != nil {
log.Warnw("context cancelled while indexing shard; releasing", "shard", s.key, "error", err)
// release the shard to decrement the refcount that's incremented before `acquireAsync` is called.
_ = d.queueTask(&task{op: OpShardRelease, shard: s}, d.completionCh)
// send the shard error to the caller for correctness
// since the context is cancelled, the result will be discarded.
d.dispatchResult(&ShardResult{Key: k, Error: err}, w)
return
}
if err != nil {
log.Warnw("acquire: failed to get index for shard", "shard", s.key, "error", err)
if err := reader.Close(); err != nil {
log.Errorf("failed to close mount reader: %s", err)
}
// release the shard to decrement the refcount that's incremented before `acquireAsync` is called.
_ = d.queueTask(&task{op: OpShardRelease, shard: s}, d.completionCh)
// fail the shard
_ = d.failShard(s, d.completionCh, "failed to recover index for shard %s: %w", k, err)
// send the shard error to the caller.
d.dispatchResult(&ShardResult{Key: k, Error: err}, w)
return
}
log.Debugw("acquire: successful; returning accessor", "shard", s.key)
// build the accessor.
sa, err := NewShardAccessor(reader, idx, s)
// send the shard accessor to the caller, adding a notifyDead function that
// will be called to release the shard if we were unable to deliver
// the accessor.
w.notifyDead = func() {
log.Warnw("context cancelled while delivering accessor; releasing", "shard", s.key)
// release the shard to decrement the refcount that's incremented before `acquireAsync` is called.
_ = d.queueTask(&task{op: OpShardRelease, shard: s}, d.completionCh)
}
d.dispatchResult(&ShardResult{Key: k, Accessor: sa, Error: err}, w)
}
// initializeShard initializes a shard asynchronously by fetching its data and
// performing indexing.
func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Mount) {
reader, err := mnt.Fetch(ctx)
if err != nil {
log.Warnw("initialize: failed to fetch from mount upgrader", "shard", s.key, "error", err)
_ = d.failShard(s, d.completionCh, "failed to acquire reader of mount on initialization: %w", err)
return
}
defer reader.Close()
log.Debugw("initialize: successfully fetched from mount upgrader", "shard", s.key)
// works for both CARv1 and CARv2.
var idx carindex.Index
err = d.throttleIndex.Do(ctx, func(_ context.Context) error {
var err error
idx, err = d.indexer(ctx, s.key, reader)
if err == nil {
log.Debugw("initialize: finished generating index for shard", "shard", s.key)
} else {
log.Warnw("initialize: failed to generate index for shard", "shard", s.key, "error", err)
}
return err
})
if err != nil {
_ = d.failShard(s, d.completionCh, "failed to read/generate CAR Index: %w", err)
return
}
if err := d.indices.AddFullIndex(s.key, idx); err != nil {
_ = d.failShard(s, d.completionCh, "failed to add index for shard: %w", err)
return
}
// add all cids in the shard to the inverted (cid -> []Shard Keys) index.
iterableIdx, ok := idx.(carindex.IterableIndex)
if ok {
mhIter := &mhIdx{iterableIdx: iterableIdx}
if err := d.TopLevelIndex.AddMultihashesForShard(ctx, mhIter, s.key); err != nil {
log.Errorw("failed to add shard multihashes to the inverted index", "shard", s.key, "error", err)
}
} else {
log.Errorw("shard index is not iterable", "shard", s.key)
}
_ = d.queueTask(&task{op: OpShardMakeAvailable, shard: s}, d.completionCh)
}
// Convenience struct for converting from CAR index.IterableIndex to the
// iterator required by the dag store inverted index.
type mhIdx struct {
iterableIdx carindex.IterableIndex
}
var _ index.MultihashIterator = (*mhIdx)(nil)
func (it *mhIdx) ForEach(fn func(mh multihash.Multihash) error) error {
return it.iterableIdx.ForEach(func(mh multihash.Multihash, _ uint64) error {
return fn(mh)
})
}