diff --git a/Cargo.lock b/Cargo.lock index 70a948561..3da8135d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1820,6 +1820,7 @@ name = "nextclade-web" version = "3.0.0-alpha.0" dependencies = [ "assert2", + "chrono", "console_error_panic_hook", "eyre", "getrandom", diff --git a/packages_rs/nextclade-web/Cargo.toml b/packages_rs/nextclade-web/Cargo.toml index 8288ad8f1..08f8b8da2 100644 --- a/packages_rs/nextclade-web/Cargo.toml +++ b/packages_rs/nextclade-web/Cargo.toml @@ -12,6 +12,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] assert2 = "=0.3.11" +chrono = { version = "=0.4.26", default-features = false, features = ["clock", "std", "wasmbind"] } console_error_panic_hook = "=0.1.7" eyre = "=0.6.8" getrandom = { version = "=0.2.10", features = ["js"] } @@ -19,12 +20,12 @@ itertools = "=0.11.0" js-sys = { version = "=0.3.64", features = [] } log = "=0.4.19" nextclade = { path = "../nextclade" } +schemars = { version = "=0.8.12", features = ["chrono", "either", "enumset", "indexmap1"] } serde = { version = "=1.0.164", features = ["derive"] } serde-wasm-bindgen = { version = "=0.5.0" } wasm-bindgen = { version = "=0.2.87", features = ["serde-serialize"] } wasm-logger = "=0.2.0" web-sys = { version = "=0.3.64", features = ["console"] } -schemars = { version = "=0.8.12", features = ["chrono", "either", "enumset", "indexmap1"] } [build-dependencies] nextclade = { path = "../nextclade" } diff --git a/packages_rs/nextclade-web/src/hooks/useRunSeqAutodetect.ts b/packages_rs/nextclade-web/src/hooks/useRunSeqAutodetect.ts index 80ae6590c..ce0af4768 100644 --- a/packages_rs/nextclade-web/src/hooks/useRunSeqAutodetect.ts +++ b/packages_rs/nextclade-web/src/hooks/useRunSeqAutodetect.ts @@ -17,8 +17,10 @@ export function useRunSeqAutodetect() { reset(minimizerIndexAtom) reset(autodetectResultsAtom) - function onResult(res: MinimizerSearchRecord) { - set(autodetectResultByIndexAtom(res.fastaRecord.index), res) + function onResult(results: MinimizerSearchRecord[]) { + results.forEach((res) => { + set(autodetectResultByIndexAtom(res.fastaRecord.index), res) + }) } Promise.all([getPromise(qrySeqInputsStorageAtom), getPromise(minimizerIndexVersionAtom)]) @@ -42,7 +44,7 @@ export function useRunSeqAutodetect() { async function runAutodetect( fasta: string, minimizerIndex: MinimizerIndexJson, - onResult: (res: MinimizerSearchRecord) => void, + onResult: (res: MinimizerSearchRecord[]) => void, ) { const worker = await SeqAutodetectWasmWorker.create(minimizerIndex) await worker.autodetect(fasta, { onResult }) @@ -51,7 +53,7 @@ async function runAutodetect( export class SeqAutodetectWasmWorker { private thread!: NextcladeSeqAutodetectWasmWorker - private subscription?: Subscription + private subscription?: Subscription private constructor() {} @@ -78,7 +80,7 @@ export class SeqAutodetectWasmWorker { onError, onComplete, }: { - onResult: (r: MinimizerSearchRecord) => void + onResult: (r: MinimizerSearchRecord[]) => void onError?: (error: Error) => void onComplete?: () => void }, diff --git a/packages_rs/nextclade-web/src/state/autodetect.state.ts b/packages_rs/nextclade-web/src/state/autodetect.state.ts index 2f793a3e4..11913819c 100644 --- a/packages_rs/nextclade-web/src/state/autodetect.state.ts +++ b/packages_rs/nextclade-web/src/state/autodetect.state.ts @@ -36,7 +36,7 @@ export const autodetectResultByIndexAtom = selectorFamily { - if (result && !prev.includes(result.fastaRecord.index)) { + if (result) { return [...prev, result.fastaRecord.index] } return prev diff --git a/packages_rs/nextclade-web/src/wasm/seq_autodetect.rs b/packages_rs/nextclade-web/src/wasm/seq_autodetect.rs index ce000bec9..5219ba526 100644 --- a/packages_rs/nextclade-web/src/wasm/seq_autodetect.rs +++ b/packages_rs/nextclade-web/src/wasm/seq_autodetect.rs @@ -1,32 +1,59 @@ use crate::wasm::jserr::jserr; +use chrono::Duration; use eyre::WrapErr; use nextclade::io::fasta::{FastaReader, FastaRecord}; +use nextclade::io::json::json_parse; use nextclade::sort::minimizer_index::MinimizerIndexJson; use nextclade::sort::minimizer_search::{run_minimizer_search, MinimizerSearchRecord}; use nextclade::sort::params::NextcladeSeqSortParams; +use nextclade::utils::datetime::date_now; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; use std::io::Read; use std::str::FromStr; use wasm_bindgen::prelude::*; +#[wasm_bindgen] +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct NextcladeSeqAutodetectWasmParams { + batch_interval_ms: i64, + max_batch_size: usize, +} + +impl Default for NextcladeSeqAutodetectWasmParams { + fn default() -> Self { + Self { + batch_interval_ms: 500, + max_batch_size: 100, + } + } +} + #[wasm_bindgen] pub struct NextcladeSeqAutodetectWasm { minimizer_index: MinimizerIndexJson, search_params: NextcladeSeqSortParams, + run_params: NextcladeSeqAutodetectWasmParams, } #[wasm_bindgen] impl NextcladeSeqAutodetectWasm { - pub fn new(minimizer_index_json_str: &str) -> Result { + pub fn new(minimizer_index_json_str: &str, params: &str) -> Result { let minimizer_index = jserr(MinimizerIndexJson::from_str(minimizer_index_json_str))?; Ok(Self { minimizer_index, search_params: NextcladeSeqSortParams::default(), + run_params: jserr(json_parse(params))?, }) } pub fn autodetect(&self, qry_fasta_str: &str, callback: &js_sys::Function) -> Result<(), JsError> { let mut reader = jserr(FastaReader::from_str(&qry_fasta_str).wrap_err_with(|| "When creating fasta reader"))?; + let mut batch = vec![]; + let mut last_flush = date_now(); + loop { let mut fasta_record = FastaRecord::default(); jserr(reader.read(&mut fasta_record).wrap_err("When reading a fasta record"))?; @@ -43,13 +70,26 @@ impl NextcladeSeqAutodetectWasm { }), )?; - let result_js = serde_wasm_bindgen::to_value(&MinimizerSearchRecord { fasta_record, result })?; + batch.push(MinimizerSearchRecord { fasta_record, result }); - callback - .call1(&JsValue::null(), &result_js) - .map_err(|err_val| JsError::new(&format!("{err_val:#?}")))?; + if (date_now() - last_flush >= Duration::milliseconds(self.run_params.batch_interval_ms)) + || batch.len() >= self.run_params.max_batch_size + { + let result_js = serde_wasm_bindgen::to_value(&batch)?; + callback + .call1(&JsValue::null(), &result_js) + .map_err(|err_val| JsError::new(&format!("{err_val:#?}")))?; + last_flush = date_now(); + batch.clear(); + } } + let result_js = serde_wasm_bindgen::to_value(&batch)?; + callback + .call1(&JsValue::null(), &result_js) + .map_err(|err_val| JsError::new(&format!("{err_val:#?}")))?; + batch.clear(); + Ok(()) } } diff --git a/packages_rs/nextclade-web/src/workers/nextcladeAutodetect.worker.ts b/packages_rs/nextclade-web/src/workers/nextcladeAutodetect.worker.ts index 802e022e3..3343331b7 100644 --- a/packages_rs/nextclade-web/src/workers/nextcladeAutodetect.worker.ts +++ b/packages_rs/nextclade-web/src/workers/nextcladeAutodetect.worker.ts @@ -8,9 +8,9 @@ import type { Thread } from 'threads' import { expose } from 'threads/worker' import { NextcladeSeqAutodetectWasm } from 'src/gen/nextclade-wasm' -const gSubject = new Subject() +const gSubject = new Subject() -function onResultParsed(resStr: MinimizerSearchRecord) { +function onResultParsed(resStr: MinimizerSearchRecord[]) { gSubject.next(resStr) } @@ -23,7 +23,10 @@ let nextcladeAutodetect: NextcladeSeqAutodetectWasm | undefined /** Creates the underlying WebAssembly module. */ async function create(minimizerIndexJsonStr: MinimizerIndexJson) { - nextcladeAutodetect = NextcladeSeqAutodetectWasm.new(JSON.stringify(minimizerIndexJsonStr)) + nextcladeAutodetect = NextcladeSeqAutodetectWasm.new( + JSON.stringify(minimizerIndexJsonStr), + JSON.stringify({ batchIntervalMs: 700, maxBatchSize: 1000 }), + ) } /** Destroys the underlying WebAssembly module. */ @@ -54,7 +57,7 @@ const worker = { create, destroy, autodetect, - values(): ThreadsObservable { + values(): ThreadsObservable { return ThreadsObservable.from(gSubject) }, }