diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 7b72380934..a7f597d7aa 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -472,6 +472,8 @@ mod tests { // There are more tests in directory/mod.rs // The following tests are specific to the MmapDirectory + use std::time::Duration; + use common::HasLen; use super::*; @@ -610,7 +612,14 @@ mod tests { mmap_directory.get_cache_info().mmapped.len() ); } - assert!(mmap_directory.get_cache_info().mmapped.is_empty()); - Ok(()) + // This test failed on CI. The last Mmap is dropped from the merging thread so there might + // be a race condition indeed. + for _ in 0..10 { + if mmap_directory.get_cache_info().mmapped.is_empty() { + return Ok(()); + } + std::thread::sleep(Duration::from_millis(200)); + } + panic!("The cache still contains information. One of the Mmap has not been dropped."); } } diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 8cd4deea4e..d2726c679e 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -246,18 +246,27 @@ impl DeleteCursor { mod tests { use super::{DeleteOperation, DeleteQueue}; - use crate::schema::{Field, Term}; + use crate::query::{Explanation, Scorer, Weight}; + use crate::{DocId, Score, SegmentReader}; + + struct DummyWeight; + impl Weight for DummyWeight { + fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result> { + Err(crate::TantivyError::InternalError("dummy impl".to_owned())) + } + + fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result { + Err(crate::TantivyError::InternalError("dummy impl".to_owned())) + } + } #[test] fn test_deletequeue() { let delete_queue = DeleteQueue::new(); - let make_op = |i: usize| { - let field = Field::from_field_id(1u32); - DeleteOperation { - opstamp: i as u64, - term: Term::from_field_u64(field, i as u64), - } + let make_op = |i: usize| DeleteOperation { + opstamp: i as u64, + target: Box::new(DummyWeight), }; delete_queue.push(make_op(1)); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 70f89490e8..065e65ec1b 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -11,7 +11,6 @@ use super::segment_updater::SegmentUpdater; use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit}; use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader}; use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite}; -use crate::docset::{DocSet, TERMINATED}; use crate::error::TantivyError; use crate::fastfield::write_alive_bitset; use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue}; @@ -20,8 +19,9 @@ use crate::indexer::index_writer_status::IndexWriterStatus; use crate::indexer::operation::DeleteOperation; use crate::indexer::stamper::Stamper; use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter}; +use crate::query::{Query, TermQuery}; use crate::schema::{Document, IndexRecordOption, Term}; -use crate::{FutureResult, Opstamp}; +use crate::{FutureResult, IndexReader, Opstamp}; // Size of the margin for the `memory_arena`. A segment is closed when the remaining memory // in the `memory_arena` goes below MARGIN_IN_BYTES. @@ -57,6 +57,7 @@ pub struct IndexWriter { _directory_lock: Option, index: Index, + index_reader: IndexReader, memory_arena_in_bytes_per_thread: usize, @@ -92,19 +93,14 @@ fn compute_deleted_bitset( // A delete operation should only affect // document that were inserted before it. - let inverted_index = segment_reader.inverted_index(delete_op.term.field())?; - if let Some(mut docset) = - inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)? - { - let mut doc_matching_deleted_term = docset.doc(); - while doc_matching_deleted_term != TERMINATED { - if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) { - alive_bitset.remove(doc_matching_deleted_term); + delete_op + .target + .for_each(segment_reader, &mut |doc_matching_delete_query, _| { + if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) { + alive_bitset.remove(doc_matching_delete_query); might_have_changed = true; } - doc_matching_deleted_term = docset.advance(); - } - } + })?; delete_cursor.advance(); } Ok(might_have_changed) @@ -302,6 +298,7 @@ impl IndexWriter { memory_arena_in_bytes_per_thread, index: index.clone(), + index_reader: index.reader()?, index_writer_status: IndexWriterStatus::from(document_receiver), operation_sender: document_sender, @@ -666,10 +663,33 @@ impl IndexWriter { /// Like adds, the deletion itself will be visible /// only after calling `commit()`. pub fn delete_term(&self, term: Term) -> Opstamp { + let query = TermQuery::new(term, IndexRecordOption::Basic); + // For backward compatibility, if Term is invalid for the index, do nothing but return an + // Opstamp + self.delete_query(Box::new(query)) + .unwrap_or_else(|_| self.stamper.stamp()) + } + + /// Delete all documents matching a given query. + /// Returns an `Err` if the query can't be executed. + /// + /// Delete operation only affects documents that + /// were added in previous commits, and documents + /// that were added previously in the same commit. + /// + /// Like adds, the deletion itself will be visible + /// only after calling `commit()`. + #[doc(hidden)] + pub fn delete_query(&self, query: Box) -> crate::Result { + let weight = query.weight(&self.index_reader.searcher(), false)?; + let opstamp = self.stamper.stamp(); - let delete_operation = DeleteOperation { opstamp, term }; + let delete_operation = DeleteOperation { + opstamp, + target: weight, + }; self.delete_queue.push(delete_operation); - opstamp + Ok(opstamp) } /// Returns the opstamp of the last successful commit. @@ -738,10 +758,17 @@ impl IndexWriter { let (batch_opstamp, stamps) = self.get_batch_opstamps(count); let mut adds = AddBatch::default(); + for (user_op, opstamp) in user_operations_it.zip(stamps) { match user_op { UserOperation::Delete(term) => { - let delete_operation = DeleteOperation { opstamp, term }; + let query = TermQuery::new(term, IndexRecordOption::Basic); + let weight = query.weight(&self.index_reader.searcher(), false)?; + + let delete_operation = DeleteOperation { + opstamp, + target: weight, + }; self.delete_queue.push(delete_operation); } UserOperation::Add(document) => { @@ -786,7 +813,7 @@ mod tests { use crate::directory::error::LockError; use crate::error::*; use crate::indexer::NoMergePolicy; - use crate::query::{QueryParser, TermQuery}; + use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery}; use crate::schema::{ self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions, TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT, @@ -1418,10 +1445,72 @@ mod tests { Ok(()) } + #[test] + fn test_delete_query_with_sort_by_field() -> crate::Result<()> { + let mut schema_builder = schema::Schema::builder(); + let id_field = + schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST); + let schema = schema_builder.build(); + + let settings = IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "id".to_string(), + order: Order::Desc, + }), + ..Default::default() + }; + + let index = Index::builder() + .schema(schema) + .settings(settings) + .create_in_ram()?; + let index_reader = index.reader()?; + let mut index_writer = index.writer_for_tests()?; + + // create and delete docs in same commit + for id in 0u64..5u64 { + index_writer.add_document(doc!(id_field => id))?; + } + for id in 1u64..4u64 { + let term = Term::from_field_u64(id_field, id); + let not_term = Term::from_field_u64(id_field, 2); + let term = Box::new(TermQuery::new(term, Default::default())); + let not_term = Box::new(TermQuery::new(not_term, Default::default())); + + let query: BooleanQuery = vec![ + (Occur::Must, term as Box), + (Occur::MustNot, not_term as Box), + ] + .into(); + + index_writer.delete_query(Box::new(query))?; + } + for id in 5u64..10u64 { + index_writer.add_document(doc!(id_field => id))?; + } + index_writer.commit()?; + index_reader.reload()?; + + let searcher = index_reader.searcher(); + assert_eq!(searcher.segment_readers().len(), 1); + + let segment_reader = searcher.segment_reader(0); + assert_eq!(segment_reader.num_docs(), 8); + assert_eq!(segment_reader.max_doc(), 10); + let fast_field_reader = segment_reader.fast_fields().u64(id_field)?; + let in_order_alive_ids: Vec = segment_reader + .doc_ids_alive() + .map(|doc| fast_field_reader.get_val(doc as u64)) + .collect(); + assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]); + Ok(()) + } + #[derive(Debug, Clone, Copy)] enum IndexingOp { AddDoc { id: u64 }, DeleteDoc { id: u64 }, + DeleteDocQuery { id: u64 }, Commit, Merge, } @@ -1429,6 +1518,7 @@ mod tests { fn balanced_operation_strategy() -> impl Strategy { prop_oneof![ (0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }), + (0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }), (0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }), (0u64..1u64).prop_map(|_| IndexingOp::Commit), (0u64..1u64).prop_map(|_| IndexingOp::Merge), @@ -1437,7 +1527,8 @@ mod tests { fn adding_operation_strategy() -> impl Strategy { prop_oneof![ - 10 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }), + 5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }), + 5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }), 50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }), 2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit), 1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge), @@ -1457,6 +1548,10 @@ mod tests { existing_ids.remove(&id); deleted_ids.insert(id); } + IndexingOp::DeleteDocQuery { id } => { + existing_ids.remove(&id); + deleted_ids.insert(id); + } _ => {} } } @@ -1539,6 +1634,11 @@ mod tests { IndexingOp::DeleteDoc { id } => { index_writer.delete_term(Term::from_field_u64(id_field, id)); } + IndexingOp::DeleteDocQuery { id } => { + let term = Term::from_field_u64(id_field, id); + let query = TermQuery::new(term, Default::default()); + index_writer.delete_query(Box::new(query))?; + } IndexingOp::Commit => { index_writer.commit()?; } diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index e0505be1d7..d64f740c2a 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -1,20 +1,11 @@ +use crate::query::Weight; use crate::schema::{Document, Term}; use crate::Opstamp; /// Timestamped Delete operation. -#[derive(Clone, Eq, PartialEq, Debug)] pub struct DeleteOperation { pub opstamp: Opstamp, - pub term: Term, -} - -impl Default for DeleteOperation { - fn default() -> Self { - DeleteOperation { - opstamp: 0u64, - term: Term::new(), - } - } + pub target: Box, } /// Timestamped Add operation.