Skip to content

Commit

Permalink
feat: CAR offset writer
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored and masih committed Jul 7, 2022
1 parent 68fe60d commit 89de813
Show file tree
Hide file tree
Showing 6 changed files with 677 additions and 7 deletions.
168 changes: 168 additions & 0 deletions v2/car_offset_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package car

import (
"bytes"
"context"
"fmt"
"io"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipld/go-car/v2/internal/carv1"
"github.com/ipld/go-car/v2/internal/carv1/util"
)

type blockInfo struct {
offset uint64
// Note: size is the size of the block and metadata
size uint64
links []*format.Link
}

// CarOffsetWriter turns a blockstore and a root CID into a CAR file stream,
// starting from an offset
type CarOffsetWriter struct {
payloadCid cid.Cid
nodeGetter format.NodeGetter
blockInfos map[cid.Cid]*blockInfo
header carv1.CarHeader
}

func NewCarOffsetWriter(payloadCid cid.Cid, bstore blockstore.Blockstore) *CarOffsetWriter {
ng := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore)))
return &CarOffsetWriter{
payloadCid: payloadCid,
nodeGetter: ng,
blockInfos: make(map[cid.Cid]*blockInfo),
header: carHeader(payloadCid),
}
}

func carHeader(payloadCid cid.Cid) carv1.CarHeader {
return carv1.CarHeader{
Roots: []cid.Cid{payloadCid},
Version: 1,
}
}

func (s *CarOffsetWriter) Write(ctx context.Context, w io.Writer, offset uint64) error {
headerSize, err := s.writeHeader(w, offset)
if err != nil {
return err
}

return s.writeBlocks(ctx, w, headerSize, offset)
}

func (s *CarOffsetWriter) writeHeader(w io.Writer, offset uint64) (uint64, error) {
headerSize, err := carv1.HeaderSize(&s.header)
if err != nil {
return 0, fmt.Errorf("failed to size car header: %w", err)
}

// Check if the offset from which to start writing is after the header
if offset >= headerSize {
return headerSize, nil
}

// Write out the header, starting at the offset
_, err = skipWrite(w, offset, func(sw io.Writer) (int, error) {
return 0, carv1.WriteHeader(&s.header, sw)
})
if err != nil {
return 0, fmt.Errorf("failed to write car header: %w", err)
}

return headerSize, nil
}

func (s *CarOffsetWriter) writeBlocks(ctx context.Context, w io.Writer, headerSize uint64, writeOffset uint64) error {
// The first block's offset is the size of the header
offset := headerSize

// This function gets called for each CID during the merkle DAG walk
nextCid := func(ctx context.Context, c cid.Cid) ([]*format.Link, error) {
// There will be an item in the cache if writeBlocks has already been
// called before, and the DAG traversal reached this CID
cached, ok := s.blockInfos[c]
if ok {
// Check if the offset from which to start writing is after this
// block
nextBlockOffset := cached.offset + cached.size
if writeOffset >= nextBlockOffset {
// The offset from which to start writing is after this block
// so don't write anything, just skip over this block
offset = nextBlockOffset
return cached.links, nil
}
}

// Get the block from the blockstore
nd, err := s.nodeGetter.Get(ctx, c)
if err != nil {
return nil, fmt.Errorf("getting block %s: %w", c, err)
}

// Get the size of the block and metadata
ldsize := util.LdSize(nd.Cid().Bytes(), nd.RawData())

// Check if the offset from which to start writing is in or before this
// block
nextBlockOffset := offset + ldsize
if writeOffset < nextBlockOffset {
// Check if the offset from which to start writing is in this block
var blockWriteOffset uint64
if writeOffset >= offset {
blockWriteOffset = writeOffset - offset
}

// Write the block data to the writer, starting at the block offset
_, err = skipWrite(w, blockWriteOffset, func(sw io.Writer) (int, error) {
return 0, util.LdWrite(sw, nd.Cid().Bytes(), nd.RawData())
})
if err != nil {
return nil, fmt.Errorf("writing CAR block %s: %w", c, err)
}
}

// Add the block to the cache
s.blockInfos[nd.Cid()] = &blockInfo{
offset: offset,
size: ldsize,
links: nd.Links(),
}

offset = nextBlockOffset

// Return any links from this block to other DAG blocks
return nd.Links(), nil
}

seen := cid.NewSet()
return merkledag.Walk(ctx, nextCid, s.payloadCid, seen.Visit)
}

// Write data to the writer, skipping the first skip bytes
func skipWrite(w io.Writer, skip uint64, write func(sw io.Writer) (int, error)) (int, error) {
// If there's nothing to skip, just do a normal write
if skip == 0 {
return write(w)
}

// Write to a buffer
var buff bytes.Buffer
if count, err := write(&buff); err != nil {
return count, err
}

// Write the buffer to the writer, skipping the first skip bytes
bz := buff.Bytes()
if skip >= uint64(len(bz)) {
return 0, nil
}
return w.Write(bz[skip:])
}
207 changes: 207 additions & 0 deletions v2/car_offset_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package car

import (
"bytes"
"context"
"io"
"math/rand"
"testing"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cidutil"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
chunk "github.com/ipfs/go-ipfs-chunker"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
"github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipld/go-car/v2/internal/carv1"
mh "github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)

func TestCarOffsetWriter(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(ds)
bserv := blockservice.New(bs, nil)
dserv := merkledag.NewDAGService(bserv)

rseed := 5
size := 2 * 1024 * 1024
source := io.LimitReader(rand.New(rand.NewSource(int64(rseed))), int64(size))
nd, err := DAGImport(dserv, source)
require.NoError(t, err)

// Write the CAR to a buffer from offset 0 so the buffer can be used for
// comparison
payloadCid := nd.Cid()
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var fullBuff bytes.Buffer
err = fullCarCow.Write(context.Background(), &fullBuff, 0)
require.NoError(t, err)

fullCar := fullBuff.Bytes()
header := carHeader(nd.Cid())
headerSize, err := carv1.HeaderSize(&header)

testCases := []struct {
name string
offset uint64
}{{
name: "1 byte offset",
offset: 1,
}, {
name: "offset < header size",
offset: headerSize - 1,
}, {
name: "offset == header size",
offset: headerSize,
}, {
name: "offset > header size",
offset: headerSize + 1,
}, {
name: "offset > header + one block size",
offset: headerSize + 1024*1024 + 512*1024,
}}

runTestCases := func(name string, runTCWithCow func() *CarOffsetWriter) {
for _, tc := range testCases {
t.Run(name+" - "+tc.name, func(t *testing.T) {
cow := runTCWithCow()
var buff bytes.Buffer
err = cow.Write(context.Background(), &buff, tc.offset)
require.NoError(t, err)
require.Equal(t, len(fullCar)-int(tc.offset), len(buff.Bytes()))
require.Equal(t, fullCar[tc.offset:], buff.Bytes())
})
}
}

// Run tests with a new CarOffsetWriter
runTestCases("new car offset writer", func() *CarOffsetWriter {
return NewCarOffsetWriter(payloadCid, bs)
})

// Run tests with a CarOffsetWriter that has already been used to write
// a CAR starting at offset 0
runTestCases("fully written car offset writer", func() *CarOffsetWriter {
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var buff bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff, 0)
require.NoError(t, err)
return fullCarCow
})

// Run tests with a CarOffsetWriter that has already been used to write
// a CAR starting at offset 1
runTestCases("car offset writer written from offset 1", func() *CarOffsetWriter {
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var buff bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff, 1)
require.NoError(t, err)
return fullCarCow
})

// Run tests with a CarOffsetWriter that has already been used to write
// a CAR starting part way through the second block
runTestCases("car offset writer written from offset 1.5 blocks", func() *CarOffsetWriter {
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var buff bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff, 1024*1024+512*1024)
require.NoError(t, err)
return fullCarCow
})

// Run tests with a CarOffsetWriter that has already been used to write
// a CAR repeatedly
runTestCases("car offset writer written from offset repeatedly", func() *CarOffsetWriter {
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
var buff bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff, 1024)
require.NoError(t, err)
fullCarCow = NewCarOffsetWriter(payloadCid, bs)
var buff2 bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff2, 10)
require.NoError(t, err)
fullCarCow = NewCarOffsetWriter(payloadCid, bs)
var buff3 bytes.Buffer
err = fullCarCow.Write(context.Background(), &buff3, 1024*1024+512*1024)
require.NoError(t, err)
return fullCarCow
})
}

func TestSkipWriter(t *testing.T) {
testCases := []struct {
name string
size int
skip int
expected int
}{{
name: "no skip",
size: 1024,
skip: 0,
expected: 1024,
}, {
name: "skip 1",
size: 1024,
skip: 1,
expected: 1023,
}, {
name: "skip all",
size: 1024,
skip: 1024,
expected: 0,
}, {
name: "skip overflow",
size: 1024,
skip: 1025,
expected: 0,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var buff bytes.Buffer
write := func(sw io.Writer) (int, error) {
bz := make([]byte, tc.size)
return sw.Write(bz)
}
count, err := skipWrite(&buff, uint64(tc.skip), write)
require.NoError(t, err)
require.Equal(t, tc.expected, count)
require.Equal(t, tc.expected, len(buff.Bytes()))
})
}
}

var DefaultHashFunction = uint64(mh.SHA2_256)

func DAGImport(dserv format.DAGService, fi io.Reader) (format.Node, error) {
prefix, err := merkledag.PrefixForCidVersion(1)
if err != nil {
return nil, err
}
prefix.MhType = DefaultHashFunction

spl := chunk.NewSizeSplitter(fi, 1024*1024)
dbp := helpers.DagBuilderParams{
Maxlinks: 1024,
RawLeaves: true,

CidBuilder: cidutil.InlineBuilder{
Builder: prefix,
Limit: 32,
},

Dagserv: dserv,
}

db, err := dbp.New(spl)
if err != nil {
return nil, err
}

return balanced.Layout(db)
}
Loading

0 comments on commit 89de813

Please sign in to comment.