Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Fix DagModifier for raw-leaves nodes #3901

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mfs/fd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type FileDescriptor interface {

type fileDescriptor struct {
inode *File
mod *mod.DagModifier
mod mod.DagModifier
perms int
sync bool
hasChanges bool
Expand Down
35 changes: 28 additions & 7 deletions test/sharness/t0250-files-api.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ test_description="test the unix files api"

test_init_ipfs

# setup files for testing
test_expect_success "can create some files for testing" '
FILE1=$(echo foo | ipfs add -q) &&
FILE2=$(echo bar | ipfs add -q) &&
FILE3=$(echo baz | ipfs add -q) &&
mkdir stuff_test &&
create_files() {
add_args=( "$@" )
FILE1=$(echo foo | ipfs add ${add_args[@]} -q) &&
FILE2=$(echo bar | ipfs add ${add_args[@]} -q) &&
FILE3=$(echo baz | ipfs add ${add_args[@]} -q) &&
mkdir -f stuff_test &&
echo cats > stuff_test/a &&
echo dogs > stuff_test/b &&
echo giraffes > stuff_test/c &&
DIR1=$(ipfs add -q stuff_test | tail -n1)
DIR1=$(ipfs add -r ${add_args[@]} -q stuff_test | tail -n1)
}

# setup files for testing
test_expect_success "can create some files for testing" '
create_files
'

verify_path_exists() {
Expand Down Expand Up @@ -534,4 +539,20 @@ test_launch_ipfs_daemon
test_sharding
test_kill_ipfs_daemon


test_expect_success "enable sharding in config" '
ipfs config --json Experimental.ShardingEnabled false
'


test_launch_ipfs_daemon

test_expect_success "can create some files for testing with raw-leaves" '
create_files --raw-leaves
'

test_files_api

test_kill_ipfs_daemon

test_done
52 changes: 35 additions & 17 deletions unixfs/mod/dagmodifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,26 @@ var writebufferSize = 1 << 21

var log = logging.Logger("dagio")

// DagModifier interface allows for reading and modifying DAGs.
type DagModifier interface {
Read([]byte) (int, error)
Write(b []byte) (int, error)
Seek(offset int64, whence int) (int64, error)

WriteAt(b []byte, offset int64) (int, error)
CtxReadFull(ctx context.Context, b []byte) (int, error)
Sync() error
Truncate(int64) error

Size() (int64, error)
GetNode() (node.Node, error)
HasChanges() bool
}

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
type protoDagModifier struct {
dagserv mdag.DAGService
curNode *mdag.ProtoNode

Expand All @@ -46,13 +62,15 @@ type DagModifier struct {
read uio.DagReader
}

func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
// NewDagModifier creates new instance of DagModifier that allows for reading
// writing and other operations on DAG.
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (DagModifier, error) {
pbn, ok := from.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
}

return &DagModifier{
return &protoDagModifier{
curNode: pbn.Copy().(*mdag.ProtoNode),
dagserv: serv,
splitter: spl,
Expand All @@ -61,7 +79,7 @@ func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, s
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
func (dm *protoDagModifier) WriteAt(b []byte, offset int64) (int, error) {
// TODO: this is currently VERY inneficient
// each write that happens at an offset other than the current one causes a
// flush to disk, and dag rewrite
Expand Down Expand Up @@ -104,7 +122,7 @@ func (zr zeroReader) Read(b []byte) (int, error) {

// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
func (dm *DagModifier) expandSparse(size int64) error {
func (dm *protoDagModifier) expandSparse(size int64) error {
r := io.LimitReader(zeroReader{}, size)
spl := chunk.NewSizeSplitter(r, 4096)
nnode, err := dm.appendData(dm.curNode, spl)
Expand All @@ -126,7 +144,7 @@ func (dm *DagModifier) expandSparse(size int64) error {
}

// Write continues writing to the dag at the current offset
func (dm *DagModifier) Write(b []byte) (int, error) {
func (dm *protoDagModifier) Write(b []byte) (int, error) {
if dm.read != nil {
dm.read = nil
}
Expand All @@ -148,7 +166,7 @@ func (dm *DagModifier) Write(b []byte) (int, error) {
return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
func (dm *protoDagModifier) Size() (int64, error) {
pbn, err := ft.FromBytes(dm.curNode.Data())
if err != nil {
return 0, err
Expand All @@ -164,7 +182,7 @@ func (dm *DagModifier) Size() (int64, error) {
}

// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
func (dm *protoDagModifier) Sync() error {
// No buffer? Nothing to do
if dm.wrBuf == nil {
return nil
Expand Down Expand Up @@ -226,7 +244,7 @@ func (dm *DagModifier) Sync() error {
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
func (dm *DagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
func (dm *protoDagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
f, err := ft.FromBytes(node.Data())
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -305,7 +323,7 @@ func (dm *DagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Re
}

// appendData appends the blocks from the given chan to the end of this dag
func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (node.Node, error) {
func (dm *protoDagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (node.Node, error) {
dbp := &help.DagBuilderParams{
Dagserv: dm.dagserv,
Maxlinks: help.DefaultLinksPerBlock,
Expand All @@ -315,7 +333,7 @@ func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (nod
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) Read(b []byte) (int, error) {
func (dm *protoDagModifier) Read(b []byte) (int, error) {
err := dm.readPrep()
if err != nil {
return 0, err
Expand All @@ -326,7 +344,7 @@ func (dm *DagModifier) Read(b []byte) (int, error) {
return n, err
}

func (dm *DagModifier) readPrep() error {
func (dm *protoDagModifier) readPrep() error {
err := dm.Sync()
if err != nil {
return err
Expand Down Expand Up @@ -359,7 +377,7 @@ func (dm *DagModifier) readPrep() error {
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
func (dm *protoDagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
err := dm.readPrep()
if err != nil {
return 0, err
Expand All @@ -371,7 +389,7 @@ func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.ProtoNode, error) {
func (dm *protoDagModifier) GetNode() (node.Node, error) {
err := dm.Sync()
if err != nil {
return nil, err
Expand All @@ -380,11 +398,11 @@ func (dm *DagModifier) GetNode() (*mdag.ProtoNode, error) {
}

// HasChanges returned whether or not there are unflushed changes to this dag
func (dm *DagModifier) HasChanges() bool {
func (dm *protoDagModifier) HasChanges() bool {
return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
func (dm *protoDagModifier) Seek(offset int64, whence int) (int64, error) {
err := dm.Sync()
if err != nil {
return 0, err
Expand Down Expand Up @@ -425,7 +443,7 @@ func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
func (dm *protoDagModifier) Truncate(size int64) error {
err := dm.Sync()
if err != nil {
return err
Expand Down
20 changes: 11 additions & 9 deletions unixfs/mod/dagmodifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blocksto
return dserv, bstore
}

func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte {
func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm DagModifier,
dserv mdag.DAGService) []byte {

newdata := make([]byte, size)
r := u.NewTimeSeededRand()
r.Read(newdata)
Expand All @@ -55,12 +57,12 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier)
t.Fatal(err)
}

err = trickle.VerifyTrickleDagStructure(nd, dm.dagserv, h.DefaultLinksPerBlock, 4)
err = trickle.VerifyTrickleDagStructure(nd, dserv, h.DefaultLinksPerBlock, 4)
if err != nil {
t.Fatal(err)
}

rd, err := uio.NewDagReader(context.Background(), nd, dm.dagserv)
rd, err := uio.NewDagReader(context.Background(), nd, dserv)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -93,34 +95,34 @@ func TestDagModifierBasic(t *testing.T) {
length := uint64(60)

t.Log("Testing mod within zero block")
b = testModWrite(t, beg, length, b, dagmod)
b = testModWrite(t, beg, length, b, dagmod, dserv)

// Within bounds of existing file
beg = 1000
length = 4000
t.Log("Testing mod within bounds of existing multiblock file.")
b = testModWrite(t, beg, length, b, dagmod)
b = testModWrite(t, beg, length, b, dagmod, dserv)

// Extend bounds
beg = 49500
length = 4000

t.Log("Testing mod that extends file.")
b = testModWrite(t, beg, length, b, dagmod)
b = testModWrite(t, beg, length, b, dagmod, dserv)

// "Append"
beg = uint64(len(b))
length = 3000
t.Log("Testing pure append")
b = testModWrite(t, beg, length, b, dagmod)
b = testModWrite(t, beg, length, b, dagmod, dserv)

// Verify reported length
node, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}

size, err := ft.DataSize(node.Data())
size, err := ft.DataSize(node.(*mdag.ProtoNode).Data())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -638,7 +640,7 @@ func TestReadAndSeek(t *testing.T) {
// skip 4
_, err = dagmod.Seek(1, os.SEEK_CUR)
if err != nil {
t.Fatalf("error: %s, offset %d, reader offset %d", err, dagmod.curWrOff, dagmod.read.Offset())
t.Fatal(err)
}

//read 5,6,7
Expand Down