Skip to content

Commit

Permalink
Removes all usage of block_on, and use a oneshot channel instead.
Browse files Browse the repository at this point in the history
Calling `block_on` panics in certain context.
For instance, it panics when it is called in a the context of another
call to block.

Using it in tantivy is unnecessary. We replace it by a thin wrapper
around a oneshot channel that supports both async/sync.
  • Loading branch information
fulmicoton committed Mar 17, 2022
1 parent 2e255c4 commit 261f6cc
Show file tree
Hide file tree
Showing 17 changed files with 221 additions and 145 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ keywords = ["search", "information", "retrieval"]
edition = "2018"

[dependencies]
oneshot = "0.1"
base64 = "0.13"
byteorder = "1.4.3"
crc32fast = "1.2.1"
Expand Down
8 changes: 3 additions & 5 deletions src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option<u64> {

#[cfg(test)]
mod tests {

use futures::executor::block_on;
use serde_json::Value;

use super::agg_req::{Aggregation, Aggregations, BucketAggregation};
Expand Down Expand Up @@ -350,7 +348,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -553,7 +551,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -990,7 +988,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down
2 changes: 1 addition & 1 deletion src/directory/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl FileWatcher {
if metafile_has_changed {
info!("Meta file {:?} was modified", path);
current_checksum_opt = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
callbacks.broadcast().wait();
}
}

Expand Down
9 changes: 3 additions & 6 deletions src/directory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;

use futures::channel::oneshot;
use futures::executor::block_on;

use super::*;

#[cfg(feature = "mmap")]
Expand Down Expand Up @@ -249,8 +246,8 @@ fn test_lock_blocking(directory: &dyn Directory) {
std::thread::spawn(move || {
//< lock_a_res is sent to the thread.
in_thread_clone.store(true, SeqCst);
let _just_sync = block_on(receiver);
// explicitely droping lock_a_res. It would have been sufficient to just force it
let _just_sync = receiver.recv();
// explicitely dropping lock_a_res. It would have been sufficient to just force it
// to be part of the move, but the intent seems clearer that way.
drop(lock_a_res);
});
Expand All @@ -273,7 +270,7 @@ fn test_lock_blocking(directory: &dyn Directory) {
assert!(in_thread.load(SeqCst));
assert!(lock_a_res.is_ok());
});
assert!(block_on(receiver2).is_ok());
assert!(receiver2.recv().is_ok());
assert!(sender.send(()).is_ok());
assert!(join_handle.join().is_ok());
}
62 changes: 28 additions & 34 deletions src/directory/watch_event_router.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::sync::{Arc, RwLock, Weak};

use futures::channel::oneshot;
use futures::{Future, TryFutureExt};

/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
pub struct WatchCallback(Arc<dyn Fn() + Sync + Send>);
Expand Down Expand Up @@ -74,13 +71,13 @@ impl WatchCallbackList {
}

/// Triggers all callbacks
pub fn broadcast(&self) -> impl Future<Output = ()> {
pub fn broadcast(&self) -> BroadcastCallback {
let callbacks = self.list_callback();
let (sender, receiver) = oneshot::channel();
let result = receiver.unwrap_or_else(|_| ());
let broadcast_cb = BroadcastCallback { receiver };
if callbacks.is_empty() {
let _ = sender.send(());
return result;
return broadcast_cb;
}
let spawn_res = std::thread::Builder::new()
.name("watch-callbacks".to_string())
Expand All @@ -96,7 +93,17 @@ impl WatchCallbackList {
err
);
}
result
broadcast_cb
}
}

pub struct BroadcastCallback {
receiver: oneshot::Receiver<()>,
}

impl BroadcastCallback {
pub fn wait(self) {
let _ = self.receiver.recv();
}
}

Expand All @@ -106,8 +113,6 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use futures::executor::block_on;

use crate::directory::{WatchCallback, WatchCallbackList};

#[test]
Expand All @@ -118,22 +123,18 @@ mod tests {
let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait();
assert_eq!(0, counter.load(Ordering::SeqCst));
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait();
assert_eq!(1, counter.load(Ordering::SeqCst));
block_on(async {
(
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
)
});
watch_event_router.broadcast().wait();
watch_event_router.broadcast().wait();
watch_event_router.broadcast().wait();
assert_eq!(4, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait();
assert_eq!(4, counter.load(Ordering::SeqCst));
}

Expand All @@ -150,19 +151,15 @@ mod tests {
let handle_a = watch_event_router.subscribe(inc_callback(1));
let handle_a2 = watch_event_router.subscribe(inc_callback(10));
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
futures::join!(
watch_event_router.broadcast(),
watch_event_router.broadcast()
)
});
watch_event_router.broadcast().wait();
watch_event_router.broadcast().wait();
assert_eq!(22, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait();
assert_eq!(32, counter.load(Ordering::SeqCst));
mem::drop(handle_a2);
block_on(watch_event_router.broadcast());
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait();
watch_event_router.broadcast().wait();
assert_eq!(32, counter.load(Ordering::SeqCst));
}

Expand All @@ -176,15 +173,12 @@ mod tests {
});
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
let future1 = watch_event_router.broadcast();
let future2 = watch_event_router.broadcast();
futures::join!(future1, future2)
});
watch_event_router.broadcast().wait();
watch_event_router.broadcast().wait();
assert_eq!(2, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
let _ = watch_event_router.broadcast();
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait();
assert_eq!(2, counter.load(Ordering::SeqCst));
}
}
3 changes: 1 addition & 2 deletions src/fastfield/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,7 @@ mod tests {
.map(SegmentReader::segment_id)
.collect();
assert_eq!(segment_ids.len(), 2);
let merge_future = index_writer.merge(&segment_ids[..]);
futures::executor::block_on(merge_future)?;
index_writer.merge(&segment_ids[..]).wait().unwrap();
reader.reload()?;
assert_eq!(reader.searcher().segment_readers().len(), 1);
Ok(())
Expand Down
5 changes: 2 additions & 3 deletions src/fastfield/multivalued/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub use self::writer::MultiValuedFastFieldWriter;
mod tests {

use chrono::Duration;
use futures::executor::block_on;
use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest};
use test_log::test;
Expand Down Expand Up @@ -268,7 +267,7 @@ mod tests {
IndexingOp::Merge => {
let segment_ids = index.searchable_segment_ids()?;
if segment_ids.len() >= 2 {
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.segment_updater().wait_merging_thread()?;
}
}
Expand All @@ -283,7 +282,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
if !segment_ids.is_empty() {
block_on(index_writer.merge(&segment_ids)).unwrap();
index_writer.merge(&segment_ids).wait()?;
assert!(index_writer.wait_merging_threads().is_ok());
}
}
Expand Down
22 changes: 9 additions & 13 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::thread::JoinHandle;

use common::BitSet;
use crossbeam::channel;
use futures::executor::block_on;
use futures::future::Future;
use smallvec::smallvec;

use super::operation::{AddOperation, UserOperation};
Expand All @@ -22,7 +20,7 @@ use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping;
use crate::indexer::index_writer_status::IndexWriterStatus;
use crate::indexer::operation::DeleteOperation;
use crate::indexer::stamper::Stamper;
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
use crate::indexer::{MergePolicy, ScheduledResult, SegmentEntry, SegmentWriter};
use crate::schema::{Document, IndexRecordOption, Term};
use crate::Opstamp;

Expand Down Expand Up @@ -214,7 +212,7 @@ fn index_documents(
meta.untrack_temp_docstore();
// update segment_updater inventory to remove tempstore
let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt);
block_on(segment_updater.schedule_add_segment(segment_entry))?;
segment_updater.schedule_add_segment(segment_entry).wait()?;
Ok(())
}

Expand Down Expand Up @@ -368,7 +366,9 @@ impl IndexWriter {
pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
block_on(self.segment_updater.schedule_add_segment(segment_entry))
self.segment_updater
.schedule_add_segment(segment_entry)
.wait()
}

/// Creates a new segment.
Expand Down Expand Up @@ -516,13 +516,10 @@ impl IndexWriter {
/// Merges a given list of segments
///
/// `segment_ids` is required to be non-empty.
pub fn merge(
&mut self,
segment_ids: &[SegmentId],
) -> impl Future<Output = crate::Result<SegmentMeta>> {
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> ScheduledResult<SegmentMeta> {
let merge_operation = self.segment_updater.make_merge_operation(segment_ids);
let segment_updater = self.segment_updater.clone();
async move { segment_updater.start_merge(merge_operation)?.await }
segment_updater.start_merge(merge_operation)
}

/// Closes the current document channel send.
Expand Down Expand Up @@ -781,7 +778,6 @@ impl Drop for IndexWriter {
mod tests {
use std::collections::{HashMap, HashSet};

use futures::executor::block_on;
use proptest::prelude::*;
use proptest::prop_oneof;
use proptest::strategy::Strategy;
Expand Down Expand Up @@ -1456,7 +1452,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
if segment_ids.len() >= 2 {
block_on(index_writer.merge(&segment_ids)).unwrap();
index_writer.merge(&segment_ids).wait().unwrap();
assert!(index_writer.segment_updater().wait_merging_thread().is_ok());
}
}
Expand All @@ -1472,7 +1468,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
if segment_ids.len() >= 2 {
block_on(index_writer.merge(&segment_ids)).unwrap();
index_writer.merge(&segment_ids).wait().unwrap();
assert!(index_writer.wait_merging_threads().is_ok());
}
}
Expand Down
Loading

0 comments on commit 261f6cc

Please sign in to comment.