diff --git a/src/core/src/index/revindex/disk_revindex.rs b/src/core/src/index/revindex/disk_revindex.rs index b409263932..d0b752a81f 100644 --- a/src/core/src/index/revindex/disk_revindex.rs +++ b/src/core/src/index/revindex/disk_revindex.rs @@ -1,7 +1,7 @@ use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use byteorder::{LittleEndian, WriteBytesExt}; use log::{info, trace}; @@ -12,7 +12,7 @@ use crate::collection::{Collection, CollectionSet}; use crate::encodings::{Color, Idx}; use crate::index::revindex::{ self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps, - MANIFEST, STORAGE_SPEC, VERSION, + MANIFEST, PROCESSED, STORAGE_SPEC, VERSION, }; use crate::index::{calculate_gather_stats, GatherResult, SigCounter}; use crate::manifest::Manifest; @@ -20,7 +20,7 @@ use crate::prelude::*; use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree}; use crate::sketch::Sketch; use crate::storage::{ - rocksdb::{cf_descriptors, db_options, DB, HASHES, METADATA}, + rocksdb::{cf_descriptors, db_options, ALL_CFS, DB, HASHES, METADATA}, InnerStorage, RocksDBStorage, Storage, }; use crate::Result; @@ -38,6 +38,7 @@ fn compute_color(idxs: &Datasets) -> Color { pub struct RevIndex { db: Arc, collection: Arc, + processed: Arc>, } pub(crate) fn merge_datasets( @@ -78,9 +79,12 @@ impl RevIndex { let processed_sigs = AtomicUsize::new(0); + let processed = Arc::>::default(); + let index = Self { db, collection: Arc::new(collection), + processed: processed.clone(), }; index.collection.par_iter().for_each(|(dataset_id, _)| { @@ -89,7 +93,16 @@ impl RevIndex { info!("Processed {} reference sigs", i); } - index.map_hashes_colors(dataset_id as Idx); + // check if this dataset_id was processed already + // call map_hashes_colors only if not already processed + if !processed.read().unwrap().contains(&dataset_id) { + index.map_hashes_colors(dataset_id as Idx); + + // if cached in a new field in the RevIndex, + // then update the cache too + + processed.write().unwrap().extend([dataset_id]); + } }); index.save_collection().expect("Error saving collection"); @@ -127,7 +140,28 @@ impl RevIndex { storage_spec, )?); - Ok(module::RevIndex::Plain(Self { db, collection })) + let processed = Arc::new(RwLock::new(Self::load_processed( + db.clone(), + collection.clone(), + )?)); + + Ok(module::RevIndex::Plain(Self { + db, + collection, + processed, + })) + } + + fn load_processed(db: Arc, collection: Arc) -> Result { + let cf_metadata = db.cf_handle(METADATA).unwrap(); + if let Some(rdr) = db.get_pinned_cf(&cf_metadata, PROCESSED)? { + // convert rdr to Datasets + Datasets::from_slice(&rdr) + .ok_or_else(|| todo!("throw error from deserializing Datasets")) + } else { + let all_datasets: Vec<_> = (0..collection.manifest().len()).map(|v| v as Idx).collect(); + Ok(Datasets::new(&all_datasets)) + } } fn load_collection_from_rocksdb( @@ -211,6 +245,14 @@ impl RevIndex { .merge_cf(&cf_hashes, &hash_bytes[..], colors.as_slice()) .expect("error merging"); } + + // finished processing this dataset, + // do a merge_cf in the PROCESSED key in metadata + // to account for that. + let cf_metadata = self.db.cf_handle(METADATA).unwrap(); + self.db + .merge_cf(&cf_metadata, PROCESSED, colors.as_slice()) + .expect("error merging"); } } @@ -400,24 +442,32 @@ impl RevIndexOps for RevIndex { fn update(mut self, collection: CollectionSet) -> Result { // TODO: verify new collection manifest is a superset of current one, // and the initial chunk is the same - let to_skip = self.collection.check_superset(&collection)?; + self.collection.check_superset(&collection)?; // process the remainder let processed_sigs = AtomicUsize::new(0); self.collection = Arc::new(collection); - self.collection - .par_iter() - .skip(to_skip) - .for_each(|(dataset_id, _)| { - let i = processed_sigs.fetch_add(1, Ordering::SeqCst); - if i % 1000 == 0 { - info!("Processed {} reference sigs", i); - } + let processed = self.processed.clone(); + self.collection.par_iter().for_each(|(dataset_id, _)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } + + // check if this dataset_id was processed already + // call map_hashes_colors only if not already processed + if !processed.read().unwrap().contains(&dataset_id) { self.map_hashes_colors(dataset_id as Idx); - }); + + // if cached in a new field in the RevIndex, + // then update the cache too + + processed.write().unwrap().extend([dataset_id]); + } + }); self.save_collection().expect("Error saving collection"); @@ -437,7 +487,7 @@ impl RevIndexOps for RevIndex { } fn compact(&self) { - for cf_name in [HASHES, METADATA] { + for cf_name in ALL_CFS { let cf = self.db.cf_handle(cf_name).unwrap(); self.db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>) } diff --git a/src/core/src/index/revindex/mod.rs b/src/core/src/index/revindex/mod.rs index d2a620716c..aac0d47c14 100644 --- a/src/core/src/index/revindex/mod.rs +++ b/src/core/src/index/revindex/mod.rs @@ -28,6 +28,7 @@ use crate::Result; const MANIFEST: &str = "manifest"; const STORAGE_SPEC: &str = "storage_spec"; const VERSION: &str = "version"; +const PROCESSED: &str = "processed"; type QueryColors = HashMap; type HashToColorT = HashMap>; @@ -351,7 +352,6 @@ impl Datasets { } } - /* fn contains(&self, value: &Idx) -> bool { match self { Self::Empty => false, @@ -359,7 +359,6 @@ impl Datasets { Self::Many(ref v) => v.contains(*value), } } - */ } #[derive(Getters, Setters, Debug)] diff --git a/src/core/src/storage/rocksdb.rs b/src/core/src/storage/rocksdb.rs index 4145ab4c3c..0bf540aaf1 100644 --- a/src/core/src/storage/rocksdb.rs +++ b/src/core/src/storage/rocksdb.rs @@ -13,6 +13,8 @@ pub(crate) const METADATA: &str = "metadata"; // Column family for using rocksdb as a Storage pub(crate) const STORAGE: &str = "storage"; +pub(crate) const ALL_CFS: [&str; 3] = [HASHES, METADATA, STORAGE]; + pub type DB = rocksdb::DBWithThreadMode; /// Store data in RocksDB @@ -81,6 +83,10 @@ pub(crate) fn cf_descriptors() -> Vec { let mut cfopts = Options::default(); cfopts.set_max_write_buffer_number(16); + cfopts.set_merge_operator_associative( + "datasets operator", + crate::index::revindex::disk_revindex::merge_datasets, + ); // Updated default cfopts.set_level_compaction_dynamic_level_bytes(true);