Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propose traversal-based car creation #269

Merged
merged 10 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions v2/blockstore/insertionindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,13 @@ func (ii *insertionIndex) GetAll(c cid.Cid, fn func(uint64) bool) error {
return nil
}

func (ii *insertionIndex) Marshal(w io.Writer) error {
func (ii *insertionIndex) Marshal(w io.Writer) (uint64, error) {
l := uint64(0)
if err := binary.Write(w, binary.LittleEndian, int64(ii.items.Len())); err != nil {
return err
return l, err
}
l += 8

var err error
iter := func(i llrb.Item) bool {
if err = cbor.Encode(w, i.(recordDigest).Record); err != nil {
Expand All @@ -115,7 +118,7 @@ func (ii *insertionIndex) Marshal(w io.Writer) error {
return true
}
ii.items.AscendGreaterOrEqual(ii.items.Min(), iter)
return err
return l, err
}

func (ii *insertionIndex) Unmarshal(r io.Reader) error {
Expand Down
2 changes: 1 addition & 1 deletion v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (b *ReadWrite) Finalize() error {
if err != nil {
return err
}
if err := index.WriteTo(fi, internalio.NewOffsetWriter(b.f, int64(b.header.IndexOffset))); err != nil {
if _, err := index.WriteTo(fi, internalio.NewOffsetWriter(b.f, int64(b.header.IndexOffset))); err != nil {
return err
}
if _, err := b.header.WriteTo(internalio.NewOffsetWriter(b.f, carv2.PragmaSize)); err != nil {
Expand Down
12 changes: 5 additions & 7 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@ go 1.16

require (
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.0.8-0.20210716091050-de6c03deae1c
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.2
github.com/klauspost/cpuid/v2 v2.0.8 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.14.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211119015904-892d2f92ba8b
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61
github.com/multiformats/go-multihash v0.0.15
github.com/multiformats/go-multihash v0.1.0
github.com/multiformats/go-varint v0.0.6
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect
github.com/stretchr/testify v1.7.0
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
golang.org/x/exp v0.0.0-20210615023648-acb5c1269671
Expand Down
104 changes: 82 additions & 22 deletions v2/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion v2/index/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func ExampleWriteTo() {
panic(err)
}
}()
err = index.WriteTo(idx, f)
_, err = index.WriteTo(idx, f)
if err != nil {
panic(err)
}
Expand Down
16 changes: 11 additions & 5 deletions v2/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/ipfs/go-cid"
)

// CarIndexNoIndex is a sentinal value used as a multicodec code for the index indicating no index.
const CarIndexNoIndex = 0x300000
willscott marked this conversation as resolved.
Show resolved Hide resolved

type (
// Record is a pre-processed record of a car item and location.
Record struct {
Expand All @@ -40,7 +43,7 @@ type (
Codec() multicodec.Code

// Marshal encodes the index in serial form.
Marshal(w io.Writer) error
Marshal(w io.Writer) (uint64, error)
masih marked this conversation as resolved.
Show resolved Hide resolved
// Unmarshal decodes the index from its serial form.
Unmarshal(r io.Reader) error

Expand Down Expand Up @@ -118,13 +121,16 @@ func New(codec multicodec.Code) (Index, error) {
// WriteTo writes the given idx into w.
// The written bytes include the index encoding.
// This can then be read back using index.ReadFrom
func WriteTo(idx Index, w io.Writer) error {
func WriteTo(idx Index, w io.Writer) (uint64, error) {
buf := make([]byte, binary.MaxVarintLen64)
b := varint.PutUvarint(buf, uint64(idx.Codec()))
if _, err := w.Write(buf[:b]); err != nil {
return err
n, err := w.Write(buf[:b])
if err != nil {
return uint64(n), err
}
return idx.Marshal(w)

l, err := idx.Marshal(w)
return uint64(n) + l, err
}

// ReadFrom reads index from r.
Expand Down
6 changes: 4 additions & 2 deletions v2/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func TestWriteTo(t *testing.T) {
destF, err := os.Create(dest)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, destF.Close()) })
require.NoError(t, WriteTo(wantIdx, destF))
_, err = WriteTo(wantIdx, destF)
require.NoError(t, err)

// Seek to the beginning of the written out file.
_, err = destF.Seek(0, io.SeekStart)
Expand All @@ -126,6 +127,7 @@ func TestMarshalledIndexStartsWithCodec(t *testing.T) {

// Assert the first two bytes are the corresponding multicodec code.
buf := new(bytes.Buffer)
require.NoError(t, WriteTo(wantIdx, buf))
_, err = WriteTo(wantIdx, buf)
require.NoError(t, err)
require.Equal(t, varint.ToUvarint(uint64(multicodec.CarIndexSorted)), buf.Bytes()[:2])
}
28 changes: 17 additions & 11 deletions v2/index/indexsorted.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,18 @@ func (r recordSet) Swap(i, j int) {
r[i], r[j] = r[j], r[i]
}

func (s *singleWidthIndex) Marshal(w io.Writer) error {
func (s *singleWidthIndex) Marshal(w io.Writer) (uint64, error) {
l := uint64(0)
if err := binary.Write(w, binary.LittleEndian, s.width); err != nil {
return err
return 0, err
}
l += 4
if err := binary.Write(w, binary.LittleEndian, int64(len(s.index))); err != nil {
return err
return l, err
}
// TODO: we could just w.Write(s.index) here and avoid overhead
_, err := io.Copy(w, bytes.NewBuffer(s.index))
return err
l += 8
n, err := w.Write(s.index)
return l + uint64(n), err
}

func (s *singleWidthIndex) Unmarshal(r io.Reader) error {
Expand Down Expand Up @@ -158,10 +160,12 @@ func (m *multiWidthIndex) Codec() multicodec.Code {
return multicodec.CarIndexSorted
}

func (m *multiWidthIndex) Marshal(w io.Writer) error {
func (m *multiWidthIndex) Marshal(w io.Writer) (uint64, error) {
l := uint64(0)
if err := binary.Write(w, binary.LittleEndian, int32(len(*m))); err != nil {
return err
return l, err
}
l += 4

// The widths are unique, but ranging over a map isn't deterministic.
// As per the CARv2 spec, we must order buckets by digest length.
Expand All @@ -176,11 +180,13 @@ func (m *multiWidthIndex) Marshal(w io.Writer) error {

for _, width := range widths {
bucket := (*m)[width]
if err := bucket.Marshal(w); err != nil {
return err
n, err := bucket.Marshal(w)
l += n
if err != nil {
return l, err
}
}
return nil
return l, nil
}

func (m *multiWidthIndex) Unmarshal(r io.Reader) error {
Expand Down
20 changes: 12 additions & 8 deletions v2/index/mhindexsorted.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func newMultiWidthCodedIndex() *multiWidthCodedIndex {
}
}

func (m *multiWidthCodedIndex) Marshal(w io.Writer) error {
func (m *multiWidthCodedIndex) Marshal(w io.Writer) (uint64, error) {
if err := binary.Write(w, binary.LittleEndian, m.code); err != nil {
return err
return 8, err
}
return m.multiWidthIndex.Marshal(w)
n, err := m.multiWidthIndex.Marshal(w)
return 8 + n, err
}

func (m *multiWidthCodedIndex) Unmarshal(r io.Reader) error {
Expand All @@ -59,22 +60,25 @@ func (m *MultihashIndexSorted) Codec() multicodec.Code {
return multicodec.CarMultihashIndexSorted
}

func (m *MultihashIndexSorted) Marshal(w io.Writer) error {
func (m *MultihashIndexSorted) Marshal(w io.Writer) (uint64, error) {
if err := binary.Write(w, binary.LittleEndian, int32(len(*m))); err != nil {
return err
return 4, err
}
// The codes are unique, but ranging over a map isn't deterministic.
// As per the CARv2 spec, we must order buckets by digest length.
// TODO update CARv2 spec to reflect this for the new index type.
codes := m.sortedMultihashCodes()
l := uint64(4)

for _, code := range codes {
mwci := (*m)[code]
if err := mwci.Marshal(w); err != nil {
return err
n, err := mwci.Marshal(w)
l += n
if err != nil {
return l, err
}
}
return nil
return l, nil
}

func (m *MultihashIndexSorted) sortedMultihashCodes() []uint64 {
Expand Down
2 changes: 1 addition & 1 deletion v2/index/mhindexsorted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestMultiWidthCodedIndex_MarshalUnmarshal(t *testing.T) {

// Marshal the index.
buf := new(bytes.Buffer)
err = subject.Marshal(buf)
_, err = subject.Marshal(buf)
require.NoError(t, err)

// Unmarshal it back to another instance of mh sorted index.
Expand Down
67 changes: 67 additions & 0 deletions v2/internal/loader/counting_loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package loader

import (
"bytes"
"io"

"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking"
"github.com/multiformats/go-varint"
)

// counter tracks how much data has been read.
type counter struct {
totalRead uint64
}

func (c *counter) Size() uint64 {
return c.totalRead
}

// ReadCounter provides an externally consumable interface to the
// additional data tracked about the linksystem.
type ReadCounter interface {
Size() uint64
}

type countingReader struct {
r io.Reader
c *counter
}

func (c *countingReader) Read(p []byte) (int, error) {
n, err := c.r.Read(p)
c.c.totalRead += uint64(n)
return n, err
}

// CountingLinkSystem wraps an ipld linksystem with to track the size of
// data loaded in a `counter` object. Each time nodes are loaded from the
// link system which trigger block reads, the size of the block as it would
// appear in a CAR file is added to the counter (included the size of the
// CID and the varint length for the block data).
func CountingLinkSystem(ls ipld.LinkSystem) (ipld.LinkSystem, ReadCounter) {
c := counter{}
return linking.LinkSystem{
EncoderChooser: ls.EncoderChooser,
DecoderChooser: ls.DecoderChooser,
HasherChooser: ls.HasherChooser,
StorageWriteOpener: ls.StorageWriteOpener,
willscott marked this conversation as resolved.
Show resolved Hide resolved
StorageReadOpener: func(lc linking.LinkContext, l ipld.Link) (io.Reader, error) {
r, err := ls.StorageReadOpener(lc, l)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(nil)
n, err := buf.ReadFrom(r)
if err != nil {
return nil, err
}
size := varint.ToUvarint(uint64(n) + uint64(len(l.Binary())))
c.totalRead += uint64(len(size)) + uint64(len(l.Binary()))
return &countingReader{buf, &c}, nil
},
TrustedStorage: ls.TrustedStorage,
NodeReifier: ls.NodeReifier,
}, &c
}
Loading