Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAR offset writer #290

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions v2/car_offset_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package car

import (
"bytes"
"context"
"fmt"
"io"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipld/go-car/v2/internal/carv1"
"github.com/ipld/go-car/v2/internal/carv1/util"
)

type blockInfo struct {
offset uint64
// Note: size is the size of the block and metadata
size uint64
links []*format.Link
}

// CarOffsetWriter turns a blockstore and a root CID into a CAR file stream,
// starting from an offset
type CarOffsetWriter struct {
payloadCid cid.Cid
nodeGetter format.NodeGetter
blockInfos map[cid.Cid]*blockInfo
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
header carv1.CarHeader
}

func NewCarOffsetWriter(payloadCid cid.Cid, bstore blockstore.Blockstore) *CarOffsetWriter {
ng := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore)))
return &CarOffsetWriter{
payloadCid: payloadCid,
nodeGetter: ng,
blockInfos: make(map[cid.Cid]*blockInfo),
header: carHeader(payloadCid),
}
}

func carHeader(payloadCid cid.Cid) carv1.CarHeader {
return carv1.CarHeader{
Roots: []cid.Cid{payloadCid},
Version: 1,
}
}

func (s *CarOffsetWriter) Write(ctx context.Context, w io.Writer, offset uint64) error {
headerSize, err := s.writeHeader(w, offset)
if err != nil {
return err
}

return s.writeBlocks(ctx, w, headerSize, offset)
}

func (s *CarOffsetWriter) writeHeader(w io.Writer, offset uint64) (uint64, error) {
headerSize, err := carv1.HeaderSize(&s.header)
if err != nil {
return 0, fmt.Errorf("failed to size car header: %w", err)
}

// Check if the offset from which to start writing is after the header
if offset >= headerSize {
return headerSize, nil
}

// Write out the header, starting at the offset
_, err = skipWrite(w, offset, func(sw io.Writer) (int, error) {
return 0, carv1.WriteHeader(&s.header, sw)
})
if err != nil {
return 0, fmt.Errorf("failed to write car header: %w", err)
}

return headerSize, nil
}

func (s *CarOffsetWriter) writeBlocks(ctx context.Context, w io.Writer, headerSize uint64, writeOffset uint64) error {
// The first block's offset is the size of the header
offset := headerSize

// This function gets called for each CID during the merkle DAG walk
nextCid := func(ctx context.Context, c cid.Cid) ([]*format.Link, error) {
// There will be an item in the cache if writeBlocks has already been
// called before, and the DAG traversal reached this CID
cached, ok := s.blockInfos[c]
if ok {
// Check if the offset from which to start writing is after this
// block
nextBlockOffset := cached.offset + cached.size
if writeOffset >= nextBlockOffset {
// The offset from which to start writing is after this block
// so don't write anything, just skip over this block
offset = nextBlockOffset
return cached.links, nil
}
}

// Get the block from the blockstore
nd, err := s.nodeGetter.Get(ctx, c)
if err != nil {
return nil, fmt.Errorf("getting block %s: %w", c, err)
}

// Get the size of the block and metadata
ldsize := util.LdSize(nd.Cid().Bytes(), nd.RawData())

// Check if the offset from which to start writing is in or before this
// block
nextBlockOffset := offset + ldsize
if writeOffset < nextBlockOffset {
// Check if the offset from which to start writing is in this block
var blockWriteOffset uint64
if writeOffset >= offset {
blockWriteOffset = writeOffset - offset
}

// Write the block data to the writer, starting at the block offset
_, err = skipWrite(w, blockWriteOffset, func(sw io.Writer) (int, error) {
return 0, util.LdWrite(sw, nd.Cid().Bytes(), nd.RawData())
})
if err != nil {
return nil, fmt.Errorf("writing CAR block %s: %w", c, err)
}
}

// Add the block to the cache
s.blockInfos[nd.Cid()] = &blockInfo{
offset: offset,
size: ldsize,
links: nd.Links(),
}

offset = nextBlockOffset

// Return any links from this block to other DAG blocks
return nd.Links(), nil
}

seen := cid.NewSet()
return merkledag.Walk(ctx, nextCid, s.payloadCid, seen.Visit)
}

// Write data to the writer, skipping the first skip bytes
func skipWrite(w io.Writer, skip uint64, write func(sw io.Writer) (int, error)) (int, error) {
// If there's nothing to skip, just do a normal write
if skip == 0 {
return write(w)
}

// Write to a buffer
var buff bytes.Buffer
if count, err := write(&buff); err != nil {
return count, err
}

// Write the buffer to the writer, skipping the first skip bytes
bz := buff.Bytes()
if skip >= uint64(len(bz)) {
return 0, nil
}
return w.Write(bz[skip:])
}
207 changes: 207 additions & 0 deletions v2/car_offset_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package car

import (
"bytes"
"context"
"io"
"math/rand"
"testing"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cidutil"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
chunk "github.com/ipfs/go-ipfs-chunker"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
"github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipld/go-car/v2/internal/carv1"
mh "github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)

func TestCarOffsetWriter(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(ds)
bserv := blockservice.New(bs, nil)
dserv := merkledag.NewDAGService(bserv)

rseed := 5
size := 2 * 1024 * 1024
source := io.LimitReader(rand.New(rand.NewSource(int64(rseed))), int64(size))
nd, err := DAGImport(dserv, source)
require.NoError(t, err)

// Write the CAR to a buffer from offset 0 so the buffer can be used for
// comparison
payloadCid := nd.Cid()
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var fullBuff bytes.Buffer
err = fullCarCow.Write(context.Background(), &fullBuff, 0)
require.NoError(t, err)

fullCar := fullBuff.Bytes()
header := carHeader(nd.Cid())
headerSize, err := carv1.HeaderSize(&header)

testCases := []struct {
name string
offset uint64
}{{
name: "1 byte offset",
offset: 1,
}, {
name: "offset < header size",
offset: headerSize - 1,
}, {
name: "offset == header size",
offset: headerSize,
}, {
name: "offset > header size",
offset: headerSize + 1,
}, {
name: "offset > header + one block size",
offset: headerSize + 1024*1024 + 512*1024,
}}

runTestCases := func(name string, runTCWithCow func() *CarOffsetWriter) {
for _, tc := range testCases {
t.Run(name+" - "+tc.name, func(t *testing.T) {
cow := runTCWithCow()
var buff bytes.Buffer
err = cow.Write(context.Background(), &buff, tc.offset)
require.NoError(t, err)
require.Equal(t, len(fullCar)-int(tc.offset), len(buff.Bytes()))
require.Equal(t, fullCar[tc.offset:], buff.Bytes())
})
}
}

// Run tests with a new CarOffsetWriter
runTestCases("new car offset writer", func() *CarOffsetWriter {
return NewCarOffsetWriter(payloadCid, bs)
})

// Run tests with a CarOffsetWriter that has already been used to write
// a CAR starting at offset 0
runTestCases("fully written car offset writer", func() *CarOffsetWriter {
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var buff bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff, 0)
require.NoError(t, err)
return fullCarCow
})

// Run tests with a CarOffsetWriter that has already been used to write
// a CAR starting at offset 1
runTestCases("car offset writer written from offset 1", func() *CarOffsetWriter {
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var buff bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff, 1)
require.NoError(t, err)
return fullCarCow
})

// Run tests with a CarOffsetWriter that has already been used to write
// a CAR starting part way through the second block
runTestCases("car offset writer written from offset 1.5 blocks", func() *CarOffsetWriter {
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var buff bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff, 1024*1024+512*1024)
require.NoError(t, err)
return fullCarCow
})

// Run tests with a CarOffsetWriter that has already been used to write
// a CAR repeatedly
runTestCases("car offset writer written from offset repeatedly", func() *CarOffsetWriter {
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var buff bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff, 1024)
require.NoError(t, err)
fullCarCow = NewCarOffsetWriter(payloadCid, bs)
var buff2 bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff2, 10)
require.NoError(t, err)
fullCarCow = NewCarOffsetWriter(payloadCid, bs)
var buff3 bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff3, 1024*1024+512*1024)
require.NoError(t, err)
return fullCarCow
})
}

func TestSkipWriter(t *testing.T) {
testCases := []struct {
name string
size int
skip int
expected int
}{{
name: "no skip",
size: 1024,
skip: 0,
expected: 1024,
}, {
name: "skip 1",
size: 1024,
skip: 1,
expected: 1023,
}, {
name: "skip all",
size: 1024,
skip: 1024,
expected: 0,
}, {
name: "skip overflow",
size: 1024,
skip: 1025,
expected: 0,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var buff bytes.Buffer
write := func(sw io.Writer) (int, error) {
bz := make([]byte, tc.size)
return sw.Write(bz)
}
count, err := skipWrite(&buff, uint64(tc.skip), write)
require.NoError(t, err)
require.Equal(t, tc.expected, count)
require.Equal(t, tc.expected, len(buff.Bytes()))
})
}
}

var DefaultHashFunction = uint64(mh.SHA2_256)

func DAGImport(dserv format.DAGService, fi io.Reader) (format.Node, error) {
prefix, err := merkledag.PrefixForCidVersion(1)
if err != nil {
return nil, err
}
prefix.MhType = DefaultHashFunction

spl := chunk.NewSizeSplitter(fi, 1024*1024)
dbp := helpers.DagBuilderParams{
Maxlinks: 1024,
RawLeaves: true,

CidBuilder: cidutil.InlineBuilder{
Builder: prefix,
Limit: 32,
},

Dagserv: dserv,
}

db, err := dbp.New(spl)
if err != nil {
return nil, err
}

return balanced.Layout(db)
}
Loading