Skip to content

Commit

Permalink
services, design: Propose and implement 21-split-segmenter (#270)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Apr 27, 2020
1 parent 714da1f commit 089c595
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 167 deletions.
72 changes: 72 additions & 0 deletions docs/design/21-split-segmenter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
---
author: Xuanwo <github@xuanwo.io>
status: candidate
updated_at: 2020-04-27
---

# Proposal: Split Segmenter

## Background

In proposal [18-return-segment-interface-instead], we introduced `segment.Segmenter` interface, but also introduce a new problem: user don't know which type of segment that service supported.

In the `Rationale` section of [18-return-segment-interface-instead], we have already discussed two implementations:

1. `OffsetBasedSegmenter` vs `IndexBasedSegmenter`
2. `WriteSegmentViaIndex` vs `WriteSegmentViaOffset`

We said:

> No.1 implementation's problem is these add too much work for services to support both of them.
But maybe this is the way, we can build new segmenter upon this idea.

## Proposal

Based on the idea of `OffsetBasedSegmenter` and `IndexBasedSegmenter`, I propose following changes:

Add new interface `IndexSegmenter` that implement following functions:

- `InitIndexSegment(path string, pairs ...*types.Pair) (seg segment.Segment, err error)`
- `WriteIndexSegment(seg segment.Segment, r io.Reader, index int, size int64, pairs ...*types.Pair) (err error)`
- `CompleteSegment(seg segment.Segment, pairs ...*types.Pair) (err error)`
- `AbortSegment(seg segment.Segment, pairs ...*types.Pair) (err error)`

Extract `CompleteSegment` and `AbortSegment` into a non-exported interface `segmenter`.

Embed `segmenter` into `IndexSegmenter`, `DirSegmentsLister` and `PrefixSegmentsLister`

Remove `Segmenter`

### Add new interface `IndexSegmenter`

It's easy to find out that we don't need to implement an entire new segmenter interface. After introducing `segment.Segmenter` interface, we can reuse already implemented `CompleteSegment` and `AbortSegment`.

After this change, the workload for service implementer reduced as the same. If new segment method needs to implemented, we only need to add `OffsetSegmenter`.

### Extract non-exported interface and embed into others

We already added `DirSegmentsLister` and `PrefixSegmentsLister`, but list only interface is not useful, we always need to convert to `Segmenter`. After extracting `CompleteSegment` and `AbortSegment` into a non-exported interface, we can make `DirSegmentsLister` and `PrefixSegmentsLister` more useful.

### Remove `Segmenter`

Yes, I know, I introduced an API breaking changes here.

I'm sorry, but I don't have the time and energy to maintain both v1 and v2 for now. Since [storage] is nearly no one except me used, I think it's safe to introduce this change here and not break others.

I'm aware the risk, and I will be responsible for this.

## Rationale

None.

## Compatibility

All API call related `Segmenter` will be borken.

## Implementation

Most of the work would be done by the author of this proposal.

[18-return-segment-interface-instead]: (./18-return-segment-interface-instead.md)
[storage]: https://github.com/Xuanwo/storage
68 changes: 68 additions & 0 deletions segmenter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package storage

import (
"context"
"io"

"github.com/Xuanwo/storage/pkg/segment"
"github.com/Xuanwo/storage/types"
)

type segmenter interface {
// CompleteSegment will complete a segment and merge them into a File.
//
// Implementer:
// - SHOULD return error while caller call CompleteSegment without init.
// Caller:
// - SHOULD call InitIndexSegment before CompleteSegment.
CompleteSegment(seg segment.Segment, pairs ...*types.Pair) (err error)
// CompleteSegmentWithContext will complete a segment and merge them into a File.
CompleteSegmentWithContext(ctx context.Context, seg segment.Segment, pairs ...*types.Pair) (err error)
// AbortSegment will abort a segment.
//
// Implementer:
// - SHOULD return error while caller call AbortSegment without init.
// Caller:
// - SHOULD call InitIndexSegment before AbortSegment.
AbortSegment(seg segment.Segment, pairs ...*types.Pair) (err error)
// AbortSegmentWithContext will abort a segment.
AbortSegmentWithContext(ctx context.Context, seg segment.Segment, pairs ...*types.Pair) (err error)
}

// DirSegmentsLister is used for directory based storage service to list segments under a dir.
type DirSegmentsLister interface {
segmenter

// ListDirSegments will list segments via dir.
ListDirSegments(path string, pairs ...*types.Pair) (err error)
// ListDirSegmentsWithContext will list segments via dir.
ListDirSegmentsWithContext(ctx context.Context, path string, pairs ...*types.Pair) (err error)
}

// PrefixSegmentsLister is used for prefix based storage service to list segments under a prefix.
type PrefixSegmentsLister interface {
segmenter

// ListSegments will list segments.
//
// Implementer:
// - If prefix == "", services should return all segments.
ListPrefixSegments(prefix string, pairs ...*types.Pair) (err error)
// ListSegmentsWithContext will list segments.
ListPrefixSegmentsWithContext(ctx context.Context, prefix string, pairs ...*types.Pair) (err error)
}

// IndexSegmenter is the interface for index based segment.
type IndexSegmenter interface {
segmenter

// InitIndexSegment will init an index based segment
InitIndexSegment(path string, pairs ...*types.Pair) (seg segment.Segment, err error)
// InitIndexSegmentWithContext will init an index based segment
InitIndexSegmentWithContext(ctx context.Context, path string, pairs ...*types.Pair) (seg segment.Segment, err error)

// WriteIndexSegment will write a part into an index based segment.
WriteIndexSegment(seg segment.Segment, r io.Reader, index int, size int64, pairs ...*types.Pair) (err error)
// WriteIndexSegmentWithContext will write a part into an index based segment.
WriteIndexSegmentWithContext(ctx context.Context, seg segment.Segment, r io.Reader, index int, size int64, pairs ...*types.Pair) (err error)
}
4 changes: 2 additions & 2 deletions services/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ const (

// Segment related op
OpListPrefixSegments = "list_prefix_segments"
OpInitSegment = "init_segment"
OpWriteSegment = "write_segment"
OpInitIndexSegment = "init_index_segment"
OpWriteIndexSegment = "write_index_segment"
OpCompleteSegment = "complete_segment"
OpAbortSegment = "abort_segment"
)
Expand Down
54 changes: 18 additions & 36 deletions services/qingstor/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions services/qingstor/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@
"checksum": false,
"size": true,
"storage_class": false
},
"write_segment": {
"index": true,
"size": true
}
}
}
20 changes: 10 additions & 10 deletions services/qingstor/storager.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,13 @@ func (s *Storage) ListPrefixSegments(prefix string, pairs ...*types.Pair) (err e
return
}

// InitSegment implements Storager.InitSegment
func (s *Storage) InitSegment(path string, pairs ...*types.Pair) (seg segment.Segment, err error) {
// InitIndexSegment implements Storager.InitIndexSegment
func (s *Storage) InitIndexSegment(path string, pairs ...*types.Pair) (seg segment.Segment, err error) {
defer func() {
err = s.formatError(services.OpInitSegment, err, path)
err = s.formatError(services.OpInitIndexSegment, err, path)
}()

_, err = s.parsePairInitSegment(pairs...)
_, err = s.parsePairInitIndexSegment(pairs...)
if err != nil {
return
}
Expand All @@ -438,18 +438,18 @@ func (s *Storage) InitSegment(path string, pairs ...*types.Pair) (seg segment.Se
return seg, nil
}

// WriteSegment implements Storager.WriteSegment
func (s *Storage) WriteSegment(seg segment.Segment, r io.Reader, pairs ...*types.Pair) (err error) {
// WriteIndexSegment implements Storager.WriteIndexSegment
func (s *Storage) WriteIndexSegment(seg segment.Segment, r io.Reader, index int, size int64, pairs ...*types.Pair) (err error) {
defer func() {
err = s.formatError(services.OpWriteSegment, err, seg.Path(), seg.ID())
err = s.formatError(services.OpWriteIndexSegment, err, seg.Path(), seg.ID())
}()

opt, err := s.parsePairWriteSegment(pairs...)
opt, err := s.parsePairWriteIndexSegment(pairs...)
if err != nil {
return
}

p, err := seg.(*segment.IndexBasedSegment).InsertPart(opt.Index, opt.Size)
p, err := seg.(*segment.IndexBasedSegment).InsertPart(index, size)
if err != nil {
return
}
Expand All @@ -463,7 +463,7 @@ func (s *Storage) WriteSegment(seg segment.Segment, r io.Reader, pairs ...*types
_, err = s.bucket.UploadMultipart(rp, &service.UploadMultipartInput{
PartNumber: service.Int(p.Index),
UploadID: service.String(seg.ID()),
ContentLength: &opt.Size,
ContentLength: &size,
Body: r,
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions services/qingstor/storager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func TestStorage_InitSegment(t *testing.T) {
bucket: mockBucket,
}

_, err := client.InitSegment(v.path)
_, err := client.InitIndexSegment(v.path)
if v.hasError {
assert.Error(t, err)
assert.True(t, errors.Is(err, v.wantErr))
Expand Down Expand Up @@ -673,7 +673,7 @@ func TestStorage_WriteSegment(t *testing.T) {
assert.Equal(t, id, *input.UploadID)
})

err := client.WriteSegment(seg, nil, pairs.WithSize(100), pairs.WithIndex(0))
err := client.WriteIndexSegment(seg, nil, 0, 100)
assert.NoError(t, err)
})
}
Expand Down
Loading

0 comments on commit 089c595

Please sign in to comment.