Skip to content

Commit

Permalink
populate vbucket map in warmup
Browse files Browse the repository at this point in the history
  • Loading branch information
jrawsthorne committed Sep 13, 2023
1 parent 6c06053 commit 0f8df21
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 8 deletions.
30 changes: 29 additions & 1 deletion ep_engine/src/ep_bucket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use parking_lot::{Mutex, MutexGuard};
use std::{ops::Deref, sync::Arc};

use crate::{
kv_store::CouchKVStore,
Expand All @@ -9,12 +10,16 @@ use crate::{

pub struct EPBucket {
pub vbucket_map: VBucketMap,
vb_mutexes: Vec<Mutex<()>>,
}

impl EPBucket {
pub fn new(config: Config) -> EPBucketPtr {
let mut vb_mutexes = Vec::with_capacity(config.max_vbuckets as usize);
vb_mutexes.resize_with(config.max_vbuckets as usize, Default::default);
EPBucketPtr::new(EPBucket {
vbucket_map: VBucketMap::new(config.clone()),
vb_mutexes,
})
}

Expand All @@ -29,6 +34,29 @@ impl EPBucket {
pub fn get_vbuckets(&self) -> &VBucketMap {
&self.vbucket_map
}

/// Return a pointer to the given VBucket, acquiring the appropriate VB
/// mutex lock at the same time.
pub fn get_locked_vbucket(&self, vbid: Vbid) -> LockedVbucketPtr {
let _guard = self.vb_mutexes[usize::from(vbid)].lock();
let vb = self.vbucket_map.get_bucket(vbid);
LockedVbucketPtr { vb, _guard }
}

pub fn flush_vbucket_unlocked(&self, _vb: &LockedVbucketPtr) {}
}

pub type EPBucketPtr = Arc<EPBucket>;

pub struct LockedVbucketPtr<'a> {
pub vb: Option<VBucketPtr>,
_guard: MutexGuard<'a, ()>,
}

impl Deref for LockedVbucketPtr<'_> {
type Target = Option<VBucketPtr>;

fn deref(&self) -> &Self::Target {
&self.vb
}
}
18 changes: 17 additions & 1 deletion ep_engine/src/vbucket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{failover_table::FailoverTable, hash_table::HashTable};
use crossbeam_utils::atomic::AtomicCell;
use parking_lot::Mutex;
use parking_lot::{Mutex, MutexGuard};
use serde::{Deserialize, Serializer};
use std::{
fmt::{self, Display},
Expand All @@ -15,6 +15,8 @@ pub struct VBucket {
pub hash_table: Mutex<HashTable>,
state: AtomicCell<State>,
_failover_table: FailoverTable,
// Can state just be inside the mutex??
state_lock: Mutex<()>,
}

impl VBucket {
Expand All @@ -24,12 +26,26 @@ impl VBucket {
hash_table: Mutex::new(Default::default()),
state: AtomicCell::new(state),
_failover_table: failover_table,
state_lock: Mutex::new(()),
}
}

pub fn state(&self) -> State {
self.state.load()
}

pub fn get_state_lock(&self) -> MutexGuard<'_, ()> {
self.state_lock.lock()
}

pub fn set_state(&self, state: State) {
let _guard = self.get_state_lock();
self.set_state_unlocked(state);
}

fn set_state_unlocked(&self, state: State) {
self.state.store(state);
}
}

pub type VBucketPtr = Arc<VBucket>;
Expand Down
48 changes: 42 additions & 6 deletions ep_engine/src/warmup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ impl Warmup {

pub fn warmup(&mut self) {
self.initialise();
self.create_vbuckets(0);
for shard_id in 0..self.store.vbucket_map.get_num_shards() {
self.create_vbuckets(shard_id);
}
// self.load_collection_counts();
// self.estimate_item_count();
// // load_prepared_sync_writes();
// self.populate_vbucket_map();
for shard_id in 0..self.store.vbucket_map.get_num_shards() {
self.populate_vbucket_map(shard_id);
}
// self.key_dump();
// // self.load_access_log();
// self.load_data();
Expand Down Expand Up @@ -134,7 +138,7 @@ impl Warmup {
let max_entries = 25;

for (&vbid, state) in &self.shard_vb_states[shard_id] {
let _vb: std::sync::Arc<VBucket> = self.store.get_vbucket(vbid).unwrap_or_else(|| {
let _vb = self.store.get_vbucket(vbid).unwrap_or_else(|| {
let table = if state.failover_table.is_null() {
FailoverTable::new_empty(max_entries)
} else {
Expand All @@ -159,8 +163,39 @@ impl Warmup {
todo!()
}

fn _populate_vbucket_map(&self) {
todo!()
/// Adds all warmed up vbuckets (for the shard) to the bucket's VBMap,
/// once added to the VBMap the rest of the system will be able to
/// locate and operate on the VBucket, so this phase must only run once
/// each vbucket is completely initialised.
fn populate_vbucket_map(&self, shard_id: usize) {
for &vbid in &self.shard_vb_ids[shard_id] {
let vb = self.warmed_up_vbuckets.get(&vbid).unwrap().clone();
// Take the vBucket lock to stop the flusher from racing with our
// set vBucket state. It MUST go to disk in the first flush batch
// or we run the risk of not rolling back replicas that we should
let locked_vb = self.store.get_locked_vbucket(vbid);
assert!(locked_vb.is_none());

// TODO: self.checkpoint_manager.queue_set_vb_state();

{
// Note this lock is here for correctness - the VBucket is not
// accessible yet, so its state cannot be changed by other code.
let _state_lock = vb.get_state_lock();
if vb.state() == vbucket::State::Active {
// TODO: Update collection map for vbucket
}
}

self.store.flush_vbucket_unlocked(&locked_vb);

self.store.vbucket_map.add_bucket(vb);
}

if shard_id == self.store.vbucket_map.shards.len() - 1 {
// TODO: start flusher
self.warmed_up_vbuckets.clear();
}
}

fn _key_dump(&self) {
Expand All @@ -178,7 +213,7 @@ mod test {
use crate::{ep_bucket::EPBucket, vbucket};

#[test]
fn test_populate_shard_vb_states() {
fn test_warmup() {
let config = Config {
max_vbuckets: 1024,
max_shards: 1,
Expand All @@ -194,5 +229,6 @@ mod test {
.state,
vbucket::State::Active
);
assert_eq!(warmup.store.vbucket_map.get_num_alive_vbuckets(), 1024);
}
}

0 comments on commit 0f8df21

Please sign in to comment.