-
Notifications
You must be signed in to change notification settings - Fork 26
/
daghelpers.go
118 lines (103 loc) · 2.42 KB
/
daghelpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package format
import (
"context"
cid "github.com/ipfs/go-cid"
)
// GetLinks returns the CIDs of the children of the given node. Prefer this
// method over looking up the node itself and calling `Links()` on it as this
// method may be able to use a link cache.
func GetLinks(ctx context.Context, ng NodeGetter, c cid.Cid) ([]*Link, error) {
if c.Type() == cid.Raw {
return nil, nil
}
if gl, ok := ng.(LinkGetter); ok {
return gl.GetLinks(ctx, c)
}
node, err := ng.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}
// GetDAG will fill out all of the links of the given Node.
// It returns an array of NodePromise with the linked nodes all in the proper
// order.
func GetDAG(ctx context.Context, ds NodeGetter, root Node) []*NodePromise {
var cids []cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
}
return GetNodes(ctx, ds, cids)
}
// GetNodes returns an array of 'FutureNode' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds NodeGetter, keys []cid.Cid) []*NodePromise {
// Early out if no work to do
if len(keys) == 0 {
return nil
}
promises := make([]*NodePromise, len(keys))
for i := range keys {
promises[i] = NewNodePromise(ctx)
}
dedupedKeys := dedupeKeys(keys)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodechan := ds.GetMany(ctx, dedupedKeys)
for count := 0; count < len(keys); {
select {
case opt, ok := <-nodechan:
if !ok {
for _, p := range promises {
p.Fail(ErrNotFound{})
}
return
}
if opt.Err != nil {
for _, p := range promises {
p.Fail(opt.Err)
}
return
}
nd := opt.Node
c := nd.Cid()
for i, lnk_c := range keys {
if c.Equals(lnk_c) {
count++
promises[i].Send(nd)
}
}
case <-ctx.Done():
return
}
}
}()
return promises
}
func Copy(ctx context.Context, from, to DAGService, root cid.Cid) error {
node, err := from.Get(ctx, root)
if err != nil {
return err
}
links := node.Links()
for _, link := range links {
err := Copy(ctx, from, to, link.Cid)
if err != nil {
return err
}
}
err = to.Add(ctx, node)
if err != nil {
return err
}
return nil
}
// Remove duplicates from a list of keys
func dedupeKeys(cids []cid.Cid) []cid.Cid {
set := cid.NewSet()
for _, c := range cids {
set.Add(c)
}
return set.Keys()
}