Skip to content

Commit

Permalink
Quick and dirty hack to add an optional requirement for segment delet…
Browse files Browse the repository at this point in the history
…ion: the segment must have been marked deletable. When the option is enabled, attempts to delete will block until all impacted segments are marked deletable.
  • Loading branch information
ncabatoff committed Nov 8, 2023
1 parent af6cc18 commit c85aa91
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 4 deletions.
6 changes: 6 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ func WithLogger(logger hclog.Logger) walOpt {
}
}

func WithRequireDeletable() walOpt {
return func(w *WAL) {
w.requireDeletable = true
}
}

// WithSegmentSize is an option that allows a custom segmentSize to be set.
func WithSegmentSize(size int) walOpt {
return func(w *WAL) {
Expand Down
3 changes: 3 additions & 0 deletions types/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type SegmentInfo struct {
// limit in the sense that the final Append usually takes the segment file
// past this size before it is considered full and sealed.
SizeLimit uint32

// DeletableTime records when a sealed segment was marked as deletable.
DeletableTime time.Time
}

// SegmentFiler is the interface that provides access to segments to the WAL. It
Expand Down
61 changes: 57 additions & 4 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ type WAL struct {
// checks if awaitRotate. If it is nil there is no rotation going on so
// StoreLogs can proceed. If it is non-nil, it releases the lock and then
// waits on the close before acquiring the lock and continuing.
triggerRotate chan uint64
awaitRotate chan struct{}
triggerRotate chan uint64
awaitRotate chan struct{}
requireDeletable bool
}

type walOpt func(*WAL)
Expand Down Expand Up @@ -351,8 +352,11 @@ func (w *WAL) GetLog(index uint64, log *raft.Log) error {

type ArchiverInterface interface {
GetSealedLogFiles(fromIndex uint64) ([]*SealedSegmentInfo, error)
MarkSealedLogDeletable(minIndex uint64) error
}

var _ ArchiverInterface = &WAL{}

type SealedSegmentInfo struct {
Path string
LogCount uint64
Expand Down Expand Up @@ -411,6 +415,20 @@ func (w *WAL) GetSealedLogFiles(fromIndex uint64) ([]*SealedSegmentInfo, error)
return sealedSegInfo, nil
}

func (w *WAL) MarkSealedLogDeletable(minIndex uint64) error {
txn := func(newState *state) (func(), func() error, error) {
seg, ok := newState.segments.Get(minIndex)
if !ok {
return nil, nil, ErrNotFound
}
seg.DeletableTime = time.Now()
newState.segments = newState.segments.Set(minIndex, seg)
return nil, nil, nil
}

return w.mutateStateLocked(txn)
}

// StoreLog stores a log entry.
func (w *WAL) StoreLog(log *raft.Log) error {
return w.StoreLogs([]*raft.Log{log})
Expand Down Expand Up @@ -767,6 +785,9 @@ func (w *WAL) resetEmptyFirstSegmentBaseIndex(newBaseIndex uint64) error {
// It's fine as it is, no-op
return nil, nil, nil
}
// TODO confirm that this cannot be a sealed segment, or if that's
// wrong, block until it's deletable.

// It needs to be removed
newState.segments = newState.segments.Delete(tailSeg.BaseIndex)
newState.tail = nil
Expand All @@ -791,6 +812,8 @@ func (w *WAL) resetEmptyFirstSegmentBaseIndex(newBaseIndex uint64) error {
return w.mutateStateLocked(txn)
}

var ErrUndeletable = errors.New("some segments are still pending deletability")

func (w *WAL) truncateHeadLocked(newMin uint64) error {
txn := stateTxn(func(newState *state) (func(), func() error, error) {
oldLastIndex := newState.lastIndex()
Expand Down Expand Up @@ -819,6 +842,10 @@ func (w *WAL) truncateHeadLocked(newMin uint64) error {
break
}

if w.requireDeletable && seg.DeletableTime.IsZero() {
return nil, nil, ErrUndeletable
}

toDelete[seg.ID] = seg.BaseIndex
toClose = append(toClose, seg.r)
newState.segments = newState.segments.Delete(seg.BaseIndex)
Expand Down Expand Up @@ -858,7 +885,18 @@ func (w *WAL) truncateHeadLocked(newMin uint64) error {
return fin, postCommit, nil
})

return w.mutateStateLocked(txn)
for {
err := w.mutateStateLocked(txn)
switch err {
case nil:
return nil
case ErrUndeletable:
time.Sleep(500 * time.Second)
continue
default:
return err
}
}
}

func (w *WAL) truncateTailLocked(newMax uint64) error {
Expand All @@ -883,6 +921,10 @@ func (w *WAL) truncateTailLocked(newMax uint64) error {
maxIdx = newState.lastIndex()
}

if w.requireDeletable && seg.DeletableTime.IsZero() {
return nil, nil, ErrUndeletable
}

toDelete[seg.ID] = seg.BaseIndex
toClose = append(toClose, seg.r)
newState.segments = newState.segments.Delete(seg.BaseIndex)
Expand Down Expand Up @@ -930,7 +972,18 @@ func (w *WAL) truncateTailLocked(newMax uint64) error {
return fin, pc, nil
})

return w.mutateStateLocked(txn)
for {
err := w.mutateStateLocked(txn)
switch err {
case nil:
return nil
case ErrUndeletable:
time.Sleep(500 * time.Second)
continue
default:
return err
}
}
}

func (w *WAL) deleteSegments(toDelete map[uint64]uint64) {
Expand Down
22 changes: 22 additions & 0 deletions wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func TestDeleteRange(t *testing.T) {
expectDeleted []uint64
expectNHeadTruncations uint64
expectNTailTruncations uint64
markDeleted []uint64
}{
{
name: "no-op empty range",
Expand Down Expand Up @@ -626,6 +627,22 @@ func TestDeleteRange(t *testing.T) {
// deletion.
expectNHeadTruncations: 210,
},
{
name: "head truncation with deletability considered",
walOpts: []walOpt{WithRequireDeletable()},
tsOpts: []testStorageOpt{
firstIndex(1000),
segFull(),
segTail(10),
},
// Delete a range before the log starts
deleteMin: 0,
deleteMax: 1101,
expectFirstIndex: 1102,
expectLastIndex: 1000 + 100 + 10 - 1,
expectNHeadTruncations: 102,
markDeleted: []uint64{1000},
},
}

for _, tc := range cases {
Expand All @@ -642,6 +659,11 @@ func TestDeleteRange(t *testing.T) {
ts, w, err := testOpenWAL(t, tc.tsOpts, opts, false)
require.NoError(t, err)

for _, baseIndex := range tc.markDeleted {
err = w.MarkSealedLogDeletable(baseIndex)
require.NoError(t, err)
}

err = w.DeleteRange(tc.deleteMin, tc.deleteMax)

if tc.expectErr != "" {
Expand Down

0 comments on commit c85aa91

Please sign in to comment.