Skip to content
This repository has been archived by the owner on Oct 22, 2021. It is now read-only.

fix: Some error in storage is not handled #23

Merged
merged 8 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/intergration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ jobs:
matrix:
go: [ "1.15", "1.16" ]
hdfs-version:
- 3.2.2
- 3.3.0
- 3.3.1
os: [ubuntu-latest]
Expand Down
89 changes: 63 additions & 26 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,43 @@ func (s *Storage) create(path string, opt pairStorageCreate) (o *Object) {
func (s *Storage) createAppend(ctx context.Context, path string, opt pairStorageCreateAppend) (o *Object, err error) {
rp := s.getAbsPath(path)
dir := filepath.Dir(rp)
err = s.hdfs.MkdirAll(dir, 0666)

// If dirname is already a directory,
// MkdirAll does nothing and returns nil.
err = s.hdfs.MkdirAll(dir, 0755)
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
// If dirname is not exist ,it will create a Mkdir rpc communication
// So we just need to catch other errors
if err != nil {
return
return nil, err
}

_, err = s.hdfs.Stat(rp)

// The error returned by Stat can only be nil or not os.ErrNotExist
if err == nil {
err = s.hdfs.Remove(rp)
if err != nil && errors.Is(err, os.ErrNotExist) {
// Omit `file not exist` error here
// ref: [GSP-46](https://github.com/beyondstorage/specs/blob/master/rfcs/46-idempotent-delete.md)
err = nil
// If the path does not exist,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the error returned by Stat is nil, the path must exist. I think we don't need to explain the detailed behavior of our hdfs SDK. Instead, we need to comment on why we need to do remove it here.

// RemoveAll returns nil (no error).
err = s.hdfs.RemoveAll(rp)
if err != nil {
return nil, err
}
}

// This ensures that err can only be os.ErrNotExist
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, err
}

f, err := s.hdfs.Create(rp)

if err != nil {
return
return nil, err
}
defer func() {
f.Close()
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()

o = s.newObject(true)
Expand All @@ -69,9 +84,13 @@ func (s *Storage) createAppend(ctx context.Context, path string, opt pairStorage
func (s *Storage) createDir(ctx context.Context, path string, opt pairStorageCreateDir) (o *Object, err error) {
rp := s.getAbsPath(path)

// If dirname is already a directory,
// MkdirAll does nothing and returns nil.
err = s.hdfs.MkdirAll(rp, 0755)
// If dirname is not exist ,it will create a Mkdir rpc communication
// So we just need to catch other errors
if err != nil {
return
return nil, err
}

o = s.newObject(true)
Expand All @@ -83,12 +102,11 @@ func (s *Storage) createDir(ctx context.Context, path string, opt pairStorageCre

func (s *Storage) delete(ctx context.Context, path string, opt pairStorageDelete) (err error) {
rp := s.getAbsPath(path)
err = s.hdfs.Remove(rp)
if err != nil && errors.Is(err, os.ErrNotExist) {
// Omit `file not exist` error here
// ref: [GSP-46](https://github.com/beyondstorage/specs/blob/master/rfcs/46-idempotent-delete.md)
err = nil
}

// If the path does not exist,
// RemoveAll returns nil (no error).
// If there is an error here other than os.ErrNotExist is also thrown directly
err = s.hdfs.RemoveAll(rp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoveAll is not a good idea. As we only allow remove one object at a time.

return err
}

Expand Down Expand Up @@ -120,12 +138,16 @@ func (s *Storage) move(ctx context.Context, src string, dst string, opt pairStor
return services.ErrObjectModeInvalid
}
}

return s.hdfs.Rename(rs, rd)
}

func (s *Storage) read(ctx context.Context, path string, w io.Writer, opt pairStorageRead) (n int64, err error) {
rp := s.getAbsPath(path)
f, err := s.hdfs.Open(rp)
if err != nil {
return 0, err
}

defer func() {
closeErr := f.Close()
Expand All @@ -134,9 +156,6 @@ func (s *Storage) read(ctx context.Context, path string, w io.Writer, opt pairSt
}
}()

if err != nil {
return 0, err
}
if opt.HasOffset {
_, err := f.Seek(opt.Offset, 0)
if err != nil {
Expand Down Expand Up @@ -186,24 +205,37 @@ func (s *Storage) stat(ctx context.Context, path string, opt pairStorageStat) (o
func (s *Storage) write(ctx context.Context, path string, r io.Reader, size int64, opt pairStorageWrite) (n int64, err error) {
rp := s.getAbsPath(path)
dir := filepath.Dir(rp)
err = s.hdfs.MkdirAll(dir, 0666)

// If dirname is already a directory,
// MkdirAll does nothing and returns nil.
err = s.hdfs.MkdirAll(dir, 0755)
// If dirname is not exist ,it will create a Mkdir rpc communication
// So we just need to catch other errors
if err != nil {
return 0, err
}

_, err = s.hdfs.Stat(rp)
if err == nil {
err = s.hdfs.Remove(rp)
if err != nil && errors.Is(err, os.ErrNotExist) {
// Omit `file not exist` error here
// ref: [GSP-46](https://github.com/beyondstorage/specs/blob/master/rfcs/46-idempotent-delete.md)
err = nil
// If the path does not exist,
// RemoveAll returns nil (no error).
err = s.hdfs.RemoveAll(rp)

if err != nil {
return 0, err
}
}

// This ensures that err can only be os.ErrNotExist
if err != nil && !errors.Is(err, os.ErrNotExist) {
return 0, err
}

f, err := s.hdfs.Create(rp)
if err != nil {
return 0, err
}

defer func() {
closeErr := f.Close()
if err == nil {
Expand All @@ -220,11 +252,16 @@ func (s *Storage) write(ctx context.Context, path string, r io.Reader, size int6

func (s *Storage) writeAppend(ctx context.Context, o *Object, r io.Reader, size int64, opt pairStorageWriteAppend) (n int64, err error) {
f, err := s.hdfs.Append(o.ID)

if err != nil {
return
}

defer func() {
f.Close()
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()

return io.CopyN(f, r, size)
Expand Down