-
-
Notifications
You must be signed in to change notification settings - Fork 3k
/
import.go
219 lines (181 loc) · 5.45 KB
/
import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package dagcmd
import (
"errors"
"fmt"
"io"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/cmdutils"
cmds "github.com/ipfs/go-ipfs-cmds"
gocarv2 "github.com/ipld/go-car/v2"
)
func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
node, err := cmdenv.GetNode(env)
if err != nil {
return err
}
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}
// on import ensure we do not reach out to the network for any reason
// if a pin based on what is imported + what is in the blockstore
// isn't possible: tough luck
api, err = api.WithOptions(options.Api.Offline(true))
if err != nil {
return err
}
// grab a pinlock ( which doubles as a GC lock ) so that regardless of the
// size of the streamed-in cars nothing will disappear on us before we had
// a chance to roots that may show up at the very end
// This is especially important for use cases like dagger:
// ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )
//
unlocker := node.Blockstore.PinLock(req.Context)
defer unlocker.Unlock(req.Context)
doPinRoots, _ := req.Options[pinRootsOptionName].(bool)
retCh := make(chan importResult, 1)
go importWorker(req, res, api, retCh)
done := <-retCh
if done.err != nil {
return done.err
}
// It is not guaranteed that a root in a header is actually present in the same ( or any )
// .car file. This is the case in version 1, and ideally in further versions too
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
// We will attempt a pin *only* at the end in case all car files were well formed
//
// The boolean value indicates whether we have encountered the root within the car file's
roots := done.roots
// opportunistic pinning: try whatever sticks
if doPinRoots {
var failedPins int
for c := range roots {
// We need to re-retrieve a block, convert it to ipld, and feed it
// to the Pinning interface, sigh...
//
// If we didn't have the problem of inability to take multiple pinlocks,
// we could use the api directly like so (though internally it does the same):
//
// // not ideal, but the pinning api takes only paths :(
// rp := path.NewResolvedPath(
// ipfspath.FromCid(c),
// c,
// c,
// "",
// )
//
// if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil {
ret := RootMeta{Cid: c}
if block, err := node.Blockstore.Get(req.Context, c); err != nil {
ret.PinErrorMsg = err.Error()
} else if nd, err := ipld.Decode(block); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Pin(req.Context, nd, true); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Flush(req.Context); err != nil {
ret.PinErrorMsg = err.Error()
}
if ret.PinErrorMsg != "" {
failedPins++
}
if err := res.Emit(&CarImportOutput{Root: &ret}); err != nil {
return err
}
}
if failedPins > 0 {
return fmt.Errorf(
"unable to pin all roots: %d out of %d failed",
failedPins,
len(roots),
)
}
}
stats, _ := req.Options[statsOptionName].(bool)
if stats {
err = res.Emit(&CarImportOutput{
Stats: &CarImportStats{
BlockCount: done.blockCount,
BlockBytesCount: done.blockBytesCount,
},
})
if err != nil {
return err
}
}
return nil
}
func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) {
// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag())
roots := make(map[cid.Cid]struct{})
var blockCount, blockBytesCount uint64
it := req.Files.Entries()
for it.Next() {
file := files.FileFromEntry(it)
if file == nil {
ret <- importResult{err: errors.New("expected a file handle")}
return
}
// wrap a defer-closer-scope
//
// every single file in it() is already open before we start
// just close here sooner rather than later for neatness
// and to surface potential errors writing on closed fifos
// this won't/can't help with not running out of handles
err := func() error {
defer file.Close()
car, err := gocarv2.NewBlockReader(file)
if err != nil {
return err
}
for _, c := range car.Roots {
roots[c] = struct{}{}
}
for {
block, err := car.Next()
if err != nil && err != io.EOF {
return err
} else if block == nil {
break
}
if err := cmdutils.CheckBlockSize(req, uint64(len(block.RawData()))); err != nil {
return err
}
// the double-decode is suboptimal, but we need it for batching
nd, err := ipld.Decode(block)
if err != nil {
return err
}
if err := batch.Add(req.Context, nd); err != nil {
return err
}
blockCount++
blockBytesCount += uint64(len(block.RawData()))
}
return nil
}()
if err != nil {
ret <- importResult{err: err}
return
}
}
if err := it.Err(); err != nil {
ret <- importResult{err: err}
return
}
if err := batch.Commit(); err != nil {
ret <- importResult{err: err}
return
}
ret <- importResult{
blockCount: blockCount,
blockBytesCount: blockBytesCount,
roots: roots}
}