From 261f6cc88091f35b01baba8840d24a75916682ca Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 17 Mar 2022 18:52:11 +0900 Subject: [PATCH] Removes all usage of block_on, and use a oneshot channel instead. Calling `block_on` panics in certain context. For instance, it panics when it is called in a the context of another call to block. Using it in tantivy is unnecessary. We replace it by a thin wrapper around a oneshot channel that supports both async/sync. --- Cargo.toml | 1 + src/aggregation/mod.rs | 8 +- src/directory/file_watcher.rs | 2 +- src/directory/tests.rs | 9 +- src/directory/watch_event_router.rs | 62 ++++---- src/fastfield/mod.rs | 3 +- src/fastfield/multivalued/mod.rs | 5 +- src/indexer/index_writer.rs | 22 ++- src/indexer/merger.rs | 20 +-- src/indexer/merger_sorted_index_test.rs | 6 +- src/indexer/mod.rs | 2 +- src/indexer/prepared_commit.rs | 9 +- src/indexer/segment_updater.rs | 192 ++++++++++++++++++------ src/lib.rs | 11 +- src/query/term_query/term_scorer.rs | 5 +- src/store/index/mod.rs | 3 +- src/store/mod.rs | 6 +- 17 files changed, 221 insertions(+), 145 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 66cb907bd6..cd3bf8fc08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ keywords = ["search", "information", "retrieval"] edition = "2018" [dependencies] +oneshot = "0.1" base64 = "0.13" byteorder = "1.4.3" crc32fast = "1.2.1" diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index d4f77fa946..c776b07549 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -284,8 +284,6 @@ pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option { #[cfg(test)] mod tests { - - use futures::executor::block_on; use serde_json::Value; use super::agg_req::{Aggregation, Aggregations, BucketAggregation}; @@ -350,7 +348,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } @@ -553,7 +551,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } @@ -990,7 +988,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } diff --git a/src/directory/file_watcher.rs b/src/directory/file_watcher.rs index ddf384a0c2..4483e001fa 100644 --- a/src/directory/file_watcher.rs +++ b/src/directory/file_watcher.rs @@ -53,7 +53,7 @@ impl FileWatcher { if metafile_has_changed { info!("Meta file {:?} was modified", path); current_checksum_opt = Some(checksum); - futures::executor::block_on(callbacks.broadcast()); + callbacks.broadcast().wait(); } } diff --git a/src/directory/tests.rs b/src/directory/tests.rs index 45b4631290..5d1d9dd460 100644 --- a/src/directory/tests.rs +++ b/src/directory/tests.rs @@ -6,9 +6,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::Arc; use std::time::Duration; -use futures::channel::oneshot; -use futures::executor::block_on; - use super::*; #[cfg(feature = "mmap")] @@ -249,8 +246,8 @@ fn test_lock_blocking(directory: &dyn Directory) { std::thread::spawn(move || { //< lock_a_res is sent to the thread. in_thread_clone.store(true, SeqCst); - let _just_sync = block_on(receiver); - // explicitely droping lock_a_res. It would have been sufficient to just force it + let _just_sync = receiver.recv(); + // explicitely dropping lock_a_res. It would have been sufficient to just force it // to be part of the move, but the intent seems clearer that way. drop(lock_a_res); }); @@ -273,7 +270,7 @@ fn test_lock_blocking(directory: &dyn Directory) { assert!(in_thread.load(SeqCst)); assert!(lock_a_res.is_ok()); }); - assert!(block_on(receiver2).is_ok()); + assert!(receiver2.recv().is_ok()); assert!(sender.send(()).is_ok()); assert!(join_handle.join().is_ok()); } diff --git a/src/directory/watch_event_router.rs b/src/directory/watch_event_router.rs index 4e828f269b..c66b1efcc0 100644 --- a/src/directory/watch_event_router.rs +++ b/src/directory/watch_event_router.rs @@ -1,8 +1,5 @@ use std::sync::{Arc, RwLock, Weak}; -use futures::channel::oneshot; -use futures::{Future, TryFutureExt}; - /// Cloneable wrapper for callbacks registered when watching files of a `Directory`. #[derive(Clone)] pub struct WatchCallback(Arc); @@ -74,13 +71,13 @@ impl WatchCallbackList { } /// Triggers all callbacks - pub fn broadcast(&self) -> impl Future { + pub fn broadcast(&self) -> BroadcastCallback { let callbacks = self.list_callback(); let (sender, receiver) = oneshot::channel(); - let result = receiver.unwrap_or_else(|_| ()); + let broadcast_cb = BroadcastCallback { receiver }; if callbacks.is_empty() { let _ = sender.send(()); - return result; + return broadcast_cb; } let spawn_res = std::thread::Builder::new() .name("watch-callbacks".to_string()) @@ -96,7 +93,17 @@ impl WatchCallbackList { err ); } - result + broadcast_cb + } +} + +pub struct BroadcastCallback { + receiver: oneshot::Receiver<()>, +} + +impl BroadcastCallback { + pub fn wait(self) { + let _ = self.receiver.recv(); } } @@ -106,8 +113,6 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - use futures::executor::block_on; - use crate::directory::{WatchCallback, WatchCallbackList}; #[test] @@ -118,22 +123,18 @@ mod tests { let inc_callback = WatchCallback::new(move || { counter_clone.fetch_add(1, Ordering::SeqCst); }); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait(); assert_eq!(0, counter.load(Ordering::SeqCst)); let handle_a = watch_event_router.subscribe(inc_callback); assert_eq!(0, counter.load(Ordering::SeqCst)); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait(); assert_eq!(1, counter.load(Ordering::SeqCst)); - block_on(async { - ( - watch_event_router.broadcast().await, - watch_event_router.broadcast().await, - watch_event_router.broadcast().await, - ) - }); + watch_event_router.broadcast().wait(); + watch_event_router.broadcast().wait(); + watch_event_router.broadcast().wait(); assert_eq!(4, counter.load(Ordering::SeqCst)); mem::drop(handle_a); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait(); assert_eq!(4, counter.load(Ordering::SeqCst)); } @@ -150,19 +151,15 @@ mod tests { let handle_a = watch_event_router.subscribe(inc_callback(1)); let handle_a2 = watch_event_router.subscribe(inc_callback(10)); assert_eq!(0, counter.load(Ordering::SeqCst)); - block_on(async { - futures::join!( - watch_event_router.broadcast(), - watch_event_router.broadcast() - ) - }); + watch_event_router.broadcast().wait(); + watch_event_router.broadcast().wait(); assert_eq!(22, counter.load(Ordering::SeqCst)); mem::drop(handle_a); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait(); assert_eq!(32, counter.load(Ordering::SeqCst)); mem::drop(handle_a2); - block_on(watch_event_router.broadcast()); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait(); + watch_event_router.broadcast().wait(); assert_eq!(32, counter.load(Ordering::SeqCst)); } @@ -176,15 +173,12 @@ mod tests { }); let handle_a = watch_event_router.subscribe(inc_callback); assert_eq!(0, counter.load(Ordering::SeqCst)); - block_on(async { - let future1 = watch_event_router.broadcast(); - let future2 = watch_event_router.broadcast(); - futures::join!(future1, future2) - }); + watch_event_router.broadcast().wait(); + watch_event_router.broadcast().wait(); assert_eq!(2, counter.load(Ordering::SeqCst)); mem::drop(handle_a); let _ = watch_event_router.broadcast(); - block_on(watch_event_router.broadcast()); + watch_event_router.broadcast().wait(); assert_eq!(2, counter.load(Ordering::SeqCst)); } } diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index be5285d39e..95080f44ca 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -501,8 +501,7 @@ mod tests { .map(SegmentReader::segment_id) .collect(); assert_eq!(segment_ids.len(), 2); - let merge_future = index_writer.merge(&segment_ids[..]); - futures::executor::block_on(merge_future)?; + index_writer.merge(&segment_ids[..]).wait().unwrap(); reader.reload()?; assert_eq!(reader.searcher().segment_readers().len(), 1); Ok(()) diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index 9d28beb5db..6d17276375 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -8,7 +8,6 @@ pub use self::writer::MultiValuedFastFieldWriter; mod tests { use chrono::Duration; - use futures::executor::block_on; use proptest::strategy::Strategy; use proptest::{prop_oneof, proptest}; use test_log::test; @@ -268,7 +267,7 @@ mod tests { IndexingOp::Merge => { let segment_ids = index.searchable_segment_ids()?; if segment_ids.len() >= 2 { - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.segment_updater().wait_merging_thread()?; } } @@ -283,7 +282,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); if !segment_ids.is_empty() { - block_on(index_writer.merge(&segment_ids)).unwrap(); + index_writer.merge(&segment_ids).wait()?; assert!(index_writer.wait_merging_threads().is_ok()); } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 646ea60e06..c4df14bf45 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -5,8 +5,6 @@ use std::thread::JoinHandle; use common::BitSet; use crossbeam::channel; -use futures::executor::block_on; -use futures::future::Future; use smallvec::smallvec; use super::operation::{AddOperation, UserOperation}; @@ -22,7 +20,7 @@ use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping; use crate::indexer::index_writer_status::IndexWriterStatus; use crate::indexer::operation::DeleteOperation; use crate::indexer::stamper::Stamper; -use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter}; +use crate::indexer::{MergePolicy, ScheduledResult, SegmentEntry, SegmentWriter}; use crate::schema::{Document, IndexRecordOption, Term}; use crate::Opstamp; @@ -214,7 +212,7 @@ fn index_documents( meta.untrack_temp_docstore(); // update segment_updater inventory to remove tempstore let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt); - block_on(segment_updater.schedule_add_segment(segment_entry))?; + segment_updater.schedule_add_segment(segment_entry).wait()?; Ok(()) } @@ -368,7 +366,9 @@ impl IndexWriter { pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> { let delete_cursor = self.delete_queue.cursor(); let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None); - block_on(self.segment_updater.schedule_add_segment(segment_entry)) + self.segment_updater + .schedule_add_segment(segment_entry) + .wait() } /// Creates a new segment. @@ -516,13 +516,10 @@ impl IndexWriter { /// Merges a given list of segments /// /// `segment_ids` is required to be non-empty. - pub fn merge( - &mut self, - segment_ids: &[SegmentId], - ) -> impl Future> { + pub fn merge(&mut self, segment_ids: &[SegmentId]) -> ScheduledResult { let merge_operation = self.segment_updater.make_merge_operation(segment_ids); let segment_updater = self.segment_updater.clone(); - async move { segment_updater.start_merge(merge_operation)?.await } + segment_updater.start_merge(merge_operation) } /// Closes the current document channel send. @@ -781,7 +778,6 @@ impl Drop for IndexWriter { mod tests { use std::collections::{HashMap, HashSet}; - use futures::executor::block_on; use proptest::prelude::*; use proptest::prop_oneof; use proptest::strategy::Strategy; @@ -1456,7 +1452,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); if segment_ids.len() >= 2 { - block_on(index_writer.merge(&segment_ids)).unwrap(); + index_writer.merge(&segment_ids).wait().unwrap(); assert!(index_writer.segment_updater().wait_merging_thread().is_ok()); } } @@ -1472,7 +1468,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); if segment_ids.len() >= 2 { - block_on(index_writer.merge(&segment_ids)).unwrap(); + index_writer.merge(&segment_ids).wait().unwrap(); assert!(index_writer.wait_merging_threads().is_ok()); } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 775baca74a..84e88efc3e 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1133,7 +1133,6 @@ impl IndexMerger { #[cfg(test)] mod tests { use byteorder::{BigEndian, ReadBytesExt}; - use futures::executor::block_on; use schema::FAST; use crate::collector::tests::{ @@ -1209,7 +1208,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } { @@ -1458,7 +1457,7 @@ mod tests { { // merging the segments let segment_ids = index.searchable_segment_ids()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; reader.reload()?; let searcher = reader.searcher(); assert_eq!(searcher.segment_readers().len(), 1); @@ -1551,7 +1550,7 @@ mod tests { { // Test merging a single segment in order to remove deletes. let segment_ids = index.searchable_segment_ids()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; reader.reload()?; let searcher = reader.searcher(); @@ -1771,7 +1770,10 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests().unwrap(); - block_on(index_writer.merge(&segment_ids)).expect("Merging failed"); + index_writer + .merge(&segment_ids) + .wait() + .expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); reader.reload().unwrap(); test_searcher( @@ -1826,7 +1828,7 @@ mod tests { let segment_ids = index .searchable_segment_ids() .expect("Searchable segments failed."); - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; reader.reload()?; // commit has not been called yet. The document should still be // there. @@ -1853,7 +1855,7 @@ mod tests { index_writer.commit()?; index_writer.delete_term(Term::from_field_u64(int_field, 1)); let segment_ids = index.searchable_segment_ids()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; // assert delete has not been committed reader.reload()?; @@ -1954,7 +1956,7 @@ mod tests { { let segment_ids = index.searchable_segment_ids()?; let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } reader.reload()?; @@ -2082,7 +2084,7 @@ mod tests { .iter() .map(|reader| reader.segment_id()) .collect(); - block_on(writer.merge(&segment_ids[..]))?; + writer.merge(&segment_ids[..]).wait()?; reader.reload()?; let searcher = reader.searcher(); diff --git a/src/indexer/merger_sorted_index_test.rs b/src/indexer/merger_sorted_index_test.rs index b07f5bf61a..cd3ee2f038 100644 --- a/src/indexer/merger_sorted_index_test.rs +++ b/src/indexer/merger_sorted_index_test.rs @@ -1,7 +1,5 @@ #[cfg(test)] mod tests { - use futures::executor::block_on; - use crate::collector::TopDocs; use crate::core::Index; use crate::fastfield::{AliveBitSet, FastFieldReader, MultiValuedFastFieldReader}; @@ -50,7 +48,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests().unwrap(); - assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); + assert!(index_writer.merge(&segment_ids).wait().is_ok()); assert!(index_writer.wait_merging_threads().is_ok()); } index @@ -140,7 +138,7 @@ mod tests { { let segment_ids = index.searchable_segment_ids()?; let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; } Ok(index) diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index b0634d2bac..abffd733ca 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -33,7 +33,7 @@ pub use self::prepared_commit::PreparedCommit; pub use self::segment_entry::SegmentEntry; pub use self::segment_manager::SegmentManager; pub use self::segment_serializer::SegmentSerializer; -pub use self::segment_updater::{merge_filtered_segments, merge_indices}; +pub use self::segment_updater::{merge_filtered_segments, merge_indices, ScheduledResult}; pub use self::segment_writer::SegmentWriter; use crate::indexer::operation::AddOperation; diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index 13d2cfa06a..10369ca092 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -1,6 +1,5 @@ -use futures::executor::block_on; - use super::IndexWriter; +use crate::indexer::segment_updater::ScheduledResult; use crate::Opstamp; /// A prepared commit @@ -37,7 +36,7 @@ impl<'a> PreparedCommit<'a> { /// Proceeds to commit. /// See `.commit_async()`. pub fn commit(self) -> crate::Result { - block_on(self.commit_async()) + self.commit_async().wait() } /// Proceeds to commit. @@ -45,12 +44,10 @@ impl<'a> PreparedCommit<'a> { /// Unfortunately, contrary to what `PrepareCommit` may suggests, /// this operation is not at all really light. /// At this point deletes have not been flushed yet. - pub async fn commit_async(self) -> crate::Result { + pub fn commit_async(self) -> ScheduledResult { info!("committing {}", self.opstamp); self.index_writer .segment_updater() .schedule_commit(self.opstamp, self.payload) - .await?; - Ok(self.opstamp) } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index c1255eec63..b4df666033 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -4,13 +4,14 @@ use std::io; use std::io::Write; use std::ops::Deref; use std::path::PathBuf; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; +use std::task::Poll; use fail::fail_point; -use futures::channel::oneshot; use futures::executor::{ThreadPool, ThreadPoolBuilder}; -use futures::future::{Future, TryFutureExt}; +use futures::future::Future; use super::segment_manager::SegmentManager; use crate::core::{ @@ -349,39 +350,30 @@ impl SegmentUpdater { *self.merge_policy.write().unwrap() = arc_merge_policy; } - async fn schedule_task< - T: 'static + Send, - F: Future> + 'static + Send, - >( + fn schedule_task> + 'static + Send>( &self, task: F, - ) -> crate::Result { + ) -> ScheduledResult { if !self.is_alive() { - return Err(crate::TantivyError::SystemError( - "Segment updater killed".to_string(), - )); + return crate::TantivyError::SystemError("Segment updater killed".to_string()).into(); } - let (sender, receiver) = oneshot::channel(); + let (scheduled_result, sender) = ScheduledResult::create( + "A segment_updater future did not success. This never happen.", + ); self.pool.spawn_ok(async move { let task_result = task.await; let _ = sender.send(task_result); }); - let task_result = receiver.await; - task_result.unwrap_or_else(|_| { - let err_msg = - "A segment_updater future did not success. This should never happen.".to_string(); - Err(crate::TantivyError::SystemError(err_msg)) - }) + scheduled_result } - pub async fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> crate::Result<()> { + pub fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> ScheduledResult<()> { let segment_updater = self.clone(); self.schedule_task(async move { segment_updater.segment_manager.add_segment(segment_entry); segment_updater.consider_merge_options().await; Ok(()) }) - .await } /// Orders `SegmentManager` to remove all segments @@ -450,7 +442,7 @@ impl SegmentUpdater { pub async fn schedule_garbage_collect(&self) -> crate::Result { let garbage_collect_future = garbage_collect_files(self.clone()); - self.schedule_task(garbage_collect_future).await + self.schedule_task(garbage_collect_future).wait() } /// List the files that are useful to the index. @@ -468,11 +460,11 @@ impl SegmentUpdater { files } - pub(crate) async fn schedule_commit( + pub(crate) fn schedule_commit( &self, opstamp: Opstamp, payload: Option, - ) -> crate::Result<()> { + ) -> ScheduledResult { let segment_updater: SegmentUpdater = self.clone(); self.schedule_task(async move { let segment_entries = segment_updater.purge_deletes(opstamp)?; @@ -480,9 +472,8 @@ impl SegmentUpdater { segment_updater.save_metas(opstamp, payload)?; let _ = garbage_collect_files(segment_updater.clone()).await; segment_updater.consider_merge_options().await; - Ok(()) + Ok(opstamp) }) - .await } fn store_meta(&self, index_meta: &IndexMeta) { @@ -515,24 +506,30 @@ impl SegmentUpdater { // suggested and the moment when it ended up being executed.) // // `segment_ids` is required to be non-empty. - pub fn start_merge( - &self, - merge_operation: MergeOperation, - ) -> crate::Result>> { + pub fn start_merge(&self, merge_operation: MergeOperation) -> ScheduledResult { assert!( !merge_operation.segment_ids().is_empty(), "Segment_ids cannot be empty." ); let segment_updater = self.clone(); - let segment_entries: Vec = self + let segment_entries: Vec = match self .segment_manager - .start_merge(merge_operation.segment_ids())?; + .start_merge(merge_operation.segment_ids()) + { + Ok(segment_entries) => segment_entries, + Err(err) => { + warn!( + "Starting the merge failed for the following reason. This is not fatal. {}", + err + ); + return err.into(); + } + }; info!("Starting merge - {:?}", merge_operation.segment_ids()); - let (merging_future_send, merging_future_recv) = - oneshot::channel::>(); + let (scheduled_result, merging_future_send) = ScheduledResult::create("Merge operation failed."); self.merge_thread_pool.spawn_ok(async move { // The fact that `merge_operation` is moved here is important. @@ -562,11 +559,7 @@ impl SegmentUpdater { } }); - Ok(merging_future_recv.unwrap_or_else(|e| { - Err(crate::TantivyError::SystemError( - "Merge failed:".to_string() + &e.to_string(), - )) - })) + scheduled_result } pub(crate) fn get_mergeable_segments(&self) -> (Vec, Vec) { @@ -601,12 +594,9 @@ impl SegmentUpdater { merge_candidates.extend(committed_merge_candidates); for merge_operation in merge_candidates { - if let Err(err) = self.start_merge(merge_operation) { - warn!( - "Starting the merge failed for the following reason. This is not fatal. {}", - err - ); - } + // If a merge cannot be started this is not a fatal error. + // We do log a warning in `start_merge`. + let _ = self.start_merge(merge_operation); } } @@ -661,7 +651,7 @@ impl SegmentUpdater { let _ = garbage_collect_files(segment_updater).await; Ok(()) }) - .await?; + .wait()?; Ok(after_merge_segment_meta) } @@ -686,8 +676,90 @@ impl SegmentUpdater { } } +/// Scheduled result is the future equivalent of a `tantivy::Result`. +/// +/// It wraps a oneshot receiver channel that allows consumption in both a sync or async context. +/// In a sync context, you can call `ScheduledResult::wait`. +pub struct ScheduledResult { + inner: ScheduledResultInner +} + +impl ScheduledResult { + +} + +enum ScheduledResultInner { + FailedToSchedule(Option), + Scheduled { + receiver: oneshot::Receiver>, + error_msg_if_failure: &'static str, + }, +} + +impl From for ScheduledResult { + fn from(err: TantivyError) -> Self { + ScheduledResult { inner: ScheduledResultInner::FailedToSchedule(Some(err)) } + } +} + +impl ScheduledResult { + fn create(error_msg_if_failure: &'static str) -> (Self, oneshot::Sender>) { + let (sender, receiver) = oneshot::channel(); + let inner: ScheduledResultInner = ScheduledResultInner::Scheduled { + receiver, + error_msg_if_failure, + }; + (ScheduledResult { inner }, sender) + } + + /// Blocks until the scheduled result is available. + /// + /// In an async context, you should simply use `ScheduledResult` as a future. + pub fn wait(self) -> crate::Result { + match self.inner { + ScheduledResultInner::FailedToSchedule(err) => Err(err.unwrap()), + ScheduledResultInner::Scheduled { + receiver, + error_msg_if_failure, + } => receiver.recv().unwrap_or_else(|_| { + Err(crate::TantivyError::SystemError( + error_msg_if_failure.to_string(), + )) + }), + } + } +} + +impl Future for ScheduledResult { + type Output = crate::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + unsafe { + match &mut Pin::get_unchecked_mut(self).inner { + ScheduledResultInner::FailedToSchedule(err) => Poll::Ready(Err(err.take().unwrap())), + ScheduledResultInner::Scheduled { + receiver, + error_msg_if_failure, + } => match Future::poll(Pin::new_unchecked(receiver), cx) { + Poll::Ready(oneshot_res) => { + let res = oneshot_res.unwrap_or_else(|_| { + Err(crate::TantivyError::SystemError( + error_msg_if_failure.to_string(), + )) + }); + Poll::Ready(res) + } + Poll::Pending => Poll::Pending, + }, + } + } + } +} + #[cfg(test)] mod tests { + use futures::executor::block_on; + use super::merge_indices; use crate::collector::TopDocs; use crate::directory::RamDirectory; @@ -697,7 +769,7 @@ mod tests { use crate::indexer::segment_updater::merge_filtered_segments; use crate::query::QueryParser; use crate::schema::*; - use crate::{Directory, DocAddress, Index, Segment}; + use crate::{Directory, DocAddress, Index, ScheduledResult, Segment, TantivyError}; #[test] fn test_delete_during_merge() -> crate::Result<()> { @@ -1113,4 +1185,34 @@ mod tests { Ok(()) } + + #[test] + fn test_scheduled_result_failed_to_schedule() { + let scheduled_result: ScheduledResult<()> = ScheduledResult::from(TantivyError::Poisoned); + let res = block_on(scheduled_result); + assert!(matches!(res, Err(TantivyError::Poisoned))); + } + + #[test] + fn test_scheduled_result_error() { + let (scheduled_result, tx): (ScheduledResult<()>, _) = ScheduledResult::create("failed"); + drop(tx); + let res = block_on(scheduled_result); + assert!(matches!(res, Err(TantivyError::SystemError(_)))); + } + + #[test] + fn test_scheduled_result_sent_success() { + let (scheduled_result, tx): (ScheduledResult, _) = ScheduledResult::create("failed"); + tx.send(Ok(2u64)); + assert_eq!(block_on(scheduled_result).unwrap(), 2u64); + } + + #[test] + fn test_scheduled_result_sent_error() { + let (scheduled_result, tx): (ScheduledResult, _) = ScheduledResult::create("failed"); + tx.send(Err(TantivyError::Poisoned)); + let res = block_on(scheduled_result); + assert!(matches!(res, Err(TantivyError::Poisoned))); + } } diff --git a/src/lib.rs b/src/lib.rs index aa70cbdc03..32bfecd63e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,6 +127,7 @@ mod macros; pub use chrono; pub use crate::error::TantivyError; +pub use crate::indexer::ScheduledResult; /// Tantivy result. /// @@ -308,6 +309,7 @@ pub mod tests { use crate::core::SegmentReader; use crate::docset::{DocSet, TERMINATED}; use crate::fastfield::FastFieldReader; + use crate::merge_policy::NoMergePolicy; use crate::query::BooleanQuery; use crate::schema::*; use crate::{DocAddress, Index, Postings, ReloadPolicy}; @@ -935,8 +937,6 @@ pub mod tests { // motivated by #729 #[test] fn test_update_via_delete_insert() -> crate::Result<()> { - use futures::executor::block_on; - use crate::collector::Count; use crate::indexer::NoMergePolicy; use crate::query::AllQuery; @@ -990,8 +990,7 @@ pub mod tests { .iter() .map(|reader| reader.segment_id()) .collect(); - block_on(index_writer.merge(&segment_ids)).unwrap(); - + index_writer.merge(&segment_ids).wait()?; index_reader.reload()?; let searcher = index_reader.searcher(); assert_eq!(searcher.search(&AllQuery, &Count)?, DOC_COUNT as usize); @@ -1006,6 +1005,7 @@ pub mod tests { let schema = builder.build(); let index = Index::create_in_dir(&index_path, schema)?; let mut writer = index.writer(50_000_000)?; + writer.set_merge_policy(Box::new(NoMergePolicy)); for _ in 0..5000 { writer.add_document(doc!(body => "foo"))?; writer.add_document(doc!(body => "boo"))?; @@ -1017,8 +1017,7 @@ pub mod tests { writer.delete_term(Term::from_field_text(body, "foo")); writer.commit()?; let segment_ids = index.searchable_segment_ids()?; - let _ = futures::executor::block_on(writer.merge(&segment_ids)); - + writer.merge(&segment_ids).wait()?; assert!(index.validate_checksum()?.is_empty()); Ok(()) } diff --git a/src/query/term_query/term_scorer.rs b/src/query/term_query/term_scorer.rs index 5badb55cf1..bb952e222b 100644 --- a/src/query/term_query/term_scorer.rs +++ b/src/query/term_query/term_scorer.rs @@ -125,7 +125,6 @@ impl Scorer for TermScorer { #[cfg(test)] mod tests { - use futures::executor::block_on; use proptest::prelude::*; use crate::merge_policy::NoMergePolicy; @@ -321,9 +320,7 @@ mod tests { .collect(); test_block_wand_aux(&term_query, &searcher)?; } - { - let _ = block_on(writer.merge(&segment_ids[..])); - } + writer.merge(&segment_ids[..]).wait().unwrap(); { reader.reload()?; let searcher = reader.searcher(); diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index 7d8b8606a9..2401e23d6d 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -41,7 +41,6 @@ mod tests { use std::io; - use futures::executor::block_on; use proptest::strategy::{BoxedStrategy, Strategy}; use super::{SkipIndex, SkipIndexBuilder}; @@ -145,7 +144,7 @@ mod tests { index_writer.delete_term(Term::from_field_text(text, "testb")); index_writer.commit()?; let segment_ids = index.searchable_segment_ids()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait().unwrap(); let reader = index.reader()?; let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 30); diff --git a/src/store/mod.rs b/src/store/mod.rs index ed142ab3c5..b378d1db5d 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -55,8 +55,6 @@ pub mod tests { use std::path::Path; - use futures::executor::block_on; - use super::*; use crate::directory::{Directory, RamDirectory, WritePtr}; use crate::fastfield::AliveBitSet; @@ -269,7 +267,7 @@ pub mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_for_tests().unwrap(); - assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); + assert!(index_writer.merge(&segment_ids).wait().is_ok()); assert!(index_writer.wait_merging_threads().is_ok()); } @@ -316,7 +314,7 @@ pub mod tests { { let segment_ids = index.searchable_segment_ids()?; let mut index_writer = index.writer_for_tests()?; - block_on(index_writer.merge(&segment_ids))?; + index_writer.merge(&segment_ids).wait()?; index_writer.wait_merging_threads()?; }