Skip to content

Commit

Permalink
use gcs for reading replay-verify
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Oct 15, 2023
1 parent eae267c commit 7c49f09
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 163 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/replay-verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ jobs:
with:
GIT_SHA: ${{ inputs.GIT_SHA }}
# replay-verify config
BUCKET: aptos-testnet-backup-2223d95b
BUCKET: aptos-testnet-backup-b7b1ad7a
SUB_DIR: e1
HISTORY_START: 250000000 # TODO: We need an exhaustive list of txns_to_skip before we can set this to 0.
TXNS_TO_SKIP: 46874937 151020059 409163615 409163669 409163708 409163774 409163845 409163955 409164059 409164191 414625832
BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/s5cmd-public.yaml
BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/gcs.yaml
# workflow config
RUNS_ON: "high-perf-docker-with-local-ssd"
TIMEOUT_MINUTES: 840
Expand All @@ -76,11 +76,11 @@ jobs:
with:
GIT_SHA: ${{ inputs.GIT_SHA }}
# replay-verify config
BUCKET: aptos-mainnet-backup-backup-831a69a8
BUCKET: aptos-mainnet-backup-backup-e098483d
SUB_DIR: e1
HISTORY_START: 0
TXNS_TO_SKIP: 12253479 12277499 148358668
BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/s5cmd-public.yaml
BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/gcs.yaml
# workflow config
RUNS_ON: "high-perf-docker-with-local-ssd"
TIMEOUT_MINUTES: 300
Expand All @@ -93,11 +93,11 @@ jobs:
with:
GIT_SHA: ${{ github.event.pull_request.head.sha }}
# replay-verify config
BUCKET: aptos-testnet-backup-2223d95b
BUCKET: aptos-testnet-backup-b7b1ad7a
SUB_DIR: e1
HISTORY_START: 250000000 # TODO: We need an exhaustive list of txns_to_skip before we can set this to 0.
TXNS_TO_SKIP: 46874937 151020059 409163615 409163669 409163708 409163774 409163845 409163955 409164059 409164191 414625832
BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/s5cmd-public.yaml
BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/gcs.yaml
# workflow config
RUNS_ON: "high-perf-docker-with-local-ssd"
TIMEOUT_MINUTES: 120 # increase test replay timeout to capture more flaky errors
16 changes: 5 additions & 11 deletions .github/workflows/workflow-run-replay-verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,11 @@ jobs:
with:
GIT_CREDENTIALS: ${{ secrets.GIT_CREDENTIALS }}

- name: Install AWS CLI
shell: bash
run: |
scripts/dev_setup.sh -b -i awscli
echo "${HOME}/bin/" >> $GITHUB_PATH # default INSTALL_DIR to path
- name: Install s5cmd
shell: bash
run: |
scripts/dev_setup.sh -b -i s5cmd
echo "${HOME}/bin/" >> $GITHUB_PATH # default INSTALL_DIR to path
- name: Install GCloud SDK
uses: "google-github-actions/setup-gcloud@62d4898025f6041e16b1068643bfc5a696863587" # pin@v1
with:
version: ">= 418.0.0"
install_components: "kubectl,gke-gcloud-auth-plugin"

- name: Build CLI binaries in release mode
shell: bash
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ tokio = { version = "1.21.0", features = ["full"] }
tokio-io-timeout = "1.2.0"
tokio-metrics = "0.1.0"
tokio-retry = "0.3.0"
tokio-stream = "0.1.8"
tokio-stream = { version = "0.1.14", features = ["fs"] }
tokio-test = "0.4.1"
tokio-util = { version = "0.7.2", features = ["compat", "codec"] }
toml = "0.7.4"
Expand Down
3 changes: 2 additions & 1 deletion storage/backup/backup-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-io-timeout = { workspace = true }
tokio-stream = { workspace = true }
tokio-stream = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true }

[dev-dependencies]
Expand Down
119 changes: 72 additions & 47 deletions storage/backup/backup-cli/src/coordinators/replay_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,28 @@ use crate::{
storage::BackupStorage,
utils::{GlobalRestoreOptions, RestoreRunMode, TrustedWaypointOpt},
};
use anyhow::{bail, ensure, Result};
use anyhow::Result;
use aptos_db::backup::restore_handler::RestoreHandler;
use aptos_executor_types::VerifyExecutionMode;
use aptos_logger::prelude::*;
use aptos_types::{on_chain_config::TimedFeatureOverride, transaction::Version};
use aptos_vm::AptosVM;
use std::sync::Arc;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum ReplayError {
#[error("Txn mismatch error")]
TxnMismatch,
#[error("Other Replay error {0}")]
OtherError(String),
}

impl From<anyhow::Error> for ReplayError {
fn from(error: anyhow::Error) -> Self {
ReplayError::OtherError(error.to_string())
}
}
pub struct ReplayVerifyCoordinator {
storage: Arc<dyn BackupStorage>,
metadata_cache_opt: MetadataCacheOpt,
Expand Down Expand Up @@ -60,9 +74,8 @@ impl ReplayVerifyCoordinator {
})
}

pub async fn run(self) -> Result<()> {
pub async fn run(self) -> Result<(), ReplayError> {
info!("ReplayVerify coordinator started.");

let ret = self.run_impl().await;

if let Err(e) = &ret {
Expand All @@ -77,7 +90,7 @@ impl ReplayVerifyCoordinator {
ret
}

async fn run_impl(self) -> Result<()> {
async fn run_impl(self) -> Result<(), ReplayError> {
AptosVM::set_concurrency_level_once(self.replay_concurrency_level);
AptosVM::set_timed_feature_override(TimedFeatureOverride::Replay);

Expand All @@ -87,45 +100,56 @@ impl ReplayVerifyCoordinator {
self.concurrent_downloads,
)
.await?;
ensure!(
self.start_version <= self.end_version,
"start_version should precede end_version."
);
if self.start_version > self.end_version {
return Err(ReplayError::OtherError(format!(
"start_version {} should precede end_version {}.",
self.start_version, self.end_version
)));
}

let run_mode = Arc::new(RestoreRunMode::Restore {
restore_handler: self.restore_handler,
});
let next_txn_version = run_mode.get_next_expected_transaction_version()?;
let (state_snapshot, replay_transactions_from_version) = if next_txn_version != 0 {
// DB is already in workable state
info!(
next_txn_version = next_txn_version,
"DB already has non-empty State DB.",
);
(None, next_txn_version)
} else if let Some(version) = run_mode.get_in_progress_state_kv_snapshot()? {
let mut next_txn_version = run_mode.get_next_expected_transaction_version()?;
let (state_snapshot, snapshot_version) = if let Some(version) =
run_mode.get_in_progress_state_kv_snapshot()?
{
info!(
version = version,
"Found in progress state snapshot restore",
);
(Some(metadata_view.expect_state_snapshot(version)?), version)
} else if self.start_version == 0 {
(None, 0)
(
Some(metadata_view.expect_state_snapshot(version)?),
Some(version),
)
} else if let Some(snapshot) = metadata_view.select_state_snapshot(self.start_version)? {
let snapshot_version = snapshot.version;
info!(
"Found state snapshot backup at epoch {}, will replay from version {}.",
snapshot.epoch,
snapshot_version + 1
);
(Some(snapshot), Some(snapshot_version))
} else {
let state_snapshot = metadata_view.select_state_snapshot(self.start_version - 1)?;
let replay_transactions_from_version =
state_snapshot.as_ref().map(|b| b.version + 1).unwrap_or(0);
(state_snapshot, replay_transactions_from_version)
(None, None)
};
ensure!(
next_txn_version <= self.start_version,
"DB version is already beyond start_version requested.",
);

let skip_snapshot: bool =
snapshot_version.is_none() || next_txn_version > snapshot_version.unwrap();
if skip_snapshot {
info!(
next_txn_version = next_txn_version,
snapshot_version = snapshot_version,
"found in progress replay and skip the state snapshot restore",
);
}

next_txn_version = std::cmp::max(next_txn_version, snapshot_version.map_or(0, |v| v + 1));

let transactions = metadata_view.select_transaction_backups(
// transaction info at the snapshot must be restored otherwise the db will be confused
// about the latest version after snapshot is restored.
replay_transactions_from_version.saturating_sub(1),
next_txn_version.saturating_sub(1),
self.end_version,
)?;
let global_opt = GlobalRestoreOptions {
Expand All @@ -135,21 +159,22 @@ impl ReplayVerifyCoordinator {
concurrent_downloads: self.concurrent_downloads,
replay_concurrency_level: 0, // won't replay, doesn't matter
};

if let Some(backup) = state_snapshot {
StateSnapshotRestoreController::new(
StateSnapshotRestoreOpt {
manifest_handle: backup.manifest,
version: backup.version,
validate_modules: self.validate_modules,
restore_mode: Default::default(),
},
global_opt.clone(),
Arc::clone(&self.storage),
None, /* epoch_history */
)
.run()
.await?;
if !skip_snapshot {
if let Some(backup) = state_snapshot {
StateSnapshotRestoreController::new(
StateSnapshotRestoreOpt {
manifest_handle: backup.manifest,
version: backup.version,
validate_modules: self.validate_modules,
restore_mode: Default::default(),
},
global_opt.clone(),
Arc::clone(&self.storage),
None, /* epoch_history */
)
.run()
.await?;
}
}

let txn_manifests = transactions.into_iter().map(|b| b.manifest).collect();
Expand All @@ -158,16 +183,16 @@ impl ReplayVerifyCoordinator {
self.storage,
txn_manifests,
None,
Some((replay_transactions_from_version, false)), /* replay_from_version */
None, /* epoch_history */
Some((next_txn_version, false)), /* replay_from_version */
None, /* epoch_history */
self.verify_execution_mode.clone(),
None,
)
.run()
.await?;

if self.verify_execution_mode.seen_error() {
bail!("Seen replay errors, check out logs.")
Err(ReplayError::TxnMismatch)
} else {
Ok(())
}
Expand Down
37 changes: 16 additions & 21 deletions storage/backup/backup-cli/src/metadata/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use aptos_logger::prelude::*;
use aptos_temppath::TempPath;
use async_trait::async_trait;
use clap::Parser;
use futures::stream::poll_fn;
use std::{
collections::{HashMap, HashSet},
path::{Path, PathBuf},
Expand All @@ -24,7 +23,7 @@ use tokio::{
fs::{create_dir_all, read_dir, remove_file, OpenOptions},
io::{AsyncRead, AsyncReadExt},
};
use tokio_stream::StreamExt;
use tokio_stream::{wrappers::ReadDirStream, StreamExt};

#[derive(Clone, Parser)]
pub struct MetadataCacheOpt {
Expand Down Expand Up @@ -99,25 +98,19 @@ pub async fn sync_and_load(
create_dir_all(&cache_dir).await.err_notes(&cache_dir)?; // create if not present already

// List cached metadata files.
let mut dir = read_dir(&cache_dir).await.err_notes(&cache_dir)?;
let local_entries = poll_fn(|ctx| {
::std::task::Poll::Ready(match futures::ready!(dir.poll_next_entry(ctx)) {
Ok(Some(entry)) => Some(Ok(entry)),
Ok(None) => None,
Err(err) => Some(Err(err)),
let dir = read_dir(&cache_dir).await.err_notes(&cache_dir)?;
let local_hashes_vec: Vec<String> = ReadDirStream::new(dir)
.filter_map(|entry| match entry {
Ok(e) => {
let path = e.path();
let file_name = path.file_name()?.to_str()?;
Some(file_name.to_string())
},
Err(_) => None,
})
})
.collect::<tokio::io::Result<Vec<_>>>()
.await?;
let local_hashes = local_entries
.iter()
.map(|e| {
e.file_name()
.into_string()
.map_err(|s| anyhow!("into_string() failed for file name {:?}", s))
})
.collect::<Result<HashSet<_>>>()?;

.collect()
.await;
let local_hashes: HashSet<_> = local_hashes_vec.into_iter().collect();
// List remote metadata files.
let mut remote_file_handles = storage.list_metadata_files().await?;
if remote_file_handles.is_empty() {
Expand Down Expand Up @@ -164,7 +157,9 @@ pub async fn sync_and_load(
download_file(storage_ref, file_handle, &local_tmp_file).await?;
// rename to target file only if successful; stale tmp file caused by failure will be
// reclaimed on next run
tokio::fs::rename(local_tmp_file, local_file).await?;
tokio::fs::rename(local_tmp_file.clone(), local_file)
.await
.err_notes(local_tmp_file)?;
info!(
file_handle = file_handle,
processed = i + 1,
Expand Down
2 changes: 1 addition & 1 deletion storage/backup/backup-cli/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl StorageOpt {
}
}

#[derive(Parser)]
#[derive(Parser, Clone, Debug)]
#[clap(group(
ArgGroup::new("storage")
.required(true)
Expand Down
Loading

0 comments on commit 7c49f09

Please sign in to comment.