Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
eric committed Mar 10, 2024
1 parent f57ca3d commit 1f27e8d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 14 deletions.
11 changes: 8 additions & 3 deletions ncbi/src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ impl NcbiFile {
&self,
site: &str,
data_dir: &PathBuf,
asm_levels: &Vec<&str>,
callback: F,
) -> Result<()>
where
Expand All @@ -208,11 +209,15 @@ impl NcbiFile {
if fields.len() > 19 {
let (_, asm_level, ftp_path) = (fields[5], fields[11], fields[19]);

if !["Complete Genome", "Chromosome"].contains(&asm_level)
|| ftp_path == "na"
{
if ftp_path == "na" {
continue;
}
if !asm_levels.contains(&asm_level) {
continue;
}
// if !["Complete Genome", "Chromosome"].contains(&asm_level) {
// continue;
// }

let fna_file_name = format!(
"{}_genomic.fna.gz",
Expand Down
53 changes: 47 additions & 6 deletions ncbi/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use anyhow::Result;
use clap::{Parser, Subcommand, ValueEnum};
use lazy_static::lazy_static;
use ncbi::fna::write_to_fna;
use ncbi::meta::{init_meta, save_meta};
use ncbi::task;
use ncbi::utils;
use std::collections::HashMap;
use std::fmt;
use std::path::PathBuf;
use tokio::runtime::Builder;
Expand All @@ -12,6 +14,23 @@ const NCBI_LIBRARY: &'static [&str] = &[
"archaea", "bacteria", "viral", "fungi", "plant", "human", "protozoa",
];

lazy_static! {
static ref NCBI_ASM_LEVELS: HashMap<String, Vec<&'static str>> = {
let mut m = HashMap::new();
m.insert("complete_genome".to_string(), vec!["Complete Genome"]);
m.insert("chromosome".to_string(), vec!["Chromosome"]);
m.insert("scaffold".to_string(), vec!["Scaffold"]);
m.insert("contig".into(), vec!["Contig"]);
m.insert("basic".into(), vec!["Complete Genome", "Chromosome"]);
m.insert("uncomplete".into(), vec!["Scaffold", "Contig"]);
m.insert(
"all".into(),
vec!["Complete Genome", "Chromosome", "Scaffold", "Contig"],
);
m
};
}

fn validate_group(group: &str) -> Result<String, String> {
let groups = utils::parse_comma_separated_list(&group);
for grp in &groups {
Expand Down Expand Up @@ -94,6 +113,9 @@ enum Commands {
#[arg(long, value_enum, default_value_t = Site::Refseq)]
site: Site,

#[arg(long, default_value = "basic")]
asm_level: String,

/// 从 NCBI 站点上下载某个种类的数据信息,可以是逗号分隔的多个, archaea,bacteria,viral,fungi,plant,human,protozoa
#[arg(short, long, value_parser = validate_group)]
group: String,
Expand All @@ -114,7 +136,12 @@ async fn async_run(args: Args) -> Result<()> {
utils::create_dir(&data_dir)?;
let _ = task::run_taxo(&data_dir).await;
}
Commands::Genomes { site, group, mode } => {
Commands::Genomes {
site,
group,
asm_level,
mode,
} => {
let site = site.to_string();
let groups = utils::parse_comma_separated_list(&group);
for grp in groups {
Expand All @@ -127,18 +154,26 @@ async fn async_run(args: Args) -> Result<()> {
grp.to_string()
};

let levels = NCBI_ASM_LEVELS.get(&asm_level).unwrap();

match &mode {
Some(Mode::Md5) => {
let _ =
task::run_check(&site, &trans_group, &data_dir, args.num_threads).await;
let _ = task::run_check(
&site,
&trans_group,
&data_dir,
&levels,
args.num_threads,
)
.await;
}
Some(Mode::Fna { out_dir }) => {
let fna_out_dir = out_dir.join("library").join(grp.clone());
utils::create_dir(&fna_out_dir)?;
let _ = write_to_fna(&site, &trans_group, &data_dir, &fna_out_dir).await;
}
Some(Mode::Assembly) => {
let _ = task::run_assembly(&site, &trans_group, &data_dir).await;
let _ = task::run_assembly(&site, &trans_group, &levels, &data_dir).await;
}
Some(Mode::Url { url }) => {
let result = task::run_download_file(&site, &data_dir, &url).await;
Expand All @@ -147,8 +182,14 @@ async fn async_run(args: Args) -> Result<()> {
}
}
None => {
let _ =
task::run_task(&site, &trans_group, &data_dir, args.num_threads).await;
let _ = task::run_task(
&site,
&trans_group,
&data_dir,
&&levels,
args.num_threads,
)
.await;
}
}
}
Expand Down
18 changes: 13 additions & 5 deletions ncbi/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ async fn process_assembly_tasks(
site: &str,
group: &str,
data_dir: &PathBuf,
asm_levels: &Vec<&str>,
tx: mpsc::Sender<NcbiFile>,
) -> Result<usize> {
let counter = Arc::new(AtomicUsize::new(0));
let assembly = NcbiFile::from_group(group, data_dir, site).await;
match assembly.run().await {
Ok(_) => {
let result = assembly
.process_summary_and_apply(site, data_dir, |file: NcbiFile| {
.process_summary_and_apply(site, data_dir, asm_levels, |file: NcbiFile| {
let tx_clone = tx.clone();
let counter_clone = counter.clone();
async move {
Expand All @@ -90,12 +91,13 @@ pub async fn run_task(
site: &str,
group: &str,
data_dir: &PathBuf,
asm_levels: &Vec<&str>,
num_threads: usize,
) -> Result<()> {
log::info!("{} {} download assembly file start...", group, site);
let (tx, rx) = mpsc::channel(4096); // 通道大小可以根据需要调整
let (tx1, rx1) = mpsc::channel(4096); // 通道大小可以根据需要调整
let assembly_tasks = process_assembly_tasks(site, group, data_dir, tx);
let assembly_tasks = process_assembly_tasks(site, group, data_dir, asm_levels, tx);
let download_handle = process_tasks("run".to_string(), num_threads, rx, Some(tx1));
let md5_handle = process_tasks("check".to_string(), num_threads, rx1, None);
// // 等待处理任务完成
Expand All @@ -116,11 +118,12 @@ pub async fn run_check(
site: &str,
group: &str,
data_dir: &PathBuf,
asm_levels: &Vec<&str>,
num_threads: usize,
) -> Result<()> {
log::info!("{} {} check md5 start...", group, site);
let (tx, rx) = mpsc::channel(4096); // 通道大小可以根据需要调整
let assembly_tasks = process_assembly_tasks(site, group, data_dir, tx);
let assembly_tasks = process_assembly_tasks(site, group, data_dir, asm_levels, tx);
let md5_handle = process_tasks("check".to_string(), num_threads, rx, None);
// // 等待处理任务完成
let (ably_res, md5_res) = tokio::join!(assembly_tasks, md5_handle);
Expand Down Expand Up @@ -162,15 +165,20 @@ pub async fn run_download_file(site: &str, data_dir: &PathBuf, fna_url: &str) ->
Ok(())
}

pub async fn run_assembly(site: &str, group: &str, data_dir: &PathBuf) -> Result<()> {
pub async fn run_assembly(
site: &str,
group: &str,
asm_levels: &Vec<&str>,
data_dir: &PathBuf,
) -> Result<()> {
let assembly = NcbiFile::from_group(group, data_dir, site).await;
if !assembly.file_exists() {
let _ = assembly.run().await;
}
let total_counter = Arc::new(AtomicUsize::new(0));
let counter = Arc::new(AtomicUsize::new(0));
let result = assembly
.process_summary_and_apply(site, data_dir, |file: NcbiFile| {
.process_summary_and_apply(site, data_dir, asm_levels, |file: NcbiFile| {
let counter_clone = counter.clone();
let total_counter_clone = total_counter.clone();
async move {
Expand Down

0 comments on commit 1f27e8d

Please sign in to comment.