Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for deleting all documents matching query #1535

Merged
merged 5 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/indexer/delete_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ mod tests {

let make_op = |i: usize| {
let field = Field::from_field_id(1u32);
DeleteOperation {
opstamp: i as u64,
term: Term::from_field_u64(field, i as u64),
}
DeleteOperation::new(i as u64, Term::from_field_u64(field, i as u64))
};

delete_queue.push(make_op(1));
Expand Down
138 changes: 123 additions & 15 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::thread;
use std::thread::JoinHandle;

use common::BitSet;
use itertools::Either;
use smallvec::smallvec;

use super::operation::{AddOperation, UserOperation};
Expand All @@ -20,8 +21,9 @@ use crate::indexer::index_writer_status::IndexWriterStatus;
use crate::indexer::operation::DeleteOperation;
use crate::indexer::stamper::Stamper;
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
use crate::query::Query;
use crate::schema::{Document, IndexRecordOption, Term};
use crate::{FutureResult, Opstamp};
use crate::{FutureResult, IndexReader, Opstamp};

// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
// in the `memory_arena` goes below MARGIN_IN_BYTES.
Expand Down Expand Up @@ -57,6 +59,7 @@ pub struct IndexWriter {
_directory_lock: Option<DirectoryLock>,

index: Index,
index_reader: IndexReader,

memory_arena_in_bytes_per_thread: usize,

Expand Down Expand Up @@ -92,17 +95,29 @@ fn compute_deleted_bitset(

// A delete operation should only affect
// document that were inserted before it.
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?;
if let Some(mut docset) =
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)?
{
let mut doc_matching_deleted_term = docset.doc();
while doc_matching_deleted_term != TERMINATED {
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
alive_bitset.remove(doc_matching_deleted_term);
might_have_changed = true;
match &delete_op.term {
Either::Left(term) => {
let inverted_index = segment_reader.inverted_index(term.field())?;
if let Some(mut docset) =
inverted_index.read_postings(term, IndexRecordOption::Basic)?
{
let mut doc_matching_deleted_term = docset.doc();
while doc_matching_deleted_term != TERMINATED {
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
alive_bitset.remove(doc_matching_deleted_term);
might_have_changed = true;
}
doc_matching_deleted_term = docset.advance();
}
}
doc_matching_deleted_term = docset.advance();
}
Either::Right(query) => {
query.for_each(segment_reader, &mut |doc_matching_delete_query, _| {
if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
alive_bitset.remove(doc_matching_delete_query);
might_have_changed = true;
}
})?;
}
}
delete_cursor.advance();
Expand Down Expand Up @@ -302,6 +317,7 @@ impl IndexWriter {

memory_arena_in_bytes_per_thread,
index: index.clone(),
index_reader: index.reader()?,

index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,
Expand Down Expand Up @@ -667,11 +683,30 @@ impl IndexWriter {
/// only after calling `commit()`.
pub fn delete_term(&self, term: Term) -> Opstamp {
let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation { opstamp, term };
let delete_operation = DeleteOperation::new(opstamp, term);
self.delete_queue.push(delete_operation);
opstamp
}

/// Delete all documents matching a given query.
/// Returns an `Err` if the query can't be executed.
///
/// Delete operation only affects documents that
/// were added in previous commits, and documents
/// that were added previously in the same commit.
///
/// Like adds, the deletion itself will be visible
/// only after calling `commit()`.
#[doc(hidden)]
pub fn delete_query(&self, query: Box<dyn Query>) -> crate::Result<Opstamp> {
let weight = query.weight(&self.index_reader.searcher(), false)?;

let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation::new_for_query(opstamp, weight);
self.delete_queue.push(delete_operation);
Ok(opstamp)
}

/// Returns the opstamp of the last successful commit.
///
/// This is, for instance, the opstamp the index will
Expand Down Expand Up @@ -741,7 +776,7 @@ impl IndexWriter {
for (user_op, opstamp) in user_operations_it.zip(stamps) {
match user_op {
UserOperation::Delete(term) => {
let delete_operation = DeleteOperation { opstamp, term };
let delete_operation = DeleteOperation::new(opstamp, term);
self.delete_queue.push(delete_operation);
}
UserOperation::Add(document) => {
Expand Down Expand Up @@ -786,7 +821,7 @@ mod tests {
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::NoMergePolicy;
use crate::query::{QueryParser, TermQuery};
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
use crate::schema::{
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
Expand Down Expand Up @@ -1418,17 +1453,80 @@ mod tests {
Ok(())
}

#[test]
fn test_delete_query_with_sort_by_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let id_field =
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
let schema = schema_builder.build();

let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Desc,
}),
..Default::default()
};

let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let index_reader = index.reader()?;
let mut index_writer = index.writer_for_tests()?;

// create and delete docs in same commit
for id in 0u64..5u64 {
index_writer.add_document(doc!(id_field => id))?;
}
for id in 1u64..4u64 {
let term = Term::from_field_u64(id_field, id);
let not_term = Term::from_field_u64(id_field, 2);
let term = Box::new(TermQuery::new(term, Default::default()));
let not_term = Box::new(TermQuery::new(not_term, Default::default()));

let query: BooleanQuery = vec![
(Occur::Must, term as Box<dyn Query>),
(Occur::MustNot, not_term as Box<dyn Query>),
]
.into();

index_writer.delete_query(Box::new(query))?;
}
for id in 5u64..10u64 {
index_writer.add_document(doc!(id_field => id))?;
}
index_writer.commit()?;
index_reader.reload()?;

let searcher = index_reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);

let segment_reader = searcher.segment_reader(0);
assert_eq!(segment_reader.num_docs(), 8);
assert_eq!(segment_reader.max_doc(), 10);
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
let in_order_alive_ids: Vec<u64> = segment_reader
.doc_ids_alive()
.map(|doc| fast_field_reader.get_val(doc as u64))
.collect();
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]);
Ok(())
}

#[derive(Debug, Clone, Copy)]
enum IndexingOp {
AddDoc { id: u64 },
DeleteDoc { id: u64 },
DeleteDocQuery { id: u64 },
Commit,
Merge,
}

fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
(0u64..1u64).prop_map(|_| IndexingOp::Merge),
Expand All @@ -1437,7 +1535,8 @@ mod tests {

fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![
10 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
Expand All @@ -1457,6 +1556,10 @@ mod tests {
existing_ids.remove(&id);
deleted_ids.insert(id);
}
IndexingOp::DeleteDocQuery { id } => {
existing_ids.remove(&id);
deleted_ids.insert(id);
}
_ => {}
}
}
Expand Down Expand Up @@ -1539,6 +1642,11 @@ mod tests {
IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id));
}
IndexingOp::DeleteDocQuery { id } => {
let term = Term::from_field_u64(id_field, id);
let query = TermQuery::new(term, Default::default());
index_writer.delete_query(Box::new(query))?;
}
IndexingOp::Commit => {
index_writer.commit()?;
}
Expand Down
28 changes: 22 additions & 6 deletions src/indexer/operation.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
use itertools::Either;

use crate::query::Weight;
use crate::schema::{Document, Term};
use crate::Opstamp;

/// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)]
//#[derive(Clone, Debug)]
pub struct DeleteOperation {
pub opstamp: Opstamp,
pub term: Term,
pub term: Either<Term, Box<dyn Weight>>,
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for DeleteOperation {
fn default() -> Self {
impl DeleteOperation {
pub fn new(opstamp: Opstamp, term: Term) -> Self {
DeleteOperation {
opstamp,
term: Either::Left(term),
}
}

pub fn new_for_query(opstamp: Opstamp, weight: Box<dyn Weight>) -> Self {
DeleteOperation {
opstamp: 0u64,
term: Term::new(),
opstamp,
term: Either::Right(weight),
}
}
}

impl Default for DeleteOperation {
fn default() -> Self {
DeleteOperation::new(0, Term::new())
}
}

/// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)]
pub struct AddOperation {
Expand Down