Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features/lock #153

Merged
merged 1 commit into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions xline/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt::Debug};

use curp::{client::Client as CurpClient, cmd::ProposeId};
use etcd_client::{
AuthClient, Client as EtcdClient, LeaseKeepAliveStream, LeaseKeeper, WatchClient,
AuthClient, Client as EtcdClient, KvClient, LeaseClient, LeaseKeepAliveStream, LeaseKeeper,
LockClient, WatchClient,
};
use utils::config::ClientTimeout;
use uuid::Uuid;
Expand Down Expand Up @@ -30,7 +31,6 @@ pub mod errors;
pub mod kv_types;

/// Xline client
#[allow(missing_debug_implementations)] // EtcdClient doesn't implement Debug
pub struct Client {
/// Name of the client
name: String,
Expand All @@ -42,6 +42,17 @@ pub struct Client {
use_curp_client: bool,
}

impl Debug for Client {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("name", &self.name)
.field("use_curp_client", &self.use_curp_client)
.field("curp_client", &self.curp_client)
.finish()
}
}

impl Client {
/// New `Client`
///
Expand Down Expand Up @@ -238,15 +249,33 @@ impl Client {
Ok(response.into())
}

/// Gets a kv client.
#[inline]
pub fn kv_client(&self) -> KvClient {
themanforfree marked this conversation as resolved.
Show resolved Hide resolved
self.etcd_client.kv_client()
}

/// Gets an auth client.
#[inline]
pub fn auth_client(&mut self) -> AuthClient {
pub fn auth_client(&self) -> AuthClient {
self.etcd_client.auth_client()
}

/// Gets an watch client.
/// Gets a watch client.
#[inline]
pub fn watch_client(&mut self) -> WatchClient {
pub fn watch_client(&self) -> WatchClient {
self.etcd_client.watch_client()
}

/// Gets a lock client.
#[inline]
pub fn lock_client(&self) -> LockClient {
self.etcd_client.lock_client()
}

/// Gets a lease client.
#[inline]
pub fn lease_client(&self) -> LeaseClient {
self.etcd_client.lease_client()
}
}
1 change: 1 addition & 0 deletions xline/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub(crate) use self::{
lease_server::{Lease as LeaseTrait, LeaseServer},
request_op::Request,
response_op::Response,
watch_client::WatchClient,
watch_request::RequestUnion,
watch_server::{Watch, WatchServer},
AuthDisableRequest, AuthDisableResponse, AuthEnableRequest, AuthEnableResponse,
Expand Down
2 changes: 1 addition & 1 deletion xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl Kv for KvServer {
) -> Result<tonic::Response<TxnResponse>, tonic::Status> {
debug!("Receive TxnRequest {:?}", request);
Self::check_txn_request(request.get_ref())?;
let is_fast_path = true;
let is_fast_path = false; // lock need revision of txn
let (cmd_res, sync_res) = self.propose(request, is_fast_path).await?;

themanforfree marked this conversation as resolved.
Show resolved Hide resolved
let mut res = Self::parse_response_op(cmd_res.decode().into());
Expand Down
8 changes: 4 additions & 4 deletions xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const DEFAULT_LEASE_REQUEST_TIME: Duration = Duration::from_millis(500);

/// Lease Server
#[derive(Debug)]
#[allow(dead_code)] // Remove this after feature is completed
pub(crate) struct LeaseServer {
/// Lease storage
storage: Arc<LeaseStore>,
Expand All @@ -40,7 +39,7 @@ pub(crate) struct LeaseServer {
client: Arc<Client<Command>>,
/// Server name
name: String,
/// Current node is leader or not
/// State of current node
state: Arc<State>,
/// Id generator
id_gen: Arc<IdGenerator>,
Expand Down Expand Up @@ -72,13 +71,14 @@ impl LeaseServer {
async fn revoke_expired_leases_task(lease_server: Arc<LeaseServer>) {
loop {
// only leader will check expired lease
if lease_server.state.is_leader() {
if lease_server.is_leader() {
for id in lease_server.storage.find_expired_leases() {
let _handle = tokio::spawn({
let s = Arc::clone(&lease_server);
let token_option = lease_server.auth_storage.root_token();
async move {
let mut request = tonic::Request::new(LeaseRevokeRequest { id });
if let Ok(token) = s.auth_storage.root_token() {
if let Ok(token) = token_option {
let _ignore = request.metadata_mut().insert(
"token",
token.parse().unwrap_or_else(|e| {
Expand Down
Loading