diff --git a/Cargo.toml b/Cargo.toml index 15bc7831b5..7abffddc35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ keywords = ["search", "information", "retrieval"] edition = "2018" [dependencies] +oneshot = "0.1" base64 = "0.13" byteorder = "1.4.3" crc32fast = "1.2.1" @@ -32,7 +33,6 @@ fs2={ version = "0.4.3", optional = true } levenshtein_automata = "0.2" uuid = { version = "0.8.2", features = ["v4", "serde"] } crossbeam = "0.8.1" -futures = { version = "0.3.15", features = ["thread-pool"] } tantivy-query-grammar = { version="0.15.0", path="./query-grammar" } tantivy-bitpacker = { version="0.1", path="./bitpacker" } common = { version = "0.2", path = "./common/", package = "tantivy-common" } @@ -71,6 +71,7 @@ criterion = "0.3.5" test-log = "0.2.8" env_logger = "0.9.0" pprof = {version= "0.7", features=["flamegraph", "criterion"]} +futures = "0.3.15" [dev-dependencies.fail] version = "0.5" diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index adb59b6567..6ff4ac4d37 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -284,8 +284,6 @@ pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option { #[cfg(test)] mod tests { - - use futures::executor::block_on; use serde_json::Value; use super::agg_req::{Aggregation, Aggregations, BucketAggregation}; @@ -348,7 +346,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } @@ -549,7 +547,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } @@ -984,7 +982,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } diff --git a/src/core/index.rs b/src/core/index.rs index cffa15f08b..bf181713e7 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -781,24 +781,24 @@ mod tests { for i in 0u64..8_000u64 { writer.add_document(doc!(field => i))?; } - let (sender, receiver) = crossbeam::channel::unbounded(); - let _handle = directory.watch(WatchCallback::new(move || { - let _ = sender.send(()); - })); + writer.commit()?; let mem_right_after_commit = directory.total_mem_usage(); - assert!(receiver.recv().is_ok()); + let reader = index .reader_builder() .reload_policy(ReloadPolicy::Manual) .try_into()?; - assert_eq!(reader.searcher().num_docs(), 8_000); + assert_eq!(reader.searcher().segment_readers().len(), 8); + writer.wait_merging_threads()?; + let mem_right_after_merge_finished = directory.total_mem_usage(); reader.reload().unwrap(); let searcher = reader.searcher(); + assert_eq!(searcher.segment_readers().len(), 1); assert_eq!(searcher.num_docs(), 8_000); assert!( mem_right_after_merge_finished < mem_right_after_commit, diff --git a/src/directory/file_watcher.rs b/src/directory/file_watcher.rs index ddf384a0c2..23a3204420 100644 --- a/src/directory/file_watcher.rs +++ b/src/directory/file_watcher.rs @@ -53,7 +53,9 @@ impl FileWatcher { if metafile_has_changed { info!("Meta file {:?} was modified", path); current_checksum_opt = Some(checksum); - futures::executor::block_on(callbacks.broadcast()); + // We actually ignore callbacks failing here. + // We just wait for the end of their execution. + let _ = callbacks.broadcast().wait(); } } diff --git a/src/directory/tests.rs b/src/directory/tests.rs index 45b4631290..5d1d9dd460 100644 --- a/src/directory/tests.rs +++ b/src/directory/tests.rs @@ -6,9 +6,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::Arc; use std::time::Duration; -use futures::channel::oneshot; -use futures::executor::block_on; - use super::*; #[cfg(feature = "mmap")] @@ -249,8 +246,8 @@ fn test_lock_blocking(directory: &dyn Directory) { std::thread::spawn(move || { //< lock_a_res is sent to the thread. in_thread_clone.store(true, SeqCst); - let _just_sync = block_on(receiver); - // explicitely droping lock_a_res. It would have been sufficient to just force it + let _just_sync = receiver.recv(); + // explicitely dropping lock_a_res. It would have been sufficient to just force it // to be part of the move, but the intent seems clearer that way. drop(lock_a_res); }); @@ -273,7 +270,7 @@ fn test_lock_blocking(directory: &dyn Directory) { assert!(in_thread.load(SeqCst)); assert!(lock_a_res.is_ok()); }); - assert!(block_on(receiver2).is_ok()); + assert!(receiver2.recv().is_ok()); assert!(sender.send(()).is_ok()); assert!(join_handle.join().is_ok()); } diff --git a/src/directory/watch_event_router.rs b/src/directory/watch_event_router.rs index 4e828f269b..0c8e604238 100644 --- a/src/directory/watch_event_router.rs +++ b/src/directory/watch_event_router.rs @@ -1,7 +1,6 @@ use std::sync::{Arc, RwLock, Weak}; -use futures::channel::oneshot; -use futures::{Future, TryFutureExt}; +use crate::FutureResult; /// Cloneable wrapper for callbacks registered when watching files of a `Directory`. #[derive(Clone)] @@ -74,12 +73,11 @@ impl WatchCallbackList { } /// Triggers all callbacks - pub fn broadcast(&self) -> impl Future { + pub fn broadcast(&self) -> FutureResult<()> { let callbacks = self.list_callback(); - let (sender, receiver) = oneshot::channel(); - let result = receiver.unwrap_or_else(|_| ()); + let (result, sender) = FutureResult::create("One of the callback panicked."); if callbacks.is_empty() { - let _ = sender.send(()); + let _ = sender.send(Ok(())); return result; } let spawn_res = std::thread::Builder::new() @@ -88,7 +86,7 @@ impl WatchCallbackList { for callback in callbacks { callback.call(); } - let _ = sender.send(()); + let _ = sender.send(Ok(())); }); if let Err(err) = spawn_res { error!( @@ -106,8 +104,6 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - use futures::executor::block_on; - use crate::directory::{WatchCallback, WatchCallbackList}; #[test] @@ -118,22 +114,18 @@ mod tests { let inc_callback = WatchCallback::new(move || { counter_clone.fetch_add(1, Ordering::SeqCst); }); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait().unwrap(); assert_eq!(0, counter.load(Ordering::SeqCst)); let handle_a = watch_event_router.subscribe(inc_callback); assert_eq!(0, counter.load(Ordering::SeqCst)); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait().unwrap(); assert_eq!(1, counter.load(Ordering::SeqCst)); - block_on(async { - ( - watch_event_router.broadcast().await, - watch_event_router.broadcast().await, - watch_event_router.broadcast().await, - ) - }); + watch_event_router.broadcast().wait().unwrap(); + watch_event_router.broadcast().wait().unwrap(); + watch_event_router.broadcast().wait().unwrap(); assert_eq!(4, counter.load(Ordering::SeqCst)); mem::drop(handle_a); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait().unwrap(); assert_eq!(4, counter.load(Ordering::SeqCst)); } @@ -150,19 +142,15 @@ mod tests { let handle_a = watch_event_router.subscribe(inc_callback(1)); let handle_a2 = watch_event_router.subscribe(inc_callback(10)); assert_eq!(0, counter.load(Ordering::SeqCst)); - block_on(async { - futures::join!( - watch_event_router.broadcast(), - watch_event_router.broadcast() - ) - }); + watch_event_router.broadcast().wait().unwrap(); + watch_event_router.broadcast().wait().unwrap(); assert_eq!(22, counter.load(Ordering::SeqCst)); mem::drop(handle_a); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait().unwrap(); assert_eq!(32, counter.load(Ordering::SeqCst)); mem::drop(handle_a2); - block_on(watch_event_router.broadcast()); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait().unwrap(); + watch_event_router.broadcast().wait().unwrap(); assert_eq!(32, counter.load(Ordering::SeqCst)); } @@ -176,15 +164,12 @@ mod tests { }); let handle_a = watch_event_router.subscribe(inc_callback); assert_eq!(0, counter.load(Ordering::SeqCst)); - block_on(async { - let future1 = watch_event_router.broadcast(); - let future2 = watch_event_router.broadcast(); - futures::join!(future1, future2) - }); + watch_event_router.broadcast().wait().unwrap(); + watch_event_router.broadcast().wait().unwrap(); assert_eq!(2, counter.load(Ordering::SeqCst)); mem::drop(handle_a); let _ = watch_event_router.broadcast(); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait().unwrap(); assert_eq!(2, counter.load(Ordering::SeqCst)); } } diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index be5285d39e..95080f44ca 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -501,8 +501,7 @@ mod tests { .map(SegmentReader::segment_id) .collect(); assert_eq!(segment_ids.len(), 2); - let merge_future = index_writer.merge(&segment_ids[..]); - futures::executor::block_on(merge_future)?; + index_writer.merge(&segment_ids[..]).wait().unwrap(); reader.reload()?; assert_eq!(reader.searcher().segment_readers().len(), 1); Ok(()) diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index 9d28beb5db..6d17276375 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -8,7 +8,6 @@ pub use self::writer::MultiValuedFastFieldWriter; mod tests { use chrono::Duration; - use futures::executor::block_on; use proptest::strategy::Strategy; use proptest::{prop_oneof, proptest}; use test_log::test; @@ -268,7 +267,7 @@ mod tests { IndexingOp::Merge => { let segment_ids = index.searchable_segment_ids()?; if segment_ids.len() >= 2 { - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.segment_updater().wait_merging_thread()?; } } @@ -283,7 +282,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); if !segment_ids.is_empty() { - block_on(index_writer.merge(&segment_ids)).unwrap(); + index_writer.merge(&segment_ids).wait()?; assert!(index_writer.wait_merging_threads().is_ok()); } } diff --git a/src/future_result.rs b/src/future_result.rs new file mode 100644 index 0000000000..d1d43f7e28 --- /dev/null +++ b/src/future_result.rs @@ -0,0 +1,130 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::Poll; + +use crate::TantivyError; + +/// `FutureResult` is a handle that makes it possible to wait for the completion +/// of an ongoing task. +/// +/// Contrary to some `Future`, it does not need to be polled for the task to +/// progress. Dropping the `FutureResult` does not cancel the task being executed +/// either. +/// +/// - In a sync context, you can call `FutureResult::wait()`. The function +/// does not rely on `block_on`. +/// - In an async context, you can call simply use `FutureResult` as a future. +pub struct FutureResult { + inner: Inner, +} + +enum Inner { + FailedBeforeStart(Option), + InProgress { + receiver: oneshot::Receiver>, + error_msg_if_failure: &'static str, + }, +} + +impl From for FutureResult { + fn from(err: TantivyError) -> Self { + FutureResult { + inner: Inner::FailedBeforeStart(Some(err)), + } + } +} + +impl FutureResult { + pub(crate) fn create( + error_msg_if_failure: &'static str, + ) -> (Self, oneshot::Sender>) { + let (sender, receiver) = oneshot::channel(); + let inner: Inner = Inner::InProgress { + receiver, + error_msg_if_failure, + }; + (FutureResult { inner }, sender) + } + + /// Blocks until the scheduled result is available. + /// + /// In an async context, you should simply use `ScheduledResult` as a future. + pub fn wait(self) -> crate::Result { + match self.inner { + Inner::FailedBeforeStart(err) => Err(err.unwrap()), + Inner::InProgress { + receiver, + error_msg_if_failure, + } => receiver.recv().unwrap_or_else(|_| { + Err(crate::TantivyError::SystemError( + error_msg_if_failure.to_string(), + )) + }), + } + } +} + +impl Future for FutureResult { + type Output = crate::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + unsafe { + match &mut Pin::get_unchecked_mut(self).inner { + Inner::FailedBeforeStart(err) => Poll::Ready(Err(err.take().unwrap())), + Inner::InProgress { + receiver, + error_msg_if_failure, + } => match Future::poll(Pin::new_unchecked(receiver), cx) { + Poll::Ready(oneshot_res) => { + let res = oneshot_res.unwrap_or_else(|_| { + Err(crate::TantivyError::SystemError( + error_msg_if_failure.to_string(), + )) + }); + Poll::Ready(res) + } + Poll::Pending => Poll::Pending, + }, + } + } + } +} + +#[cfg(test)] +mod tests { + use futures::executor::block_on; + + use super::FutureResult; + use crate::TantivyError; + + #[test] + fn test_scheduled_result_failed_to_schedule() { + let scheduled_result: FutureResult<()> = FutureResult::from(TantivyError::Poisoned); + let res = block_on(scheduled_result); + assert!(matches!(res, Err(TantivyError::Poisoned))); + } + + #[test] + + fn test_scheduled_result_error() { + let (scheduled_result, tx): (FutureResult<()>, _) = FutureResult::create("failed"); + drop(tx); + let res = block_on(scheduled_result); + assert!(matches!(res, Err(TantivyError::SystemError(_)))); + } + + #[test] + fn test_scheduled_result_sent_success() { + let (scheduled_result, tx): (FutureResult, _) = FutureResult::create("failed"); + tx.send(Ok(2u64)).unwrap(); + assert_eq!(block_on(scheduled_result).unwrap(), 2u64); + } + + #[test] + fn test_scheduled_result_sent_error() { + let (scheduled_result, tx): (FutureResult, _) = FutureResult::create("failed"); + tx.send(Err(TantivyError::Poisoned)).unwrap(); + let res = block_on(scheduled_result); + assert!(matches!(res, Err(TantivyError::Poisoned))); + } +} diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 646ea60e06..ff71742d4c 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -5,8 +5,6 @@ use std::thread::JoinHandle; use common::BitSet; use crossbeam::channel; -use futures::executor::block_on; -use futures::future::Future; use smallvec::smallvec; use super::operation::{AddOperation, UserOperation}; @@ -24,7 +22,7 @@ use crate::indexer::operation::DeleteOperation; use crate::indexer::stamper::Stamper; use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter}; use crate::schema::{Document, IndexRecordOption, Term}; -use crate::Opstamp; +use crate::{FutureResult, Opstamp}; // Size of the margin for the `memory_arena`. A segment is closed when the remaining memory // in the `memory_arena` goes below MARGIN_IN_BYTES. @@ -214,7 +212,7 @@ fn index_documents( meta.untrack_temp_docstore(); // update segment_updater inventory to remove tempstore let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt); - block_on(segment_updater.schedule_add_segment(segment_entry))?; + segment_updater.schedule_add_segment(segment_entry).wait()?; Ok(()) } @@ -368,7 +366,9 @@ impl IndexWriter { pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> { let delete_cursor = self.delete_queue.cursor(); let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None); - block_on(self.segment_updater.schedule_add_segment(segment_entry)) + self.segment_updater + .schedule_add_segment(segment_entry) + .wait() } /// Creates a new segment. @@ -465,8 +465,8 @@ impl IndexWriter { } /// Detects and removes the files that are not used by the index anymore. - pub async fn garbage_collect_files(&self) -> crate::Result { - self.segment_updater.schedule_garbage_collect().await + pub fn garbage_collect_files(&self) -> FutureResult { + self.segment_updater.schedule_garbage_collect() } /// Deletes all documents from the index @@ -516,13 +516,10 @@ impl IndexWriter { /// Merges a given list of segments /// /// `segment_ids` is required to be non-empty. - pub fn merge( - &mut self, - segment_ids: &[SegmentId], - ) -> impl Future> { + pub fn merge(&mut self, segment_ids: &[SegmentId]) -> FutureResult { let merge_operation = self.segment_updater.make_merge_operation(segment_ids); let segment_updater = self.segment_updater.clone(); - async move { segment_updater.start_merge(merge_operation)?.await } + segment_updater.start_merge(merge_operation) } /// Closes the current document channel send. @@ -781,7 +778,6 @@ impl Drop for IndexWriter { mod tests { use std::collections::{HashMap, HashSet}; - use futures::executor::block_on; use proptest::prelude::*; use proptest::prop_oneof; use proptest::strategy::Strategy; @@ -1456,7 +1452,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); if segment_ids.len() >= 2 { - block_on(index_writer.merge(&segment_ids)).unwrap(); + index_writer.merge(&segment_ids).wait().unwrap(); assert!(index_writer.segment_updater().wait_merging_thread().is_ok()); } } @@ -1472,7 +1468,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); if segment_ids.len() >= 2 { - block_on(index_writer.merge(&segment_ids)).unwrap(); + index_writer.merge(&segment_ids).wait().unwrap(); assert!(index_writer.wait_merging_threads().is_ok()); } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 7327a70f4c..3006ece678 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1133,7 +1133,6 @@ impl IndexMerger { #[cfg(test)] mod tests { use byteorder::{BigEndian, ReadBytesExt}; - use futures::executor::block_on; use schema::FAST; use crate::collector::tests::{ @@ -1207,7 +1206,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } { @@ -1456,7 +1455,7 @@ mod tests { { // merging the segments let segment_ids = index.searchable_segment_ids()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; reader.reload()?; let searcher = reader.searcher(); assert_eq!(searcher.segment_readers().len(), 1); @@ -1549,7 +1548,7 @@ mod tests { { // Test merging a single segment in order to remove deletes. let segment_ids = index.searchable_segment_ids()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; reader.reload()?; let searcher = reader.searcher(); @@ -1769,7 +1768,10 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests().unwrap(); - block_on(index_writer.merge(&segment_ids)).expect("Merging failed"); + index_writer + .merge(&segment_ids) + .wait() + .expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); reader.reload().unwrap(); test_searcher( @@ -1824,7 +1826,7 @@ mod tests { let segment_ids = index .searchable_segment_ids() .expect("Searchable segments failed."); - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; reader.reload()?; // commit has not been called yet. The document should still be // there. @@ -1851,7 +1853,7 @@ mod tests { index_writer.commit()?; index_writer.delete_term(Term::from_field_u64(int_field, 1)); let segment_ids = index.searchable_segment_ids()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; // assert delete has not been committed reader.reload()?; @@ -1952,7 +1954,7 @@ mod tests { { let segment_ids = index.searchable_segment_ids()?; let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } reader.reload()?; @@ -2080,7 +2082,7 @@ mod tests { .iter() .map(|reader| reader.segment_id()) .collect(); - block_on(writer.merge(&segment_ids[..]))?; + writer.merge(&segment_ids[..]).wait()?; reader.reload()?; let searcher = reader.searcher(); diff --git a/src/indexer/merger_sorted_index_test.rs b/src/indexer/merger_sorted_index_test.rs index b07f5bf61a..cd3ee2f038 100644 --- a/src/indexer/merger_sorted_index_test.rs +++ b/src/indexer/merger_sorted_index_test.rs @@ -1,7 +1,5 @@ #[cfg(test)] mod tests { - use futures::executor::block_on; - use crate::collector::TopDocs; use crate::core::Index; use crate::fastfield::{AliveBitSet, FastFieldReader, MultiValuedFastFieldReader}; @@ -50,7 +48,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests().unwrap(); - assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); + assert!(index_writer.merge(&segment_ids).wait().is_ok()); assert!(index_writer.wait_merging_threads().is_ok()); } index @@ -140,7 +138,7 @@ mod tests { { let segment_ids = index.searchable_segment_ids()?; let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } Ok(index) diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index 13d2cfa06a..96b1baef0a 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -1,7 +1,5 @@ -use futures::executor::block_on; - use super::IndexWriter; -use crate::Opstamp; +use crate::{FutureResult, Opstamp}; /// A prepared commit pub struct PreparedCommit<'a> { @@ -35,9 +33,9 @@ impl<'a> PreparedCommit<'a> { } /// Proceeds to commit. - /// See `.commit_async()`. + /// See `.commit_future()`. pub fn commit(self) -> crate::Result { - block_on(self.commit_async()) + self.commit_future().wait() } /// Proceeds to commit. @@ -45,12 +43,10 @@ impl<'a> PreparedCommit<'a> { /// Unfortunately, contrary to what `PrepareCommit` may suggests, /// this operation is not at all really light. /// At this point deletes have not been flushed yet. - pub async fn commit_async(self) -> crate::Result { + pub fn commit_future(self) -> FutureResult { info!("committing {}", self.opstamp); self.index_writer .segment_updater() .schedule_commit(self.opstamp, self.payload) - .await?; - Ok(self.opstamp) } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index c1255eec63..4ca751d8db 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -8,9 +8,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use fail::fail_point; -use futures::channel::oneshot; -use futures::executor::{ThreadPool, ThreadPoolBuilder}; -use futures::future::{Future, TryFutureExt}; +use rayon::{ThreadPool, ThreadPoolBuilder}; use super::segment_manager::SegmentManager; use crate::core::{ @@ -29,7 +27,7 @@ use crate::indexer::{ SegmentSerializer, }; use crate::schema::Schema; -use crate::{Opstamp, TantivyError}; +use crate::{FutureResult, Opstamp, TantivyError}; const NUM_MERGE_THREADS: usize = 4; @@ -105,7 +103,7 @@ impl Deref for SegmentUpdater { } } -async fn garbage_collect_files( +fn garbage_collect_files( segment_updater: SegmentUpdater, ) -> crate::Result { info!("Running garbage collection"); @@ -309,18 +307,18 @@ impl SegmentUpdater { let segments = index.searchable_segment_metas()?; let segment_manager = SegmentManager::from_segments(segments, delete_cursor); let pool = ThreadPoolBuilder::new() - .name_prefix("segment_updater") - .pool_size(1) - .create() + .thread_name(|_| "segment_updater".to_string()) + .num_threads(1) + .build() .map_err(|_| { crate::TantivyError::SystemError( "Failed to spawn segment updater thread".to_string(), ) })?; let merge_thread_pool = ThreadPoolBuilder::new() - .name_prefix("merge_thread") - .pool_size(NUM_MERGE_THREADS) - .create() + .thread_name(|i| format!("merge_thread_{i}")) + .num_threads(NUM_MERGE_THREADS) + .build() .map_err(|_| { crate::TantivyError::SystemError( "Failed to spawn segment merging thread".to_string(), @@ -349,39 +347,30 @@ impl SegmentUpdater { *self.merge_policy.write().unwrap() = arc_merge_policy; } - async fn schedule_task< - T: 'static + Send, - F: Future> + 'static + Send, - >( + fn schedule_task crate::Result + 'static + Send>( &self, task: F, - ) -> crate::Result { + ) -> FutureResult { if !self.is_alive() { - return Err(crate::TantivyError::SystemError( - "Segment updater killed".to_string(), - )); + return crate::TantivyError::SystemError("Segment updater killed".to_string()).into(); } - let (sender, receiver) = oneshot::channel(); - self.pool.spawn_ok(async move { - let task_result = task.await; + let (scheduled_result, sender) = FutureResult::create( + "A segment_updater future did not succeed. This should never happen.", + ); + self.pool.spawn(|| { + let task_result = task(); let _ = sender.send(task_result); }); - let task_result = receiver.await; - task_result.unwrap_or_else(|_| { - let err_msg = - "A segment_updater future did not success. This should never happen.".to_string(); - Err(crate::TantivyError::SystemError(err_msg)) - }) + scheduled_result } - pub async fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> crate::Result<()> { + pub fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> FutureResult<()> { let segment_updater = self.clone(); - self.schedule_task(async move { + self.schedule_task(move || { segment_updater.segment_manager.add_segment(segment_entry); - segment_updater.consider_merge_options().await; + segment_updater.consider_merge_options(); Ok(()) }) - .await } /// Orders `SegmentManager` to remove all segments @@ -448,9 +437,9 @@ impl SegmentUpdater { Ok(()) } - pub async fn schedule_garbage_collect(&self) -> crate::Result { - let garbage_collect_future = garbage_collect_files(self.clone()); - self.schedule_task(garbage_collect_future).await + pub fn schedule_garbage_collect(&self) -> FutureResult { + let self_clone = self.clone(); + self.schedule_task(move || garbage_collect_files(self_clone)) } /// List the files that are useful to the index. @@ -468,21 +457,20 @@ impl SegmentUpdater { files } - pub(crate) async fn schedule_commit( + pub(crate) fn schedule_commit( &self, opstamp: Opstamp, payload: Option, - ) -> crate::Result<()> { + ) -> FutureResult { let segment_updater: SegmentUpdater = self.clone(); - self.schedule_task(async move { + self.schedule_task(move || { let segment_entries = segment_updater.purge_deletes(opstamp)?; segment_updater.segment_manager.commit(segment_entries); segment_updater.save_metas(opstamp, payload)?; - let _ = garbage_collect_files(segment_updater.clone()).await; - segment_updater.consider_merge_options().await; - Ok(()) + let _ = garbage_collect_files(segment_updater.clone()); + segment_updater.consider_merge_options(); + Ok(opstamp) }) - .await } fn store_meta(&self, index_meta: &IndexMeta) { @@ -515,26 +503,33 @@ impl SegmentUpdater { // suggested and the moment when it ended up being executed.) // // `segment_ids` is required to be non-empty. - pub fn start_merge( - &self, - merge_operation: MergeOperation, - ) -> crate::Result>> { + pub fn start_merge(&self, merge_operation: MergeOperation) -> FutureResult { assert!( !merge_operation.segment_ids().is_empty(), "Segment_ids cannot be empty." ); let segment_updater = self.clone(); - let segment_entries: Vec = self + let segment_entries: Vec = match self .segment_manager - .start_merge(merge_operation.segment_ids())?; + .start_merge(merge_operation.segment_ids()) + { + Ok(segment_entries) => segment_entries, + Err(err) => { + warn!( + "Starting the merge failed for the following reason. This is not fatal. {}", + err + ); + return err.into(); + } + }; info!("Starting merge - {:?}", merge_operation.segment_ids()); - let (merging_future_send, merging_future_recv) = - oneshot::channel::>(); + let (scheduled_result, merging_future_send) = + FutureResult::create("Merge operation failed."); - self.merge_thread_pool.spawn_ok(async move { + self.merge_thread_pool.spawn(move || { // The fact that `merge_operation` is moved here is important. // Its lifetime is used to track how many merging thread are currently running, // as well as which segment is currently in merge and therefore should not be @@ -545,28 +540,23 @@ impl SegmentUpdater { merge_operation.target_opstamp(), ) { Ok(after_merge_segment_entry) => { - let segment_meta = segment_updater - .end_merge(merge_operation, after_merge_segment_entry) - .await; - let _send_result = merging_future_send.send(segment_meta); + let segment_meta_res = + segment_updater.end_merge(merge_operation, after_merge_segment_entry); + let _send_result = merging_future_send.send(segment_meta_res); } - Err(e) => { + Err(merge_error) => { warn!( "Merge of {:?} was cancelled: {:?}", merge_operation.segment_ids().to_vec(), - e + merge_error ); - // ... cancel merge + let _send_result = merging_future_send.send(Err(merge_error)); assert!(!cfg!(test), "Merge failed."); } } }); - Ok(merging_future_recv.unwrap_or_else(|e| { - Err(crate::TantivyError::SystemError( - "Merge failed:".to_string() + &e.to_string(), - )) - })) + scheduled_result } pub(crate) fn get_mergeable_segments(&self) -> (Vec, Vec) { @@ -575,7 +565,7 @@ impl SegmentUpdater { .get_mergeable_segments(&merge_segment_ids) } - async fn consider_merge_options(&self) { + fn consider_merge_options(&self) { let (committed_segments, uncommitted_segments) = self.get_mergeable_segments(); // Committed segments cannot be merged with uncommitted_segments. @@ -601,23 +591,21 @@ impl SegmentUpdater { merge_candidates.extend(committed_merge_candidates); for merge_operation in merge_candidates { - if let Err(err) = self.start_merge(merge_operation) { - warn!( - "Starting the merge failed for the following reason. This is not fatal. {}", - err - ); - } + // If a merge cannot be started this is not a fatal error. + // We do log a warning in `start_merge`. + let _ = self.start_merge(merge_operation); } } - async fn end_merge( + /// Queues a `end_merge` in the segment updater and blocks until it is successfully processed. + fn end_merge( &self, merge_operation: MergeOperation, mut after_merge_segment_entry: SegmentEntry, ) -> crate::Result { let segment_updater = self.clone(); let after_merge_segment_meta = after_merge_segment_entry.meta().clone(); - self.schedule_task(async move { + self.schedule_task(move || { info!("End merge {:?}", after_merge_segment_entry.meta()); { let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); @@ -655,13 +643,13 @@ impl SegmentUpdater { .save_metas(previous_metas.opstamp, previous_metas.payload.clone())?; } - segment_updater.consider_merge_options().await; + segment_updater.consider_merge_options(); } // we drop all possible handle to a now useless `SegmentMeta`. - let _ = garbage_collect_files(segment_updater).await; + let _ = garbage_collect_files(segment_updater); Ok(()) }) - .await?; + .wait()?; Ok(after_merge_segment_meta) } diff --git a/src/lib.rs b/src/lib.rs index aa70cbdc03..47f2ff8d19 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,10 +123,12 @@ mod functional_test; #[macro_use] mod macros; +mod future_result; pub use chrono; pub use crate::error::TantivyError; +pub use crate::future_result::FutureResult; /// Tantivy result. /// @@ -308,6 +310,7 @@ pub mod tests { use crate::core::SegmentReader; use crate::docset::{DocSet, TERMINATED}; use crate::fastfield::FastFieldReader; + use crate::merge_policy::NoMergePolicy; use crate::query::BooleanQuery; use crate::schema::*; use crate::{DocAddress, Index, Postings, ReloadPolicy}; @@ -935,8 +938,6 @@ pub mod tests { // motivated by #729 #[test] fn test_update_via_delete_insert() -> crate::Result<()> { - use futures::executor::block_on; - use crate::collector::Count; use crate::indexer::NoMergePolicy; use crate::query::AllQuery; @@ -990,8 +991,7 @@ pub mod tests { .iter() .map(|reader| reader.segment_id()) .collect(); - block_on(index_writer.merge(&segment_ids)).unwrap(); - + index_writer.merge(&segment_ids).wait()?; index_reader.reload()?; let searcher = index_reader.searcher(); assert_eq!(searcher.search(&AllQuery, &Count)?, DOC_COUNT as usize); @@ -1006,6 +1006,7 @@ pub mod tests { let schema = builder.build(); let index = Index::create_in_dir(&index_path, schema)?; let mut writer = index.writer(50_000_000)?; + writer.set_merge_policy(Box::new(NoMergePolicy)); for _ in 0..5000 { writer.add_document(doc!(body => "foo"))?; writer.add_document(doc!(body => "boo"))?; @@ -1017,8 +1018,7 @@ pub mod tests { writer.delete_term(Term::from_field_text(body, "foo")); writer.commit()?; let segment_ids = index.searchable_segment_ids()?; - let _ = futures::executor::block_on(writer.merge(&segment_ids)); - + writer.merge(&segment_ids).wait()?; assert!(index.validate_checksum()?.is_empty()); Ok(()) } diff --git a/src/query/term_query/term_scorer.rs b/src/query/term_query/term_scorer.rs index 5badb55cf1..bb952e222b 100644 --- a/src/query/term_query/term_scorer.rs +++ b/src/query/term_query/term_scorer.rs @@ -125,7 +125,6 @@ impl Scorer for TermScorer { #[cfg(test)] mod tests { - use futures::executor::block_on; use proptest::prelude::*; use crate::merge_policy::NoMergePolicy; @@ -321,9 +320,7 @@ mod tests { .collect(); test_block_wand_aux(&term_query, &searcher)?; } - { - let _ = block_on(writer.merge(&segment_ids[..])); - } + writer.merge(&segment_ids[..]).wait().unwrap(); { reader.reload()?; let searcher = reader.searcher(); diff --git a/src/schema/text_options.rs b/src/schema/text_options.rs index 8b7161e035..5a380138c9 100644 --- a/src/schema/text_options.rs +++ b/src/schema/text_options.rs @@ -106,7 +106,7 @@ impl TextFieldIndexing { /// Returns the tokenizer that will be used for this field. pub fn tokenizer(&self) -> &str { - &self.tokenizer.name() + self.tokenizer.name() } /// Sets fieldnorms diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index 7d8b8606a9..2401e23d6d 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -41,7 +41,6 @@ mod tests { use std::io; - use futures::executor::block_on; use proptest::strategy::{BoxedStrategy, Strategy}; use super::{SkipIndex, SkipIndexBuilder}; @@ -145,7 +144,7 @@ mod tests { index_writer.delete_term(Term::from_field_text(text, "testb")); index_writer.commit()?; let segment_ids = index.searchable_segment_ids()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait().unwrap(); let reader = index.reader()?; let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 30); diff --git a/src/store/mod.rs b/src/store/mod.rs index ed142ab3c5..b378d1db5d 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -55,8 +55,6 @@ pub mod tests { use std::path::Path; - use futures::executor::block_on; - use super::*; use crate::directory::{Directory, RamDirectory, WritePtr}; use crate::fastfield::AliveBitSet; @@ -269,7 +267,7 @@ pub mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests().unwrap(); - assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); + assert!(index_writer.merge(&segment_ids).wait().is_ok()); assert!(index_writer.wait_merging_threads().is_ok()); } @@ -316,7 +314,7 @@ pub mod tests { { let segment_ids = index.searchable_segment_ids()?; let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } diff --git a/src/tokenizer/ascii_folding_filter.rs b/src/tokenizer/ascii_folding_filter.rs index 56b4dfac77..21103e7776 100644 --- a/src/tokenizer/ascii_folding_filter.rs +++ b/src/tokenizer/ascii_folding_filter.rs @@ -1625,9 +1625,9 @@ mod tests { #[test] fn test_to_ascii() { - let mut input = "Rámon".to_string(); + let input = "Rámon".to_string(); let mut buffer = String::new(); - to_ascii(&mut input, &mut buffer); + to_ascii(&input, &mut buffer); assert_eq!("Ramon", buffer); }