Skip to content

Commit

Permalink
Add xor filter
Browse files Browse the repository at this point in the history
  • Loading branch information
mgodwan committed Oct 11, 2023
1 parent baf425d commit ded4d14
Show file tree
Hide file tree
Showing 9 changed files with 495 additions and 50 deletions.
2 changes: 0 additions & 2 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ dependencies {
api "org.apache.lucene:lucene-spatial-extras:${versions.lucene}"
api "org.apache.lucene:lucene-spatial3d:${versions.lucene}"
api "org.apache.lucene:lucene-suggest:${versions.lucene}"
api 'me.k11i:xor-filter:0.1.1'
api 'io.github.fastfilter:fastfilter_java:1.0.2'

// utilities
api project(":libs:opensearch-cli")
Expand Down
37 changes: 28 additions & 9 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ public final class IndexSettings {
);

public static final Setting<String> DOC_ID_FUZZY_SET_TYPE_SETTING = Setting.simpleString(
"index.doc_id.fuzzy_set_type",
"index.doc_id_fuzzy_set.type",
FuzzySet.SetType.BLOOM_FILTER_V1.getAliases().get(0),
Property.IndexScope,
Property.Dynamic
Expand Down Expand Up @@ -673,9 +673,11 @@ public final class IndexSettings {

private volatile String defaultSearchPipeline;

private volatile boolean useBloomFilterForDocIds;
private volatile boolean useFuzzySetForDocIds;

private volatile double bloomFilterForDocIdFalsePositiveProbability;
private volatile double fuzzySetForDocIdFalsePositiveProbability;

private volatile String fuzzySetType;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -873,8 +875,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));
defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE);

useBloomFilterForDocIds = scopedSettings.get(DOC_ID_FUZZY_SET_ENABLED_SETTING);
bloomFilterForDocIdFalsePositiveProbability = scopedSettings.get(DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING);
useFuzzySetForDocIds = scopedSettings.get(DOC_ID_FUZZY_SET_ENABLED_SETTING);
fuzzySetForDocIdFalsePositiveProbability = scopedSettings.get(DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING);
fuzzySetType = scopedSettings.get(DOC_ID_FUZZY_SET_TYPE_SETTING);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(
Expand Down Expand Up @@ -957,6 +960,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this::setUseFuzzySetForDocIdSetting);
scopedSettings.addSettingsUpdateConsumer(DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING,
this::setFuzzySetForDocIdFalsePositiveProbability);
scopedSettings.addSettingsUpdateConsumer(DOC_ID_FUZZY_SET_TYPE_SETTING,
this::setFuzzySetType);
}

private void setSearchSegmentOrderReversed(boolean reversed) {
Expand Down Expand Up @@ -1661,25 +1666,39 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) {
}

public boolean useFuzzySetForDocIds() {
return useBloomFilterForDocIds;
return useFuzzySetForDocIds;
}

public void setUseFuzzySetForDocIdSetting(boolean useBloomFilterForDocIds) {
if (FeatureFlags.isEnabled(FeatureFlags.BLOOM_FILTER_FOR_DOC_IDS)) {
this.useBloomFilterForDocIds = useBloomFilterForDocIds;
this.useFuzzySetForDocIds = useBloomFilterForDocIds;
} else {
throw new IllegalArgumentException("Setting cannot be updated as feature flag: " + FeatureFlags.BLOOM_FILTER_FOR_DOC_IDS +
" is not enabled.");
}
}

public double getFuzzySetForDocIdFalsePositiveProbability() {
return bloomFilterForDocIdFalsePositiveProbability;
return fuzzySetForDocIdFalsePositiveProbability;
}

public void setFuzzySetForDocIdFalsePositiveProbability(double bloomFilterForDocIdFalsePositiveProbability) {
if (FeatureFlags.isEnabled(FeatureFlags.BLOOM_FILTER_FOR_DOC_IDS)) {
this.bloomFilterForDocIdFalsePositiveProbability = bloomFilterForDocIdFalsePositiveProbability;
this.fuzzySetForDocIdFalsePositiveProbability = bloomFilterForDocIdFalsePositiveProbability;
} else {
throw new IllegalArgumentException("Setting cannot be updated as feature flag: " + FeatureFlags.BLOOM_FILTER_FOR_DOC_IDS +
" is not enabled.");
}
}

public String getFuzzySetType() {
return fuzzySetType;
}

public void setFuzzySetType(String fuzzySetType) {
if (FeatureFlags.isEnabled(FeatureFlags.BLOOM_FILTER_FOR_DOC_IDS)) {
FuzzySet.SetType.fromAlias(fuzzySetType);
this.fuzzySetType = fuzzySetType;
} else {
throw new IllegalArgumentException("Setting cannot be updated as feature flag: " + FeatureFlags.BLOOM_FILTER_FOR_DOC_IDS +
" is not enabled.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private PostingsFormat getFuzzyFilterPostingsFormat(String field) {
if (fuzzyFilterPostingsFormat == null) {
fuzzyFilterPostingsFormat = new FuzzyFilterPostingsFormat(super.getPostingsFormatForField(field), new FuzzySetFactory(Map.of(
IdFieldMapper.NAME, new FuzzySetParameters(mapperService.getIndexSettings().getFuzzySetForDocIdFalsePositiveProbability(),
FuzzySet.SetType.fromAlias(mapperService.getIndexSettings().getFuzzySetForDocIdFalsePositiveProbability())) // This can be replaced with a setting/mapping type
FuzzySet.SetType.fromAlias(mapperService.getIndexSettings().getFuzzySetType())) // This can be replaced with a setting/mapping type
)));
}
return fuzzyFilterPostingsFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.CheckedSupplier;

import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;

/**
Expand Down Expand Up @@ -58,13 +60,7 @@ public class BloomFilter implements FuzzySet {
this.hashCount = hashCount;
}

private BloomFilter(FixedBitSet filter, int setSize, int hashCount) {
this.filter = filter;
this.setSize = setSize;
this.hashCount = hashCount;
}

public BloomFilter(int maxDocs, double maxFpp) {
public BloomFilter(int maxDocs, double maxFpp, CheckedSupplier<Iterator<BytesRef>, IOException> fieldIteratorProvider) throws IOException {
int setSize =
(int)
Math.ceil(
Expand All @@ -76,6 +72,8 @@ public BloomFilter(int maxDocs, double maxFpp) {
this.filter = new FixedBitSet(setSize + 1);
this.setSize = setSize;
this.hashCount = optimalK;

addAll(fieldIteratorProvider);
}

static int getNearestSetSize(int maxNumberOfBits) {
Expand Down Expand Up @@ -152,4 +150,14 @@ private boolean mayContainValue(int aHash) {
int pos = aHash & setSize;
return filter.get(pos);
}

public static void main(String[] args) {
double[] d = new double[]{0.0511, 0.1023, 0.2047};
for (double fpp: d) {
int setSize =
(int)
Math.ceil((120000000 * Math.log(fpp)) / Math.log(1 / Math.pow(2, Math.log(2))));
System.out.println(fpp + " -> " + getNearestSetSize(2 * setSize));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,46 @@

package org.opensearch.index.codec.fuzzy;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.XPackedInts;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.hash.MurmurHash3;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;


public class CuckooFilter implements FuzzySet {

private static final Logger logger = LogManager.getLogger(CuckooFilter.class);

private final org.opensearch.common.util.CuckooFilter delegatingFilter;
private MurmurHash3.Hash128 scratchHash = new MurmurHash3.Hash128();
private static final Random RNG = new Random(0xFFFFF);
private final long seed;

public CuckooFilter(int maxDocs, double fpp) {
this.delegatingFilter = new org.opensearch.common.util.CuckooFilter(maxDocs, fpp, RNG);
public CuckooFilter(int maxDocs, double fpp, CheckedSupplier<Iterator<BytesRef>, IOException> fieldIteratorProvider) throws IOException {
this.seed = System.nanoTime();
this.delegatingFilter = new org.opensearch.common.util.CuckooFilter(maxDocs, fpp, new Random(seed));
addAll(fieldIteratorProvider);
}

CuckooFilter(DataInput in) throws IOException {
this.delegatingFilter = new org.opensearch.common.util.CuckooFilter(StreamWrapper.fromDataInput(in), RNG);
this.seed = in.readLong();
this.delegatingFilter = new org.opensearch.common.util.CuckooFilter(StreamWrapper.fromDataInput(in), new Random(seed));
}

@Override
Expand All @@ -46,7 +64,11 @@ public Result contains(BytesRef value) {
@Override
public void add(BytesRef value) {
MurmurHash3.Hash128 hash = MurmurHash3.hash128(value.bytes, value.offset, value.length, 0, scratchHash);
delegatingFilter.add(hash.h1);
boolean added = delegatingFilter.add(hash.h1);
if(!added) {
logger.error("Failed to insert into the cuckoo filter: " + value);
throw new IllegalStateException("Failed to insert into the cuckoo filter: " + value);
}
}

@Override
Expand All @@ -61,11 +83,56 @@ public Optional<FuzzySet> maybeDownsize() {

@Override
public void writeTo(DataOutput out) throws IOException {
out.writeLong(seed);
delegatingFilter.writeTo(StreamWrapper.wrap(out));
}

@Override
public long ramBytesUsed() {
return delegatingFilter.getSizeInBytes();
}

@Override
public String toString() {
return "CuckooFilter{" +
"delegatingFilter=" + delegatingFilter +
", scratchHash=" + scratchHash +
", seed=" + seed +
'}';
}

public static void main(String[] args) throws IOException {
// CuckooFilter filter = new CuckooFilter(100, 0.2047, () ->List.of(new BytesRef("item1"), new BytesRef("item2"), new BytesRef("item3")).iterator());
// System.out.println(filter.contains(new BytesRef("item1")));
// System.out.println(filter.contains(new BytesRef("item2")));
// System.out.println(filter.contains(new BytesRef("item3")));
// System.out.println(filter.contains(new BytesRef("item4")));
// System.out.println(filter.contains(new BytesRef("item5")));
// byte[] b = new byte[1000000];
// DataOutput output = new ByteArrayDataOutput(b);
// filter.writeTo(output);
//
// for (int i = 0; i < 100; i ++) {
// System.out.print(b[i] + " ");
// }
// System.out.println();
//
// CuckooFilter filter2 = new CuckooFilter(new ByteArrayDataInput(b));
// System.out.println(filter2.contains(new BytesRef("item1")));
// System.out.println(filter2.contains(new BytesRef("item2")));
// System.out.println(filter2.contains(new BytesRef("item3")));
// System.out.println(filter2.contains(new BytesRef("item4")));
// System.out.println(filter2.contains(new BytesRef("item5")));

XPackedInts.Mutable data = XPackedInts.getMutable(323, 32, PackedInts.COMPACT);
byte[] b = new byte[100000];
for (int i = 0; i < 323; i ++) {
data.set(i, i);
}
data.save(new ByteArrayDataOutput(b));
XPackedInts.Mutable data2 = (XPackedInts.Mutable)XPackedInts.getReader(new ByteArrayDataInput(b));
for (int i = 0; i < 323; i ++) {
assert data.get(i) == data2.get(i);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,33 +350,73 @@ public void write(Fields fields, NormsProducer norms) throws IOException {
continue;
}
FieldInfo fieldInfo = state.fieldInfos.fieldInfo(field);
TermsEnum termsEnum = terms.iterator();
// TermsEnum termsEnum = terms.iterator();

FuzzySet fuzzySet = null;

PostingsEnum postingsEnum = null;
while (true) {
BytesRef term = termsEnum.next();
if (term == null) {
break;
}
// while (true) {
// BytesRef term = termsEnum.next();
// if (term == null) {
// break;
// }
// if (fuzzySet == null) {
// fuzzySet = fuzzySetFactory.createFuzzySet(state.segmentInfo.maxDoc(), fieldInfo.name);
// if (fuzzySet == null) {
// break;
// }
// assert fuzzySets.containsKey(fieldInfo) == false;
// fuzzySets.put(fieldInfo, fuzzySet);
// }
// // Make sure there's at least one doc for this term:
// postingsEnum = termsEnum.postings(postingsEnum, 0);
// if (postingsEnum.nextDoc() != PostingsEnum.NO_MORE_DOCS) {
// fuzzySet.add(term);
// }
// }

if (fuzzySet == null) {
fuzzySet = fuzzySetFactory.createFuzzySet(state.segmentInfo.maxDoc(), fieldInfo.name, () -> iterator(terms));
if (fuzzySet == null) {
fuzzySet = fuzzySetFactory.createFuzzySet(state.segmentInfo.maxDoc(), fieldInfo.name);
if (fuzzySet == null) {
break;
}
assert fuzzySets.containsKey(fieldInfo) == false;
fuzzySets.put(fieldInfo, fuzzySet);
}
// Make sure there's at least one doc for this term:
postingsEnum = termsEnum.postings(postingsEnum, 0);
if (postingsEnum.nextDoc() != PostingsEnum.NO_MORE_DOCS) {
fuzzySet.add(term);
break;
}
assert fuzzySets.containsKey(fieldInfo) == false;
fuzzySets.put(fieldInfo, fuzzySet);
}
}
}

private Iterator<BytesRef> iterator(Terms terms) throws IOException {
TermsEnum termIterator = terms.iterator();
return new Iterator<>() {

private BytesRef currentTerm;
private PostingsEnum postingsEnum;
@Override
public boolean hasNext() {
try {
do {
currentTerm = termIterator.next();
if (currentTerm == null) {
return false;
}
postingsEnum = termIterator.postings(postingsEnum, 0);
if (postingsEnum.nextDoc() != PostingsEnum.NO_MORE_DOCS) {
return true;
}
} while (true);
} catch (IOException ex) {
throw new IllegalStateException("Cannot read terms: " + termIterator.attributes());
}
}

@Override
public BytesRef next() {
return currentTerm;
}
};
}

private boolean closed;

@Override
Expand Down
Loading

0 comments on commit ded4d14

Please sign in to comment.