Skip to content

Commit

Permalink
use openssl and tokio-openssl crate
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Mar 17, 2020
1 parent 9637262 commit 34a2193
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 19 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ futures-util = { version = "0.3.1", default-features = false, features = ["io",
grpcio = { version = "0.5", default-features = false, features = ["openssl-vendored"] }
hex = "0.3"
itertools = "0.8"
openssl = "0.10"
tokio-openssl = "0.2"
hyper = { version = "0.12", default-features = false, features = ["runtime"] }
keys = { path = "components/keys" }
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false }
Expand Down Expand Up @@ -120,6 +122,7 @@ tikv_util = { path = "components/tikv_util" }
time = "0.1"
tipb = { git = "https://github.com/pingcap/tipb.git", default-features = false }
tokio = { version = "0.2", features = ["sync"] }
tokio-tcp = "0.1"
tokio-core = "0.1"
tokio-fs = "0.1.6"
tokio-io = "0.1.12"
Expand Down
5 changes: 4 additions & 1 deletion cmd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,10 @@ impl TiKVServer {
server.pd_sender.clone(),
));
// Start the status server.
if let Err(e) = status_server.start(self.config.server.status_addr.clone()) {
if let Err(e) = status_server.start(
self.config.server.status_addr.clone(),
&self.config.security,
) {
error!(
"failed to bind addr for status service";
"err" => %e
Expand Down
7 changes: 7 additions & 0 deletions src/server/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::result;

use grpcio::Error as GrpcError;
use hyper::Error as HttpError;
use openssl::error::ErrorStack as OpenSSLError;
use protobuf::ProtobufError;
use tokio_sync::oneshot::error::RecvError;

Expand Down Expand Up @@ -106,6 +107,12 @@ quick_error! {
display("{:?}", err)
description(err.description())
}
OpenSSL(err: OpenSSLError) {
from()
cause(err)
display("{:?}", err)
description(err.description())
}
}
}

Expand Down
81 changes: 63 additions & 18 deletions src/server/status_server.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use futures::future::{err, ok};
use futures::stream::Stream;
use futures::sync::oneshot;
#[cfg(feature = "failpoints")]
use futures::Stream;
use futures::{self, Future};
use hyper::server::Builder as HyperBuilder;
use hyper::service::service_fn;
use hyper::{self, header, Body, Method, Request, Response, Server, StatusCode};
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
use pprof;
use pprof::protos::Message;
use regex::Regex;
use std::sync::Arc;
use tempfile::TempDir;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::SslAcceptorExt;
use tokio_sync::oneshot::{Receiver, Sender};
use tokio_tcp::TcpListener;
use tokio_threadpool::{Builder, ThreadPool};

use std::error::Error as StdError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;

use super::Result;
use crate::config::TiKvConfig;
use raftstore::store::PdTask;
use tikv_alloc::error::ProfError;
use tikv_util::collections::HashMap;
use tikv_util::metrics::dump;
use tikv_util::security::SecurityConfig;
use tikv_util::timer::GLOBAL_TIMER_HANDLE;
use tikv_util::worker::FutureScheduler;

Expand Down Expand Up @@ -385,13 +391,13 @@ impl StatusServer {
)
}

pub fn start(&mut self, status_addr: String) -> Result<()> {
let addr = SocketAddr::from_str(&status_addr)?;

// TODO: support TLS for the status server.
let builder = Server::try_bind(&addr)?;
fn start_serve<I>(&mut self, builder: HyperBuilder<I>)
where
I: Stream + Send + 'static,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
{
let pd_sender = self.pd_sender.clone();

// Start to serve.
let server = builder.serve(move || {
let pd_sender = pd_sender.clone();
Expand Down Expand Up @@ -424,11 +430,48 @@ impl StatusServer {
},
)
});
self.addr = Some(server.local_addr());

let graceful = server
.with_graceful_shutdown(self.rx.take().unwrap())
.map_err(|e| error!("Status server error: {:?}", e));
self.thread_pool.spawn(graceful);
}

pub fn start(&mut self, status_addr: String, security_config: &SecurityConfig) -> Result<()> {
let addr = SocketAddr::from_str(&status_addr)?;

let tcp_listener = TcpListener::bind(&addr)?;
self.addr = Some(tcp_listener.local_addr()?);

if !security_config.cert_path.is_empty()
&& !security_config.key_path.is_empty()
&& !security_config.ca_path.is_empty()
{
let mut acceptor = SslAcceptor::mozilla_modern(SslMethod::tls())?;
acceptor.set_ca_file(&security_config.ca_path)?;
acceptor.set_certificate_chain_file(&security_config.cert_path)?;
acceptor.set_private_key_file(&security_config.key_path, SslFiletype::PEM)?;
let acceptor = acceptor.build();

let tls_stream = tcp_listener
.incoming()
.and_then(move |stream| {
acceptor.accept_async(stream).then(|r| match r {
Ok(stream) => Ok(Some(stream)),
Err(e) => {
error!("failed to accept TLS connection"; "err" => ?e);
Ok(None)
}
})
})
.filter_map(|x| x);
let server = Server::builder(tls_stream);
self.start_serve(server);
} else {
let tcp_stream = tcp_listener.incoming();
let server = Server::builder(tcp_stream);
self.start_serve(server);
}
Ok(())
}

Expand Down Expand Up @@ -526,19 +569,21 @@ fn handle_fail_points_request(

#[cfg(test)]
mod tests {
use crate::config::TiKvConfig;
use crate::server::status_server::StatusServer;
use futures::future::{lazy, Future};
use futures::Stream;
use hyper::{Body, Client, Method, Request, StatusCode, Uri};
use tokio_core::reactor::Handle;

use crate::config::TiKvConfig;
use crate::server::status_server::StatusServer;
use raftstore::store::PdTask;
use tikv_util::security::SecurityConfig;
use tikv_util::worker::{dummy_future_scheduler, FutureRunnable, FutureWorker};
use tokio_core::reactor::Handle;

#[test]
fn test_status_service() {
let mut status_server = StatusServer::new(1, dummy_future_scheduler());
let _ = status_server.start("127.0.0.1:0".to_string());
let _ = status_server.start("127.0.0.1:0".to_string(), &SecurityConfig::default());
let client = Client::new();
let uri = Uri::builder()
.scheme("http")
Expand Down Expand Up @@ -576,7 +621,7 @@ mod tests {
worker.start(Runner).unwrap();

let mut status_server = StatusServer::new(1, worker.scheduler());
let _ = status_server.start("127.0.0.1:0".to_string());
let _ = status_server.start("127.0.0.1:0".to_string(), &SecurityConfig::default());
let client = Client::new();
let uri = Uri::builder()
.scheme("http")
Expand Down Expand Up @@ -612,7 +657,7 @@ mod tests {
fn test_status_service_fail_endpoints() {
let _guard = fail::FailScenario::setup();
let mut status_server = StatusServer::new(1, dummy_future_scheduler());
let _ = status_server.start("127.0.0.1:0".to_string());
let _ = status_server.start("127.0.0.1:0".to_string(), &SecurityConfig::default());
let client = Client::new();
let addr = status_server.listening_addr().to_string();

Expand Down Expand Up @@ -744,7 +789,7 @@ mod tests {
fn test_status_service_fail_endpoints_can_trigger_fails() {
let _guard = fail::FailScenario::setup();
let mut status_server = StatusServer::new(1, dummy_future_scheduler());
let _ = status_server.start("127.0.0.1:0".to_string());
let _ = status_server.start("127.0.0.1:0".to_string(), &SecurityConfig::default());
let client = Client::new();
let addr = status_server.listening_addr().to_string();

Expand Down Expand Up @@ -785,7 +830,7 @@ mod tests {
fn test_status_service_fail_endpoints_should_give_404_when_failpoints_are_disable() {
let _guard = fail::FailScenario::setup();
let mut status_server = StatusServer::new(1, dummy_future_scheduler());
let _ = status_server.start("127.0.0.1:0".to_string());
let _ = status_server.start("127.0.0.1:0".to_string(), &SecurityConfig::default());
let client = Client::new();
let addr = status_server.listening_addr().to_string();

Expand Down

0 comments on commit 34a2193

Please sign in to comment.