Skip to content

Commit

Permalink
perf: batch updates from wasm to reduce main thread blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-aksamentov committed Sep 7, 2023
1 parent 94578f4 commit 2a97561
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion packages_rs/nextclade-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ crate-type = ["cdylib", "rlib"]

[dependencies]
assert2 = "=0.3.11"
chrono = { version = "=0.4.26", default-features = false, features = ["clock", "std", "wasmbind"] }
console_error_panic_hook = "=0.1.7"
eyre = "=0.6.8"
getrandom = { version = "=0.2.10", features = ["js"] }
itertools = "=0.11.0"
js-sys = { version = "=0.3.64", features = [] }
log = "=0.4.19"
nextclade = { path = "../nextclade" }
schemars = { version = "=0.8.12", features = ["chrono", "either", "enumset", "indexmap1"] }
serde = { version = "=1.0.164", features = ["derive"] }
serde-wasm-bindgen = { version = "=0.5.0" }
wasm-bindgen = { version = "=0.2.87", features = ["serde-serialize"] }
wasm-logger = "=0.2.0"
web-sys = { version = "=0.3.64", features = ["console"] }
schemars = { version = "=0.8.12", features = ["chrono", "either", "enumset", "indexmap1"] }

[build-dependencies]
nextclade = { path = "../nextclade" }
Expand Down
12 changes: 7 additions & 5 deletions packages_rs/nextclade-web/src/hooks/useRunSeqAutodetect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ export function useRunSeqAutodetect() {
reset(minimizerIndexAtom)
reset(autodetectResultsAtom)

function onResult(res: MinimizerSearchRecord) {
set(autodetectResultByIndexAtom(res.fastaRecord.index), res)
function onResult(results: MinimizerSearchRecord[]) {
results.forEach((res) => {
set(autodetectResultByIndexAtom(res.fastaRecord.index), res)
})
}

Promise.all([getPromise(qrySeqInputsStorageAtom), getPromise(minimizerIndexVersionAtom)])
Expand All @@ -42,7 +44,7 @@ export function useRunSeqAutodetect() {
async function runAutodetect(
fasta: string,
minimizerIndex: MinimizerIndexJson,
onResult: (res: MinimizerSearchRecord) => void,
onResult: (res: MinimizerSearchRecord[]) => void,
) {
const worker = await SeqAutodetectWasmWorker.create(minimizerIndex)
await worker.autodetect(fasta, { onResult })
Expand All @@ -51,7 +53,7 @@ async function runAutodetect(

export class SeqAutodetectWasmWorker {
private thread!: NextcladeSeqAutodetectWasmWorker
private subscription?: Subscription<MinimizerSearchRecord>
private subscription?: Subscription<MinimizerSearchRecord[]>

private constructor() {}

Expand All @@ -78,7 +80,7 @@ export class SeqAutodetectWasmWorker {
onError,
onComplete,
}: {
onResult: (r: MinimizerSearchRecord) => void
onResult: (r: MinimizerSearchRecord[]) => void
onError?: (error: Error) => void
onComplete?: () => void
},
Expand Down
2 changes: 1 addition & 1 deletion packages_rs/nextclade-web/src/state/autodetect.state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export const autodetectResultByIndexAtom = selectorFamily<MinimizerSearchRecord,

// Add to the list of indices
set(autodetectResultIndicesAtom, (prev) => {
if (result && !prev.includes(result.fastaRecord.index)) {
if (result) {
return [...prev, result.fastaRecord.index]
}
return prev
Expand Down
50 changes: 45 additions & 5 deletions packages_rs/nextclade-web/src/wasm/seq_autodetect.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,59 @@
use crate::wasm::jserr::jserr;
use chrono::Duration;
use eyre::WrapErr;
use nextclade::io::fasta::{FastaReader, FastaRecord};
use nextclade::io::json::json_parse;
use nextclade::sort::minimizer_index::MinimizerIndexJson;
use nextclade::sort::minimizer_search::{run_minimizer_search, MinimizerSearchRecord};
use nextclade::sort::params::NextcladeSeqSortParams;
use nextclade::utils::datetime::date_now;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::io::Read;
use std::str::FromStr;
use wasm_bindgen::prelude::*;

#[wasm_bindgen]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct NextcladeSeqAutodetectWasmParams {
batch_interval_ms: i64,
max_batch_size: usize,
}

impl Default for NextcladeSeqAutodetectWasmParams {
fn default() -> Self {
Self {
batch_interval_ms: 500,
max_batch_size: 100,
}
}
}

#[wasm_bindgen]
pub struct NextcladeSeqAutodetectWasm {
minimizer_index: MinimizerIndexJson,
search_params: NextcladeSeqSortParams,
run_params: NextcladeSeqAutodetectWasmParams,
}

#[wasm_bindgen]
impl NextcladeSeqAutodetectWasm {
pub fn new(minimizer_index_json_str: &str) -> Result<NextcladeSeqAutodetectWasm, JsError> {
pub fn new(minimizer_index_json_str: &str, params: &str) -> Result<NextcladeSeqAutodetectWasm, JsError> {
let minimizer_index = jserr(MinimizerIndexJson::from_str(minimizer_index_json_str))?;
Ok(Self {
minimizer_index,
search_params: NextcladeSeqSortParams::default(),
run_params: jserr(json_parse(params))?,
})
}

pub fn autodetect(&self, qry_fasta_str: &str, callback: &js_sys::Function) -> Result<(), JsError> {
let mut reader = jserr(FastaReader::from_str(&qry_fasta_str).wrap_err_with(|| "When creating fasta reader"))?;

let mut batch = vec![];
let mut last_flush = date_now();

loop {
let mut fasta_record = FastaRecord::default();
jserr(reader.read(&mut fasta_record).wrap_err("When reading a fasta record"))?;
Expand All @@ -43,13 +70,26 @@ impl NextcladeSeqAutodetectWasm {
}),
)?;

let result_js = serde_wasm_bindgen::to_value(&MinimizerSearchRecord { fasta_record, result })?;
batch.push(MinimizerSearchRecord { fasta_record, result });

callback
.call1(&JsValue::null(), &result_js)
.map_err(|err_val| JsError::new(&format!("{err_val:#?}")))?;
if (date_now() - last_flush >= Duration::milliseconds(self.run_params.batch_interval_ms))
|| batch.len() >= self.run_params.max_batch_size
{
let result_js = serde_wasm_bindgen::to_value(&batch)?;
callback
.call1(&JsValue::null(), &result_js)
.map_err(|err_val| JsError::new(&format!("{err_val:#?}")))?;
last_flush = date_now();
batch.clear();
}
}

let result_js = serde_wasm_bindgen::to_value(&batch)?;
callback
.call1(&JsValue::null(), &result_js)
.map_err(|err_val| JsError::new(&format!("{err_val:#?}")))?;
batch.clear();

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import type { Thread } from 'threads'
import { expose } from 'threads/worker'
import { NextcladeSeqAutodetectWasm } from 'src/gen/nextclade-wasm'

const gSubject = new Subject<MinimizerSearchRecord>()
const gSubject = new Subject<MinimizerSearchRecord[]>()

function onResultParsed(resStr: MinimizerSearchRecord) {
function onResultParsed(resStr: MinimizerSearchRecord[]) {
gSubject.next(resStr)
}

Expand All @@ -23,7 +23,10 @@ let nextcladeAutodetect: NextcladeSeqAutodetectWasm | undefined

/** Creates the underlying WebAssembly module. */
async function create(minimizerIndexJsonStr: MinimizerIndexJson) {
nextcladeAutodetect = NextcladeSeqAutodetectWasm.new(JSON.stringify(minimizerIndexJsonStr))
nextcladeAutodetect = NextcladeSeqAutodetectWasm.new(
JSON.stringify(minimizerIndexJsonStr),
JSON.stringify({ batchIntervalMs: 700, maxBatchSize: 1000 }),
)
}

/** Destroys the underlying WebAssembly module. */
Expand Down Expand Up @@ -54,7 +57,7 @@ const worker = {
create,
destroy,
autodetect,
values(): ThreadsObservable<MinimizerSearchRecord> {
values(): ThreadsObservable<MinimizerSearchRecord[]> {
return ThreadsObservable.from(gSubject)
},
}
Expand Down

0 comments on commit 2a97561

Please sign in to comment.