diff --git a/clientv3/integration/mirror_test.go b/clientv3/integration/mirror_test.go index de74e09e6e6..df9911efb7e 100644 --- a/clientv3/integration/mirror_test.go +++ b/clientv3/integration/mirror_test.go @@ -15,7 +15,9 @@ package integration import ( + "fmt" "reflect" + "sync" "testing" "time" @@ -69,3 +71,55 @@ func TestMirrorSync(t *testing.T) { t.Fatal("failed to receive update in one second") } } + +func TestMirrorSyncBase(t *testing.T) { + cluster := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(nil) + + cli := cluster.Client(0) + ctx := context.TODO() + + keyCh := make(chan string) + var wg sync.WaitGroup + + for i := 0; i < 50; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + for key := range keyCh { + if _, err := cli.Put(ctx, key, "test"); err != nil { + t.Fatal(err) + } + } + }() + } + + for i := 0; i < 2000; i++ { + keyCh <- fmt.Sprintf("test%d", i) + } + + close(keyCh) + wg.Wait() + + syncer := mirror.NewSyncer(cli, "test", 0) + respCh, errCh := syncer.SyncBase(ctx) + + count := 0 + + for resp := range respCh { + count = count + len(resp.Kvs) + if !resp.More { + break + } + } + + for err := range errCh { + t.Fatalf("unexpected error %v", err) + } + + if count != 2000 { + t.Errorf("unexpected kv count: %d", count) + } +} diff --git a/clientv3/mirror/syncer.go b/clientv3/mirror/syncer.go index 58aae940e83..f2a8f107f67 100644 --- a/clientv3/mirror/syncer.go +++ b/clientv3/mirror/syncer.go @@ -78,7 +78,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha // If len(s.prefix) != 0, we will sync key-value space with given prefix. // We then range from the prefix to the next prefix if exists. Or we will // range from the prefix to the end if the next prefix does not exists. - opts = append(opts, clientv3.WithPrefix()) + opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix))) key = s.prefix } diff --git a/clientv3/op.go b/clientv3/op.go index 58e2c94cc15..2fc803cbbc9 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -184,6 +184,12 @@ func WithSort(target SortTarget, order SortOrder) OpOption { } } +// GetPrefixRangeEnd gets the range end of the prefix. +// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'. +func GetPrefixRangeEnd(prefix string) string { + return string(getPrefix([]byte(prefix))) +} + func getPrefix(key []byte) []byte { end := make([]byte, len(key)) copy(end, key)