diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 56323d12f8c3..9bd98b826820 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -626,4 +626,95 @@ mod tests { assert_eq!(b"test_key".to_vec(), delete.key); let _ = delete.options.unwrap(); } + + use crate::kv_backend::test::{ + prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix, + test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix, + test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix, + unprepare_kv, + }; + + async fn build_kv_backend() -> Option { + let endpoints = std::env::var("GT_ETCD_ENDPOINTS").unwrap_or_default(); + if endpoints.is_empty() { + return None; + } + + let endpoints = endpoints + .split(',') + .map(|s| s.to_string()) + .collect::>(); + + let client = Client::connect(endpoints, None) + .await + .expect("malformed endpoints"); + + Some(EtcdStore { + client, + max_txn_ops: 128, + }) + } + + #[tokio::test] + async fn test_put() { + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"put/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test] + async fn test_range() { + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test] + async fn test_range_2() { + if let Some(kv_backend) = build_kv_backend().await { + test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; + } + } + + #[tokio::test] + async fn test_batch_get() { + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"batchGet/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_compare_and_put() { + if let Some(kv_backend) = build_kv_backend().await { + let kv_backend = Arc::new(kv_backend); + test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await; + } + } + + #[tokio::test] + async fn test_delete_range() { + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"deleteRange/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; + } + } + + #[tokio::test] + async fn test_batch_delete() { + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"batchDelete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; + } + } } diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index bb8cf2d95d1e..6c95bb646991 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -357,14 +357,14 @@ mod tests { async fn test_put() { let kv_backend = mock_mem_store_with_data().await; - test_kv_put(kv_backend).await; + test_kv_put(&kv_backend).await; } #[tokio::test] async fn test_range() { let kv_backend = mock_mem_store_with_data().await; - test_kv_range(kv_backend).await; + test_kv_range(&kv_backend).await; } #[tokio::test] @@ -378,7 +378,7 @@ mod tests { async fn test_batch_get() { let kv_backend = mock_mem_store_with_data().await; - test_kv_batch_get(kv_backend).await; + test_kv_batch_get(&kv_backend).await; } #[tokio::test(flavor = "multi_thread")] diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs index b079a3dc2d52..2f0216dfdfcb 100644 --- a/src/common/meta/src/kv_backend/test.rs +++ b/src/common/meta/src/kv_backend/test.rs @@ -21,25 +21,33 @@ use crate::rpc::store::{BatchGetRequest, PutRequest}; use crate::rpc::KeyValue; use crate::util; -pub fn mock_kvs() -> Vec { +pub fn mock_kvs(prefix: Vec) -> Vec { vec![ KeyValue { - key: b"key1".to_vec(), + key: [prefix.clone(), b"key1".to_vec()].concat(), value: b"val1".to_vec(), }, KeyValue { - key: b"key2".to_vec(), + key: [prefix.clone(), b"key2".to_vec()].concat(), value: b"val2".to_vec(), }, KeyValue { - key: b"key3".to_vec(), + key: [prefix.clone(), b"key3".to_vec()].concat(), value: b"val3".to_vec(), }, + KeyValue { + key: [prefix.clone(), b"key11".to_vec()].concat(), + value: b"val11".to_vec(), + }, ] } pub async fn prepare_kv(kv_backend: &impl KvBackend) { - let kvs = mock_kvs(); + prepare_kv_with_prefix(kv_backend, vec![]).await; +} + +pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { + let kvs = mock_kvs(prefix); assert!(kv_backend .batch_put(BatchPutRequest { kvs, @@ -47,21 +55,29 @@ pub async fn prepare_kv(kv_backend: &impl KvBackend) { }) .await .is_ok()); +} +pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) { + let range_end = util::get_prefix_end_key(prefix); assert!(kv_backend - .put(PutRequest { - key: b"key11".to_vec(), - value: b"val11".to_vec(), + .delete_range(DeleteRangeRequest { + key: prefix.to_vec(), + range_end, ..Default::default() }) .await .is_ok()); } -pub async fn test_kv_put(kv_backend: impl KvBackend) { +pub async fn test_kv_put(kv_backend: &impl KvBackend) { + test_kv_put_with_prefix(kv_backend, vec![]).await; +} + +pub async fn test_kv_put_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { + let put_key = [prefix.clone(), b"key11".to_vec()].concat(); let resp = kv_backend .put(PutRequest { - key: b"key11".to_vec(), + key: put_key.clone(), value: b"val12".to_vec(), prev_kv: false, }) @@ -71,20 +87,25 @@ pub async fn test_kv_put(kv_backend: impl KvBackend) { let resp = kv_backend .put(PutRequest { - key: b"key11".to_vec(), + key: put_key.clone(), value: b"val13".to_vec(), prev_kv: true, }) .await .unwrap(); let prev_kv = resp.prev_kv.unwrap(); - assert_eq!(b"key11", prev_kv.key()); + assert_eq!(put_key, prev_kv.key()); assert_eq!(b"val12", prev_kv.value()); } -pub async fn test_kv_range(kv_backend: impl KvBackend) { - let key = b"key1".to_vec(); - let range_end = util::get_prefix_end_key(b"key1"); +pub async fn test_kv_range(kv_backend: &impl KvBackend) { + test_kv_range_with_prefix(kv_backend, vec![]).await; +} + +pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { + let key = [prefix.clone(), b"key1".to_vec()].concat(); + let key11 = [prefix.clone(), b"key11".to_vec()].concat(); + let range_end = util::get_prefix_end_key(&key); let resp = kv_backend .range(RangeRequest { @@ -97,9 +118,9 @@ pub async fn test_kv_range(kv_backend: impl KvBackend) { .unwrap(); assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key, resp.kvs[0].key); assert_eq!(b"val1", resp.kvs[0].value()); - assert_eq!(b"key11", resp.kvs[1].key()); + assert_eq!(key11, resp.kvs[1].key); assert_eq!(b"val11", resp.kvs[1].value()); let resp = kv_backend @@ -113,9 +134,9 @@ pub async fn test_kv_range(kv_backend: impl KvBackend) { .unwrap(); assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key, resp.kvs[0].key); assert_eq!(b"", resp.kvs[0].value()); - assert_eq!(b"key11", resp.kvs[1].key()); + assert_eq!(key11, resp.kvs[1].key); assert_eq!(b"", resp.kvs[1].value()); let resp = kv_backend @@ -129,12 +150,12 @@ pub async fn test_kv_range(kv_backend: impl KvBackend) { .unwrap(); assert_eq!(1, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key, resp.kvs[0].key); assert_eq!(b"val1", resp.kvs[0].value()); let resp = kv_backend .range(RangeRequest { - key, + key: key.clone(), range_end, limit: 1, keys_only: false, @@ -143,24 +164,41 @@ pub async fn test_kv_range(kv_backend: impl KvBackend) { .unwrap(); assert_eq!(1, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key, resp.kvs[0].key); assert_eq!(b"val1", resp.kvs[0].value()); } pub async fn test_kv_range_2(kv_backend: impl KvBackend) { + test_kv_range_2_with_prefix(kv_backend, vec![]).await; +} + +pub async fn test_kv_range_2_with_prefix(kv_backend: impl KvBackend, prefix: Vec) { + let atest = [prefix.clone(), b"atest".to_vec()].concat(); + let test = [prefix.clone(), b"test".to_vec()].concat(); + kv_backend - .put(PutRequest::new().with_key("atest").with_value("value")) + .put( + PutRequest::new() + .with_key(atest.clone()) + .with_value("value"), + ) .await .unwrap(); kv_backend - .put(PutRequest::new().with_key("test").with_value("value")) + .put(PutRequest::new().with_key(test.clone()).with_value("value")) .await .unwrap(); // If both key and range_end are ‘\0’, then range represents all keys. + let all_start = [prefix.clone(), b"\0".to_vec()].concat(); + let all_end = if prefix.is_empty() { + b"\0".to_vec() + } else { + util::get_prefix_end_key(&prefix) + }; let result = kv_backend - .range(RangeRequest::new().with_range(b"\0".to_vec(), b"\0".to_vec())) + .range(RangeRequest::new().with_range(all_start, all_end.clone())) .await .unwrap(); @@ -168,26 +206,28 @@ pub async fn test_kv_range_2(kv_backend: impl KvBackend) { assert!(!result.more); // If range_end is ‘\0’, the range is all keys greater than or equal to the key argument. + let a_start = [prefix.clone(), b"a".to_vec()].concat(); let result = kv_backend - .range(RangeRequest::new().with_range(b"a".to_vec(), b"\0".to_vec())) + .range(RangeRequest::new().with_range(a_start.clone(), all_end.clone())) .await .unwrap(); assert_eq!(result.kvs.len(), 2); + let b_start = [prefix.clone(), b"b".to_vec()].concat(); let result = kv_backend - .range(RangeRequest::new().with_range(b"b".to_vec(), b"\0".to_vec())) + .range(RangeRequest::new().with_range(b_start, all_end.clone())) .await .unwrap(); assert_eq!(result.kvs.len(), 1); - assert_eq!(result.kvs[0].key, b"test"); + assert_eq!(result.kvs[0].key, test); // Fetches the keys >= "a", set limit to 1, the `more` should be true. let result = kv_backend .range( RangeRequest::new() - .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_range(a_start.clone(), all_end.clone()) .with_limit(1), ) .await @@ -199,7 +239,7 @@ pub async fn test_kv_range_2(kv_backend: impl KvBackend) { let result = kv_backend .range( RangeRequest::new() - .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_range(a_start.clone(), all_end.clone()) .with_limit(2), ) .await @@ -211,16 +251,27 @@ pub async fn test_kv_range_2(kv_backend: impl KvBackend) { let result = kv_backend .range( RangeRequest::new() - .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_range(a_start.clone(), all_end.clone()) .with_limit(3), ) .await .unwrap(); assert_eq!(result.kvs.len(), 2); assert!(!result.more); + + let req = BatchDeleteRequest { + keys: vec![atest, test], + prev_kv: false, + }; + let resp = kv_backend.batch_delete(req).await.unwrap(); + assert!(resp.prev_kvs.is_empty()); +} + +pub async fn test_kv_batch_get(kv_backend: &impl KvBackend) { + test_kv_batch_get_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_batch_get(kv_backend: impl KvBackend) { +pub async fn test_kv_batch_get_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { let keys = vec![]; let resp = kv_backend .batch_get(BatchGetRequest { keys }) @@ -229,7 +280,8 @@ pub async fn test_kv_batch_get(kv_backend: impl KvBackend) { assert!(resp.kvs.is_empty()); - let keys = vec![b"key10".to_vec()]; + let key10 = [prefix.clone(), b"key10".to_vec()].concat(); + let keys = vec![key10]; let resp = kv_backend .batch_get(BatchGetRequest { keys }) .await @@ -237,29 +289,42 @@ pub async fn test_kv_batch_get(kv_backend: impl KvBackend) { assert!(resp.kvs.is_empty()); - let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()]; + let key1 = [prefix.clone(), b"key1".to_vec()].concat(); + let key3 = [prefix.clone(), b"key3".to_vec()].concat(); + let key4 = [prefix.clone(), b"key4".to_vec()].concat(); + let keys = vec![key1.clone(), key3.clone(), key4]; let resp = kv_backend .batch_get(BatchGetRequest { keys }) .await .unwrap(); assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key1, resp.kvs[0].key); assert_eq!(b"val1", resp.kvs[0].value()); - assert_eq!(b"key3", resp.kvs[1].key()); + assert_eq!(key3, resp.kvs[1].key); assert_eq!(b"val3", resp.kvs[1].value()); } pub async fn test_kv_compare_and_put(kv_backend: Arc>) { + test_kv_compare_and_put_with_prefix(kv_backend, vec![]).await; +} + +pub async fn test_kv_compare_and_put_with_prefix( + kv_backend: Arc>, + prefix: Vec, +) { let success = Arc::new(AtomicU8::new(0)); + let key = [prefix.clone(), b"key".to_vec()].concat(); let mut joins = vec![]; for _ in 0..20 { let kv_backend_clone = kv_backend.clone(); let success_clone = success.clone(); + let key_clone = key.clone(); + let join = tokio::spawn(async move { let req = CompareAndPutRequest { - key: b"key".to_vec(), + key: key_clone, expect: vec![], value: b"val_new".to_vec(), }; @@ -276,11 +341,19 @@ pub async fn test_kv_compare_and_put(kv_backend: Arc) { + let key3 = [prefix.clone(), b"key3".to_vec()].concat(); let req = DeleteRangeRequest { - key: b"key3".to_vec(), + key: key3.clone(), range_end: vec![], prev_kv: true, }; @@ -288,14 +361,15 @@ pub async fn test_kv_delete_range(kv_backend: impl KvBackend) { let resp = kv_backend.delete_range(req).await.unwrap(); assert_eq!(1, resp.prev_kvs.len()); assert_eq!(1, resp.deleted); - assert_eq!(b"key3", resp.prev_kvs[0].key()); + assert_eq!(key3, resp.prev_kvs[0].key); assert_eq!(b"val3", resp.prev_kvs[0].value()); - let resp = kv_backend.get(b"key3").await.unwrap(); + let resp = kv_backend.get(&key3).await.unwrap(); assert!(resp.is_none()); + let key2 = [prefix.clone(), b"key2".to_vec()].concat(); let req = DeleteRangeRequest { - key: b"key2".to_vec(), + key: key2.clone(), range_end: vec![], prev_kv: false, }; @@ -304,11 +378,11 @@ pub async fn test_kv_delete_range(kv_backend: impl KvBackend) { assert_eq!(1, resp.deleted); assert!(resp.prev_kvs.is_empty()); - let resp = kv_backend.get(b"key2").await.unwrap(); + let resp = kv_backend.get(&key2).await.unwrap(); assert!(resp.is_none()); - let key = b"key1".to_vec(); - let range_end = util::get_prefix_end_key(b"key1"); + let key = [prefix.clone(), b"key1".to_vec()].concat(); + let range_end = util::get_prefix_end_key(&key); let req = DeleteRangeRequest { key: key.clone(), @@ -328,34 +402,45 @@ pub async fn test_kv_delete_range(kv_backend: impl KvBackend) { } pub async fn test_kv_batch_delete(kv_backend: impl KvBackend) { - assert!(kv_backend.get(b"key1").await.unwrap().is_some()); - assert!(kv_backend.get(b"key100").await.unwrap().is_none()); + test_kv_batch_delete_with_prefix(kv_backend, vec![]).await; +} + +pub async fn test_kv_batch_delete_with_prefix(kv_backend: impl KvBackend, prefix: Vec) { + let key1 = [prefix.clone(), b"key1".to_vec()].concat(); + let key100 = [prefix.clone(), b"key100".to_vec()].concat(); + assert!(kv_backend.get(&key1).await.unwrap().is_some()); + assert!(kv_backend.get(&key100).await.unwrap().is_none()); let req = BatchDeleteRequest { - keys: vec![b"key1".to_vec(), b"key100".to_vec()], + keys: vec![key1.clone(), key100.clone()], prev_kv: true, }; let resp = kv_backend.batch_delete(req).await.unwrap(); assert_eq!(1, resp.prev_kvs.len()); assert_eq!( vec![KeyValue { - key: b"key1".to_vec(), + key: key1.clone(), value: b"val1".to_vec() }], resp.prev_kvs ); - assert!(kv_backend.get(b"key1").await.unwrap().is_none()); + assert!(kv_backend.get(&key1).await.unwrap().is_none()); - assert!(kv_backend.get(b"key2").await.unwrap().is_some()); - assert!(kv_backend.get(b"key3").await.unwrap().is_some()); + let key2 = [prefix.clone(), b"key2".to_vec()].concat(); + let key3 = [prefix.clone(), b"key3".to_vec()].concat(); + let key11 = [prefix.clone(), b"key11".to_vec()].concat(); + assert!(kv_backend.get(&key2).await.unwrap().is_some()); + assert!(kv_backend.get(&key3).await.unwrap().is_some()); + assert!(kv_backend.get(&key11).await.unwrap().is_some()); let req = BatchDeleteRequest { - keys: vec![b"key2".to_vec(), b"key3".to_vec()], + keys: vec![key2.clone(), key3.clone(), key11.clone()], prev_kv: false, }; let resp = kv_backend.batch_delete(req).await.unwrap(); assert!(resp.prev_kvs.is_empty()); - assert!(kv_backend.get(b"key2").await.unwrap().is_none()); - assert!(kv_backend.get(b"key3").await.unwrap().is_none()); + assert!(kv_backend.get(&key2).await.unwrap().is_none()); + assert!(kv_backend.get(&key3).await.unwrap().is_none()); + assert!(kv_backend.get(&key11).await.unwrap().is_none()); } diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 851c3f94f0b9..f3c26b50df13 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -40,6 +40,7 @@ tokio.workspace = true [dev-dependencies] common-meta = { workspace = true, features = ["testing"] } common-test-util.workspace = true +common-wal = { workspace = true, features = ["testing"] } itertools.workspace = true rand.workspace = true rand_distr = "0.4" diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 86d6ae39d759..b0abc76095f4 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -675,7 +675,7 @@ mod tests { let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); prepare_kv(&backend).await; - test_kv_range(backend).await; + test_kv_range(&backend).await; } #[tokio::test] @@ -692,7 +692,7 @@ mod tests { let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); prepare_kv(&backend).await; - test_kv_put(backend).await; + test_kv_put(&backend).await; } #[tokio::test] @@ -701,7 +701,7 @@ mod tests { let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); prepare_kv(&backend).await; - test_kv_batch_get(backend).await; + test_kv_batch_get(&backend).await; } #[tokio::test]