Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add option to create blockstore that writes a plain CARv1 #288

Merged
merged 3 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 72 additions & 38 deletions v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ type ReadWrite struct {
opts carv2.Options
}

// WriteAsCarV1 is a write option which makes a CAR blockstore write the output
// as a CARv1 only, with no CARv2 header or index. Indexing is used internally
// during write but is discarded upon finalization.
//
// Note that this option only affects the blockstore, and is ignored by the root
// go-car/v2 package.
func WriteAsCarV1(asCarV1 bool) carv2.Option {
return func(o *carv2.Options) {
o.WriteAsCarV1 = asCarV1
}
}

// AllowDuplicatePuts is a write option which makes a CAR blockstore not
// deduplicate blocks in Put and PutMany. The default is to deduplicate,
// which matches the current semantics of go-ipfs-blockstore v1.
Expand Down Expand Up @@ -122,33 +134,39 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.Option) (*ReadWri
rwbs.header = rwbs.header.WithIndexPadding(p)
}

rwbs.dataWriter = internalio.NewOffsetWriter(rwbs.f, int64(rwbs.header.DataOffset))
v1r := internalio.NewOffsetReadSeeker(rwbs.f, int64(rwbs.header.DataOffset))
offset := int64(rwbs.header.DataOffset)
if rwbs.opts.WriteAsCarV1 {
offset = 0
}
rwbs.dataWriter = internalio.NewOffsetWriter(rwbs.f, offset)
v1r := internalio.NewOffsetReadSeeker(rwbs.f, offset)
rwbs.ronly.backing = v1r
rwbs.ronly.idx = rwbs.idx
rwbs.ronly.carv2Closer = rwbs.f

if resume {
if err = rwbs.resumeWithRoots(roots); err != nil {
if err = rwbs.resumeWithRoots(!rwbs.opts.WriteAsCarV1, roots); err != nil {
return nil, err
}
} else {
if err = rwbs.initWithRoots(roots); err != nil {
if err = rwbs.initWithRoots(!rwbs.opts.WriteAsCarV1, roots); err != nil {
return nil, err
}
}

return rwbs, nil
}

func (b *ReadWrite) initWithRoots(roots []cid.Cid) error {
if _, err := b.f.WriteAt(carv2.Pragma, 0); err != nil {
return err
func (b *ReadWrite) initWithRoots(v2 bool, roots []cid.Cid) error {
if v2 {
if _, err := b.f.WriteAt(carv2.Pragma, 0); err != nil {
return err
}
}
return carv1.WriteHeader(&carv1.CarHeader{Roots: roots, Version: 1}, b.dataWriter)
}

func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error {
func (b *ReadWrite) resumeWithRoots(v2 bool, roots []cid.Cid) error {
// On resumption it is expected that the CARv2 Pragma, and the CARv1 header is successfully written.
// Otherwise we cannot resume from the file.
// Read pragma to assert if b.f is indeed a CARv2.
Expand All @@ -158,36 +176,42 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error {
// Or the write must have failed before pragma was written.
return err
}
if version != 2 {
// The file is not a CARv2 and we cannot resume from it.
switch {
case version == 1 && !v2:
case version == 2 && v2:
default:
// The file is not the expected version and we cannot resume from it.
return fmt.Errorf("cannot resume on CAR file with version %v", version)
}

// Check if file was finalized by trying to read the CARv2 header.
// We check because if finalized the CARv1 reader behaviour needs to be adjusted since
// EOF will not signify end of CARv1 payload. i.e. index is most likely present.
var headerInFile carv2.Header
_, err = headerInFile.ReadFrom(internalio.NewOffsetReadSeeker(b.f, carv2.PragmaSize))

// If reading CARv2 header succeeded, and CARv1 offset in header is not zero then the file is
// most-likely finalized. Check padding and truncate the file to remove index.
// Otherwise, carry on reading the v1 payload at offset determined from b.header.
if err == nil && headerInFile.DataOffset != 0 {
if headerInFile.DataOffset != b.header.DataOffset {
// Assert that the padding on file matches the given WithDataPadding option.
wantPadding := headerInFile.DataOffset - carv2.PragmaSize - carv2.HeaderSize
gotPadding := b.header.DataOffset - carv2.PragmaSize - carv2.HeaderSize
return fmt.Errorf(
"cannot resume from file with mismatched CARv1 offset; "+
"`WithDataPadding` option must match the padding on file. "+
"Expected padding value of %v but got %v", wantPadding, gotPadding,
)
} else if headerInFile.DataSize == 0 {
// If CARv1 size is zero, since CARv1 offset wasn't, then the CARv2 header was
// most-likely partially written. Since we write the header last in Finalize then the
// file most-likely contains the index and we cannot know where it starts, therefore
// can't resume.
return errors.New("corrupt CARv2 header; cannot resume from file")

if v2 {
// Check if file was finalized by trying to read the CARv2 header.
// We check because if finalized the CARv1 reader behaviour needs to be adjusted since
// EOF will not signify end of CARv1 payload. i.e. index is most likely present.
_, err = headerInFile.ReadFrom(internalio.NewOffsetReadSeeker(b.f, carv2.PragmaSize))

// If reading CARv2 header succeeded, and CARv1 offset in header is not zero then the file is
// most-likely finalized. Check padding and truncate the file to remove index.
// Otherwise, carry on reading the v1 payload at offset determined from b.header.
if err == nil && headerInFile.DataOffset != 0 {
if headerInFile.DataOffset != b.header.DataOffset {
// Assert that the padding on file matches the given WithDataPadding option.
wantPadding := headerInFile.DataOffset - carv2.PragmaSize - carv2.HeaderSize
gotPadding := b.header.DataOffset - carv2.PragmaSize - carv2.HeaderSize
return fmt.Errorf(
"cannot resume from file with mismatched CARv1 offset; "+
"`WithDataPadding` option must match the padding on file. "+
"Expected padding value of %v but got %v", wantPadding, gotPadding,
)
} else if headerInFile.DataSize == 0 {
// If CARv1 size is zero, since CARv1 offset wasn't, then the CARv2 header was
// most-likely partially written. Since we write the header last in Finalize then the
// file most-likely contains the index and we cannot know where it starts, therefore
// can't resume.
return errors.New("corrupt CARv2 header; cannot resume from file")
}
}
}

Expand All @@ -213,10 +237,13 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error {
return err
}
}
// Now that CARv2 header is present on file, clear it to avoid incorrect size and offset in
// header in case blocksotre is closed without finalization and is resumed from.
if err := b.unfinalize(); err != nil {
return fmt.Errorf("could not un-finalize: %w", err)

if v2 {
// Now that CARv2 header is present on file, clear it to avoid incorrect size and offset in
// header in case blocksotre is closed without finalization and is resumed from.
if err := b.unfinalize(); err != nil {
return fmt.Errorf("could not un-finalize: %w", err)
}
}

// TODO See how we can reduce duplicate code here.
Expand Down Expand Up @@ -354,6 +381,13 @@ func (b *ReadWrite) Discard() {
// for more efficient subsequent read.
// After this call, the blockstore can no longer be used.
func (b *ReadWrite) Finalize() error {
if b.opts.WriteAsCarV1 {
// all blocks are already properly written to the CARv1 inner container and there's
// no additional finalization required at the end of the file for a complete v1
b.ronly.Close()
return nil
}

b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()

Expand Down
171 changes: 106 additions & 65 deletions v2/blockstore/readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,81 +47,122 @@ func TestReadWriteGetReturnsBlockstoreNotFoundWhenCidDoesNotExist(t *testing.T)
require.Nil(t, gotBlock)
}

func TestBlockstore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
func TestBlockstoreX(t *testing.T) {
originalCARv1Path := "../testdata/sample-v1.car"
originalCARv1ComparePath := "../testdata/sample-v1-noidentity.car"
originalCARv1ComparePathStat, err := os.Stat(originalCARv1ComparePath)
require.NoError(t, err)

variants := []struct {
name string
options []carv2.Option
expectedV1StartOffset int64
}{
// no options, expect a standard CARv2 with the noidentity inner CARv1
{"noopt_carv2", []carv2.Option{}, int64(carv2.PragmaSize + carv2.HeaderSize)},
// option to only write as a CARv1, expect the noidentity inner CARv1
{"carv1", []carv2.Option{blockstore.WriteAsCarV1(true)}, int64(0)},
}

f, err := os.Open("../testdata/sample-v1.car")
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, f.Close()) })
r, err := carv1.NewCarReader(f)
require.NoError(t, err)
for _, variant := range variants {
t.Run(variant.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

path := filepath.Join(t.TempDir(), "readwrite.car")
ingester, err := blockstore.OpenReadWrite(path, r.Header.Roots)
require.NoError(t, err)
t.Cleanup(func() { ingester.Finalize() })
f, err := os.Open(originalCARv1Path)
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, f.Close()) })
r, err := carv1.NewCarReader(f)
require.NoError(t, err)

cids := make([]cid.Cid, 0)
var idCidCount int
for {
b, err := r.Next()
if err == io.EOF {
break
}
require.NoError(t, err)
path := filepath.Join(t.TempDir(), fmt.Sprintf("readwrite_%s.car", variant.name))
ingester, err := blockstore.OpenReadWrite(path, r.Header.Roots, variant.options...)
require.NoError(t, err)
t.Cleanup(func() { ingester.Finalize() })

cids := make([]cid.Cid, 0)
var idCidCount int
for {
b, err := r.Next()
if err == io.EOF {
break
}
require.NoError(t, err)

err = ingester.Put(ctx, b)
require.NoError(t, err)
cids = append(cids, b.Cid())
err = ingester.Put(ctx, b)
require.NoError(t, err)
cids = append(cids, b.Cid())

// try reading a random one:
candidate := cids[rng.Intn(len(cids))]
if has, err := ingester.Has(ctx, candidate); !has || err != nil {
t.Fatalf("expected to find %s but didn't: %s", candidate, err)
}
// try reading a random one:
candidate := cids[rng.Intn(len(cids))]
if has, err := ingester.Has(ctx, candidate); !has || err != nil {
t.Fatalf("expected to find %s but didn't: %s", candidate, err)
}

dmh, err := multihash.Decode(b.Cid().Hash())
require.NoError(t, err)
if dmh.Code == multihash.IDENTITY {
idCidCount++
}
}
dmh, err := multihash.Decode(b.Cid().Hash())
require.NoError(t, err)
if dmh.Code == multihash.IDENTITY {
idCidCount++
}
}

for _, c := range cids {
b, err := ingester.Get(ctx, c)
require.NoError(t, err)
if !b.Cid().Equals(c) {
t.Fatal("wrong item returned")
}
}
for _, c := range cids {
b, err := ingester.Get(ctx, c)
require.NoError(t, err)
if !b.Cid().Equals(c) {
t.Fatal("wrong item returned")
}
}

err = ingester.Finalize()
require.NoError(t, err)
robs, err := blockstore.OpenReadOnly(path)
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, robs.Close()) })
err = ingester.Finalize()
require.NoError(t, err)
robs, err := blockstore.OpenReadOnly(path)
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, robs.Close()) })

allKeysCh, err := robs.AllKeysChan(ctx)
require.NoError(t, err)
numKeysCh := 0
for c := range allKeysCh {
b, err := robs.Get(ctx, c)
require.NoError(t, err)
if !b.Cid().Equals(c) {
t.Fatal("wrong item returned")
}
numKeysCh++
}
expectedCidCount := len(cids) - idCidCount
require.Equal(t, expectedCidCount, numKeysCh, "AllKeysChan returned an unexpected amount of keys; expected %v but got %v", expectedCidCount, numKeysCh)
allKeysCh, err := robs.AllKeysChan(ctx)
require.NoError(t, err)
numKeysCh := 0
for c := range allKeysCh {
b, err := robs.Get(ctx, c)
require.NoError(t, err)
if !b.Cid().Equals(c) {
t.Fatal("wrong item returned")
}
numKeysCh++
}
expectedCidCount := len(cids) - idCidCount
require.Equal(t, expectedCidCount, numKeysCh, "AllKeysChan returned an unexpected amount of keys; expected %v but got %v", expectedCidCount, numKeysCh)

for _, c := range cids {
b, err := robs.Get(ctx, c)
require.NoError(t, err)
if !b.Cid().Equals(c) {
t.Fatal("wrong item returned")
}
for _, c := range cids {
b, err := robs.Get(ctx, c)
require.NoError(t, err)
if !b.Cid().Equals(c) {
t.Fatal("wrong item returned")
}
}

wrote, err := os.Open(path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, wrote.Close()) })
_, err = wrote.Seek(variant.expectedV1StartOffset, io.SeekStart)
require.NoError(t, err)
hasher := sha512.New()
gotWritten, err := io.Copy(hasher, io.LimitReader(wrote, originalCARv1ComparePathStat.Size()))
require.NoError(t, err)
gotSum := hasher.Sum(nil)

hasher.Reset()
originalCarV1, err := os.Open(originalCARv1ComparePath)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, originalCarV1.Close()) })
wantWritten, err := io.Copy(hasher, originalCarV1)
require.NoError(t, err)
wantSum := hasher.Sum(nil)

require.Equal(t, wantWritten, gotWritten)
require.Equal(t, wantSum, gotSum)
})
}
}

Expand Down
1 change: 1 addition & 0 deletions v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Options struct {
BlockstoreAllowDuplicatePuts bool
BlockstoreUseWholeCIDs bool
MaxTraversalLinks uint64
WriteAsCarV1 bool
}

// ApplyOptions applies given opts and returns the resulting Options.
Expand Down
Binary file added v2/testdata/sample-v1-noidentity.car
Binary file not shown.