From 3c7965f06356d1b80c10b777c86573bc910e9a09 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 19 Sep 2022 17:59:10 +0200 Subject: [PATCH 1/5] add support for deleting all documents matching query --- src/indexer/delete_queue.rs | 5 +- src/indexer/index_writer.rs | 138 ++++++++++++++++++++++++++++++++---- src/indexer/operation.rs | 28 ++++++-- 3 files changed, 146 insertions(+), 25 deletions(-) diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 8cd4deea4e..48c4bbe1d4 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -254,10 +254,7 @@ mod tests { let make_op = |i: usize| { let field = Field::from_field_id(1u32); - DeleteOperation { - opstamp: i as u64, - term: Term::from_field_u64(field, i as u64), - } + DeleteOperation::new(i as u64, Term::from_field_u64(field, i as u64)) }; delete_queue.push(make_op(1)); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 70f89490e8..f9ba52d0d8 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -4,6 +4,7 @@ use std::thread; use std::thread::JoinHandle; use common::BitSet; +use itertools::Either; use smallvec::smallvec; use super::operation::{AddOperation, UserOperation}; @@ -20,8 +21,9 @@ use crate::indexer::index_writer_status::IndexWriterStatus; use crate::indexer::operation::DeleteOperation; use crate::indexer::stamper::Stamper; use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter}; +use crate::query::Query; use crate::schema::{Document, IndexRecordOption, Term}; -use crate::{FutureResult, Opstamp}; +use crate::{FutureResult, IndexReader, Opstamp}; // Size of the margin for the `memory_arena`. A segment is closed when the remaining memory // in the `memory_arena` goes below MARGIN_IN_BYTES. @@ -57,6 +59,7 @@ pub struct IndexWriter { _directory_lock: Option, index: Index, + index_reader: IndexReader, memory_arena_in_bytes_per_thread: usize, @@ -92,17 +95,29 @@ fn compute_deleted_bitset( // A delete operation should only affect // document that were inserted before it. - let inverted_index = segment_reader.inverted_index(delete_op.term.field())?; - if let Some(mut docset) = - inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)? - { - let mut doc_matching_deleted_term = docset.doc(); - while doc_matching_deleted_term != TERMINATED { - if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) { - alive_bitset.remove(doc_matching_deleted_term); - might_have_changed = true; + match &delete_op.term { + Either::Left(term) => { + let inverted_index = segment_reader.inverted_index(term.field())?; + if let Some(mut docset) = + inverted_index.read_postings(term, IndexRecordOption::Basic)? + { + let mut doc_matching_deleted_term = docset.doc(); + while doc_matching_deleted_term != TERMINATED { + if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) { + alive_bitset.remove(doc_matching_deleted_term); + might_have_changed = true; + } + doc_matching_deleted_term = docset.advance(); + } } - doc_matching_deleted_term = docset.advance(); + } + Either::Right(query) => { + query.for_each(segment_reader, &mut |doc_matching_delete_query, _| { + if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) { + alive_bitset.remove(doc_matching_delete_query); + might_have_changed = true; + } + })?; } } delete_cursor.advance(); @@ -302,6 +317,7 @@ impl IndexWriter { memory_arena_in_bytes_per_thread, index: index.clone(), + index_reader: index.reader()?, index_writer_status: IndexWriterStatus::from(document_receiver), operation_sender: document_sender, @@ -667,11 +683,30 @@ impl IndexWriter { /// only after calling `commit()`. pub fn delete_term(&self, term: Term) -> Opstamp { let opstamp = self.stamper.stamp(); - let delete_operation = DeleteOperation { opstamp, term }; + let delete_operation = DeleteOperation::new(opstamp, term); self.delete_queue.push(delete_operation); opstamp } + /// Delete all documents matching a given query. + /// Returns an `Err` if the query can't be executed. + /// + /// Delete operation only affects documents that + /// were added in previous commits, and documents + /// that were added previously in the same commit. + /// + /// Like adds, the deletion itself will be visible + /// only after calling `commit()`. + #[doc(hidden)] + pub fn delete_query(&self, query: Box) -> crate::Result { + let weight = query.weight(&self.index_reader.searcher(), false)?; + + let opstamp = self.stamper.stamp(); + let delete_operation = DeleteOperation::new_for_query(opstamp, weight); + self.delete_queue.push(delete_operation); + Ok(opstamp) + } + /// Returns the opstamp of the last successful commit. /// /// This is, for instance, the opstamp the index will @@ -741,7 +776,7 @@ impl IndexWriter { for (user_op, opstamp) in user_operations_it.zip(stamps) { match user_op { UserOperation::Delete(term) => { - let delete_operation = DeleteOperation { opstamp, term }; + let delete_operation = DeleteOperation::new(opstamp, term); self.delete_queue.push(delete_operation); } UserOperation::Add(document) => { @@ -786,7 +821,7 @@ mod tests { use crate::directory::error::LockError; use crate::error::*; use crate::indexer::NoMergePolicy; - use crate::query::{QueryParser, TermQuery}; + use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery}; use crate::schema::{ self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions, TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT, @@ -1418,10 +1453,72 @@ mod tests { Ok(()) } + #[test] + fn test_delete_query_with_sort_by_field() -> crate::Result<()> { + let mut schema_builder = schema::Schema::builder(); + let id_field = + schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST); + let schema = schema_builder.build(); + + let settings = IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "id".to_string(), + order: Order::Desc, + }), + ..Default::default() + }; + + let index = Index::builder() + .schema(schema) + .settings(settings) + .create_in_ram()?; + let index_reader = index.reader()?; + let mut index_writer = index.writer_for_tests()?; + + // create and delete docs in same commit + for id in 0u64..5u64 { + index_writer.add_document(doc!(id_field => id))?; + } + for id in 1u64..4u64 { + let term = Term::from_field_u64(id_field, id); + let not_term = Term::from_field_u64(id_field, 2); + let term = Box::new(TermQuery::new(term, Default::default())); + let not_term = Box::new(TermQuery::new(not_term, Default::default())); + + let query: BooleanQuery = vec![ + (Occur::Must, term as Box), + (Occur::MustNot, not_term as Box), + ] + .into(); + + index_writer.delete_query(Box::new(query))?; + } + for id in 5u64..10u64 { + index_writer.add_document(doc!(id_field => id))?; + } + index_writer.commit()?; + index_reader.reload()?; + + let searcher = index_reader.searcher(); + assert_eq!(searcher.segment_readers().len(), 1); + + let segment_reader = searcher.segment_reader(0); + assert_eq!(segment_reader.num_docs(), 8); + assert_eq!(segment_reader.max_doc(), 10); + let fast_field_reader = segment_reader.fast_fields().u64(id_field)?; + let in_order_alive_ids: Vec = segment_reader + .doc_ids_alive() + .map(|doc| fast_field_reader.get_val(doc as u64)) + .collect(); + assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]); + Ok(()) + } + #[derive(Debug, Clone, Copy)] enum IndexingOp { AddDoc { id: u64 }, DeleteDoc { id: u64 }, + DeleteDocQuery { id: u64 }, Commit, Merge, } @@ -1429,6 +1526,7 @@ mod tests { fn balanced_operation_strategy() -> impl Strategy { prop_oneof![ (0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }), + (0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }), (0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }), (0u64..1u64).prop_map(|_| IndexingOp::Commit), (0u64..1u64).prop_map(|_| IndexingOp::Merge), @@ -1437,7 +1535,8 @@ mod tests { fn adding_operation_strategy() -> impl Strategy { prop_oneof![ - 10 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }), + 5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }), + 5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }), 50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }), 2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit), 1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge), @@ -1457,6 +1556,10 @@ mod tests { existing_ids.remove(&id); deleted_ids.insert(id); } + IndexingOp::DeleteDocQuery { id } => { + existing_ids.remove(&id); + deleted_ids.insert(id); + } _ => {} } } @@ -1539,6 +1642,11 @@ mod tests { IndexingOp::DeleteDoc { id } => { index_writer.delete_term(Term::from_field_u64(id_field, id)); } + IndexingOp::DeleteDocQuery { id } => { + let term = Term::from_field_u64(id_field, id); + let query = TermQuery::new(term, Default::default()); + index_writer.delete_query(Box::new(query))?; + } IndexingOp::Commit => { index_writer.commit()?; } diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index e0505be1d7..6e58488e00 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -1,22 +1,38 @@ +use itertools::Either; + +use crate::query::Weight; use crate::schema::{Document, Term}; use crate::Opstamp; /// Timestamped Delete operation. -#[derive(Clone, Eq, PartialEq, Debug)] +//#[derive(Clone, Debug)] pub struct DeleteOperation { pub opstamp: Opstamp, - pub term: Term, + pub term: Either>, } -impl Default for DeleteOperation { - fn default() -> Self { +impl DeleteOperation { + pub fn new(opstamp: Opstamp, term: Term) -> Self { + DeleteOperation { + opstamp, + term: Either::Left(term), + } + } + + pub fn new_for_query(opstamp: Opstamp, weight: Box) -> Self { DeleteOperation { - opstamp: 0u64, - term: Term::new(), + opstamp, + term: Either::Right(weight), } } } +impl Default for DeleteOperation { + fn default() -> Self { + DeleteOperation::new(0, Term::new()) + } +} + /// Timestamped Add operation. #[derive(Eq, PartialEq, Debug)] pub struct AddOperation { From 9ae9fa52650a2cbc26960d5b274e197f08f94eb1 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 20 Sep 2022 09:26:49 +0200 Subject: [PATCH 2/5] use dedicated enum instead of Either --- src/indexer/delete_queue.rs | 6 +++++- src/indexer/index_writer.rs | 24 ++++++++++++++++-------- src/indexer/operation.rs | 28 +++++++++------------------- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 48c4bbe1d4..d40e77b73a 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -246,6 +246,7 @@ impl DeleteCursor { mod tests { use super::{DeleteOperation, DeleteQueue}; + use crate::indexer::operation::DeleteTarget; use crate::schema::{Field, Term}; #[test] @@ -254,7 +255,10 @@ mod tests { let make_op = |i: usize| { let field = Field::from_field_id(1u32); - DeleteOperation::new(i as u64, Term::from_field_u64(field, i as u64)) + DeleteOperation { + opstamp: i as u64, + target: DeleteTarget::Term(Term::from_field_u64(field, i as u64)), + } }; delete_queue.push(make_op(1)); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index f9ba52d0d8..e04b8e1970 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -4,7 +4,6 @@ use std::thread; use std::thread::JoinHandle; use common::BitSet; -use itertools::Either; use smallvec::smallvec; use super::operation::{AddOperation, UserOperation}; @@ -18,7 +17,7 @@ use crate::fastfield::write_alive_bitset; use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue}; use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping; use crate::indexer::index_writer_status::IndexWriterStatus; -use crate::indexer::operation::DeleteOperation; +use crate::indexer::operation::{DeleteOperation, DeleteTarget}; use crate::indexer::stamper::Stamper; use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter}; use crate::query::Query; @@ -95,8 +94,8 @@ fn compute_deleted_bitset( // A delete operation should only affect // document that were inserted before it. - match &delete_op.term { - Either::Left(term) => { + match &delete_op.target { + DeleteTarget::Term(term) => { let inverted_index = segment_reader.inverted_index(term.field())?; if let Some(mut docset) = inverted_index.read_postings(term, IndexRecordOption::Basic)? @@ -111,7 +110,7 @@ fn compute_deleted_bitset( } } } - Either::Right(query) => { + DeleteTarget::Query(query) => { query.for_each(segment_reader, &mut |doc_matching_delete_query, _| { if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) { alive_bitset.remove(doc_matching_delete_query); @@ -683,7 +682,10 @@ impl IndexWriter { /// only after calling `commit()`. pub fn delete_term(&self, term: Term) -> Opstamp { let opstamp = self.stamper.stamp(); - let delete_operation = DeleteOperation::new(opstamp, term); + let delete_operation = DeleteOperation { + opstamp, + target: DeleteTarget::Term(term), + }; self.delete_queue.push(delete_operation); opstamp } @@ -702,7 +704,10 @@ impl IndexWriter { let weight = query.weight(&self.index_reader.searcher(), false)?; let opstamp = self.stamper.stamp(); - let delete_operation = DeleteOperation::new_for_query(opstamp, weight); + let delete_operation = DeleteOperation { + opstamp, + target: DeleteTarget::Query(weight), + }; self.delete_queue.push(delete_operation); Ok(opstamp) } @@ -776,7 +781,10 @@ impl IndexWriter { for (user_op, opstamp) in user_operations_it.zip(stamps) { match user_op { UserOperation::Delete(term) => { - let delete_operation = DeleteOperation::new(opstamp, term); + let delete_operation = DeleteOperation { + opstamp, + target: DeleteTarget::Term(term), + }; self.delete_queue.push(delete_operation); } UserOperation::Add(document) => { diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index 6e58488e00..ed89290e98 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -1,35 +1,25 @@ -use itertools::Either; - use crate::query::Weight; use crate::schema::{Document, Term}; use crate::Opstamp; /// Timestamped Delete operation. -//#[derive(Clone, Debug)] pub struct DeleteOperation { pub opstamp: Opstamp, - pub term: Either>, + pub target: DeleteTarget, } -impl DeleteOperation { - pub fn new(opstamp: Opstamp, term: Term) -> Self { - DeleteOperation { - opstamp, - term: Either::Left(term), - } - } - - pub fn new_for_query(opstamp: Opstamp, weight: Box) -> Self { - DeleteOperation { - opstamp, - term: Either::Right(weight), - } - } +/// Target of a Delete operation +pub enum DeleteTarget { + Term(Term), + Query(Box), } impl Default for DeleteOperation { fn default() -> Self { - DeleteOperation::new(0, Term::new()) + DeleteOperation { + opstamp: 0, + target: DeleteTarget::Term(Term::new().into()), + } } } From deab3c7b9821055039cf916121a2b1dfc71918b1 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 20 Sep 2022 10:29:03 +0200 Subject: [PATCH 3/5] cleanup --- src/indexer/operation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index ed89290e98..f0ae538f01 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -18,7 +18,7 @@ impl Default for DeleteOperation { fn default() -> Self { DeleteOperation { opstamp: 0, - target: DeleteTarget::Term(Term::new().into()), + target: DeleteTarget::Term(Term::new()), } } } From c94c9c365562e33b0e754b1b4ceac3330d56af32 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 21 Sep 2022 10:36:18 +0200 Subject: [PATCH 4/5] always use Query for deletion --- src/indexer/delete_queue.rs | 24 ++++++++++------ src/indexer/index_writer.rs | 56 +++++++++++++------------------------ src/indexer/operation.rs | 17 +---------- 3 files changed, 37 insertions(+), 60 deletions(-) diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index d40e77b73a..d2726c679e 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -246,19 +246,27 @@ impl DeleteCursor { mod tests { use super::{DeleteOperation, DeleteQueue}; - use crate::indexer::operation::DeleteTarget; - use crate::schema::{Field, Term}; + use crate::query::{Explanation, Scorer, Weight}; + use crate::{DocId, Score, SegmentReader}; + + struct DummyWeight; + impl Weight for DummyWeight { + fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result> { + Err(crate::TantivyError::InternalError("dummy impl".to_owned())) + } + + fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result { + Err(crate::TantivyError::InternalError("dummy impl".to_owned())) + } + } #[test] fn test_deletequeue() { let delete_queue = DeleteQueue::new(); - let make_op = |i: usize| { - let field = Field::from_field_id(1u32); - DeleteOperation { - opstamp: i as u64, - target: DeleteTarget::Term(Term::from_field_u64(field, i as u64)), - } + let make_op = |i: usize| DeleteOperation { + opstamp: i as u64, + target: Box::new(DummyWeight), }; delete_queue.push(make_op(1)); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index e04b8e1970..065e65ec1b 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -11,16 +11,15 @@ use super::segment_updater::SegmentUpdater; use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit}; use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader}; use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite}; -use crate::docset::{DocSet, TERMINATED}; use crate::error::TantivyError; use crate::fastfield::write_alive_bitset; use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue}; use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping; use crate::indexer::index_writer_status::IndexWriterStatus; -use crate::indexer::operation::{DeleteOperation, DeleteTarget}; +use crate::indexer::operation::DeleteOperation; use crate::indexer::stamper::Stamper; use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter}; -use crate::query::Query; +use crate::query::{Query, TermQuery}; use crate::schema::{Document, IndexRecordOption, Term}; use crate::{FutureResult, IndexReader, Opstamp}; @@ -94,31 +93,14 @@ fn compute_deleted_bitset( // A delete operation should only affect // document that were inserted before it. - match &delete_op.target { - DeleteTarget::Term(term) => { - let inverted_index = segment_reader.inverted_index(term.field())?; - if let Some(mut docset) = - inverted_index.read_postings(term, IndexRecordOption::Basic)? - { - let mut doc_matching_deleted_term = docset.doc(); - while doc_matching_deleted_term != TERMINATED { - if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) { - alive_bitset.remove(doc_matching_deleted_term); - might_have_changed = true; - } - doc_matching_deleted_term = docset.advance(); - } + delete_op + .target + .for_each(segment_reader, &mut |doc_matching_delete_query, _| { + if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) { + alive_bitset.remove(doc_matching_delete_query); + might_have_changed = true; } - } - DeleteTarget::Query(query) => { - query.for_each(segment_reader, &mut |doc_matching_delete_query, _| { - if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) { - alive_bitset.remove(doc_matching_delete_query); - might_have_changed = true; - } - })?; - } - } + })?; delete_cursor.advance(); } Ok(might_have_changed) @@ -681,13 +663,11 @@ impl IndexWriter { /// Like adds, the deletion itself will be visible /// only after calling `commit()`. pub fn delete_term(&self, term: Term) -> Opstamp { - let opstamp = self.stamper.stamp(); - let delete_operation = DeleteOperation { - opstamp, - target: DeleteTarget::Term(term), - }; - self.delete_queue.push(delete_operation); - opstamp + let query = TermQuery::new(term, IndexRecordOption::Basic); + // For backward compatibility, if Term is invalid for the index, do nothing but return an + // Opstamp + self.delete_query(Box::new(query)) + .unwrap_or_else(|_| self.stamper.stamp()) } /// Delete all documents matching a given query. @@ -706,7 +686,7 @@ impl IndexWriter { let opstamp = self.stamper.stamp(); let delete_operation = DeleteOperation { opstamp, - target: DeleteTarget::Query(weight), + target: weight, }; self.delete_queue.push(delete_operation); Ok(opstamp) @@ -778,12 +758,16 @@ impl IndexWriter { let (batch_opstamp, stamps) = self.get_batch_opstamps(count); let mut adds = AddBatch::default(); + for (user_op, opstamp) in user_operations_it.zip(stamps) { match user_op { UserOperation::Delete(term) => { + let query = TermQuery::new(term, IndexRecordOption::Basic); + let weight = query.weight(&self.index_reader.searcher(), false)?; + let delete_operation = DeleteOperation { opstamp, - target: DeleteTarget::Term(term), + target: weight, }; self.delete_queue.push(delete_operation); } diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index f0ae538f01..d64f740c2a 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -5,22 +5,7 @@ use crate::Opstamp; /// Timestamped Delete operation. pub struct DeleteOperation { pub opstamp: Opstamp, - pub target: DeleteTarget, -} - -/// Target of a Delete operation -pub enum DeleteTarget { - Term(Term), - Query(Box), -} - -impl Default for DeleteOperation { - fn default() -> Self { - DeleteOperation { - opstamp: 0, - target: DeleteTarget::Term(Term::new()), - } - } + pub target: Box, } /// Timestamped Add operation. From 1dfa388f75cf00b734ded00fef2f108db1cc1b9a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 22 Sep 2022 19:05:43 +0900 Subject: [PATCH 5/5] Attempt to fix race condition in unit test --- src/directory/mmap_directory.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 7b72380934..a7f597d7aa 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -472,6 +472,8 @@ mod tests { // There are more tests in directory/mod.rs // The following tests are specific to the MmapDirectory + use std::time::Duration; + use common::HasLen; use super::*; @@ -610,7 +612,14 @@ mod tests { mmap_directory.get_cache_info().mmapped.len() ); } - assert!(mmap_directory.get_cache_info().mmapped.is_empty()); - Ok(()) + // This test failed on CI. The last Mmap is dropped from the merging thread so there might + // be a race condition indeed. + for _ in 0..10 { + if mmap_directory.get_cache_info().mmapped.is_empty() { + return Ok(()); + } + std::thread::sleep(Duration::from_millis(200)); + } + panic!("The cache still contains information. One of the Mmap has not been dropped."); } }