Skip to content
This repository has been archived by the owner on Oct 22, 2021. It is now read-only.

Commit

Permalink
fix: Some error in storage is not handled (#23)
Browse files Browse the repository at this point in the history
* ci:Fix dependabot automatic update merge error

Signed-off-by: bokket <3100563328@qq.com>

* remove hdfs version 3.2.2

Signed-off-by: bokket <3100563328@qq.com>

* fix appender bug

Signed-off-by: bokket <3100563328@qq.com>

* remove version 3.2.2

Signed-off-by: bokket <3100563328@qq.com>

* fix error Handling

Signed-off-by: bokket <3100563328@qq.com>

* remove some unwanted errors

Signed-off-by: bokket <3100563328@qq.com>

* fix createAppend operator mistake

Signed-off-by: bokket <3100563328@qq.com>

* modify delete operator

Signed-off-by: bokket <3100563328@qq.com>
  • Loading branch information
bokket authored Sep 8, 2021
1 parent 2e84277 commit ceaedae
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 11 deletions.
1 change: 0 additions & 1 deletion .github/workflows/intergration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ jobs:
matrix:
go: [ "1.15", "1.16" ]
hdfs-version:
- 3.2.2
- 3.3.0
- 3.3.1
os: [ubuntu-latest]
Expand Down
62 changes: 52 additions & 10 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,21 @@ func (s *Storage) create(path string, opt pairStorageCreate) (o *Object) {
func (s *Storage) createAppend(ctx context.Context, path string, opt pairStorageCreateAppend) (o *Object, err error) {
rp := s.getAbsPath(path)
dir := filepath.Dir(rp)
err = s.hdfs.MkdirAll(dir, 0666)

// If dirname is already a directory,
// MkdirAll does nothing and returns nil.
err = s.hdfs.MkdirAll(dir, 0755)
// If dirname is not exist ,it will create a Mkdir rpc communication
// So we just need to catch other errors
if err != nil {
return
return nil, err
}

_, err = s.hdfs.Stat(rp)

// The error returned by Stat can only be nil or not os.ErrNotExist
if err == nil {
// If the error returned by Stat is nil, the path must exist.
err = s.hdfs.Remove(rp)
if err != nil && errors.Is(err, os.ErrNotExist) {
// Omit `file not exist` error here
Expand All @@ -50,12 +57,21 @@ func (s *Storage) createAppend(ctx context.Context, path string, opt pairStorage
}
}

// This ensures that err can only be os.ErrNotExist
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, err
}

f, err := s.hdfs.Create(rp)

if err != nil {
return
return nil, err
}
defer func() {
f.Close()
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()

o = s.newObject(true)
Expand All @@ -69,9 +85,13 @@ func (s *Storage) createAppend(ctx context.Context, path string, opt pairStorage
func (s *Storage) createDir(ctx context.Context, path string, opt pairStorageCreateDir) (o *Object, err error) {
rp := s.getAbsPath(path)

// If dirname is already a directory,
// MkdirAll does nothing and returns nil.
err = s.hdfs.MkdirAll(rp, 0755)
// If dirname is not exist ,it will create a Mkdir rpc communication
// So we just need to catch other errors
if err != nil {
return
return nil, err
}

o = s.newObject(true)
Expand All @@ -83,12 +103,14 @@ func (s *Storage) createDir(ctx context.Context, path string, opt pairStorageCre

func (s *Storage) delete(ctx context.Context, path string, opt pairStorageDelete) (err error) {
rp := s.getAbsPath(path)

err = s.hdfs.Remove(rp)
if err != nil && errors.Is(err, os.ErrNotExist) {
// Omit `file not exist` error here
// ref: [GSP-46](https://github.com/beyondstorage/specs/blob/master/rfcs/46-idempotent-delete.md)
err = nil
}

return err
}

Expand Down Expand Up @@ -120,12 +142,16 @@ func (s *Storage) move(ctx context.Context, src string, dst string, opt pairStor
return services.ErrObjectModeInvalid
}
}

return s.hdfs.Rename(rs, rd)
}

func (s *Storage) read(ctx context.Context, path string, w io.Writer, opt pairStorageRead) (n int64, err error) {
rp := s.getAbsPath(path)
f, err := s.hdfs.Open(rp)
if err != nil {
return 0, err
}

defer func() {
closeErr := f.Close()
Expand All @@ -134,9 +160,6 @@ func (s *Storage) read(ctx context.Context, path string, w io.Writer, opt pairSt
}
}()

if err != nil {
return 0, err
}
if opt.HasOffset {
_, err := f.Seek(opt.Offset, 0)
if err != nil {
Expand Down Expand Up @@ -186,24 +209,38 @@ func (s *Storage) stat(ctx context.Context, path string, opt pairStorageStat) (o
func (s *Storage) write(ctx context.Context, path string, r io.Reader, size int64, opt pairStorageWrite) (n int64, err error) {
rp := s.getAbsPath(path)
dir := filepath.Dir(rp)
err = s.hdfs.MkdirAll(dir, 0666)

// If dirname is already a directory,
// MkdirAll does nothing and returns nil.
err = s.hdfs.MkdirAll(dir, 0755)
// If dirname is not exist ,it will create a Mkdir rpc communication
// So we just need to catch other errors
if err != nil {
return 0, err
}

_, err = s.hdfs.Stat(rp)
if err == nil {
// If the error returned by Stat is nil, the path must exist.
err = s.hdfs.Remove(rp)

if err != nil && errors.Is(err, os.ErrNotExist) {
// Omit `file not exist` error here
// ref: [GSP-46](https://github.com/beyondstorage/specs/blob/master/rfcs/46-idempotent-delete.md)
err = nil
}
}

// This ensures that err can only be os.ErrNotExist
if err != nil && !errors.Is(err, os.ErrNotExist) {
return 0, err
}

f, err := s.hdfs.Create(rp)
if err != nil {
return 0, err
}

defer func() {
closeErr := f.Close()
if err == nil {
Expand All @@ -220,11 +257,16 @@ func (s *Storage) write(ctx context.Context, path string, r io.Reader, size int6

func (s *Storage) writeAppend(ctx context.Context, o *Object, r io.Reader, size int64, opt pairStorageWriteAppend) (n int64, err error) {
f, err := s.hdfs.Append(o.ID)

if err != nil {
return
}

defer func() {
f.Close()
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()

return io.CopyN(f, r, size)
Expand Down

0 comments on commit ceaedae

Please sign in to comment.