Skip to content

Commit

Permalink
fix: tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
conblem committed Oct 26, 2020
1 parent b171b0f commit f2375d1
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 42 deletions.
83 changes: 83 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ description = "Acme DNS implementation written in Rust"

[dependencies]
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-futures = { version = "0.2.4", features = ["futures-03"]}
sqlx = { version = "0.4.0-beta.1", default-features = false, features = [ "runtime-tokio", "macros", "migrate"] }
tokio = { version = "0.2", features = ["dns", "tcp", "udp"] }
uuid = { version = "0.8", features = ["v4"] }
Expand Down
26 changes: 22 additions & 4 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio_rustls::TlsAcceptor;
use tracing::error;
use tracing::{error, info};
use tracing_futures::Instrument;
use warp::{http::Response, reply, serve, Filter, Rejection, Reply};

use crate::cert::{Cert, CertFacade};
Expand Down Expand Up @@ -102,18 +103,24 @@ pub struct Api {
pool: PgPool,
}

#[tracing::instrument(skip(pool))]
async fn register(pool: PgPool, domain: Domain) -> Result<reply::Response, Rejection> {
let _domain = match DomainFacade::create_domain(&pool, &domain).await {
let _domain = match DomainFacade::create_domain(&pool, &domain)
.in_current_span()
.await
{
Err(e) => {
error!("{}", e);
return Ok(Response::builder()
.status(500)
.body(e.to_string())
.unwrap()
.into_response())
.into_response());
}
Ok(domain) => domain,
};

info!("Success for call");
Ok(Response::new("no error").into_response())
}

Expand All @@ -131,7 +138,10 @@ impl Api {
Ok(Api { http, https, pool })
}

#[tracing::instrument(skip(self))]
pub async fn spawn(self) -> Result<()> {
info!("Starting API spawn");

let pool = self.pool.clone();
let routes = warp::path("register")
.and(warp::post())
Expand All @@ -141,13 +151,21 @@ impl Api {

let http = self
.http
.map(|http| {
info!(?http, "Starting http");
http.in_current_span()
})
.map(|http| serve(routes.clone()).serve_incoming(http))
.map(tokio::spawn);

let pool = self.pool.clone();
let https = self
.https
.map(|https| serve(routes).serve_incoming(stream(https, pool)))
.map(|https| {
info!(?https, "Starting https");
stream(https, pool).into_stream().in_current_span()
})
.map(|https| serve(routes).serve_incoming(https))
.map(tokio::spawn);

match (https, http) {
Expand Down
14 changes: 10 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde::Deserialize;
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use tracing::info;
use tracing::{debug, info, info_span};

#[derive(Deserialize, Debug)]
pub struct Api {
Expand Down Expand Up @@ -35,17 +35,23 @@ pub struct Config {
const DEFAULT_CONFIG_PATH: &str = "config.toml";

// is not async so we can use it to load settings for tokio runtime
pub fn config(config_path: Option<String>) -> Result<Config> {
pub fn load_config(config_path: Option<String>) -> Result<Config> {
let config_path = config_path.as_deref().unwrap_or(DEFAULT_CONFIG_PATH);

let span = info_span!("load_config", config_path);
let _enter = span.enter();

let mut file = File::open(config_path)?;
debug!(?file, "Opened file");

let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
debug!(file_length = bytes.len(), "Read file");

let config = toml::de::from_slice::<Config>(&bytes)?;

// redact db information
let config_str = format!("{:?}", config).replace(&config.general.db, "******");
info!("Loaded {}", config_str);
info!(config = %config_str, "Deserialized config");

Ok(config)
}
102 changes: 68 additions & 34 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use anyhow::Result;
use futures_util::TryFutureExt;
use simplelog::{Config, LevelFilter, SimpleLogger};
use sqlx::migrate::Migrator;
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use sqlx::PgPool;
use std::env;
use std::str::FromStr;
use tokio::runtime::Runtime;
use tracing::error;
use tracing::{debug, error, info};

use crate::acme::DatabasePersist;
use crate::api::Api;
use crate::cert::CertManager;
use crate::dns::{DatabaseAuthority, DNS};
use tracing_futures::Instrument;

mod acme;
mod api;
Expand All @@ -25,51 +25,85 @@ mod util;
static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgres");

fn main() {
SimpleLogger::init(LevelFilter::Debug, Config::default()).unwrap();
tracing_subscriber::fmt::init();

if let Err(e) = run() {
error!("{:?}", e);
if run().is_err() {
std::process::exit(1);
}
}

#[tracing::instrument]
fn run() -> Result<()> {
let config_path = env::args().nth(1);
let config = config::config(config_path)?;
let config = config::load_config(config_path)?;

let runtime = match Runtime::new() {
Ok(runtime) => runtime,
Err(e) => {
error!("{}", e);
return Err(e.into());
}
};
debug!("Created runtime");

let runtime = Runtime::new()?;
// Async closure cannot be move, if runtime gets moved into it
// it gets dropped inside an async call
runtime.handle().block_on(async {
let pool = setup_database(&config.general.db).await?;
let authority = DatabaseAuthority::new(pool.clone(), &config.general.name, config.records);
let dns = DNS::new(&config.general.dns, &runtime, authority);

let api = Api::new(
config.api.http.as_deref(),
config.api.https.as_deref(),
pool.clone(),
)
.and_then(Api::spawn);

let persist = DatabasePersist::new(pool.clone(), runtime.handle());
let cert_manager =
CertManager::new(pool, persist, config.general.acme).and_then(CertManager::spawn);

tokio::try_join!(api, cert_manager, dns.spawn())?;

Ok(())
})
let res: Result<()> = runtime.handle().block_on(
async {
let pool = setup_database(&config.general.db).in_current_span().await?;
let authority =
DatabaseAuthority::new(pool.clone(), &config.general.name, config.records);
let dns = DNS::new(&config.general.dns, &runtime, authority);

let api = Api::new(
config.api.http.as_deref(),
config.api.https.as_deref(),
pool.clone(),
)
.and_then(Api::spawn);

let persist = DatabasePersist::new(pool.clone(), runtime.handle());
let cert_manager =
CertManager::new(pool, persist, config.general.acme).and_then(CertManager::spawn);

info!("Starting API Cert Manager and DNS");
tokio::try_join!(api, cert_manager, dns.spawn())?;

Ok(())
}
.in_current_span(),
);

if let Err(e) = res {
error!("{}", e);
return Err(e.into());
};

Ok(())
}

#[tracing::instrument(skip(db))]
async fn setup_database(db: &str) -> Result<PgPool, sqlx::Error> {
let options = PgConnectOptions::from_str(db)?;
let pool = PgPoolOptions::new()
.max_connections(5)
.connect_with(options)
.await?;
let pool = async {
let options = PgConnectOptions::from_str(db)?;
let pool = PgPoolOptions::new()
.max_connections(5)
.connect_with(options)
.await?;
debug!("Created DB pool");

MIGRATOR.run(&pool).await?;
MIGRATOR.run(&pool).await?;
info!("Ran migration");
Ok(pool)
}
.in_current_span()
.await;

Ok(pool)
match pool {
Ok(pool) => Ok(pool),
Err(e) => {
error!("{}", e);
Err(e)
}
}
}

0 comments on commit f2375d1

Please sign in to comment.