From 3882beeb8223e226f8e172ed46979125abcde4ac Mon Sep 17 00:00:00 2001 From: taobo Date: Thu, 11 Apr 2024 23:23:50 +0800 Subject: [PATCH 1/6] test: add etcd --- src/common/meta/src/kv_backend/etcd.rs | 78 ++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 56323d12f8c3..d9551718ad2d 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -626,4 +626,82 @@ mod tests { assert_eq!(b"test_key".to_vec(), delete.key); let _ = delete.options.unwrap(); } + + use crate::kv_backend::test::{ + prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, + test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, + }; + + async fn build_kv_backend() -> EtcdStore { + let endpoints = std::env::var("GT_ETCD_ENDPOINTS").unwrap_or_default(); + let endpoints = endpoints + .split(',') + .map(|s| s.to_string()) + .collect::>(); + println!("etcd endpoints: {:?}", endpoints); + + let client = Client::connect(endpoints, None) + .await + .expect("malformed endpoints"); + EtcdStore { + client, + max_txn_ops: 128, + } + } + + async fn mock_etcd_store_with_data() -> EtcdStore { + let kv_backend = build_kv_backend().await; + prepare_kv(&kv_backend).await; + + kv_backend + } + + #[tokio::test] + async fn test_put() { + let kv_backend = mock_etcd_store_with_data().await; + + test_kv_put(kv_backend).await; + } + + #[tokio::test] + async fn test_range() { + let kv_backend = mock_etcd_store_with_data().await; + + test_kv_range(kv_backend).await; + } + + #[tokio::test] + async fn test_range_2() { + let kv = build_kv_backend().await; + + test_kv_range_2(kv).await; + } + + #[tokio::test] + async fn test_batch_get() { + let kv_backend = mock_etcd_store_with_data().await; + + test_kv_batch_get(kv_backend).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_compare_and_put() { + let kv_backend = Arc::new(build_kv_backend().await); + + test_kv_compare_and_put(kv_backend).await; + } + + #[tokio::test] + async fn test_delete_range() { + let kv_backend = mock_etcd_store_with_data().await; + + test_kv_delete_range(kv_backend).await; + } + + #[tokio::test] + async fn test_batch_delete() { + let kv_backend = mock_etcd_store_with_data().await; + + test_kv_batch_delete(kv_backend).await; + } } From 50b3d7ddf9439608b2fef70b9fc765d2be3fffb0 Mon Sep 17 00:00:00 2001 From: taobo Date: Fri, 12 Apr 2024 01:42:51 +0800 Subject: [PATCH 2/6] optimize code --- src/catalog/Cargo.toml | 2 +- src/common/meta/src/kv_backend/etcd.rs | 34 +++++++++++++----------- src/common/meta/src/kv_backend/memory.rs | 4 +-- src/common/meta/src/kv_backend/test.rs | 23 ++++++++++++---- src/log-store/Cargo.toml | 3 ++- src/log-store/src/raft_engine/backend.rs | 4 +-- 6 files changed, 43 insertions(+), 27 deletions(-) diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index ff57e0cb0add..962e62304d45 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -12,8 +12,8 @@ workspace = true [dependencies] api.workspace = true -arrow.workspace = true arrow-schema.workspace = true +arrow.workspace = true async-stream.workspace = true async-trait = "0.1" common-catalog.workspace = true diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index d9551718ad2d..766eebf1db03 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -629,7 +629,7 @@ mod tests { use crate::kv_backend::test::{ prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, - test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, + test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, unprepare_kv, }; async fn build_kv_backend() -> EtcdStore { @@ -638,7 +638,6 @@ mod tests { .split(',') .map(|s| s.to_string()) .collect::>(); - println!("etcd endpoints: {:?}", endpoints); let client = Client::connect(endpoints, None) .await @@ -649,25 +648,22 @@ mod tests { } } - async fn mock_etcd_store_with_data() -> EtcdStore { - let kv_backend = build_kv_backend().await; - prepare_kv(&kv_backend).await; - - kv_backend - } - #[tokio::test] async fn test_put() { - let kv_backend = mock_etcd_store_with_data().await; + let kv_backend = build_kv_backend().await; - test_kv_put(kv_backend).await; + prepare_kv(&kv_backend).await; + test_kv_put(&kv_backend).await; + unprepare_kv(&kv_backend).await; } #[tokio::test] async fn test_range() { - let kv_backend = mock_etcd_store_with_data().await; + let kv_backend = build_kv_backend().await; - test_kv_range(kv_backend).await; + prepare_kv(&kv_backend).await; + test_kv_range(&kv_backend).await; + unprepare_kv(&kv_backend).await; } #[tokio::test] @@ -679,7 +675,9 @@ mod tests { #[tokio::test] async fn test_batch_get() { - let kv_backend = mock_etcd_store_with_data().await; + let kv_backend = build_kv_backend().await; + + prepare_kv(&kv_backend).await; test_kv_batch_get(kv_backend).await; } @@ -693,14 +691,18 @@ mod tests { #[tokio::test] async fn test_delete_range() { - let kv_backend = mock_etcd_store_with_data().await; + let kv_backend = build_kv_backend().await; + + prepare_kv(&kv_backend).await; test_kv_delete_range(kv_backend).await; } #[tokio::test] async fn test_batch_delete() { - let kv_backend = mock_etcd_store_with_data().await; + let kv_backend = build_kv_backend().await; + + prepare_kv(&kv_backend).await; test_kv_batch_delete(kv_backend).await; } diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index bb8cf2d95d1e..48d0d39863e2 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -357,14 +357,14 @@ mod tests { async fn test_put() { let kv_backend = mock_mem_store_with_data().await; - test_kv_put(kv_backend).await; + test_kv_put(&kv_backend).await; } #[tokio::test] async fn test_range() { let kv_backend = mock_mem_store_with_data().await; - test_kv_range(kv_backend).await; + test_kv_range(&kv_backend).await; } #[tokio::test] diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs index b079a3dc2d52..6aa8f3cfd117 100644 --- a/src/common/meta/src/kv_backend/test.rs +++ b/src/common/meta/src/kv_backend/test.rs @@ -35,6 +35,10 @@ pub fn mock_kvs() -> Vec { key: b"key3".to_vec(), value: b"val3".to_vec(), }, + KeyValue { + key: b"key11".to_vec(), + value: b"val11".to_vec(), + }, ] } @@ -47,18 +51,20 @@ pub async fn prepare_kv(kv_backend: &impl KvBackend) { }) .await .is_ok()); +} +pub async fn unprepare_kv(kv_backend: &impl KvBackend) { + let keys = mock_kvs().iter().map(|kv| kv.key.clone()).collect(); assert!(kv_backend - .put(PutRequest { - key: b"key11".to_vec(), - value: b"val11".to_vec(), + .batch_delete(BatchDeleteRequest { + keys, ..Default::default() }) .await .is_ok()); } -pub async fn test_kv_put(kv_backend: impl KvBackend) { +pub async fn test_kv_put(kv_backend: &impl KvBackend) { let resp = kv_backend .put(PutRequest { key: b"key11".to_vec(), @@ -82,7 +88,7 @@ pub async fn test_kv_put(kv_backend: impl KvBackend) { assert_eq!(b"val12", prev_kv.value()); } -pub async fn test_kv_range(kv_backend: impl KvBackend) { +pub async fn test_kv_range(kv_backend: &impl KvBackend) { let key = b"key1".to_vec(); let range_end = util::get_prefix_end_key(b"key1"); @@ -218,6 +224,13 @@ pub async fn test_kv_range_2(kv_backend: impl KvBackend) { .unwrap(); assert_eq!(result.kvs.len(), 2); assert!(!result.more); + + let req = BatchDeleteRequest { + keys: vec![b"atest".to_vec(), b"test".to_vec()], + prev_kv: false, + }; + let resp = kv_backend.batch_delete(req).await.unwrap(); + assert!(resp.prev_kvs.is_empty()); } pub async fn test_kv_batch_get(kv_backend: impl KvBackend) { diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 851c3f94f0b9..be66a8281977 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -24,8 +24,8 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true -futures.workspace = true futures-util.workspace = true +futures.workspace = true lazy_static.workspace = true prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } @@ -40,6 +40,7 @@ tokio.workspace = true [dev-dependencies] common-meta = { workspace = true, features = ["testing"] } common-test-util.workspace = true +common-wal = { workspace = true, features = ["testing"] } itertools.workspace = true rand.workspace = true rand_distr = "0.4" diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 86d6ae39d759..55c430001389 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -675,7 +675,7 @@ mod tests { let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); prepare_kv(&backend).await; - test_kv_range(backend).await; + test_kv_range(&backend).await; } #[tokio::test] @@ -692,7 +692,7 @@ mod tests { let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); prepare_kv(&backend).await; - test_kv_put(backend).await; + test_kv_put(&backend).await; } #[tokio::test] From b7dbe5ddd560c7f2b52418a419e44969028c3e2a Mon Sep 17 00:00:00 2001 From: taobo Date: Fri, 12 Apr 2024 20:02:24 +0800 Subject: [PATCH 3/6] test: add etcd tests --- src/common/meta/src/kv_backend/etcd.rs | 105 ++++++++----- src/common/meta/src/kv_backend/memory.rs | 2 +- src/common/meta/src/kv_backend/test.rs | 182 ++++++++++++++++------- src/log-store/src/raft_engine/backend.rs | 2 +- 4 files changed, 198 insertions(+), 93 deletions(-) diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 766eebf1db03..5a726c17bff1 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -628,82 +628,115 @@ mod tests { } use crate::kv_backend::test::{ - prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, - test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, unprepare_kv, + prepare_kv_with_perfix, test_kv_batch_delete_with_perfix, test_kv_batch_get_with_perfix, + test_kv_compare_and_put_with_perfix, test_kv_delete_range_with_perfix, + test_kv_put_with_perfix, test_kv_range_2_with_perfix, test_kv_range_with_perfix, + unprepare_kv, }; - async fn build_kv_backend() -> EtcdStore { + async fn build_kv_backend() -> Option { let endpoints = std::env::var("GT_ETCD_ENDPOINTS").unwrap_or_default(); + if endpoints.is_empty() { + return None; + } let endpoints = endpoints .split(',') .map(|s| s.to_string()) .collect::>(); + std::panic::set_hook(Box::new(|_| { + println!("panic: malformed endpoints"); + })); let client = Client::connect(endpoints, None) .await .expect("malformed endpoints"); - EtcdStore { + Some(EtcdStore { client, max_txn_ops: 128, - } + }) } #[tokio::test] async fn test_put() { - let kv_backend = build_kv_backend().await; - - prepare_kv(&kv_backend).await; - test_kv_put(&kv_backend).await; - unprepare_kv(&kv_backend).await; + match build_kv_backend().await { + Some(kv_backend) => { + let perfix = b"put/"; + prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; + test_kv_put_with_perfix(&kv_backend, perfix.to_vec()).await; + unprepare_kv(&kv_backend, perfix).await; + } + None => {} + } } #[tokio::test] async fn test_range() { - let kv_backend = build_kv_backend().await; - - prepare_kv(&kv_backend).await; - test_kv_range(&kv_backend).await; - unprepare_kv(&kv_backend).await; + match build_kv_backend().await { + Some(kv_backend) => { + let perfix = b"range/"; + prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; + test_kv_range_with_perfix(&kv_backend, perfix.to_vec()).await; + unprepare_kv(&kv_backend, perfix).await; + } + None => {} + } } #[tokio::test] async fn test_range_2() { - let kv = build_kv_backend().await; - - test_kv_range_2(kv).await; + match build_kv_backend().await { + Some(kv_backend) => { + test_kv_range_2_with_perfix(kv_backend, b"range2/".to_vec()).await; + } + None => {} + } } #[tokio::test] async fn test_batch_get() { - let kv_backend = build_kv_backend().await; - - prepare_kv(&kv_backend).await; - - test_kv_batch_get(kv_backend).await; + match build_kv_backend().await { + Some(kv_backend) => { + let perfix = b"batchGet/"; + prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; + test_kv_batch_get_with_perfix(&kv_backend, perfix.to_vec()).await; + unprepare_kv(&kv_backend, perfix).await; + } + None => {} + } } #[tokio::test(flavor = "multi_thread")] async fn test_compare_and_put() { - let kv_backend = Arc::new(build_kv_backend().await); - - test_kv_compare_and_put(kv_backend).await; + match build_kv_backend().await { + Some(kv_backend) => { + let kv_backend = Arc::new(kv_backend); + test_kv_compare_and_put_with_perfix(kv_backend, b"compareAndPut/".to_vec()).await; + } + None => {} + } } #[tokio::test] async fn test_delete_range() { - let kv_backend = build_kv_backend().await; - - prepare_kv(&kv_backend).await; - - test_kv_delete_range(kv_backend).await; + match build_kv_backend().await { + Some(kv_backend) => { + let perfix = b"deleteRange/"; + prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; + test_kv_delete_range_with_perfix(kv_backend, perfix.to_vec()).await; + } + None => {} + } } #[tokio::test] async fn test_batch_delete() { - let kv_backend = build_kv_backend().await; - - prepare_kv(&kv_backend).await; - - test_kv_batch_delete(kv_backend).await; + match build_kv_backend().await { + Some(kv_backend) => { + let perfix = b"batchDelete/"; + prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; + test_kv_batch_delete_with_perfix(kv_backend, perfix.to_vec()).await; + } + None => {} + } } } diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 48d0d39863e2..6c95bb646991 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -378,7 +378,7 @@ mod tests { async fn test_batch_get() { let kv_backend = mock_mem_store_with_data().await; - test_kv_batch_get(kv_backend).await; + test_kv_batch_get(&kv_backend).await; } #[tokio::test(flavor = "multi_thread")] diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs index 6aa8f3cfd117..bbc0b182abe6 100644 --- a/src/common/meta/src/kv_backend/test.rs +++ b/src/common/meta/src/kv_backend/test.rs @@ -21,29 +21,33 @@ use crate::rpc::store::{BatchGetRequest, PutRequest}; use crate::rpc::KeyValue; use crate::util; -pub fn mock_kvs() -> Vec { +pub fn mock_kvs(perfix: Vec) -> Vec { vec![ KeyValue { - key: b"key1".to_vec(), + key: [perfix.clone(), b"key1".to_vec()].concat(), value: b"val1".to_vec(), }, KeyValue { - key: b"key2".to_vec(), + key: [perfix.clone(), b"key2".to_vec()].concat(), value: b"val2".to_vec(), }, KeyValue { - key: b"key3".to_vec(), + key: [perfix.clone(), b"key3".to_vec()].concat(), value: b"val3".to_vec(), }, KeyValue { - key: b"key11".to_vec(), + key: [perfix.clone(), b"key11".to_vec()].concat(), value: b"val11".to_vec(), }, ] } pub async fn prepare_kv(kv_backend: &impl KvBackend) { - let kvs = mock_kvs(); + prepare_kv_with_perfix(kv_backend, vec![]).await; +} + +pub async fn prepare_kv_with_perfix(kv_backend: &impl KvBackend, perfix: Vec) { + let kvs = mock_kvs(perfix); assert!(kv_backend .batch_put(BatchPutRequest { kvs, @@ -53,11 +57,12 @@ pub async fn prepare_kv(kv_backend: &impl KvBackend) { .is_ok()); } -pub async fn unprepare_kv(kv_backend: &impl KvBackend) { - let keys = mock_kvs().iter().map(|kv| kv.key.clone()).collect(); +pub async fn unprepare_kv(kv_backend: &impl KvBackend, perfix: &[u8]) { + let range_end = util::get_prefix_end_key(perfix); assert!(kv_backend - .batch_delete(BatchDeleteRequest { - keys, + .delete_range(DeleteRangeRequest { + key: perfix.to_vec(), + range_end, ..Default::default() }) .await @@ -65,9 +70,14 @@ pub async fn unprepare_kv(kv_backend: &impl KvBackend) { } pub async fn test_kv_put(kv_backend: &impl KvBackend) { + test_kv_put_with_perfix(kv_backend, vec![]).await; +} + +pub async fn test_kv_put_with_perfix(kv_backend: &impl KvBackend, perfix: Vec) { + let put_key = [perfix.clone(), b"key11".to_vec()].concat(); let resp = kv_backend .put(PutRequest { - key: b"key11".to_vec(), + key: put_key.clone(), value: b"val12".to_vec(), prev_kv: false, }) @@ -77,20 +87,25 @@ pub async fn test_kv_put(kv_backend: &impl KvBackend) { let resp = kv_backend .put(PutRequest { - key: b"key11".to_vec(), + key: put_key.clone(), value: b"val13".to_vec(), prev_kv: true, }) .await .unwrap(); let prev_kv = resp.prev_kv.unwrap(); - assert_eq!(b"key11", prev_kv.key()); + assert_eq!(put_key, prev_kv.key()); assert_eq!(b"val12", prev_kv.value()); } pub async fn test_kv_range(kv_backend: &impl KvBackend) { - let key = b"key1".to_vec(); - let range_end = util::get_prefix_end_key(b"key1"); + test_kv_range_with_perfix(kv_backend, vec![]).await; +} + +pub async fn test_kv_range_with_perfix(kv_backend: &impl KvBackend, perfix: Vec) { + let key = [perfix.clone(), b"key1".to_vec()].concat(); + let key11 = [perfix.clone(), b"key11".to_vec()].concat(); + let range_end = util::get_prefix_end_key(&key); let resp = kv_backend .range(RangeRequest { @@ -103,9 +118,9 @@ pub async fn test_kv_range(kv_backend: &impl KvBackend) { .unwrap(); assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key, resp.kvs[0].key); assert_eq!(b"val1", resp.kvs[0].value()); - assert_eq!(b"key11", resp.kvs[1].key()); + assert_eq!(key11, resp.kvs[1].key); assert_eq!(b"val11", resp.kvs[1].value()); let resp = kv_backend @@ -119,9 +134,9 @@ pub async fn test_kv_range(kv_backend: &impl KvBackend) { .unwrap(); assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key, resp.kvs[0].key); assert_eq!(b"", resp.kvs[0].value()); - assert_eq!(b"key11", resp.kvs[1].key()); + assert_eq!(key11, resp.kvs[1].key); assert_eq!(b"", resp.kvs[1].value()); let resp = kv_backend @@ -135,12 +150,12 @@ pub async fn test_kv_range(kv_backend: &impl KvBackend) { .unwrap(); assert_eq!(1, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key, resp.kvs[0].key); assert_eq!(b"val1", resp.kvs[0].value()); let resp = kv_backend .range(RangeRequest { - key, + key: key.clone(), range_end, limit: 1, keys_only: false, @@ -149,24 +164,41 @@ pub async fn test_kv_range(kv_backend: &impl KvBackend) { .unwrap(); assert_eq!(1, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key, resp.kvs[0].key); assert_eq!(b"val1", resp.kvs[0].value()); } pub async fn test_kv_range_2(kv_backend: impl KvBackend) { + test_kv_range_2_with_perfix(kv_backend, vec![]).await; +} + +pub async fn test_kv_range_2_with_perfix(kv_backend: impl KvBackend, perfix: Vec) { + let atest = [perfix.clone(), b"atest".to_vec()].concat(); + let test = [perfix.clone(), b"test".to_vec()].concat(); + kv_backend - .put(PutRequest::new().with_key("atest").with_value("value")) + .put( + PutRequest::new() + .with_key(atest.clone()) + .with_value("value"), + ) .await .unwrap(); kv_backend - .put(PutRequest::new().with_key("test").with_value("value")) + .put(PutRequest::new().with_key(test.clone()).with_value("value")) .await .unwrap(); // If both key and range_end are ‘\0’, then range represents all keys. + let all_start = [perfix.clone(), b"\0".to_vec()].concat(); + let all_end = if perfix.is_empty() { + b"\0".to_vec() + } else { + util::get_prefix_end_key(&perfix) + }; let result = kv_backend - .range(RangeRequest::new().with_range(b"\0".to_vec(), b"\0".to_vec())) + .range(RangeRequest::new().with_range(all_start, all_end.clone())) .await .unwrap(); @@ -174,26 +206,28 @@ pub async fn test_kv_range_2(kv_backend: impl KvBackend) { assert!(!result.more); // If range_end is ‘\0’, the range is all keys greater than or equal to the key argument. + let a_start = [perfix.clone(), b"a".to_vec()].concat(); let result = kv_backend - .range(RangeRequest::new().with_range(b"a".to_vec(), b"\0".to_vec())) + .range(RangeRequest::new().with_range(a_start.clone(), all_end.clone())) .await .unwrap(); assert_eq!(result.kvs.len(), 2); + let b_start = [perfix.clone(), b"b".to_vec()].concat(); let result = kv_backend - .range(RangeRequest::new().with_range(b"b".to_vec(), b"\0".to_vec())) + .range(RangeRequest::new().with_range(b_start, all_end.clone())) .await .unwrap(); assert_eq!(result.kvs.len(), 1); - assert_eq!(result.kvs[0].key, b"test"); + assert_eq!(result.kvs[0].key, test); // Fetches the keys >= "a", set limit to 1, the `more` should be true. let result = kv_backend .range( RangeRequest::new() - .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_range(a_start.clone(), all_end.clone()) .with_limit(1), ) .await @@ -205,7 +239,7 @@ pub async fn test_kv_range_2(kv_backend: impl KvBackend) { let result = kv_backend .range( RangeRequest::new() - .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_range(a_start.clone(), all_end.clone()) .with_limit(2), ) .await @@ -217,7 +251,7 @@ pub async fn test_kv_range_2(kv_backend: impl KvBackend) { let result = kv_backend .range( RangeRequest::new() - .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_range(a_start.clone(), all_end.clone()) .with_limit(3), ) .await @@ -226,14 +260,18 @@ pub async fn test_kv_range_2(kv_backend: impl KvBackend) { assert!(!result.more); let req = BatchDeleteRequest { - keys: vec![b"atest".to_vec(), b"test".to_vec()], + keys: vec![atest, test], prev_kv: false, }; let resp = kv_backend.batch_delete(req).await.unwrap(); assert!(resp.prev_kvs.is_empty()); } -pub async fn test_kv_batch_get(kv_backend: impl KvBackend) { +pub async fn test_kv_batch_get(kv_backend: &impl KvBackend) { + test_kv_batch_get_with_perfix(kv_backend, vec![]).await; +} + +pub async fn test_kv_batch_get_with_perfix(kv_backend: &impl KvBackend, perfix: Vec) { let keys = vec![]; let resp = kv_backend .batch_get(BatchGetRequest { keys }) @@ -242,7 +280,8 @@ pub async fn test_kv_batch_get(kv_backend: impl KvBackend) { assert!(resp.kvs.is_empty()); - let keys = vec![b"key10".to_vec()]; + let key10 = [perfix.clone(), b"key10".to_vec()].concat(); + let keys = vec![key10]; let resp = kv_backend .batch_get(BatchGetRequest { keys }) .await @@ -250,29 +289,42 @@ pub async fn test_kv_batch_get(kv_backend: impl KvBackend) { assert!(resp.kvs.is_empty()); - let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()]; + let key1 = [perfix.clone(), b"key1".to_vec()].concat(); + let key3 = [perfix.clone(), b"key3".to_vec()].concat(); + let key4 = [perfix.clone(), b"key4".to_vec()].concat(); + let keys = vec![key1.clone(), key3.clone(), key4]; let resp = kv_backend .batch_get(BatchGetRequest { keys }) .await .unwrap(); assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(key1, resp.kvs[0].key); assert_eq!(b"val1", resp.kvs[0].value()); - assert_eq!(b"key3", resp.kvs[1].key()); + assert_eq!(key3, resp.kvs[1].key); assert_eq!(b"val3", resp.kvs[1].value()); } pub async fn test_kv_compare_and_put(kv_backend: Arc>) { + test_kv_compare_and_put_with_perfix(kv_backend, vec![]).await; +} + +pub async fn test_kv_compare_and_put_with_perfix( + kv_backend: Arc>, + perfix: Vec, +) { let success = Arc::new(AtomicU8::new(0)); + let key = [perfix.clone(), b"key".to_vec()].concat(); let mut joins = vec![]; for _ in 0..20 { let kv_backend_clone = kv_backend.clone(); let success_clone = success.clone(); + let key_clone = key.clone(); + let join = tokio::spawn(async move { let req = CompareAndPutRequest { - key: b"key".to_vec(), + key: key_clone, expect: vec![], value: b"val_new".to_vec(), }; @@ -289,11 +341,19 @@ pub async fn test_kv_compare_and_put(kv_backend: Arc) { + let key3 = [perfix.clone(), b"key3".to_vec()].concat(); let req = DeleteRangeRequest { - key: b"key3".to_vec(), + key: key3.clone(), range_end: vec![], prev_kv: true, }; @@ -301,14 +361,15 @@ pub async fn test_kv_delete_range(kv_backend: impl KvBackend) { let resp = kv_backend.delete_range(req).await.unwrap(); assert_eq!(1, resp.prev_kvs.len()); assert_eq!(1, resp.deleted); - assert_eq!(b"key3", resp.prev_kvs[0].key()); + assert_eq!(key3, resp.prev_kvs[0].key); assert_eq!(b"val3", resp.prev_kvs[0].value()); - let resp = kv_backend.get(b"key3").await.unwrap(); + let resp = kv_backend.get(&key3).await.unwrap(); assert!(resp.is_none()); + let key2 = [perfix.clone(), b"key2".to_vec()].concat(); let req = DeleteRangeRequest { - key: b"key2".to_vec(), + key: key2.clone(), range_end: vec![], prev_kv: false, }; @@ -317,11 +378,11 @@ pub async fn test_kv_delete_range(kv_backend: impl KvBackend) { assert_eq!(1, resp.deleted); assert!(resp.prev_kvs.is_empty()); - let resp = kv_backend.get(b"key2").await.unwrap(); + let resp = kv_backend.get(&key2).await.unwrap(); assert!(resp.is_none()); - let key = b"key1".to_vec(); - let range_end = util::get_prefix_end_key(b"key1"); + let key = [perfix.clone(), b"key1".to_vec()].concat(); + let range_end = util::get_prefix_end_key(&key); let req = DeleteRangeRequest { key: key.clone(), @@ -341,34 +402,45 @@ pub async fn test_kv_delete_range(kv_backend: impl KvBackend) { } pub async fn test_kv_batch_delete(kv_backend: impl KvBackend) { - assert!(kv_backend.get(b"key1").await.unwrap().is_some()); - assert!(kv_backend.get(b"key100").await.unwrap().is_none()); + test_kv_batch_delete_with_perfix(kv_backend, vec![]).await; +} + +pub async fn test_kv_batch_delete_with_perfix(kv_backend: impl KvBackend, perfix: Vec) { + let key1 = [perfix.clone(), b"key1".to_vec()].concat(); + let key100 = [perfix.clone(), b"key100".to_vec()].concat(); + assert!(kv_backend.get(&key1).await.unwrap().is_some()); + assert!(kv_backend.get(&key100).await.unwrap().is_none()); let req = BatchDeleteRequest { - keys: vec![b"key1".to_vec(), b"key100".to_vec()], + keys: vec![key1.clone(), key100.clone()], prev_kv: true, }; let resp = kv_backend.batch_delete(req).await.unwrap(); assert_eq!(1, resp.prev_kvs.len()); assert_eq!( vec![KeyValue { - key: b"key1".to_vec(), + key: key1.clone(), value: b"val1".to_vec() }], resp.prev_kvs ); - assert!(kv_backend.get(b"key1").await.unwrap().is_none()); + assert!(kv_backend.get(&key1).await.unwrap().is_none()); - assert!(kv_backend.get(b"key2").await.unwrap().is_some()); - assert!(kv_backend.get(b"key3").await.unwrap().is_some()); + let key2 = [perfix.clone(), b"key2".to_vec()].concat(); + let key3 = [perfix.clone(), b"key3".to_vec()].concat(); + let key11 = [perfix.clone(), b"key11".to_vec()].concat(); + assert!(kv_backend.get(&key2).await.unwrap().is_some()); + assert!(kv_backend.get(&key3).await.unwrap().is_some()); + assert!(kv_backend.get(&key11).await.unwrap().is_some()); let req = BatchDeleteRequest { - keys: vec![b"key2".to_vec(), b"key3".to_vec()], + keys: vec![key2.clone(), key3.clone(), key11.clone()], prev_kv: false, }; let resp = kv_backend.batch_delete(req).await.unwrap(); assert!(resp.prev_kvs.is_empty()); - assert!(kv_backend.get(b"key2").await.unwrap().is_none()); - assert!(kv_backend.get(b"key3").await.unwrap().is_none()); + assert!(kv_backend.get(&key2).await.unwrap().is_none()); + assert!(kv_backend.get(&key3).await.unwrap().is_none()); + assert!(kv_backend.get(&key11).await.unwrap().is_none()); } diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 55c430001389..b0abc76095f4 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -701,7 +701,7 @@ mod tests { let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); prepare_kv(&backend).await; - test_kv_batch_get(backend).await; + test_kv_batch_get(&backend).await; } #[tokio::test] From 543f4bbcc52a1396e42c792c60e0cdce7540d940 Mon Sep 17 00:00:00 2001 From: taobo Date: Fri, 12 Apr 2024 20:21:29 +0800 Subject: [PATCH 4/6] fix: typos --- src/common/meta/src/kv_backend/etcd.rs | 46 ++++++------ src/common/meta/src/kv_backend/test.rs | 98 +++++++++++++------------- 2 files changed, 72 insertions(+), 72 deletions(-) diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 5a726c17bff1..23f9d7ea9824 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -628,9 +628,9 @@ mod tests { } use crate::kv_backend::test::{ - prepare_kv_with_perfix, test_kv_batch_delete_with_perfix, test_kv_batch_get_with_perfix, - test_kv_compare_and_put_with_perfix, test_kv_delete_range_with_perfix, - test_kv_put_with_perfix, test_kv_range_2_with_perfix, test_kv_range_with_perfix, + prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix, + test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix, + test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix, unprepare_kv, }; @@ -660,10 +660,10 @@ mod tests { async fn test_put() { match build_kv_backend().await { Some(kv_backend) => { - let perfix = b"put/"; - prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; - test_kv_put_with_perfix(&kv_backend, perfix.to_vec()).await; - unprepare_kv(&kv_backend, perfix).await; + let prefix = b"put/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } None => {} } @@ -673,10 +673,10 @@ mod tests { async fn test_range() { match build_kv_backend().await { Some(kv_backend) => { - let perfix = b"range/"; - prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; - test_kv_range_with_perfix(&kv_backend, perfix.to_vec()).await; - unprepare_kv(&kv_backend, perfix).await; + let prefix = b"range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } None => {} } @@ -686,7 +686,7 @@ mod tests { async fn test_range_2() { match build_kv_backend().await { Some(kv_backend) => { - test_kv_range_2_with_perfix(kv_backend, b"range2/".to_vec()).await; + test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; } None => {} } @@ -696,10 +696,10 @@ mod tests { async fn test_batch_get() { match build_kv_backend().await { Some(kv_backend) => { - let perfix = b"batchGet/"; - prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; - test_kv_batch_get_with_perfix(&kv_backend, perfix.to_vec()).await; - unprepare_kv(&kv_backend, perfix).await; + let prefix = b"batchGet/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } None => {} } @@ -710,7 +710,7 @@ mod tests { match build_kv_backend().await { Some(kv_backend) => { let kv_backend = Arc::new(kv_backend); - test_kv_compare_and_put_with_perfix(kv_backend, b"compareAndPut/".to_vec()).await; + test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await; } None => {} } @@ -720,9 +720,9 @@ mod tests { async fn test_delete_range() { match build_kv_backend().await { Some(kv_backend) => { - let perfix = b"deleteRange/"; - prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; - test_kv_delete_range_with_perfix(kv_backend, perfix.to_vec()).await; + let prefix = b"deleteRange/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; } None => {} } @@ -732,9 +732,9 @@ mod tests { async fn test_batch_delete() { match build_kv_backend().await { Some(kv_backend) => { - let perfix = b"batchDelete/"; - prepare_kv_with_perfix(&kv_backend, perfix.to_vec()).await; - test_kv_batch_delete_with_perfix(kv_backend, perfix.to_vec()).await; + let prefix = b"batchDelete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; } None => {} } diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs index bbc0b182abe6..2f0216dfdfcb 100644 --- a/src/common/meta/src/kv_backend/test.rs +++ b/src/common/meta/src/kv_backend/test.rs @@ -21,33 +21,33 @@ use crate::rpc::store::{BatchGetRequest, PutRequest}; use crate::rpc::KeyValue; use crate::util; -pub fn mock_kvs(perfix: Vec) -> Vec { +pub fn mock_kvs(prefix: Vec) -> Vec { vec![ KeyValue { - key: [perfix.clone(), b"key1".to_vec()].concat(), + key: [prefix.clone(), b"key1".to_vec()].concat(), value: b"val1".to_vec(), }, KeyValue { - key: [perfix.clone(), b"key2".to_vec()].concat(), + key: [prefix.clone(), b"key2".to_vec()].concat(), value: b"val2".to_vec(), }, KeyValue { - key: [perfix.clone(), b"key3".to_vec()].concat(), + key: [prefix.clone(), b"key3".to_vec()].concat(), value: b"val3".to_vec(), }, KeyValue { - key: [perfix.clone(), b"key11".to_vec()].concat(), + key: [prefix.clone(), b"key11".to_vec()].concat(), value: b"val11".to_vec(), }, ] } pub async fn prepare_kv(kv_backend: &impl KvBackend) { - prepare_kv_with_perfix(kv_backend, vec![]).await; + prepare_kv_with_prefix(kv_backend, vec![]).await; } -pub async fn prepare_kv_with_perfix(kv_backend: &impl KvBackend, perfix: Vec) { - let kvs = mock_kvs(perfix); +pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { + let kvs = mock_kvs(prefix); assert!(kv_backend .batch_put(BatchPutRequest { kvs, @@ -57,11 +57,11 @@ pub async fn prepare_kv_with_perfix(kv_backend: &impl KvBackend, perfix: Vec .is_ok()); } -pub async fn unprepare_kv(kv_backend: &impl KvBackend, perfix: &[u8]) { - let range_end = util::get_prefix_end_key(perfix); +pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) { + let range_end = util::get_prefix_end_key(prefix); assert!(kv_backend .delete_range(DeleteRangeRequest { - key: perfix.to_vec(), + key: prefix.to_vec(), range_end, ..Default::default() }) @@ -70,11 +70,11 @@ pub async fn unprepare_kv(kv_backend: &impl KvBackend, perfix: &[u8]) { } pub async fn test_kv_put(kv_backend: &impl KvBackend) { - test_kv_put_with_perfix(kv_backend, vec![]).await; + test_kv_put_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_put_with_perfix(kv_backend: &impl KvBackend, perfix: Vec) { - let put_key = [perfix.clone(), b"key11".to_vec()].concat(); +pub async fn test_kv_put_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { + let put_key = [prefix.clone(), b"key11".to_vec()].concat(); let resp = kv_backend .put(PutRequest { key: put_key.clone(), @@ -99,12 +99,12 @@ pub async fn test_kv_put_with_perfix(kv_backend: &impl KvBackend, perfix: Vec) { - let key = [perfix.clone(), b"key1".to_vec()].concat(); - let key11 = [perfix.clone(), b"key11".to_vec()].concat(); +pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { + let key = [prefix.clone(), b"key1".to_vec()].concat(); + let key11 = [prefix.clone(), b"key11".to_vec()].concat(); let range_end = util::get_prefix_end_key(&key); let resp = kv_backend @@ -169,12 +169,12 @@ pub async fn test_kv_range_with_perfix(kv_backend: &impl KvBackend, perfix: Vec< } pub async fn test_kv_range_2(kv_backend: impl KvBackend) { - test_kv_range_2_with_perfix(kv_backend, vec![]).await; + test_kv_range_2_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_range_2_with_perfix(kv_backend: impl KvBackend, perfix: Vec) { - let atest = [perfix.clone(), b"atest".to_vec()].concat(); - let test = [perfix.clone(), b"test".to_vec()].concat(); +pub async fn test_kv_range_2_with_prefix(kv_backend: impl KvBackend, prefix: Vec) { + let atest = [prefix.clone(), b"atest".to_vec()].concat(); + let test = [prefix.clone(), b"test".to_vec()].concat(); kv_backend .put( @@ -191,11 +191,11 @@ pub async fn test_kv_range_2_with_perfix(kv_backend: impl KvBackend, perfix: Vec .unwrap(); // If both key and range_end are ‘\0’, then range represents all keys. - let all_start = [perfix.clone(), b"\0".to_vec()].concat(); - let all_end = if perfix.is_empty() { + let all_start = [prefix.clone(), b"\0".to_vec()].concat(); + let all_end = if prefix.is_empty() { b"\0".to_vec() } else { - util::get_prefix_end_key(&perfix) + util::get_prefix_end_key(&prefix) }; let result = kv_backend .range(RangeRequest::new().with_range(all_start, all_end.clone())) @@ -206,7 +206,7 @@ pub async fn test_kv_range_2_with_perfix(kv_backend: impl KvBackend, perfix: Vec assert!(!result.more); // If range_end is ‘\0’, the range is all keys greater than or equal to the key argument. - let a_start = [perfix.clone(), b"a".to_vec()].concat(); + let a_start = [prefix.clone(), b"a".to_vec()].concat(); let result = kv_backend .range(RangeRequest::new().with_range(a_start.clone(), all_end.clone())) .await @@ -214,7 +214,7 @@ pub async fn test_kv_range_2_with_perfix(kv_backend: impl KvBackend, perfix: Vec assert_eq!(result.kvs.len(), 2); - let b_start = [perfix.clone(), b"b".to_vec()].concat(); + let b_start = [prefix.clone(), b"b".to_vec()].concat(); let result = kv_backend .range(RangeRequest::new().with_range(b_start, all_end.clone())) .await @@ -268,10 +268,10 @@ pub async fn test_kv_range_2_with_perfix(kv_backend: impl KvBackend, perfix: Vec } pub async fn test_kv_batch_get(kv_backend: &impl KvBackend) { - test_kv_batch_get_with_perfix(kv_backend, vec![]).await; + test_kv_batch_get_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_batch_get_with_perfix(kv_backend: &impl KvBackend, perfix: Vec) { +pub async fn test_kv_batch_get_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { let keys = vec![]; let resp = kv_backend .batch_get(BatchGetRequest { keys }) @@ -280,7 +280,7 @@ pub async fn test_kv_batch_get_with_perfix(kv_backend: &impl KvBackend, perfix: assert!(resp.kvs.is_empty()); - let key10 = [perfix.clone(), b"key10".to_vec()].concat(); + let key10 = [prefix.clone(), b"key10".to_vec()].concat(); let keys = vec![key10]; let resp = kv_backend .batch_get(BatchGetRequest { keys }) @@ -289,9 +289,9 @@ pub async fn test_kv_batch_get_with_perfix(kv_backend: &impl KvBackend, perfix: assert!(resp.kvs.is_empty()); - let key1 = [perfix.clone(), b"key1".to_vec()].concat(); - let key3 = [perfix.clone(), b"key3".to_vec()].concat(); - let key4 = [perfix.clone(), b"key4".to_vec()].concat(); + let key1 = [prefix.clone(), b"key1".to_vec()].concat(); + let key3 = [prefix.clone(), b"key3".to_vec()].concat(); + let key4 = [prefix.clone(), b"key4".to_vec()].concat(); let keys = vec![key1.clone(), key3.clone(), key4]; let resp = kv_backend .batch_get(BatchGetRequest { keys }) @@ -306,15 +306,15 @@ pub async fn test_kv_batch_get_with_perfix(kv_backend: &impl KvBackend, perfix: } pub async fn test_kv_compare_and_put(kv_backend: Arc>) { - test_kv_compare_and_put_with_perfix(kv_backend, vec![]).await; + test_kv_compare_and_put_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_compare_and_put_with_perfix( +pub async fn test_kv_compare_and_put_with_prefix( kv_backend: Arc>, - perfix: Vec, + prefix: Vec, ) { let success = Arc::new(AtomicU8::new(0)); - let key = [perfix.clone(), b"key".to_vec()].concat(); + let key = [prefix.clone(), b"key".to_vec()].concat(); let mut joins = vec![]; for _ in 0..20 { @@ -347,11 +347,11 @@ pub async fn test_kv_compare_and_put_with_perfix( } pub async fn test_kv_delete_range(kv_backend: impl KvBackend) { - test_kv_delete_range_with_perfix(kv_backend, vec![]).await; + test_kv_delete_range_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_delete_range_with_perfix(kv_backend: impl KvBackend, perfix: Vec) { - let key3 = [perfix.clone(), b"key3".to_vec()].concat(); +pub async fn test_kv_delete_range_with_prefix(kv_backend: impl KvBackend, prefix: Vec) { + let key3 = [prefix.clone(), b"key3".to_vec()].concat(); let req = DeleteRangeRequest { key: key3.clone(), range_end: vec![], @@ -367,7 +367,7 @@ pub async fn test_kv_delete_range_with_perfix(kv_backend: impl KvBackend, perfix let resp = kv_backend.get(&key3).await.unwrap(); assert!(resp.is_none()); - let key2 = [perfix.clone(), b"key2".to_vec()].concat(); + let key2 = [prefix.clone(), b"key2".to_vec()].concat(); let req = DeleteRangeRequest { key: key2.clone(), range_end: vec![], @@ -381,7 +381,7 @@ pub async fn test_kv_delete_range_with_perfix(kv_backend: impl KvBackend, perfix let resp = kv_backend.get(&key2).await.unwrap(); assert!(resp.is_none()); - let key = [perfix.clone(), b"key1".to_vec()].concat(); + let key = [prefix.clone(), b"key1".to_vec()].concat(); let range_end = util::get_prefix_end_key(&key); let req = DeleteRangeRequest { @@ -402,12 +402,12 @@ pub async fn test_kv_delete_range_with_perfix(kv_backend: impl KvBackend, perfix } pub async fn test_kv_batch_delete(kv_backend: impl KvBackend) { - test_kv_batch_delete_with_perfix(kv_backend, vec![]).await; + test_kv_batch_delete_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_batch_delete_with_perfix(kv_backend: impl KvBackend, perfix: Vec) { - let key1 = [perfix.clone(), b"key1".to_vec()].concat(); - let key100 = [perfix.clone(), b"key100".to_vec()].concat(); +pub async fn test_kv_batch_delete_with_prefix(kv_backend: impl KvBackend, prefix: Vec) { + let key1 = [prefix.clone(), b"key1".to_vec()].concat(); + let key100 = [prefix.clone(), b"key100".to_vec()].concat(); assert!(kv_backend.get(&key1).await.unwrap().is_some()); assert!(kv_backend.get(&key100).await.unwrap().is_none()); @@ -426,9 +426,9 @@ pub async fn test_kv_batch_delete_with_perfix(kv_backend: impl KvBackend, perfix ); assert!(kv_backend.get(&key1).await.unwrap().is_none()); - let key2 = [perfix.clone(), b"key2".to_vec()].concat(); - let key3 = [perfix.clone(), b"key3".to_vec()].concat(); - let key11 = [perfix.clone(), b"key11".to_vec()].concat(); + let key2 = [prefix.clone(), b"key2".to_vec()].concat(); + let key3 = [prefix.clone(), b"key3".to_vec()].concat(); + let key11 = [prefix.clone(), b"key11".to_vec()].concat(); assert!(kv_backend.get(&key2).await.unwrap().is_some()); assert!(kv_backend.get(&key3).await.unwrap().is_some()); assert!(kv_backend.get(&key11).await.unwrap().is_some()); From 71f6ebb63eea7d54259ec21813e71dda60c3c67b Mon Sep 17 00:00:00 2001 From: taobo Date: Fri, 12 Apr 2024 21:23:54 +0800 Subject: [PATCH 5/6] fix: taplo error and clippy --- src/catalog/Cargo.toml | 2 +- src/common/meta/src/kv_backend/etcd.rs | 78 ++++++++++---------------- src/log-store/Cargo.toml | 2 +- 3 files changed, 31 insertions(+), 51 deletions(-) diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 962e62304d45..ff57e0cb0add 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -12,8 +12,8 @@ workspace = true [dependencies] api.workspace = true -arrow-schema.workspace = true arrow.workspace = true +arrow-schema.workspace = true async-stream.workspace = true async-trait = "0.1" common-catalog.workspace = true diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 23f9d7ea9824..963c9d7d6b9a 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -516,6 +516,7 @@ impl TryFrom for Delete { } #[cfg(test)] +#[allow(clippy::print_stdout)] mod tests { use super::*; @@ -658,85 +659,64 @@ mod tests { #[tokio::test] async fn test_put() { - match build_kv_backend().await { - Some(kv_backend) => { - let prefix = b"put/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; - } - None => {} + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"put/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } } #[tokio::test] async fn test_range() { - match build_kv_backend().await { - Some(kv_backend) => { - let prefix = b"range/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; - } - None => {} + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } } #[tokio::test] async fn test_range_2() { - match build_kv_backend().await { - Some(kv_backend) => { - test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; - } - None => {} + if let Some(kv_backend) = build_kv_backend().await { + test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; } } #[tokio::test] async fn test_batch_get() { - match build_kv_backend().await { - Some(kv_backend) => { - let prefix = b"batchGet/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; - } - None => {} + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"batchGet/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } } #[tokio::test(flavor = "multi_thread")] async fn test_compare_and_put() { - match build_kv_backend().await { - Some(kv_backend) => { - let kv_backend = Arc::new(kv_backend); - test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await; - } - None => {} + if let Some(kv_backend) = build_kv_backend().await { + let kv_backend = Arc::new(kv_backend); + test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await; } } #[tokio::test] async fn test_delete_range() { - match build_kv_backend().await { - Some(kv_backend) => { - let prefix = b"deleteRange/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; - } - None => {} + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"deleteRange/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; } } #[tokio::test] async fn test_batch_delete() { - match build_kv_backend().await { - Some(kv_backend) => { - let prefix = b"batchDelete/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; - } - None => {} + if let Some(kv_backend) = build_kv_backend().await { + let prefix = b"batchDelete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; } } } diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index be66a8281977..f3c26b50df13 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -24,8 +24,8 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true -futures-util.workspace = true futures.workspace = true +futures-util.workspace = true lazy_static.workspace = true prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } From 9644cb3aca7b297a04ea2f7783b7ee30008375a9 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 15 Apr 2024 15:36:16 +0800 Subject: [PATCH 6/6] avoid print Signed-off-by: tison --- src/common/meta/src/kv_backend/etcd.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 963c9d7d6b9a..9bd98b826820 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -516,7 +516,6 @@ impl TryFrom for Delete { } #[cfg(test)] -#[allow(clippy::print_stdout)] mod tests { use super::*; @@ -640,17 +639,16 @@ mod tests { if endpoints.is_empty() { return None; } + let endpoints = endpoints .split(',') .map(|s| s.to_string()) .collect::>(); - std::panic::set_hook(Box::new(|_| { - println!("panic: malformed endpoints"); - })); let client = Client::connect(endpoints, None) .await .expect("malformed endpoints"); + Some(EtcdStore { client, max_txn_ops: 128,