Skip to content

Commit

Permalink
[aptos-workspace-server] add postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
vgao1996 committed Nov 4, 2024
1 parent e534544 commit b59fb0f
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 6 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions aptos-move/aptos-workspace-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ rand = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
bollard = { workspace = true }
maplit = {workspace = true }
220 changes: 215 additions & 5 deletions aptos-move/aptos-workspace-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::{anyhow, Context, Result};
use aptos::node::local_testnet::HealthChecker;
use aptos::node::local_testnet::{docker, HealthChecker};
use aptos_config::config::{NodeConfig, TableInfoServiceMode};
use aptos_faucet_core::server::{FunderKeyEnum, RunConfig};
use aptos_node::{load_node_config, start_and_report_ports};
use aptos_types::network_address::{NetworkAddress, Protocol};
use futures::{channel::oneshot, future::Shared, FutureExt};
use bollard::{
container::{
CreateContainerOptions, InspectContainerOptions, StartContainerOptions,
WaitContainerOptions,
},
network::CreateNetworkOptions,
secret::{ContainerInspectResponse, HostConfig, PortBinding},
};
use futures::{channel::oneshot, future::Shared, FutureExt, TryStreamExt};
use maplit::hashmap;
use rand::{rngs::StdRng, SeedableRng};
use std::{
future::Future,
Expand All @@ -17,6 +26,7 @@ use std::{
thread,
};
use url::Url;
use uuid::Uuid;

const IP_LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

Expand Down Expand Up @@ -219,29 +229,222 @@ fn start_faucet(
(fut_faucet_port, fut_faucet_finish)
}

const POSTGRES_DEFAULT_PORT: u16 = 5432;
const DATA_PATH_IN_CONTAINER: &str = "/var/lib/mydata";
const POSTGRES_IMAGE: &str = "postgres:14.11";

fn get_postgres_assigned_port(container_info: &ContainerInspectResponse) -> Option<u16> {
if let Some(port_bindings) = container_info
.network_settings
.as_ref()
.and_then(|ns| ns.ports.as_ref())
{
if let Some(Some(bindings)) = port_bindings.get(&format!("{}/tcp", POSTGRES_DEFAULT_PORT)) {
if let Some(binding) = bindings.first() {
return binding
.host_port
.as_ref()
.and_then(|port| port.parse::<u16>().ok());
}
}
}
None
}

fn start_postgres(
instance_id: Uuid,
) -> Result<(
impl Future<Output = Result<u16>>,
impl Future<Output = Result<()>>,
)> {
let (postgres_container_id_tx, postgres_container_id_rx) = oneshot::channel();

let handle_postgres = tokio::spawn(async move {
let docker = docker::get_docker().await?;

let volume_name = format!("aptos-workspace-{}", instance_id);
docker
.create_volume(bollard::volume::CreateVolumeOptions {
name: volume_name.as_str(),
..Default::default()
})
.await
.context("failed to create volume for postgres")?;

let network_name = format!("aptos-workspace-{}", instance_id);
docker
.create_network(CreateNetworkOptions {
name: network_name.as_str(),
internal: false,
check_duplicate: true,
..Default::default()
})
.await
.context("failed to create network for postgres")?;

let host_config = Some(HostConfig {
// Bind the container to the network we created in the pre_run. This does
// not prevent the binary in the container from exposing itself to the host
// on 127.0.0.1. See more here: https://stackoverflow.com/a/77432636/3846032.
network_mode: Some(network_name.clone()),
port_bindings: Some(hashmap! {
POSTGRES_DEFAULT_PORT.to_string() => Some(vec![PortBinding {
host_ip: Some("127.0.0.1".to_string()),
host_port: None,
}]),
}),
// Mount the volume in to the container. We use a volume because they are
// more performant and easier to manage via the Docker API.
binds: Some(vec![format!("{}:{}", volume_name, DATA_PATH_IN_CONTAINER,)]),
..Default::default()
});

let config = bollard::container::Config {
image: Some(POSTGRES_IMAGE.to_string()),
// We set this to false so the container keeps running after the CLI
// shuts down by default. We manually kill the container if applicable,
// for example if the user set --force-restart.
tty: Some(false),
exposed_ports: Some(hashmap! {POSTGRES_DEFAULT_PORT.to_string() => hashmap!{}}),
host_config,
env: Some(vec![
// We run postgres without any auth + no password.
"POSTGRES_HOST_AUTH_METHOD=trust".to_string(),
format!("POSTGRES_USER={}", "postgres"),
format!("POSTGRES_DB={}", "local-testnet"),
// This tells where postgres to store the DB data on disk. This is the
// directory inside the container that is mounted from the host system.
format!("PGDATA={}", DATA_PATH_IN_CONTAINER),
]),
cmd: Some(
vec![
"postgres",
"-c",
// The default is 100 as of Postgres 14.11. Given the localnet
// can be composed of many different processors all with their own
// connection pools, 100 is insufficient.
"max_connections=200",
"-c",
// The default is 128MB as of Postgres 14.11. We 2x that value to
// match the fact that we 2x'd max_connections.
"shared_buffers=256MB",
]
.into_iter()
.map(|s| s.to_string())
.collect(),
),
..Default::default()
};

let options = Some(CreateContainerOptions {
name: format!("aptos-workspace-{}-postgres", instance_id),
..Default::default()
});

let container_id = docker
.create_container(options, config)
.await
.context("failed to create postgres container")?
.id;

docker
.start_container(&container_id, None::<StartContainerOptions<&str>>)
.await
.context("failed to start postgres container")?;

let container_info = docker
.inspect_container(&container_id, Some(InspectContainerOptions::default()))
.await
.context("failed to inspect postgres container")?;

postgres_container_id_tx
.send(container_id)
.map_err(|_port| anyhow!("failed to send postgres container id"))?;

let postgres_port = get_postgres_assigned_port(&container_info)
.ok_or_else(|| anyhow!("failed to get postgres port"))?;

println!(
"Postgres is ready. Endpoint: http://{}:{}",
IP_LOCAL_HOST, postgres_port
);

// TODO: health checker
let health_checker = HealthChecker::Postgres(format!(
"postgres://{}@{}:{}/{}",
"postgres",
IP_LOCAL_HOST.to_string(),
postgres_port,
"local-testnet"
));
health_checker.wait(None).await?;

Ok(postgres_port)
});

let fut_postgres_port = async move {
handle_postgres
.await
.map_err(|err| anyhow!("failed to join handle task: {}", err))?
};

let fut_postgres_finish = async move {
let container_id = postgres_container_id_rx
.await
.context("failed to receive postgres container id")?;

let docker = docker::get_docker().await?;

// Wait for the container to stop (which it shouldn't).
let _wait = docker
.wait_container(
&container_id,
Some(WaitContainerOptions {
condition: "not-running",
}),
)
.try_collect::<Vec<_>>()
.await
.context("Failed to wait on postgres container")?;

Ok(())
};

Ok((fut_postgres_port, fut_postgres_finish))
}

async fn start_all_services(test_dir: &Path) -> Result<()> {
let instance_id = Uuid::new_v4();

// Step 1: spawn all services.
// Node
let (fut_node_api, fut_indexer_grpc, fut_node_finish) = start_node(test_dir)?;

let fut_node_api = make_shared(fut_node_api);
let fut_indexer_grpc = make_shared(fut_indexer_grpc);

// Faucet
let (fut_faucet, fut_faucet_finish) = start_faucet(
test_dir.to_owned(),
fut_node_api.clone(),
fut_indexer_grpc.clone(),
);

let (res_node_api, res_indexer_grpc, res_faucet) =
tokio::join!(fut_node_api, fut_indexer_grpc, fut_faucet);
// Postgres
let (fut_postgres, fut_postgres_finish) = start_postgres(instance_id)?;

// Step 2: wait for all services to be up.
let (res_node_api, res_indexer_grpc, res_faucet, res_postgres) =
tokio::join!(fut_node_api, fut_indexer_grpc, fut_faucet, fut_postgres);

res_node_api
.map_err(anyhow::Error::msg)
.context("failed to start node api")?;
res_indexer_grpc
.map_err(anyhow::Error::msg)
.context("failed to start node api")?;
res_faucet.context("failed to start faucet")?;
res_postgres.context("failed to start postgres")?;

println!(
"Indexer API is ready. Endpoint: http://{}:0/",
Expand All @@ -253,9 +456,10 @@ async fn start_all_services(test_dir: &Path) -> Result<()> {
// Step 3: wait for services to stop.
tokio::pin!(fut_node_finish);
tokio::pin!(fut_faucet_finish);
tokio::pin!(fut_postgres_finish);

let mut finished: u64 = 0;
while finished < 2 {
while finished < 3 {
tokio::select! {
res = &mut fut_node_finish => {
if let Err(err) = res {
Expand All @@ -269,6 +473,12 @@ async fn start_all_services(test_dir: &Path) -> Result<()> {
}
finished += 1;
}
res = &mut fut_postgres_finish => {
if let Err(err) = res {
eprintln!("Postgres existed with error: {}", err);
}
finished += 1;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/aptos/src/node/local_testnet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

mod docker;
mod indexer_api;
mod logging;
mod postgres;
Expand All @@ -10,6 +9,7 @@ mod ready_server;
mod utils;

// This is to allow external crates to use the localnode.
pub mod docker;
pub mod faucet;
pub mod health_checker;
pub mod node;
Expand Down

0 comments on commit b59fb0f

Please sign in to comment.