Skip to content

Commit

Permalink
Implement resumability for revindex
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Aug 5, 2024
1 parent 69fd616 commit f7b43f3
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 18 deletions.
82 changes: 66 additions & 16 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use byteorder::{LittleEndian, WriteBytesExt};
use log::{info, trace};
Expand All @@ -12,15 +12,15 @@ use crate::collection::{Collection, CollectionSet};
use crate::encodings::{Color, Idx};
use crate::index::revindex::{
self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps,
MANIFEST, STORAGE_SPEC, VERSION,
MANIFEST, PROCESSED, STORAGE_SPEC, VERSION,
};
use crate::index::{calculate_gather_stats, GatherResult, SigCounter};
use crate::manifest::Manifest;
use crate::prelude::*;
use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree};
use crate::sketch::Sketch;
use crate::storage::{
rocksdb::{cf_descriptors, db_options, DB, HASHES, METADATA},
rocksdb::{cf_descriptors, db_options, ALL_CFS, DB, HASHES, METADATA},
InnerStorage, RocksDBStorage, Storage,
};
use crate::Result;
Expand All @@ -38,6 +38,7 @@ fn compute_color(idxs: &Datasets) -> Color {
pub struct RevIndex {
db: Arc<DB>,
collection: Arc<CollectionSet>,
processed: Arc<RwLock<Datasets>>,
}

pub(crate) fn merge_datasets(
Expand Down Expand Up @@ -78,9 +79,12 @@ impl RevIndex {

let processed_sigs = AtomicUsize::new(0);

let processed = Arc::<RwLock<Datasets>>::default();

let index = Self {
db,
collection: Arc::new(collection),
processed: processed.clone(),
};

index.collection.par_iter().for_each(|(dataset_id, _)| {
Expand All @@ -89,7 +93,16 @@ impl RevIndex {
info!("Processed {} reference sigs", i);
}

index.map_hashes_colors(dataset_id as Idx);
// check if this dataset_id was processed already
// call map_hashes_colors only if not already processed
if !processed.read().unwrap().contains(&dataset_id) {
index.map_hashes_colors(dataset_id as Idx);

// if cached in a new field in the RevIndex,
// then update the cache too

processed.write().unwrap().extend([dataset_id]);
}
});

index.save_collection().expect("Error saving collection");
Expand Down Expand Up @@ -127,7 +140,28 @@ impl RevIndex {
storage_spec,
)?);

Ok(module::RevIndex::Plain(Self { db, collection }))
let processed = Arc::new(RwLock::new(Self::load_processed(
db.clone(),
collection.clone(),

Check warning on line 145 in src/core/src/index/revindex/disk_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/disk_revindex.rs#L144-L145

Added lines #L144 - L145 were not covered by tests
)?));

Ok(module::RevIndex::Plain(Self {
db,
collection,
processed,

Check warning on line 151 in src/core/src/index/revindex/disk_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/disk_revindex.rs#L151

Added line #L151 was not covered by tests
}))
}

fn load_processed(db: Arc<DB>, collection: Arc<CollectionSet>) -> Result<Datasets> {
let cf_metadata = db.cf_handle(METADATA).unwrap();
if let Some(rdr) = db.get_pinned_cf(&cf_metadata, PROCESSED)? {
// convert rdr to Datasets
Datasets::from_slice(&rdr)
.ok_or_else(|| todo!("throw error from deserializing Datasets"))

Check warning on line 160 in src/core/src/index/revindex/disk_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/disk_revindex.rs#L160

Added line #L160 was not covered by tests
} else {
let all_datasets: Vec<_> = (0..collection.manifest().len()).map(|v| v as Idx).collect();
Ok(Datasets::new(&all_datasets))

Check warning on line 163 in src/core/src/index/revindex/disk_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/disk_revindex.rs#L162-L163

Added lines #L162 - L163 were not covered by tests
}
}

fn load_collection_from_rocksdb(
Expand Down Expand Up @@ -211,6 +245,14 @@ impl RevIndex {
.merge_cf(&cf_hashes, &hash_bytes[..], colors.as_slice())
.expect("error merging");
}

// finished processing this dataset,
// do a merge_cf in the PROCESSED key in metadata
// to account for that.
let cf_metadata = self.db.cf_handle(METADATA).unwrap();
self.db
.merge_cf(&cf_metadata, PROCESSED, colors.as_slice())
.expect("error merging");
}
}

Expand Down Expand Up @@ -400,24 +442,32 @@ impl RevIndexOps for RevIndex {
fn update(mut self, collection: CollectionSet) -> Result<module::RevIndex> {
// TODO: verify new collection manifest is a superset of current one,
// and the initial chunk is the same
let to_skip = self.collection.check_superset(&collection)?;
self.collection.check_superset(&collection)?;

// process the remainder
let processed_sigs = AtomicUsize::new(0);

self.collection = Arc::new(collection);

self.collection
.par_iter()
.skip(to_skip)
.for_each(|(dataset_id, _)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}
let processed = self.processed.clone();

self.collection.par_iter().for_each(|(dataset_id, _)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

// check if this dataset_id was processed already
// call map_hashes_colors only if not already processed
if !processed.read().unwrap().contains(&dataset_id) {
self.map_hashes_colors(dataset_id as Idx);
});

// if cached in a new field in the RevIndex,
// then update the cache too

processed.write().unwrap().extend([dataset_id]);
}
});

self.save_collection().expect("Error saving collection");

Expand All @@ -437,7 +487,7 @@ impl RevIndexOps for RevIndex {
}

fn compact(&self) {
for cf_name in [HASHES, METADATA] {
for cf_name in ALL_CFS {
let cf = self.db.cf_handle(cf_name).unwrap();
self.db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>)
}
Expand Down
3 changes: 1 addition & 2 deletions src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::Result;
const MANIFEST: &str = "manifest";
const STORAGE_SPEC: &str = "storage_spec";
const VERSION: &str = "version";
const PROCESSED: &str = "processed";

type QueryColors = HashMap<Color, Datasets>;
type HashToColorT = HashMap<HashIntoType, Color, BuildNoHashHasher<HashIntoType>>;
Expand Down Expand Up @@ -351,15 +352,13 @@ impl Datasets {
}
}

/*
fn contains(&self, value: &Idx) -> bool {
match self {
Self::Empty => false,
Self::Unique(v) => v == value,
Self::Many(ref v) => v.contains(*value),
}
}
*/
}

#[derive(Getters, Setters, Debug)]
Expand Down
6 changes: 6 additions & 0 deletions src/core/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub(crate) const METADATA: &str = "metadata";
// Column family for using rocksdb as a Storage
pub(crate) const STORAGE: &str = "storage";

pub(crate) const ALL_CFS: [&str; 3] = [HASHES, METADATA, STORAGE];

pub type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;

/// Store data in RocksDB
Expand Down Expand Up @@ -81,6 +83,10 @@ pub(crate) fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
cfopts.set_merge_operator_associative(
"datasets operator",
crate::index::revindex::disk_revindex::merge_datasets,
);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

Expand Down

0 comments on commit f7b43f3

Please sign in to comment.