From c85aa9130a04d04086bd57d32c437f17ae9c2884 Mon Sep 17 00:00:00 2001 From: ncabatoff Date: Wed, 8 Nov 2023 14:27:18 -0500 Subject: [PATCH] Quick and dirty hack to add an optional requirement for segment deletion: the segment must have been marked deletable. When the option is enabled, attempts to delete will block until all impacted segments are marked deletable. --- options.go | 6 +++++ types/segment.go | 3 +++ wal.go | 61 ++++++++++++++++++++++++++++++++++++++++++++---- wal_test.go | 22 +++++++++++++++++ 4 files changed, 88 insertions(+), 4 deletions(-) diff --git a/options.go b/options.go index 743e1b8..03e961e 100644 --- a/options.go +++ b/options.go @@ -46,6 +46,12 @@ func WithLogger(logger hclog.Logger) walOpt { } } +func WithRequireDeletable() walOpt { + return func(w *WAL) { + w.requireDeletable = true + } +} + // WithSegmentSize is an option that allows a custom segmentSize to be set. func WithSegmentSize(size int) walOpt { return func(w *WAL) { diff --git a/types/segment.go b/types/segment.go index 8e341d2..65e059b 100644 --- a/types/segment.go +++ b/types/segment.go @@ -64,6 +64,9 @@ type SegmentInfo struct { // limit in the sense that the final Append usually takes the segment file // past this size before it is considered full and sealed. SizeLimit uint32 + + // DeletableTime records when a sealed segment was marked as deletable. + DeletableTime time.Time } // SegmentFiler is the interface that provides access to segments to the WAL. It diff --git a/wal.go b/wal.go index 0453b25..616c016 100644 --- a/wal.go +++ b/wal.go @@ -80,8 +80,9 @@ type WAL struct { // checks if awaitRotate. If it is nil there is no rotation going on so // StoreLogs can proceed. If it is non-nil, it releases the lock and then // waits on the close before acquiring the lock and continuing. - triggerRotate chan uint64 - awaitRotate chan struct{} + triggerRotate chan uint64 + awaitRotate chan struct{} + requireDeletable bool } type walOpt func(*WAL) @@ -351,8 +352,11 @@ func (w *WAL) GetLog(index uint64, log *raft.Log) error { type ArchiverInterface interface { GetSealedLogFiles(fromIndex uint64) ([]*SealedSegmentInfo, error) + MarkSealedLogDeletable(minIndex uint64) error } +var _ ArchiverInterface = &WAL{} + type SealedSegmentInfo struct { Path string LogCount uint64 @@ -411,6 +415,20 @@ func (w *WAL) GetSealedLogFiles(fromIndex uint64) ([]*SealedSegmentInfo, error) return sealedSegInfo, nil } +func (w *WAL) MarkSealedLogDeletable(minIndex uint64) error { + txn := func(newState *state) (func(), func() error, error) { + seg, ok := newState.segments.Get(minIndex) + if !ok { + return nil, nil, ErrNotFound + } + seg.DeletableTime = time.Now() + newState.segments = newState.segments.Set(minIndex, seg) + return nil, nil, nil + } + + return w.mutateStateLocked(txn) +} + // StoreLog stores a log entry. func (w *WAL) StoreLog(log *raft.Log) error { return w.StoreLogs([]*raft.Log{log}) @@ -767,6 +785,9 @@ func (w *WAL) resetEmptyFirstSegmentBaseIndex(newBaseIndex uint64) error { // It's fine as it is, no-op return nil, nil, nil } + // TODO confirm that this cannot be a sealed segment, or if that's + // wrong, block until it's deletable. + // It needs to be removed newState.segments = newState.segments.Delete(tailSeg.BaseIndex) newState.tail = nil @@ -791,6 +812,8 @@ func (w *WAL) resetEmptyFirstSegmentBaseIndex(newBaseIndex uint64) error { return w.mutateStateLocked(txn) } +var ErrUndeletable = errors.New("some segments are still pending deletability") + func (w *WAL) truncateHeadLocked(newMin uint64) error { txn := stateTxn(func(newState *state) (func(), func() error, error) { oldLastIndex := newState.lastIndex() @@ -819,6 +842,10 @@ func (w *WAL) truncateHeadLocked(newMin uint64) error { break } + if w.requireDeletable && seg.DeletableTime.IsZero() { + return nil, nil, ErrUndeletable + } + toDelete[seg.ID] = seg.BaseIndex toClose = append(toClose, seg.r) newState.segments = newState.segments.Delete(seg.BaseIndex) @@ -858,7 +885,18 @@ func (w *WAL) truncateHeadLocked(newMin uint64) error { return fin, postCommit, nil }) - return w.mutateStateLocked(txn) + for { + err := w.mutateStateLocked(txn) + switch err { + case nil: + return nil + case ErrUndeletable: + time.Sleep(500 * time.Second) + continue + default: + return err + } + } } func (w *WAL) truncateTailLocked(newMax uint64) error { @@ -883,6 +921,10 @@ func (w *WAL) truncateTailLocked(newMax uint64) error { maxIdx = newState.lastIndex() } + if w.requireDeletable && seg.DeletableTime.IsZero() { + return nil, nil, ErrUndeletable + } + toDelete[seg.ID] = seg.BaseIndex toClose = append(toClose, seg.r) newState.segments = newState.segments.Delete(seg.BaseIndex) @@ -930,7 +972,18 @@ func (w *WAL) truncateTailLocked(newMax uint64) error { return fin, pc, nil }) - return w.mutateStateLocked(txn) + for { + err := w.mutateStateLocked(txn) + switch err { + case nil: + return nil + case ErrUndeletable: + time.Sleep(500 * time.Second) + continue + default: + return err + } + } } func (w *WAL) deleteSegments(toDelete map[uint64]uint64) { diff --git a/wal_test.go b/wal_test.go index 43da049..33be5c2 100644 --- a/wal_test.go +++ b/wal_test.go @@ -466,6 +466,7 @@ func TestDeleteRange(t *testing.T) { expectDeleted []uint64 expectNHeadTruncations uint64 expectNTailTruncations uint64 + markDeleted []uint64 }{ { name: "no-op empty range", @@ -626,6 +627,22 @@ func TestDeleteRange(t *testing.T) { // deletion. expectNHeadTruncations: 210, }, + { + name: "head truncation with deletability considered", + walOpts: []walOpt{WithRequireDeletable()}, + tsOpts: []testStorageOpt{ + firstIndex(1000), + segFull(), + segTail(10), + }, + // Delete a range before the log starts + deleteMin: 0, + deleteMax: 1101, + expectFirstIndex: 1102, + expectLastIndex: 1000 + 100 + 10 - 1, + expectNHeadTruncations: 102, + markDeleted: []uint64{1000}, + }, } for _, tc := range cases { @@ -642,6 +659,11 @@ func TestDeleteRange(t *testing.T) { ts, w, err := testOpenWAL(t, tc.tsOpts, opts, false) require.NoError(t, err) + for _, baseIndex := range tc.markDeleted { + err = w.MarkSealedLogDeletable(baseIndex) + require.NoError(t, err) + } + err = w.DeleteRange(tc.deleteMin, tc.deleteMax) if tc.expectErr != "" {