diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 5fe10d8fc684f..90420369513ae 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -837,7 +837,8 @@ public int length() { } /** - * Wraps a directory reader to include all live docs. + * Wraps a directory reader to make all documents live except those were rolled back + * or hard-deleted due to non-aborting exceptions during indexing. * The wrapped reader can be used to query all documents. * * @param in the input directory reader @@ -848,17 +849,21 @@ public static DirectoryReader wrapAllDocsLive(DirectoryReader in) throws IOExcep } private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader { - static final class SubReaderWithAllLiveDocs extends FilterLeafReader { - SubReaderWithAllLiveDocs(LeafReader in) { + static final class LeafReaderWithLiveDocs extends FilterLeafReader { + final Bits liveDocs; + final int numDocs; + LeafReaderWithLiveDocs(LeafReader in, Bits liveDocs, int numDocs) { super(in); + this.liveDocs = liveDocs; + this.numDocs = numDocs; } @Override public Bits getLiveDocs() { - return null; + return liveDocs; } @Override public int numDocs() { - return maxDoc(); + return numDocs; } @Override public CacheHelper getCoreCacheHelper() { @@ -869,14 +874,28 @@ public CacheHelper getReaderCacheHelper() { return null; // Modifying liveDocs } } + DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException { - super(in, new FilterDirectoryReader.SubReaderWrapper() { + super(in, new SubReaderWrapper() { @Override public LeafReader wrap(LeafReader leaf) { - return new SubReaderWithAllLiveDocs(leaf); + SegmentReader segmentReader = segmentReader(leaf); + Bits hardLiveDocs = segmentReader.getHardLiveDocs(); + if (hardLiveDocs == null) { + return new LeafReaderWithLiveDocs(leaf, null, leaf.maxDoc()); + } + // TODO: Avoid recalculate numDocs everytime. + int numDocs = 0; + for (int i = 0; i < hardLiveDocs.length(); i++) { + if (hardLiveDocs.get(i)) { + numDocs++; + } + } + return new LeafReaderWithLiveDocs(segmentReader, hardLiveDocs, numDocs); } }); } + @Override protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { return wrapAllDocsLive(in); diff --git a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java index 753aedea01e02..5d1e10956d5a2 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/LuceneTests.java @@ -33,18 +33,23 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.io.StringReader; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -53,6 +58,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.equalTo; + public class LuceneTests extends ESTestCase { public void testWaitForIndex() throws Exception { final MockDirectoryWrapper dir = newMockDirectory(); @@ -406,4 +413,88 @@ public void testMMapHackSupported() throws Exception { // add assume's here if needed for certain platforms, but we should know if it does not work. assertTrue("MMapDirectory does not support unmapping: " + MMapDirectory.UNMAP_NOT_SUPPORTED_REASON, MMapDirectory.UNMAP_SUPPORTED); } + + public void testWrapAllDocsLive() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) + .setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, MatchAllDocsQuery::new, newMergePolicy())); + IndexWriter writer = new IndexWriter(dir, config); + int numDocs = between(1, 10); + Set liveDocs = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + Document doc = new Document(); + doc.add(new StringField("id", id, Store.YES)); + writer.addDocument(doc); + liveDocs.add(id); + } + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + String id = Integer.toString(i); + Document doc = new Document(); + doc.add(new StringField("id", "v2-" + id, Store.YES)); + if (randomBoolean()) { + doc.add(Lucene.newSoftDeleteField()); + } + writer.softUpdateDocument(new Term("id", id), doc, Lucene.newSoftDeleteField()); + liveDocs.add("v2-" + id); + } + } + try (DirectoryReader unwrapped = DirectoryReader.open(writer)) { + DirectoryReader reader = Lucene.wrapAllDocsLive(unwrapped); + assertThat(reader.numDocs(), equalTo(liveDocs.size())); + IndexSearcher searcher = new IndexSearcher(reader); + Set actualDocs = new HashSet<>(); + TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + actualDocs.add(reader.document(scoreDoc.doc).get("id")); + } + assertThat(actualDocs, equalTo(liveDocs)); + } + IOUtils.close(writer, dir); + } + + public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) + .setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, MatchAllDocsQuery::new, newMergePolicy())); + IndexWriter writer = new IndexWriter(dir, config); + int numDocs = between(1, 10); + List liveDocs = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + Document doc = new Document(); + doc.add(new StringField("id", id, Store.YES)); + if (randomBoolean()) { + doc.add(Lucene.newSoftDeleteField()); + } + writer.addDocument(doc); + liveDocs.add(id); + } + int abortedDocs = between(1, 10); + for (int i = 0; i < abortedDocs; i++) { + try { + Document doc = new Document(); + doc.add(new StringField("id", "aborted-" + i, Store.YES)); + StringReader reader = new StringReader(""); + doc.add(new TextField("other", reader)); + reader.close(); // mark the indexing hit non-aborting error + writer.addDocument(doc); + fail("index should have failed"); + } catch (Exception ignored) { } + } + try (DirectoryReader unwrapped = DirectoryReader.open(writer)) { + DirectoryReader reader = Lucene.wrapAllDocsLive(unwrapped); + assertThat(reader.maxDoc(), equalTo(numDocs + abortedDocs)); + assertThat(reader.numDocs(), equalTo(liveDocs.size())); + IndexSearcher searcher = new IndexSearcher(reader); + List actualDocs = new ArrayList<>(); + TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + actualDocs.add(reader.document(scoreDoc.doc).get("id")); + } + assertThat(actualDocs, equalTo(liveDocs)); + } + IOUtils.close(writer, dir); + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 229a218fa0da9..c7c74377667e7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -65,6 +65,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -3087,30 +3088,4 @@ public void testSupplyTombstoneDoc() throws Exception { closeShards(shard); } - - public void testSearcherIncludesSoftDeletes() throws Exception { - Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .build(); - IndexMetaData metaData = IndexMetaData.builder("test") - .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") - .settings(settings) - .primaryTerm(0, 1).build(); - IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoverShardFromStore(shard); - indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); - indexDoc(shard, "test", "1", "{\"foo\" : \"baz\"}"); - deleteDoc(shard, "test", "0"); - shard.refresh("test"); - try (Engine.Searcher searcher = shard.acquireSearcher("test")) { - IndexSearcher searchWithSoftDeletes = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); - assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(0L)); - assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(1L)); - assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L)); - assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L)); - } - closeShards(shard); - } }