Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v14] Introduce a dedicated backend key type #45713

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/auth/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type stateBackend interface {
// exists, updates it otherwise)
Put(ctx context.Context, i backend.Item) (*backend.Lease, error)
// Get returns a single item or not found error
Get(ctx context.Context, key []byte) (*backend.Item, error)
Get(ctx context.Context, key backend.Key) (*backend.Item, error)
}

// ProcessStorage is a backend for local process state,
Expand Down
38 changes: 19 additions & 19 deletions lib/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ type Backend interface {
Update(ctx context.Context, i Item) (*Lease, error)

// Get returns a single item or not found error
Get(ctx context.Context, key []byte) (*Item, error)
Get(ctx context.Context, key Key) (*Item, error)

// GetRange returns query range
GetRange(ctx context.Context, startKey []byte, endKey []byte, limit int) (*GetResult, error)
GetRange(ctx context.Context, startKey, endKey Key, limit int) (*GetResult, error)

// Delete deletes item by key, returns NotFound error
// if item does not exist
Delete(ctx context.Context, key []byte) error
Delete(ctx context.Context, key Key) error

// DeleteRange deletes range of items with keys between startKey and endKey
DeleteRange(ctx context.Context, startKey, endKey []byte) error
DeleteRange(ctx context.Context, startKey, endKey Key) error

// KeepAlive keeps object from expiring, updates lease on the existing object,
// expires contains the new expiry to set on the lease,
Expand All @@ -93,7 +93,7 @@ type Backend interface {
}

// IterateRange is a helper for stepping over a range
func IterateRange(ctx context.Context, bk Backend, startKey []byte, endKey []byte, limit int, fn func([]Item) (stop bool, err error)) error {
func IterateRange(ctx context.Context, bk Backend, startKey, endKey Key, limit int, fn func([]Item) (stop bool, err error)) error {
if limit == 0 || limit > 10_000 {
limit = 10_000
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func IterateRange(ctx context.Context, bk Backend, startKey []byte, endKey []byt
//
// 2. allow individual backends to expose custom streaming methods s.t. the most performant
// impl for a given backend may be used.
func StreamRange(ctx context.Context, bk Backend, startKey, endKey []byte, pageSize int) stream.Stream[Item] {
func StreamRange(ctx context.Context, bk Backend, startKey, endKey Key, pageSize int) stream.Stream[Item] {
return stream.PageFunc[Item](func() ([]Item, error) {
if startKey == nil {
return nil, io.EOF
Expand Down Expand Up @@ -167,7 +167,7 @@ type Batch interface {
// err = backend.KeepAlive(ctx, lease, expires)
type Lease struct {
// Key is an object representing lease
Key []byte
Key Key
// ID is a lease ID, could be empty
// Deprecated: use Revision instead
ID int64
Expand All @@ -187,7 +187,7 @@ type Watch struct {
Name string
// Prefixes specifies prefixes to watch,
// passed to the backend implementation
Prefixes [][]byte
Prefixes []Key
// QueueSize is an optional queue size
QueueSize int
// MetricComponent if set will start reporting
Expand Down Expand Up @@ -231,7 +231,7 @@ type Event struct {
// Item is a key value item
type Item struct {
// Key is a key of the key value item
Key []byte
Key Key
// Value is a value of the key value item
Value []byte
// Expires is an optional record expiry time
Expand Down Expand Up @@ -287,7 +287,7 @@ const NoLimit = 0
// nextKey returns the next possible key.
// If used with a key prefix, this will return
// the end of the range for that key prefix.
func nextKey(key []byte) []byte {
func nextKey(key Key) Key {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
Expand All @@ -301,10 +301,10 @@ func nextKey(key []byte) []byte {
return noEnd
}

var noEnd = []byte{0}
var noEnd = Key{0}

// RangeEnd returns end of the range for given key.
func RangeEnd(key []byte) []byte {
func RangeEnd(key Key) Key {
return nextKey(key)
}

Expand All @@ -325,7 +325,7 @@ type KeyedItem interface {
// have the HostID part.
func NextPaginationKey(ki KeyedItem) string {
key := GetPaginationKey(ki)
return string(nextKey([]byte(key)))
return string(nextKey(Key(key)))
}

// GetPaginationKey returns the pagination key given item.
Expand All @@ -341,13 +341,13 @@ func GetPaginationKey(ki KeyedItem) string {

// MaskKeyName masks the given key name.
// e.g "123456789" -> "******789"
func MaskKeyName(keyName string) []byte {
func MaskKeyName(keyName string) string {
maskedBytes := []byte(keyName)
hiddenBefore := int(0.75 * float64(len(keyName)))
for i := 0; i < hiddenBefore; i++ {
maskedBytes[i] = '*'
}
return maskedBytes
return string(maskedBytes)
}

// Items is a sortable list of backend items
Expand Down Expand Up @@ -421,18 +421,18 @@ const Separator = '/'

// NewKey joins parts into path separated by Separator,
// makes sure path always starts with Separator ("/")
func NewKey(parts ...string) []byte {
func NewKey(parts ...string) Key {
return internalKey("", parts...)
}

// ExactKey is like Key, except a Separator is appended to the result
// path of Key. This is to ensure range matching of a path will only
// math child paths and not other paths that have the resulting path
// as a prefix.
func ExactKey(parts ...string) []byte {
func ExactKey(parts ...string) Key {
return append(NewKey(parts...), Separator)
}

func internalKey(internalPrefix string, parts ...string) []byte {
return []byte(strings.Join(append([]string{internalPrefix}, parts...), string(Separator)))
func internalKey(internalPrefix string, parts ...string) Key {
return Key(strings.Join(append([]string{internalPrefix}, parts...), string(Separator)))
}
2 changes: 1 addition & 1 deletion lib/backend/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (c *CircularBuffer) fanOutEvent(r Event) {
}
}

func removeRedundantPrefixes(prefixes [][]byte) [][]byte {
func removeRedundantPrefixes(prefixes []Key) []Key {
if len(prefixes) == 0 {
return prefixes
}
Expand Down
65 changes: 33 additions & 32 deletions lib/backend/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package backend

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -48,17 +49,17 @@ func TestWatcherSimple(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: 1}})
b.Emit(Event{Item: Item{Key: Key("/1")}})

select {
case e := <-w.Events():
require.Equal(t, e.Item.ID, int64(1))
require.Equal(t, Key("/1"), e.Item.Key)
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout waiting for event.")
}

b.Close()
b.Emit(Event{Item: Item{ID: 2}})
b.Emit(Event{Item: Item{Key: Key("/2")}})

select {
case <-w.Done():
Expand Down Expand Up @@ -101,12 +102,12 @@ func TestWatcherCapacity(t *testing.T) {
// emit and then consume 10 events. this is much larger than our queue size,
// but should succeed since we consume within our grace period.
for i := 0; i < 10; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(i + 1)}})
b.Emit(Event{Item: Item{Key: Key(fmt.Sprintf("/%d", i+1))}})
}
for i := 0; i < 10; i++ {
select {
case e := <-w.Events():
require.Equal(t, e.Item.ID, int64(i+1))
require.Equal(t, fmt.Sprintf("/%d", i+1), string(e.Item.Key))
default:
t.Fatalf("Expected events to be immediately available")
}
Expand All @@ -116,7 +117,7 @@ func TestWatcherCapacity(t *testing.T) {
clock.Advance(gracePeriod + time.Second)

// emit another event, which will cause buffer to reevaluate the grace period.
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(11)}})
b.Emit(Event{Item: Item{Key: Key("/11")}})

// ensure that buffer did not close watcher, since previously created backlog
// was drained within grace period.
Expand All @@ -128,13 +129,13 @@ func TestWatcherCapacity(t *testing.T) {

// create backlog again, and this time advance past grace period without draining it.
for i := 0; i < 10; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(i + 12)}})
b.Emit(Event{Item: Item{Key: Key(fmt.Sprintf("/%d", i+12))}})
}
clock.Advance(gracePeriod + time.Second)

// emit another event, which will cause buffer to realize that watcher is past
// its grace period.
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(22)}})
b.Emit(Event{Item: Item{Key: Key("/22")}})

select {
case <-w.Done():
Expand Down Expand Up @@ -174,7 +175,7 @@ func TestWatcherCreationGracePeriod(t *testing.T) {

// emit enough events to create a backlog
for i := 0; i < queueSize*2; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}}})
b.Emit(Event{Item: Item{Key: Key{Separator}}})
}

select {
Expand All @@ -189,7 +190,7 @@ func TestWatcherCreationGracePeriod(t *testing.T) {
// advance well past the backlog grace period, but not past the creation grace period
clock.Advance(backlogGracePeriod * 2)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})
b.Emit(Event{Item: Item{Key: Key{Separator}}})

select {
case <-w.Done():
Expand All @@ -200,7 +201,7 @@ func TestWatcherCreationGracePeriod(t *testing.T) {
// advance well past creation grace period
clock.Advance(creationGracePeriod)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})
b.Emit(Event{Item: Item{Key: Key{Separator}}})
select {
case <-w.Done():
default:
Expand Down Expand Up @@ -236,29 +237,29 @@ func TestWatcherClose(t *testing.T) {
// TestRemoveRedundantPrefixes removes redundant prefixes
func TestRemoveRedundantPrefixes(t *testing.T) {
type tc struct {
in [][]byte
out [][]byte
in []Key
out []Key
}
tcs := []tc{
{
in: [][]byte{},
out: [][]byte{},
in: []Key{},
out: []Key{},
},
{
in: [][]byte{[]byte("/a")},
out: [][]byte{[]byte("/a")},
in: []Key{Key("/a")},
out: []Key{Key("/a")},
},
{
in: [][]byte{[]byte("/a"), []byte("/")},
out: [][]byte{[]byte("/")},
in: []Key{Key("/a"), Key("/")},
out: []Key{Key("/")},
},
{
in: [][]byte{[]byte("/b"), []byte("/a")},
out: [][]byte{[]byte("/a"), []byte("/b")},
in: []Key{Key("/b"), Key("/a")},
out: []Key{Key("/a"), Key("/b")},
},
{
in: [][]byte{[]byte("/a/b"), []byte("/a"), []byte("/a/b/c"), []byte("/d")},
out: [][]byte{[]byte("/a"), []byte("/d")},
in: []Key{Key("/a/b"), Key("/a"), Key("/a/b/c"), Key("/d")},
out: []Key{Key("/a"), Key("/d")},
},
}
for _, tc := range tcs {
Expand All @@ -276,7 +277,7 @@ func TestWatcherMulti(t *testing.T) {
defer b.Close()
b.SetInit()

w, err := b.NewWatcher(ctx, Watch{Prefixes: [][]byte{[]byte("/a"), []byte("/a/b")}})
w, err := b.NewWatcher(ctx, Watch{Prefixes: []Key{Key("/a"), Key("/a/b")}})
require.NoError(t, err)
defer w.Close()

Expand All @@ -287,11 +288,11 @@ func TestWatcherMulti(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte("/a/b/c"), ID: 1}})
b.Emit(Event{Item: Item{Key: Key("/a/b/c")}})

select {
case e := <-w.Events():
require.Equal(t, e.Item.ID, int64(1))
require.Equal(t, Key("/a/b/c"), e.Item.Key)
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout waiting for event.")
}
Expand Down Expand Up @@ -320,7 +321,7 @@ func TestWatcherReset(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: 1}})
b.Emit(Event{Item: Item{Key: Key("/1")}})
b.Clear()

// make sure watcher has been closed
Expand All @@ -341,11 +342,11 @@ func TestWatcherReset(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: 2}})
b.Emit(Event{Item: Item{Key: Key("/2")}})

select {
case e := <-w2.Events():
require.Equal(t, e.Item.ID, int64(2))
require.Equal(t, Key("/2"), e.Item.Key)
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout waiting for event.")
}
Expand All @@ -356,10 +357,10 @@ func TestWatcherTree(t *testing.T) {
wt := newWatcherTree()
require.Equal(t, wt.rm(nil), false)

w1 := &BufferWatcher{Watch: Watch{Prefixes: [][]byte{[]byte("/a"), []byte("/a/a1"), []byte("/c")}}}
require.Equal(t, wt.rm(w1), false)
w1 := &BufferWatcher{Watch: Watch{Prefixes: []Key{Key("/a"), Key("/a/a1"), Key("/c")}}}
require.False(t, wt.rm(w1))

w2 := &BufferWatcher{Watch: Watch{Prefixes: [][]byte{[]byte("/a")}}}
w2 := &BufferWatcher{Watch: Watch{Prefixes: []Key{Key("/a")}}}

wt.add(w1)
wt.add(w2)
Expand Down
Loading
Loading