diff --git a/xline/src/client/mod.rs b/xline/src/client/mod.rs index 3aa56e813..4ab850f17 100644 --- a/xline/src/client/mod.rs +++ b/xline/src/client/mod.rs @@ -1,8 +1,9 @@ -use std::collections::HashMap; +use std::{collections::HashMap, fmt::Debug}; use curp::{client::Client as CurpClient, cmd::ProposeId}; use etcd_client::{ - AuthClient, Client as EtcdClient, LeaseKeepAliveStream, LeaseKeeper, WatchClient, + AuthClient, Client as EtcdClient, KvClient, LeaseClient, LeaseKeepAliveStream, LeaseKeeper, + LockClient, WatchClient, }; use utils::config::ClientTimeout; use uuid::Uuid; @@ -30,7 +31,6 @@ pub mod errors; pub mod kv_types; /// Xline client -#[allow(missing_debug_implementations)] // EtcdClient doesn't implement Debug pub struct Client { /// Name of the client name: String, @@ -42,6 +42,17 @@ pub struct Client { use_curp_client: bool, } +impl Debug for Client { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Client") + .field("name", &self.name) + .field("use_curp_client", &self.use_curp_client) + .field("curp_client", &self.curp_client) + .finish() + } +} + impl Client { /// New `Client` /// @@ -238,15 +249,33 @@ impl Client { Ok(response.into()) } + /// Gets a kv client. + #[inline] + pub fn kv_client(&self) -> KvClient { + self.etcd_client.kv_client() + } + /// Gets an auth client. #[inline] - pub fn auth_client(&mut self) -> AuthClient { + pub fn auth_client(&self) -> AuthClient { self.etcd_client.auth_client() } - /// Gets an watch client. + /// Gets a watch client. #[inline] - pub fn watch_client(&mut self) -> WatchClient { + pub fn watch_client(&self) -> WatchClient { self.etcd_client.watch_client() } + + /// Gets a lock client. + #[inline] + pub fn lock_client(&self) -> LockClient { + self.etcd_client.lock_client() + } + + /// Gets a lease client. + #[inline] + pub fn lease_client(&self) -> LeaseClient { + self.etcd_client.lease_client() + } } diff --git a/xline/src/rpc/mod.rs b/xline/src/rpc/mod.rs index fe87ff4fb..2f564a2d3 100644 --- a/xline/src/rpc/mod.rs +++ b/xline/src/rpc/mod.rs @@ -83,6 +83,7 @@ pub(crate) use self::{ lease_server::{Lease as LeaseTrait, LeaseServer}, request_op::Request, response_op::Response, + watch_client::WatchClient, watch_request::RequestUnion, watch_server::{Watch, WatchServer}, AuthDisableRequest, AuthDisableResponse, AuthEnableRequest, AuthEnableResponse, diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 5f8b2dc93..fdb60ced7 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -433,7 +433,7 @@ impl Kv for KvServer { ) -> Result, tonic::Status> { debug!("Receive TxnRequest {:?}", request); Self::check_txn_request(request.get_ref())?; - let is_fast_path = true; + let is_fast_path = false; // lock need revision of txn let (cmd_res, sync_res) = self.propose(request, is_fast_path).await?; let mut res = Self::parse_response_op(cmd_res.decode().into()); diff --git a/xline/src/server/lease_server.rs b/xline/src/server/lease_server.rs index 5a33c1fb7..e440134c2 100644 --- a/xline/src/server/lease_server.rs +++ b/xline/src/server/lease_server.rs @@ -30,7 +30,6 @@ const DEFAULT_LEASE_REQUEST_TIME: Duration = Duration::from_millis(500); /// Lease Server #[derive(Debug)] -#[allow(dead_code)] // Remove this after feature is completed pub(crate) struct LeaseServer { /// Lease storage storage: Arc, @@ -40,7 +39,7 @@ pub(crate) struct LeaseServer { client: Arc>, /// Server name name: String, - /// Current node is leader or not + /// State of current node state: Arc, /// Id generator id_gen: Arc, @@ -72,13 +71,14 @@ impl LeaseServer { async fn revoke_expired_leases_task(lease_server: Arc) { loop { // only leader will check expired lease - if lease_server.state.is_leader() { + if lease_server.is_leader() { for id in lease_server.storage.find_expired_leases() { let _handle = tokio::spawn({ let s = Arc::clone(&lease_server); + let token_option = lease_server.auth_storage.root_token(); async move { let mut request = tonic::Request::new(LeaseRevokeRequest { id }); - if let Ok(token) = s.auth_storage.root_token() { + if let Ok(token) = token_option { let _ignore = request.metadata_mut().insert( "token", token.parse().unwrap_or_else(|e| { diff --git a/xline/src/server/lock_server.rs b/xline/src/server/lock_server.rs index 640cf01bd..9a1f3e72d 100644 --- a/xline/src/server/lock_server.rs +++ b/xline/src/server/lock_server.rs @@ -1,23 +1,39 @@ -use std::sync::Arc; +#![allow(unused)] +use std::{ + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; +use clippy_utilities::{Cast, OverflowArithmetic}; use curp::{client::Client, cmd::ProposeId, error::ProposeError}; -use tokio::time::Duration; +use etcd_client::{EventType, WatchOptions}; +use parking_lot::Mutex; +use tokio::{sync::mpsc, time::Duration}; +use tokio_stream::wrappers::ReceiverStream; use tracing::debug; use uuid::Uuid; use super::{ + auth_server::get_token, command::{Command, CommandResponse, KeyRange, SyncResponse}, kv_server::KvServer, }; use crate::{ + client::errors::ClientError, rpc::{ - Compare, CompareResult, CompareTarget, DeleteRangeRequest, Lock, LockRequest, LockResponse, - PutRequest, Request, RequestOp, RequestWithToken, Response, TargetUnion, TxnRequest, - UnlockRequest, UnlockResponse, + Compare, CompareResult, CompareTarget, DeleteRangeRequest, DeleteRangeResponse, + LeaseGrantRequest, LeaseGrantResponse, Lock, LockRequest, LockResponse, PutRequest, + RangeRequest, RangeResponse, Request, RequestOp, RequestUnion, RequestWithToken, + RequestWrapper, Response, ResponseHeader, SortOrder, SortTarget, TargetUnion, TxnRequest, + TxnResponse, UnlockRequest, UnlockResponse, WatchClient, WatchCreateRequest, WatchRequest, }, + state::State, storage::KvStore, }; +/// Default session ttl +const DEFAULT_SESSION_TTL: i64 = 60; + /// Lock Server //#[derive(Debug)] #[allow(dead_code)] // Remove this after feature is completed @@ -26,54 +42,223 @@ pub(crate) struct LockServer { storage: Arc, /// Consensus client client: Arc>, + /// State of current node + state: Arc, /// Server name name: String, } impl LockServer { /// New `LockServer` - pub(crate) fn new(storage: Arc, client: Arc>, name: String) -> Self { + pub(crate) fn new( + storage: Arc, + client: Arc>, + state: Arc, + name: String, + ) -> Self { Self { storage, client, + state, name, } } - /// Propose request and get result - async fn propose( - &self, - propose_id: ProposeId, - request: Request, - ) -> Result<(CommandResponse, SyncResponse), ProposeError> { - let key_ranges = match request { - Request::RequestRange(_) | Request::RequestPut(_) => { - unreachable!("Propose RequestRange and RequestPut from LockServer is not allowed") + /// Generate propose id + fn generate_propose_id(&self) -> ProposeId { + ProposeId::new(format!("{}-{}", self.name, Uuid::new_v4())) + } + + /// Generate `Command` proposal from `Request` + fn command_from_request_wrapper(propose_id: ProposeId, wrapper: RequestWithToken) -> Command { + #[allow(clippy::wildcard_enum_match_arm)] + let keys = match wrapper.request { + RequestWrapper::DeleteRangeRequest(ref req) => { + vec![KeyRange::new(req.key.as_slice(), "")] } - Request::RequestDeleteRange(ref req) => vec![KeyRange { - start: req.key.clone(), - end: req.range_end.clone(), - }], - Request::RequestTxn(ref req) => req + RequestWrapper::RangeRequest(ref req) => { + vec![KeyRange::new(req.key.as_slice(), req.range_end.as_slice())] + } + RequestWrapper::TxnRequest(ref req) => req .compare .iter() - .map(|cmp| KeyRange { - start: cmp.key.clone(), - end: cmp.range_end.clone(), - }) + .map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice())) .collect(), + _ => vec![], }; - let request_op = RequestOp { - request: Some(request), + Command::new(keys, wrapper, propose_id) + } + + /// Propose request and get result with fast/slow path + async fn propose( + &self, + request: T, + token: Option, + use_fast_path: bool, + ) -> Result<(CommandResponse, Option), tonic::Status> + where + T: Into, + { + let wrapper = match token { + Some(token) => RequestWithToken::new_with_token(request.into(), token), + None => RequestWithToken::new(request.into()), }; - let wrapper = RequestWithToken::new(request_op.into()); - let cmd = Command::new(key_ranges, wrapper, propose_id); - self.client.propose_indexed(cmd.clone()).await + let propose_id = self.generate_propose_id(); + let cmd = Self::command_from_request_wrapper(propose_id, wrapper); + if use_fast_path { + let cmd_res = self.client.propose(cmd).await.map_err(|err| { + if let ProposeError::ExecutionError(e) = err { + tonic::Status::invalid_argument(e) + } else { + panic!("propose err {err:?}") + } + })?; + Ok((cmd_res, None)) + } else { + let (cmd_res, sync_res) = self.client.propose_indexed(cmd).await.map_err(|err| { + if let ProposeError::ExecutionError(e) = err { + tonic::Status::invalid_argument(e) + } else { + panic!("propose err {err:?}") + } + })?; + Ok((cmd_res, Some(sync_res))) + } } - /// Generate propose id - fn generate_propose_id(&self) -> ProposeId { - ProposeId::new(format!("{}-{}", self.name, Uuid::new_v4())) + /// Crate txn for try acquire lock + fn create_acquire_txn(prefix: &str, lease_id: i64) -> TxnRequest { + let key = format!("{}{:x}", prefix, lease_id); + #[allow(clippy::as_conversions)] // this cast is always safe + let cmp = Compare { + result: CompareResult::Equal as i32, + target: CompareTarget::Create as i32, + key: key.as_bytes().to_vec(), + range_end: vec![], + target_union: Some(TargetUnion::CreateRevision(0)), + }; + let put = RequestOp { + request: Some(Request::RequestPut(PutRequest { + key: key.as_bytes().to_vec(), + value: vec![], + lease: lease_id, + ..Default::default() + })), + }; + let get = RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: key.as_bytes().to_vec(), + ..Default::default() + })), + }; + let range_end = KeyRange::get_prefix(prefix.as_bytes()); + #[allow(clippy::as_conversions)] // this cast is always safe + let get_owner = RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: prefix.as_bytes().to_vec(), + range_end, + sort_order: SortOrder::Ascend as i32, + sort_target: SortTarget::Create as i32, + limit: 1, + ..Default::default() + })), + }; + TxnRequest { + compare: vec![cmp], + success: vec![put, get_owner.clone()], + failure: vec![get, get_owner], + } + } + + /// Wait until last key deleted + async fn wait_delete( + &self, + pfx: String, + my_rev: i64, + token: Option<&String>, + ) -> Result<(), tonic::Status> { + let rev = my_rev.overflow_sub(1); + let self_addr = self.state.self_address(); + let mut watch_client = WatchClient::connect(format!("http://{self_addr}")) + .await + .map_err(|e| tonic::Status::internal(format!("Connect error: {e}")))?; + loop { + let range_end = KeyRange::get_prefix(pfx.as_bytes()); + #[allow(clippy::as_conversions)] // this cast is always safe + let get_req = RangeRequest { + key: pfx.as_bytes().to_vec(), + range_end, + limit: 1, + sort_order: SortOrder::Descend as i32, + sort_target: SortTarget::Create as i32, + max_create_revision: rev, + ..Default::default() + }; + let (cmd_res, sync_res) = self.propose(get_req, token.cloned(), false).await?; + let response = Into::::into(cmd_res.decode()); + let last_key = match response.kvs.first() { + Some(kv) => kv.key.as_slice(), + None => return Ok(()), + }; + #[allow(clippy::unwrap_used)] // sync_res always has value when use slow path + let response_revision = sync_res.unwrap().revision(); + + let (request_sender, request_receiver) = mpsc::channel(100); + let request_stream = ReceiverStream::new(request_receiver); + request_sender + .send(WatchRequest { + request_union: Some(RequestUnion::CreateRequest(WatchCreateRequest { + key: last_key.to_vec(), + ..Default::default() + })), + }) + .await + .unwrap_or_else(|e| panic!("failed to send watch request: {}", e)); + + let mut response_stream = watch_client.watch(request_stream).await?.into_inner(); + while let Some(watch_res) = response_stream.message().await? { + #[allow(clippy::as_conversions)] // this cast is always safe + if watch_res + .events + .iter() + .any(|e| e.r#type == EventType::Delete as i32) + { + break; + } + } + } + } + + /// Delete key + async fn delete_key( + &self, + key: &[u8], + token: Option, + ) -> Result, tonic::Status> { + let keys = vec![KeyRange::new(key, "")]; + let del_req = DeleteRangeRequest { + key: key.into(), + ..Default::default() + }; + let (cmd_res, _) = self.propose(del_req, token, true).await?; + let res = Into::::into(cmd_res.decode()); + Ok(res.header) + } + + /// Lease grant + async fn lease_grant(&self, token: Option) -> Result { + let lease_id = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|e| panic!("SystemTime before UNIX EPOCH! {}", e)) + .as_secs() + .cast(); // TODO: generate lease unique id + let lease_grant_req = LeaseGrantRequest { + ttl: DEFAULT_SESSION_TTL, + id: lease_id, + }; + let (cmd_res, _) = self.propose(lease_grant_req, token, true).await?; + let res = Into::::into(cmd_res.decode()); + Ok(res.id) } } @@ -90,61 +275,72 @@ impl Lock for LockServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive LockRequest {:?}", request); - let lock_request = request.into_inner(); - let key = lock_request.name; - - #[allow(clippy::as_conversions)] // Converting Enum to i32 is safe. - let compare = Compare { - result: CompareResult::Equal as i32, - target: CompareTarget::Create as i32, - key: key.clone(), - range_end: vec![], - target_union: Some(TargetUnion::CreateRevision(0)), + let token = get_token(request.metadata()); + let lock_req = request.into_inner(); + let lease_id = if lock_req.lease == 0 { + self.lease_grant(token.clone()).await? + } else { + lock_req.lease }; - let success = RequestOp { - request: Some(Request::RequestPut(PutRequest { - key: key.clone(), - value: vec![0], - ..PutRequest::default() - })), - }; + let prefix = format!("{}/", String::from_utf8_lossy(&lock_req.name).into_owned()); + let key = format!("{}{:x}", prefix, lease_id); - let txn_request = TxnRequest { - compare: vec![compare], - success: vec![success], - failure: vec![], - }; - loop { - let result = self - .propose( - self.generate_propose_id(), - Request::RequestTxn(txn_request.clone()), - ) - .await; + let txn = Self::create_acquire_txn(&prefix, lease_id); + let (cmd_res, sync_res) = self.propose(txn, token.clone(), false).await?; + let mut txn_res = Into::::into(cmd_res.decode()); + #[allow(clippy::unwrap_used)] // sync_res always has value when use slow path + let my_rev = sync_res.unwrap().revision(); + let owner_res = txn_res + .responses + .swap_remove(1) + .response + .and_then(|r| { + if let Response::ResponseRange(res) = r { + Some(res) + } else { + None + } + }) + .unwrap_or_else(|| unreachable!("owner_resp should be a Get response")); + + let owner_key = owner_res.kvs; + let header = if owner_key + .get(0) + .map_or(false, |kv| kv.create_revision == my_rev) + { + owner_res.header + } else { + if let Err(e) = self.wait_delete(prefix, my_rev, token.as_ref()).await { + let _ignore = self.delete_key(key.as_bytes(), token).await; + return Err(e); + } + let range_req = RangeRequest { + key: key.as_bytes().to_vec(), + ..Default::default() + }; + let result = self.propose(range_req, token.clone(), true).await; match result { - Ok((res_op, sync_res)) => { - let mut res = KvServer::parse_response_op(res_op.decode().into()); - let revision = sync_res.revision(); - debug!("Get revision {:?} for LockRequest", revision); - KvServer::update_header_revision(&mut res, revision); - if let Response::ResponseTxn(response) = res { - if response.succeeded { - let resp = LockResponse { - header: response.header, - key: key.clone(), - }; - return Ok(tonic::Response::new(resp)); - } - } else { - panic!("Receive wrong response {:?} for LockRequest", res); + Ok(res) => { + let res = Into::::into(res.0.decode()); + if res.kvs.is_empty() { + return Err(tonic::Status::internal("session expired")); } + res.header + } + Err(e) => { + let _ignore = self.delete_key(key.as_bytes(), token).await; + return Err(e); } - Err(e) => panic!("Failed to receive response from KV storage, {e}"), } - tokio::time::sleep(Duration::from_millis(1)).await; - } + }; + let res = LockResponse { + header, + key: key.into_bytes(), + }; + Ok(tonic::Response::new(res)) } + /// Unlock takes a key returned by Lock and releases the hold on lock. The /// next Lock caller waiting for the lock will then be woken up and given /// ownership of the lock. @@ -153,35 +349,8 @@ impl Lock for LockServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive UnlockRequest {:?}", request); - let unlock_request = request.into_inner(); - let key = unlock_request.key; - - let delete_request = DeleteRangeRequest { - key: key.clone(), - range_end: vec![], - ..DeleteRangeRequest::default() - }; - let result = self - .propose( - self.generate_propose_id(), - Request::RequestDeleteRange(delete_request.clone()), - ) - .await; - match result { - Ok((res_op, sync_res)) => { - let mut res = KvServer::parse_response_op(res_op.decode().into()); - let revision = sync_res.revision(); - debug!("Get revision {:?} for UnlockRequest", revision); - KvServer::update_header_revision(&mut res, revision); - if let Response::ResponseDeleteRange(response) = res { - Ok(tonic::Response::new(UnlockResponse { - header: response.header, - })) - } else { - panic!("Receive wrong response {:?} for LockRequest", res); - } - } - Err(e) => panic!("Failed to receive response from KV storage, {e}"), - } + let token = get_token(request.metadata()); + let header = self.delete_key(&request.get_ref().key, token).await?; + Ok(tonic::Response::new(UnlockResponse { header })) } } diff --git a/xline/src/server/watch_server.rs b/xline/src/server/watch_server.rs index a2063d05e..9a7570999 100644 --- a/xline/src/server/watch_server.rs +++ b/xline/src/server/watch_server.rs @@ -148,16 +148,13 @@ where end: req.range_end, }; - let (watcher, events, revision) = self - .kv_watcher - .watch( - watch_id, - key_range, - req.start_revision, - req.filters, - self.event_tx.clone(), - ) - .await; + let (watcher, events, revision) = self.kv_watcher.watch( + watch_id, + key_range, + req.start_revision, + req.filters, + self.event_tx.clone(), + ); assert!( self.watcher_map.insert(watch_id, watcher).is_none(), "WatchId {} already exists in watcher_map", diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 4ab3c385b..bc4f33bce 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -99,7 +99,6 @@ impl XlineServer { Arc::clone(&header_gen), )); let client = Arc::new(Client::::new(all_members.clone(), client_timeout).await); - Self { state, kv_storage, @@ -229,6 +228,7 @@ impl XlineServer { LockServer::new( Arc::clone(&self.kv_storage), Arc::clone(&self.client), + Arc::clone(&self.state), self.id(), ), LeaseServer::new( diff --git a/xline/src/state.rs b/xline/src/state.rs index 36bb5ab7f..cd438e4a3 100644 --- a/xline/src/state.rs +++ b/xline/src/state.rs @@ -36,6 +36,16 @@ impl State { &self.id } + /// Get self address + pub(crate) fn self_address(&self) -> &str { + self.members.get(&self.id).unwrap_or_else(|| { + panic!( + "Self address not found, id: {}, members: {:?}", + self.id, self.members + ) + }) + } + /// Get leader address pub(crate) fn leader_address(&self) -> Option<&str> { self.leader_id @@ -92,3 +102,44 @@ impl State { .ok_or_else(|| tonic::Status::internal("Get leader address error")) } } + +#[cfg(test)] +mod test { + use std::{sync::Arc, time::Duration}; + + use super::*; + use tokio::time::timeout; + + #[tokio::test] + async fn test_state() -> Result<(), Box> { + let state = Arc::new(State::new( + "1".to_owned(), + None, + vec![ + ("1".to_owned(), "1".to_owned()), + ("2".to_owned(), "2".to_owned()), + ] + .into_iter() + .collect(), + )); + let handle = tokio::spawn({ + let state = Arc::clone(&state); + async move { + #[allow(clippy::unwrap_used)] + let leader = state.wait_leader().await.unwrap(); + assert_eq!(leader, "2"); + } + }); + assert!(!state.set_leader_id(Some("2".to_owned()))); + assert_eq!(state.id(), "1"); + assert_eq!(state.self_address(), "1"); + assert_eq!(state.leader_address(), Some("2")); + assert!(!state.is_leader()); + assert_eq!( + state.others(), + vec![("2".to_owned(), "2".to_owned())].into_iter().collect() + ); + timeout(Duration::from_secs(1), handle).await??; + Ok(()) + } +} diff --git a/xline/src/storage/auth_store/perms.rs b/xline/src/storage/auth_store/perms.rs index 5f7d59623..1f79b9a52 100644 --- a/xline/src/storage/auth_store/perms.rs +++ b/xline/src/storage/auth_store/perms.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + fmt::Debug, time::{SystemTime, UNIX_EPOCH}, }; @@ -47,6 +48,15 @@ pub(crate) struct JwtTokenManager { decoding_key: DecodingKey, } +impl Debug for JwtTokenManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JwtTokenManager") + .field("encoding_key", &"EncodingKey") + .field("decoding_key", &"DecodingKey") + .finish() + } +} + impl JwtTokenManager { /// New `JwtTokenManager` pub(crate) fn new(encoding_key: EncodingKey, decoding_key: DecodingKey) -> Self { diff --git a/xline/src/storage/auth_store/store.rs b/xline/src/storage/auth_store/store.rs index b755e86ec..259b7286b 100644 --- a/xline/src/storage/auth_store/store.rs +++ b/xline/src/storage/auth_store/store.rs @@ -67,6 +67,27 @@ impl AuthStore { self.inner.check_password(username, password) } + /// Check if the request need admin permission + fn need_admin_permission(wrapper: &RequestWithToken) -> bool { + matches!( + wrapper.request, + RequestWrapper::AuthEnableRequest(_) + | RequestWrapper::AuthDisableRequest(_) + | RequestWrapper::AuthStatusRequest(_) + | RequestWrapper::AuthUserAddRequest(_) + | RequestWrapper::AuthUserDeleteRequest(_) + | RequestWrapper::AuthUserChangePasswordRequest(_) + | RequestWrapper::AuthUserGrantRoleRequest(_) + | RequestWrapper::AuthUserRevokeRoleRequest(_) + | RequestWrapper::AuthRoleAddRequest(_) + | RequestWrapper::AuthRoleGrantPermissionRequest(_) + | RequestWrapper::AuthRoleRevokePermissionRequest(_) + | RequestWrapper::AuthRoleDeleteRequest(_) + | RequestWrapper::AuthUserListRequest(_) + | RequestWrapper::AuthRoleListRequest(_) + ) + } + /// check if the request is permitted pub(crate) async fn check_permission( &self, @@ -93,54 +114,55 @@ impl AuthStore { )); } let username = claims.username; - #[allow(clippy::wildcard_enum_match_arm)] - match wrapper.request { - RequestWrapper::RangeRequest(ref range_req) => { - self.check_range_permission(&username, range_req)?; - } - RequestWrapper::PutRequest(ref put_req) => { - self.check_put_permission(&username, put_req).await?; - } - RequestWrapper::DeleteRangeRequest(ref del_range_req) => { - self.check_delete_permission(&username, del_range_req)?; - } - RequestWrapper::TxnRequest(ref txn_req) => { - self.check_txn_permission(&username, txn_req).await?; - } - RequestWrapper::LeaseRevokeRequest(ref lease_revoke_req) => { - self.check_lease_revoke_permission(&username, lease_revoke_req) - .await?; - } - RequestWrapper::AuthUserGetRequest(ref user_get_req) => { - self.check_admin_permission(&username).map_or_else( - |e| { - if user_get_req.name == username { - Ok(()) - } else { - Err(e) - } - }, - |_| Ok(()), - )?; - } - RequestWrapper::AuthRoleGetRequest(ref role_get_req) => { - self.check_admin_permission(&username).map_or_else( - |e| { - let user = self.inner.get_user(&username)?; - if user.has_role(&role_get_req.role) { - Ok(()) - } else { - Err(e) - } - }, - |_| Ok(()), - )?; - } - _ => { - self.check_admin_permission(&username)?; + if Self::need_admin_permission(wrapper) { + self.check_admin_permission(&username)?; + } else { + #[allow(clippy::wildcard_enum_match_arm)] + match wrapper.request { + RequestWrapper::RangeRequest(ref range_req) => { + self.check_range_permission(&username, range_req)?; + } + RequestWrapper::PutRequest(ref put_req) => { + self.check_put_permission(&username, put_req).await?; + } + RequestWrapper::DeleteRangeRequest(ref del_range_req) => { + self.check_delete_permission(&username, del_range_req)?; + } + RequestWrapper::TxnRequest(ref txn_req) => { + self.check_txn_permission(&username, txn_req).await?; + } + RequestWrapper::LeaseRevokeRequest(ref lease_revoke_req) => { + self.check_lease_revoke_permission(&username, lease_revoke_req) + .await?; + } + RequestWrapper::AuthUserGetRequest(ref user_get_req) => { + self.check_admin_permission(&username).map_or_else( + |e| { + if user_get_req.name == username { + Ok(()) + } else { + Err(e) + } + }, + |_| Ok(()), + )?; + } + RequestWrapper::AuthRoleGetRequest(ref role_get_req) => { + self.check_admin_permission(&username).map_or_else( + |e| { + let user = self.inner.get_user(&username)?; + if user.has_role(&role_get_req.role) { + Ok(()) + } else { + Err(e) + } + }, + |_| Ok(()), + )?; + } + _ => {} } } - Ok(()) } diff --git a/xline/src/storage/kvwatcher.rs b/xline/src/storage/kvwatcher.rs index 5c92f7eda..5a0bc3f7b 100644 --- a/xline/src/storage/kvwatcher.rs +++ b/xline/src/storage/kvwatcher.rs @@ -187,10 +187,9 @@ impl KvWatcher { /// Operations of KV watcher #[allow(clippy::integer_arithmetic, clippy::indexing_slicing)] // Introduced by mockall::automock #[cfg_attr(test, mockall::automock)] -#[async_trait::async_trait] pub(crate) trait KvWatcherOps { /// Create a watch to KV store - async fn watch( + fn watch( &self, id: WatchId, key_range: KeyRange, @@ -203,10 +202,9 @@ pub(crate) trait KvWatcherOps { fn cancel(&self, watcher: &Watcher) -> i64; } -#[async_trait::async_trait] impl KvWatcherOps for KvWatcher { /// Create a watch to KV store - async fn watch( + fn watch( &self, id: WatchId, key_range: KeyRange, @@ -216,7 +214,6 @@ impl KvWatcherOps for KvWatcher { ) -> (Watcher, Vec, i64) { self.inner .watch(id, key_range, start_rev, filters, event_tx) - .await } /// Cancel a watch from KV store @@ -235,7 +232,7 @@ impl KvWatcherInner { } /// Create a watch to KV store - async fn watch( + fn watch( &self, id: WatchId, key_range: KeyRange, diff --git a/xline/src/storage/lease_store/mod.rs b/xline/src/storage/lease_store/mod.rs index 69da9a1d3..46121ef9c 100644 --- a/xline/src/storage/lease_store/mod.rs +++ b/xline/src/storage/lease_store/mod.rs @@ -75,15 +75,14 @@ impl LeaseCollection { if *expiry <= Instant::now() { #[allow(clippy::unwrap_used)] // queue.peek() returns Some let id = self.expired_queue.pop().unwrap(); - expired_leases.push(id); + if self.lease_map.contains_key(&id) { + expired_leases.push(id); + } } else { break; } } expired_leases - .into_iter() - .filter(|id| self.lease_map.contains_key(id)) - .collect::>() } /// Renew lease diff --git a/xline/tests/lock_test.rs b/xline/tests/lock_test.rs new file mode 100644 index 000000000..88901a244 --- /dev/null +++ b/xline/tests/lock_test.rs @@ -0,0 +1,52 @@ +mod common; + +use std::{error::Error, time::Duration}; + +use common::Cluster; +use etcd_client::LockOptions; +use tokio::time::{self, timeout}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn test_lock() -> Result<(), Box> { + let mut cluster = Cluster::new(3).await; + cluster.start().await; + let client = cluster.client().await; + let mut lock_client = client.lock_client(); + + let lock_handle = tokio::spawn({ + let mut c = lock_client.clone(); + async move { + let res = c.lock("test", None).await.unwrap(); + time::sleep(Duration::from_secs(3)).await; + let _res = c.unlock(res.key()).await.unwrap(); + } + }); + + time::sleep(Duration::from_secs(1)).await; + let now = time::Instant::now(); + let res = lock_client.lock("test", None).await?; + let elapsed = now.elapsed(); + assert!(res.key().starts_with(b"test")); + assert!(elapsed >= Duration::from_secs(1)); + let _ignore = lock_handle.await; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn test_lock_timeout() -> Result<(), Box> { + let mut cluster = Cluster::new(3).await; + cluster.start().await; + let client = cluster.client().await; + let mut lock_client = client.lock_client(); + + let lease_id = client.lease_client().grant(1, None).await?.id(); + let _res = lock_client + .lock("test", Some(LockOptions::new().with_lease(lease_id))) + .await?; + + let res = timeout(Duration::from_secs(3), lock_client.lock("test", None)).await??; + assert!(res.key().starts_with(b"test")); + + Ok(()) +}