Skip to content

Commit

Permalink
Support TLS for etcd connections
Browse files Browse the repository at this point in the history
Closes #16
Closes #8

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
  • Loading branch information
LeeSmet committed Mar 13, 2021
1 parent 77a2fdb commit cd6349b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ simple_logger = "1.11"
pretty_env_logger = "0.4"
structopt = "0.3"
tokio = { version = "1", features = ["rt", "macros"] }
etcd-rs = "0.5"
futures = "0.3"
blake2 = "0.9"
gray-codes = "0.1"
etcd-client = { version = "0.6", features = ["tls", "tls-roots"] }

[dev-dependencies]
rand = "0.7"
Expand Down
62 changes: 28 additions & 34 deletions src/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use blake2::{
digest::{Update, VariableOutput},
VarBlake2b,
};
use etcd_rs::{Client, ClientConfig, KeyRange, PutRequest, RangeRequest};
use etcd_client::{Client, ConnectOptions};
use log::{debug, trace};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
Expand Down Expand Up @@ -33,23 +33,22 @@ pub struct EtcdConfig {
impl Etcd {
/// Create a new client connecting to the cluster with the given endpoints
pub async fn new(cfg: &EtcdConfig, virtual_root: Option<PathBuf>) -> EtcdResult<Self> {
let client = Client::connect(ClientConfig {
endpoints: cfg.endpoints.clone(),
auth: match cfg {
EtcdConfig {
username: Some(username),
password: Some(password),
..
} => Some((username.clone(), password.clone())),
_ => None,
},
tls: None,
})
.await
.map_err(|e| EtcdError {
kind: EtcdErrorKind::Connect,
internal: InternalError::Etcd(e),
})?;
// Don't set TLS client options, etcd client lib can figure this out for us.
let mut co = ConnectOptions::new();
match cfg {
EtcdConfig {
username: Some(username),
password: Some(password),
..
} => co = co.with_user(username, password),
_ => {}
}
let client = Client::connect(&cfg.endpoints, Some(co))
.await
.map_err(|e| EtcdError {
kind: EtcdErrorKind::Connect,
internal: InternalError::Etcd(e),
})?;
Ok(Etcd {
client,
prefix: cfg.prefix.clone(),
Expand All @@ -58,7 +57,7 @@ impl Etcd {
}

/// Save the metadata for the file identified by `path` with a given prefix
pub async fn save_meta(&self, path: &PathBuf, meta: &MetaData) -> EtcdResult<()> {
pub async fn save_meta(&mut self, path: &PathBuf, meta: &MetaData) -> EtcdResult<()> {
// for now save metadata human readable
trace!("encoding metadata");
let enc_meta = toml::to_vec(meta).map_err(|e| EtcdError {
Expand All @@ -71,7 +70,7 @@ impl Etcd {
}

/// loads the metadata for a given path and prefix
pub async fn load_meta(&self, path: &PathBuf) -> EtcdResult<Option<MetaData>> {
pub async fn load_meta(&mut self, path: &PathBuf) -> EtcdResult<Option<MetaData>> {
let key = self.build_key(path)?;
Ok(if let Some(value) = self.read_value(&key).await? {
Some(toml::from_slice(&value).map_err(|e| EtcdError {
Expand All @@ -84,10 +83,9 @@ impl Etcd {
}

// helper functions to read and write a value
async fn write_value(&self, key: &str, value: &[u8]) -> EtcdResult<()> {
async fn write_value(&mut self, key: &str, value: &[u8]) -> EtcdResult<()> {
self.client
.kv()
.put(PutRequest::new(key, value))
.put(key, value, None)
.await
.map(|_| ()) // ignore result
.map_err(|e| EtcdError {
Expand All @@ -96,20 +94,19 @@ impl Etcd {
})
}

async fn read_value(&self, key: &str) -> EtcdResult<Option<Vec<u8>>> {
async fn read_value(&mut self, key: &str) -> EtcdResult<Option<Vec<u8>>> {
self.client
.kv()
.range(RangeRequest::new(KeyRange::key(key)))
.get(key, None)
.await
.map_err(|e| EtcdError {
kind: EtcdErrorKind::Read,
internal: InternalError::Etcd(e),
})
.and_then(|mut resp| {
let mut kvs = resp.take_kvs();
.and_then(|resp| {
let kvs = resp.kvs();
match kvs.len() {
0 => Ok(None),
1 => Ok(Some(kvs[0].take_value())),
1 => Ok(Some(kvs[0].value().to_vec())),
keys => Err(EtcdError {
kind: EtcdErrorKind::Meta,
internal: InternalError::Other(format!(
Expand Down Expand Up @@ -211,9 +208,6 @@ pub struct EtcdError {
internal: InternalError,
}

// TODO
unsafe impl Send for EtcdError {}

impl fmt::Display for EtcdError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "EtcdError: {}: {}", self.kind, self.internal)
Expand All @@ -233,8 +227,8 @@ impl std::error::Error for EtcdError {

#[derive(Debug)]
enum InternalError {
Etcd(etcd_rs::Error),
Meta(Box<dyn std::error::Error>),
Etcd(etcd_client::Error),
Meta(Box<dyn std::error::Error + Send>),
IO(io::Error),
Other(String),
}
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn main() -> ZstorResult<()> {
let cfg = read_cfg(&opts.config)?;

// Get from config if not present
let cluster = match cfg.meta() {
let mut cluster = match cfg.meta() {
Meta::ETCD(etcdconf) => Etcd::new(etcdconf, cfg.virtual_root().clone()).await?,
};

Expand Down
2 changes: 1 addition & 1 deletion src/test_etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
"http://127.0.0.1:32379".to_owned(),
];

let cluster = etcd::Etcd::new(
let mut cluster = etcd::Etcd::new(
&etcd::EtcdConfig::new(nodes, "prefix".to_string(), None, None),
None,
)
Expand Down

0 comments on commit cd6349b

Please sign in to comment.