Skip to content

Commit

Permalink
[CLI] Add support for running postgres in a container to the local te…
Browse files Browse the repository at this point in the history
…stnet
  • Loading branch information
banool committed Sep 27, 2023
1 parent f59af3e commit fcb0465
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/aptos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ clap = { workspace = true, features = ["env", "unstable-styles"] }
clap_complete = { workspace = true }
codespan-reporting = { workspace = true }
dashmap = { workspace = true }
diesel = { workspace = true }
dirs = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
Expand Down
34 changes: 18 additions & 16 deletions crates/aptos/src/node/local_testnet/health_checker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::types::{CliError, CliTypedResult};
use anyhow::Context;
use anyhow::{anyhow, Context, Result};
use aptos_protos::indexer::v1::GetTransactionsRequest;
use diesel::{pg::PgConnection, Connection};
use futures::StreamExt;
use reqwest::Url;
use serde::Serialize;
Expand All @@ -23,10 +23,12 @@ pub enum HealthChecker {
NodeApi(Url),
/// Check that a data service GRPC stream is up.
DataServiceGrpc(Url),
/// Check that a postgres instance is up.
Postgres(String),
}

impl HealthChecker {
pub async fn check(&self) -> CliTypedResult<()> {
pub async fn check(&self) -> Result<()> {
match self {
HealthChecker::Http(url, _) => {
reqwest::get(Url::clone(url))
Expand Down Expand Up @@ -54,19 +56,17 @@ impl HealthChecker {
client
.get_transactions(request)
.await
.map_err(|err| {
CliError::UnexpectedError(format!("GRPC connection error: {:#}", err))
})?
.context("GRPC connection error")?
.into_inner()
.next()
.await
.context("Did not receive init signal from data service GRPC stream")?
.map_err(|err| {
CliError::UnexpectedError(format!(
"Error processing first message from GRPC stream: {:#}",
err
))
})?;
.context("Error processing first message from GRPC stream")?;
Ok(())
},
HealthChecker::Postgres(connection_string) => {
PgConnection::establish(connection_string)
.context("Failed to connect to postgres")?;
Ok(())
},
}
Expand All @@ -77,7 +77,7 @@ impl HealthChecker {
&self,
// The service, if any, waiting for this service to start up.
waiting_service: Option<&str>,
) -> CliTypedResult<()> {
) -> Result<()> {
let prefix = self.to_string();
wait_for_startup(|| self.check(), match waiting_service {
Some(waiting_service) => {
Expand All @@ -98,6 +98,7 @@ impl HealthChecker {
HealthChecker::Http(url, _) => url.as_str(),
HealthChecker::NodeApi(url) => url.as_str(),
HealthChecker::DataServiceGrpc(url) => url.as_str(),
HealthChecker::Postgres(url) => url.as_str(),
}
}

Expand All @@ -116,14 +117,15 @@ impl std::fmt::Display for HealthChecker {
HealthChecker::Http(_, name) => write!(f, "{}", name),
HealthChecker::NodeApi(_) => write!(f, "Node API"),
HealthChecker::DataServiceGrpc(_) => write!(f, "Transaction stream"),
HealthChecker::Postgres(_) => write!(f, "Postgres"),
}
}
}

async fn wait_for_startup<F, Fut>(check_fn: F, error_message: String) -> CliTypedResult<()>
async fn wait_for_startup<F, Fut>(check_fn: F, error_message: String) -> Result<()>
where
F: Fn() -> Fut,
Fut: futures::Future<Output = CliTypedResult<()>>,
Fut: futures::Future<Output = Result<()>>,
{
let max_wait = Duration::from_secs(MAX_WAIT_S);
let wait_interval = Duration::from_millis(WAIT_INTERVAL_MS);
Expand All @@ -150,7 +152,7 @@ where
Some(last_error_message) => format!("{}: {}", error_message, last_error_message),
None => error_message,
};
return Err(CliError::UnexpectedError(error_message));
return Err(anyhow!(error_message));
}

Ok(())
Expand Down
16 changes: 16 additions & 0 deletions crates/aptos/src/node/local_testnet/indexer_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use clap::Parser;

/// Args related to running an indexer API for the local testnet.
#[derive(Debug, Parser)]
pub struct IndexerApiArgs {
/// If set, we will run a postgres DB using Docker (unless
/// --use-host-postgres is set), run the standard set of indexer processors (see
/// --processors) and configure them to write to this DB, and run an API that lets
/// you access the data they write to storage. This is opt in because it requires
/// Docker to be installed in the host system.
#[clap(long, conflicts_with = "no_txn_stream")]
pub with_indexer_api: bool,
}
22 changes: 19 additions & 3 deletions crates/aptos/src/node/local_testnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@

mod faucet;
mod health_checker;
mod indexer_api;
mod logging;
mod node;
mod postgres;
mod ready_server;
mod traits;
mod utils;

use self::{
faucet::FaucetArgs,
health_checker::HealthChecker,
indexer_api::IndexerApiArgs,
logging::ThreadNameMakeWriter,
node::NodeArgs,
postgres::PostgresArgs,
ready_server::ReadyServerArgs,
traits::{PostHealthyStep, ServiceManager},
};
Expand Down Expand Up @@ -69,6 +73,12 @@ pub struct RunLocalTestnet {
#[clap(flatten)]
faucet_args: FaucetArgs,

#[clap(flatten)]
postgres_args: PostgresArgs,

#[clap(flatten)]
indexer_api_args: IndexerApiArgs,

#[clap(flatten)]
ready_server_args: ReadyServerArgs,

Expand All @@ -78,7 +88,7 @@ pub struct RunLocalTestnet {

impl RunLocalTestnet {
/// Wait for many services to start up. This prints a message like "X is starting,
/// please wait..." for each service and then "X is running. Endpoint: <url>"
/// please wait..." for each service and then "X is ready. Endpoint: <url>"
/// when it's ready.
async fn wait_for_startup<'a>(
&self,
Expand All @@ -88,11 +98,12 @@ impl RunLocalTestnet {
Vec::new();

for health_checker in health_checkers {
// We don't want to print anything for the processors, it'd be too spammy.
let silent = match health_checker {
HealthChecker::NodeApi(_) => false,
// We don't want to print anything for the processors, it'd be too spammy.
HealthChecker::Http(_, name) => name.contains("processor"),
HealthChecker::DataServiceGrpc(_) => false,
HealthChecker::Postgres(_) => false,
};
if !silent {
eprintln!("{} is starting, please wait...", health_checker);
Expand All @@ -101,7 +112,7 @@ impl RunLocalTestnet {
health_checker.wait(None).await?;
if !silent {
eprintln!(
"{} is running. Endpoint: {}",
"{} is ready. Endpoint: {}",
health_checker,
health_checker.address_str()
);
Expand Down Expand Up @@ -188,6 +199,11 @@ impl CliCommand<()> for RunLocalTestnet {
managers.push(Box::new(faucet_manager));
}

if self.indexer_api_args.with_indexer_api {
let postgres_manager = postgres::PostgresManager::new(&self, test_dir.clone());
managers.push(Box::new(postgres_manager));
}

// Now we put the node manager into managers, just so we have access to it
// before this so we can call things like `node_manager.get_node_api_url()`.
managers.push(Box::new(node_manager));
Expand Down
Loading

0 comments on commit fcb0465

Please sign in to comment.