-
Notifications
You must be signed in to change notification settings - Fork 44
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
677 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
package car | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"io" | ||
|
||
"github.com/ipfs/go-blockservice" | ||
"github.com/ipfs/go-cid" | ||
blockstore "github.com/ipfs/go-ipfs-blockstore" | ||
offline "github.com/ipfs/go-ipfs-exchange-offline" | ||
format "github.com/ipfs/go-ipld-format" | ||
"github.com/ipfs/go-merkledag" | ||
"github.com/ipld/go-car/v2/internal/carv1" | ||
"github.com/ipld/go-car/v2/internal/carv1/util" | ||
) | ||
|
||
type blockInfo struct { | ||
offset uint64 | ||
// Note: size is the size of the block and metadata | ||
size uint64 | ||
links []*format.Link | ||
} | ||
|
||
// CarOffsetWriter turns a blockstore and a root CID into a CAR file stream, | ||
// starting from an offset | ||
type CarOffsetWriter struct { | ||
payloadCid cid.Cid | ||
nodeGetter format.NodeGetter | ||
blockInfos map[cid.Cid]*blockInfo | ||
header carv1.CarHeader | ||
} | ||
|
||
func NewCarOffsetWriter(payloadCid cid.Cid, bstore blockstore.Blockstore) *CarOffsetWriter { | ||
ng := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))) | ||
return &CarOffsetWriter{ | ||
payloadCid: payloadCid, | ||
nodeGetter: ng, | ||
blockInfos: make(map[cid.Cid]*blockInfo), | ||
header: carHeader(payloadCid), | ||
} | ||
} | ||
|
||
func carHeader(payloadCid cid.Cid) carv1.CarHeader { | ||
return carv1.CarHeader{ | ||
Roots: []cid.Cid{payloadCid}, | ||
Version: 1, | ||
} | ||
} | ||
|
||
func (s *CarOffsetWriter) Write(ctx context.Context, w io.Writer, offset uint64) error { | ||
headerSize, err := s.writeHeader(w, offset) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return s.writeBlocks(ctx, w, headerSize, offset) | ||
} | ||
|
||
func (s *CarOffsetWriter) writeHeader(w io.Writer, offset uint64) (uint64, error) { | ||
headerSize, err := carv1.HeaderSize(&s.header) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to size car header: %w", err) | ||
} | ||
|
||
// Check if the offset from which to start writing is after the header | ||
if offset >= headerSize { | ||
return headerSize, nil | ||
} | ||
|
||
// Write out the header, starting at the offset | ||
_, err = skipWrite(w, offset, func(sw io.Writer) (int, error) { | ||
return 0, carv1.WriteHeader(&s.header, sw) | ||
}) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to write car header: %w", err) | ||
} | ||
|
||
return headerSize, nil | ||
} | ||
|
||
func (s *CarOffsetWriter) writeBlocks(ctx context.Context, w io.Writer, headerSize uint64, writeOffset uint64) error { | ||
// The first block's offset is the size of the header | ||
offset := headerSize | ||
|
||
// This function gets called for each CID during the merkle DAG walk | ||
nextCid := func(ctx context.Context, c cid.Cid) ([]*format.Link, error) { | ||
// There will be an item in the cache if writeBlocks has already been | ||
// called before, and the DAG traversal reached this CID | ||
cached, ok := s.blockInfos[c] | ||
if ok { | ||
// Check if the offset from which to start writing is after this | ||
// block | ||
nextBlockOffset := cached.offset + cached.size | ||
if writeOffset >= nextBlockOffset { | ||
// The offset from which to start writing is after this block | ||
// so don't write anything, just skip over this block | ||
offset = nextBlockOffset | ||
return cached.links, nil | ||
} | ||
} | ||
|
||
// Get the block from the blockstore | ||
nd, err := s.nodeGetter.Get(ctx, c) | ||
if err != nil { | ||
return nil, fmt.Errorf("getting block %s: %w", c, err) | ||
} | ||
|
||
// Get the size of the block and metadata | ||
ldsize := util.LdSize(nd.Cid().Bytes(), nd.RawData()) | ||
|
||
// Check if the offset from which to start writing is in or before this | ||
// block | ||
nextBlockOffset := offset + ldsize | ||
if writeOffset < nextBlockOffset { | ||
// Check if the offset from which to start writing is in this block | ||
var blockWriteOffset uint64 | ||
if writeOffset >= offset { | ||
blockWriteOffset = writeOffset - offset | ||
} | ||
|
||
// Write the block data to the writer, starting at the block offset | ||
_, err = skipWrite(w, blockWriteOffset, func(sw io.Writer) (int, error) { | ||
return 0, util.LdWrite(sw, nd.Cid().Bytes(), nd.RawData()) | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("writing CAR block %s: %w", c, err) | ||
} | ||
} | ||
|
||
// Add the block to the cache | ||
s.blockInfos[nd.Cid()] = &blockInfo{ | ||
offset: offset, | ||
size: ldsize, | ||
links: nd.Links(), | ||
} | ||
|
||
offset = nextBlockOffset | ||
|
||
// Return any links from this block to other DAG blocks | ||
return nd.Links(), nil | ||
} | ||
|
||
seen := cid.NewSet() | ||
return merkledag.Walk(ctx, nextCid, s.payloadCid, seen.Visit) | ||
} | ||
|
||
// Write data to the writer, skipping the first skip bytes | ||
func skipWrite(w io.Writer, skip uint64, write func(sw io.Writer) (int, error)) (int, error) { | ||
// If there's nothing to skip, just do a normal write | ||
if skip == 0 { | ||
return write(w) | ||
} | ||
|
||
// Write to a buffer | ||
var buff bytes.Buffer | ||
if count, err := write(&buff); err != nil { | ||
return count, err | ||
} | ||
|
||
// Write the buffer to the writer, skipping the first skip bytes | ||
bz := buff.Bytes() | ||
if skip >= uint64(len(bz)) { | ||
return 0, nil | ||
} | ||
return w.Write(bz[skip:]) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
package car | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"io" | ||
"math/rand" | ||
"testing" | ||
|
||
"github.com/ipfs/go-blockservice" | ||
"github.com/ipfs/go-cidutil" | ||
"github.com/ipfs/go-datastore" | ||
dss "github.com/ipfs/go-datastore/sync" | ||
bstore "github.com/ipfs/go-ipfs-blockstore" | ||
chunk "github.com/ipfs/go-ipfs-chunker" | ||
format "github.com/ipfs/go-ipld-format" | ||
"github.com/ipfs/go-merkledag" | ||
"github.com/ipfs/go-unixfs/importer/balanced" | ||
"github.com/ipfs/go-unixfs/importer/helpers" | ||
"github.com/ipld/go-car/v2/internal/carv1" | ||
mh "github.com/multiformats/go-multihash" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestCarOffsetWriter(t *testing.T) { | ||
ds := dss.MutexWrap(datastore.NewMapDatastore()) | ||
bs := bstore.NewBlockstore(ds) | ||
bserv := blockservice.New(bs, nil) | ||
dserv := merkledag.NewDAGService(bserv) | ||
|
||
rseed := 5 | ||
size := 2 * 1024 * 1024 | ||
source := io.LimitReader(rand.New(rand.NewSource(int64(rseed))), int64(size)) | ||
nd, err := DAGImport(dserv, source) | ||
require.NoError(t, err) | ||
|
||
// Write the CAR to a buffer from offset 0 so the buffer can be used for | ||
// comparison | ||
payloadCid := nd.Cid() | ||
fullCarCow := NewCarOffsetWriter(payloadCid, bs) | ||
var fullBuff bytes.Buffer | ||
err = fullCarCow.Write(context.Background(), &fullBuff, 0) | ||
require.NoError(t, err) | ||
|
||
fullCar := fullBuff.Bytes() | ||
header := carHeader(nd.Cid()) | ||
headerSize, err := carv1.HeaderSize(&header) | ||
|
||
testCases := []struct { | ||
name string | ||
offset uint64 | ||
}{{ | ||
name: "1 byte offset", | ||
offset: 1, | ||
}, { | ||
name: "offset < header size", | ||
offset: headerSize - 1, | ||
}, { | ||
name: "offset == header size", | ||
offset: headerSize, | ||
}, { | ||
name: "offset > header size", | ||
offset: headerSize + 1, | ||
}, { | ||
name: "offset > header + one block size", | ||
offset: headerSize + 1024*1024 + 512*1024, | ||
}} | ||
|
||
runTestCases := func(name string, runTCWithCow func() *CarOffsetWriter) { | ||
for _, tc := range testCases { | ||
t.Run(name+" - "+tc.name, func(t *testing.T) { | ||
cow := runTCWithCow() | ||
var buff bytes.Buffer | ||
err = cow.Write(context.Background(), &buff, tc.offset) | ||
require.NoError(t, err) | ||
require.Equal(t, len(fullCar)-int(tc.offset), len(buff.Bytes())) | ||
require.Equal(t, fullCar[tc.offset:], buff.Bytes()) | ||
}) | ||
} | ||
} | ||
|
||
// Run tests with a new CarOffsetWriter | ||
runTestCases("new car offset writer", func() *CarOffsetWriter { | ||
return NewCarOffsetWriter(payloadCid, bs) | ||
}) | ||
|
||
// Run tests with a CarOffsetWriter that has already been used to write | ||
// a CAR starting at offset 0 | ||
runTestCases("fully written car offset writer", func() *CarOffsetWriter { | ||
fullCarCow := NewCarOffsetWriter(payloadCid, bs) | ||
var buff bytes.Buffer | ||
err = fullCarCow.Write(context.Background(), &buff, 0) | ||
require.NoError(t, err) | ||
return fullCarCow | ||
}) | ||
|
||
// Run tests with a CarOffsetWriter that has already been used to write | ||
// a CAR starting at offset 1 | ||
runTestCases("car offset writer written from offset 1", func() *CarOffsetWriter { | ||
fullCarCow := NewCarOffsetWriter(payloadCid, bs) | ||
var buff bytes.Buffer | ||
err = fullCarCow.Write(context.Background(), &buff, 1) | ||
require.NoError(t, err) | ||
return fullCarCow | ||
}) | ||
|
||
// Run tests with a CarOffsetWriter that has already been used to write | ||
// a CAR starting part way through the second block | ||
runTestCases("car offset writer written from offset 1.5 blocks", func() *CarOffsetWriter { | ||
fullCarCow := NewCarOffsetWriter(payloadCid, bs) | ||
var buff bytes.Buffer | ||
err = fullCarCow.Write(context.Background(), &buff, 1024*1024+512*1024) | ||
require.NoError(t, err) | ||
return fullCarCow | ||
}) | ||
|
||
// Run tests with a CarOffsetWriter that has already been used to write | ||
// a CAR repeatedly | ||
runTestCases("car offset writer written from offset repeatedly", func() *CarOffsetWriter { | ||
fullCarCow := NewCarOffsetWriter(payloadCid, bs) | ||
var buff bytes.Buffer | ||
err = fullCarCow.Write(context.Background(), &buff, 1024) | ||
require.NoError(t, err) | ||
fullCarCow = NewCarOffsetWriter(payloadCid, bs) | ||
var buff2 bytes.Buffer | ||
err = fullCarCow.Write(context.Background(), &buff2, 10) | ||
require.NoError(t, err) | ||
fullCarCow = NewCarOffsetWriter(payloadCid, bs) | ||
var buff3 bytes.Buffer | ||
err = fullCarCow.Write(context.Background(), &buff3, 1024*1024+512*1024) | ||
require.NoError(t, err) | ||
return fullCarCow | ||
}) | ||
} | ||
|
||
func TestSkipWriter(t *testing.T) { | ||
testCases := []struct { | ||
name string | ||
size int | ||
skip int | ||
expected int | ||
}{{ | ||
name: "no skip", | ||
size: 1024, | ||
skip: 0, | ||
expected: 1024, | ||
}, { | ||
name: "skip 1", | ||
size: 1024, | ||
skip: 1, | ||
expected: 1023, | ||
}, { | ||
name: "skip all", | ||
size: 1024, | ||
skip: 1024, | ||
expected: 0, | ||
}, { | ||
name: "skip overflow", | ||
size: 1024, | ||
skip: 1025, | ||
expected: 0, | ||
}} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
var buff bytes.Buffer | ||
write := func(sw io.Writer) (int, error) { | ||
bz := make([]byte, tc.size) | ||
return sw.Write(bz) | ||
} | ||
count, err := skipWrite(&buff, uint64(tc.skip), write) | ||
require.NoError(t, err) | ||
require.Equal(t, tc.expected, count) | ||
require.Equal(t, tc.expected, len(buff.Bytes())) | ||
}) | ||
} | ||
} | ||
|
||
var DefaultHashFunction = uint64(mh.SHA2_256) | ||
|
||
func DAGImport(dserv format.DAGService, fi io.Reader) (format.Node, error) { | ||
prefix, err := merkledag.PrefixForCidVersion(1) | ||
if err != nil { | ||
return nil, err | ||
} | ||
prefix.MhType = DefaultHashFunction | ||
|
||
spl := chunk.NewSizeSplitter(fi, 1024*1024) | ||
dbp := helpers.DagBuilderParams{ | ||
Maxlinks: 1024, | ||
RawLeaves: true, | ||
|
||
CidBuilder: cidutil.InlineBuilder{ | ||
Builder: prefix, | ||
Limit: 32, | ||
}, | ||
|
||
Dagserv: dserv, | ||
} | ||
|
||
db, err := dbp.New(spl) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return balanced.Layout(db) | ||
} |
Oops, something went wrong.