diff --git a/jetstream/kv.go b/jetstream/kv.go index c94f11c36..7c234a423 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -67,7 +67,10 @@ type ( // WatchAll will invoke the callback for all updates. WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) // Keys will return all keys. + // DEPRECATED: Use ListKeys instead to avoid memory issues. Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) + // ListKeys will return all keys in a channel. + ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error) // History will return all historical values for the key. History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error) // Bucket returns the current bucket name. @@ -78,6 +81,12 @@ type ( Status(ctx context.Context) (KeyValueStatus, error) } + // KeyLister is used to retrieve a list of key value store keys + KeyLister interface { + Keys() <-chan string + Stop() error + } + // KeyValueConfig is for configuring a KeyValue store. KeyValueConfig struct { Bucket string @@ -923,6 +932,46 @@ func (kv *kvs) Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) { return keys, nil } +type keyLister struct { + watcher KeyWatcher + keys chan string +} + +// Keys will return all keys. +func (kv *kvs) ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error) { + opts = append(opts, IgnoreDeletes(), MetaOnly()) + watcher, err := kv.WatchAll(ctx, opts...) + if err != nil { + return nil, err + } + kl := &keyLister{watcher: watcher, keys: make(chan string, 256)} + + go func() { + defer close(kl.keys) + defer watcher.Stop() + for { + select { + case entry := <-watcher.Updates(): + if entry == nil { + return + } + kl.keys <- entry.Key() + case <-ctx.Done(): + return + } + } + }() + return kl, nil +} + +func (kl *keyLister) Keys() <-chan string { + return kl.keys +} + +func (kl *keyLister) Stop() error { + return kl.watcher.Stop() +} + // History will return all historical values for the key. func (kv *kvs) History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error) { opts = append(opts, IncludeHistory()) diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go index e559758b0..f3a9288ce 100644 --- a/jetstream/test/kv_test.go +++ b/jetstream/test/kv_test.go @@ -746,6 +746,80 @@ func TestKeyValueKeys(t *testing.T) { } } +func TestKeyValueListKeys(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", History: 2}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(ctx, key, []byte(value)) + expectOk(t, err) + } + + // Put in a few names and ages. + put("name", "derek") + put("age", "22") + put("country", "US") + put("name", "ivan") + put("age", "33") + put("country", "US") + put("name", "rip") + put("age", "44") + put("country", "MT") + + keys, err := kv.ListKeys(ctx) + expectOk(t, err) + + kmap := make(map[string]struct{}) + for key := range keys.Keys() { + if _, ok := kmap[key]; ok { + t.Fatalf("Already saw %q", key) + } + kmap[key] = struct{}{} + } + if len(kmap) != 3 { + t.Fatalf("Expected 3 total keys, got %d", len(kmap)) + } + expected := map[string]struct{}{ + "name": struct{}{}, + "age": struct{}{}, + "country": struct{}{}, + } + if !reflect.DeepEqual(kmap, expected) { + t.Fatalf("Expected %+v but got %+v", expected, kmap) + } + // Make sure delete and purge do the right thing and not return the keys. + err = kv.Delete(ctx, "name") + expectOk(t, err) + err = kv.Purge(ctx, "country") + expectOk(t, err) + + keys, err = kv.ListKeys(ctx) + expectOk(t, err) + + kmap = make(map[string]struct{}) + for key := range keys.Keys() { + if _, ok := kmap[key]; ok { + t.Fatalf("Already saw %q", key) + } + kmap[key] = struct{}{} + } + if len(kmap) != 1 { + t.Fatalf("Expected 1 total key, got %d", len(kmap)) + } + if _, ok := kmap["age"]; !ok { + t.Fatalf("Expected %q to be only key present", "age") + } +} + func TestKeyValueCrossAccounts(t *testing.T) { conf := createConfFile(t, []byte(` jetstream: enabled diff --git a/kv.go b/kv.go index efecc4fa5..29566a34b 100644 --- a/kv.go +++ b/kv.go @@ -65,7 +65,10 @@ type KeyValue interface { // WatchAll will invoke the callback for all updates. WatchAll(opts ...WatchOpt) (KeyWatcher, error) // Keys will return all keys. + // DEPRECATED: Use ListKeys instead to avoid memory issues. Keys(opts ...WatchOpt) ([]string, error) + // ListKeys will return all keys in a channel. + ListKeys(opts ...WatchOpt) (KeyLister, error) // History will return all historical values for the key. History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) // Bucket returns the current bucket name. @@ -110,6 +113,12 @@ type KeyWatcher interface { Stop() error } +// KeyLister is used to retrieve a list of key value store keys +type KeyLister interface { + Keys() <-chan string + Stop() error +} + type WatchOpt interface { configureWatcher(opts *watchOpts) error } @@ -842,6 +851,41 @@ func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) { return keys, nil } +type keyLister struct { + watcher KeyWatcher + keys chan string +} + +// ListKeys will return all keys. +func (kv *kvs) ListKeys(opts ...WatchOpt) (KeyLister, error) { + opts = append(opts, IgnoreDeletes(), MetaOnly()) + watcher, err := kv.WatchAll(opts...) + if err != nil { + return nil, err + } + kl := &keyLister{watcher: watcher, keys: make(chan string, 256)} + + go func() { + defer close(kl.keys) + defer watcher.Stop() + for entry := range watcher.Updates() { + if entry == nil { + return + } + kl.keys <- entry.Key() + } + }() + return kl, nil +} + +func (kl *keyLister) Keys() <-chan string { + return kl.keys +} + +func (kl *keyLister) Stop() error { + return kl.watcher.Stop() +} + // History will return all values for the key. func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) { opts = append(opts, IncludeHistory()) diff --git a/test/kv_test.go b/test/kv_test.go index d4d68a357..5759f5d17 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -694,6 +694,78 @@ func TestKeyValueKeys(t *testing.T) { } } +func TestKeyValueListKeys(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 2}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(key, []byte(value)) + expectOk(t, err) + } + + // Put in a few names and ages. + put("name", "derek") + put("age", "22") + put("country", "US") + put("name", "ivan") + put("age", "33") + put("country", "US") + put("name", "rip") + put("age", "44") + put("country", "MT") + + keys, err := kv.ListKeys() + expectOk(t, err) + + kmap := make(map[string]struct{}) + for key := range keys.Keys() { + if _, ok := kmap[key]; ok { + t.Fatalf("Already saw %q", key) + } + kmap[key] = struct{}{} + } + if len(kmap) != 3 { + t.Fatalf("Expected 3 total keys, got %d", len(kmap)) + } + expected := map[string]struct{}{ + "name": struct{}{}, + "age": struct{}{}, + "country": struct{}{}, + } + if !reflect.DeepEqual(kmap, expected) { + t.Fatalf("Expected %+v but got %+v", expected, kmap) + } + // Make sure delete and purge do the right thing and not return the keys. + err = kv.Delete("name") + expectOk(t, err) + err = kv.Purge("country") + expectOk(t, err) + + keys, err = kv.ListKeys() + expectOk(t, err) + + kmap = make(map[string]struct{}) + for key := range keys.Keys() { + if _, ok := kmap[key]; ok { + t.Fatalf("Already saw %q", key) + } + kmap[key] = struct{}{} + } + if len(kmap) != 1 { + t.Fatalf("Expected 1 total key, got %d", len(kmap)) + } + if _, ok := kmap["age"]; !ok { + t.Fatalf("Expected %q to be only key present", "age") + } +} + func TestKeyValueCrossAccounts(t *testing.T) { conf := createConfFile(t, []byte(` jetstream: enabled