Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement adaptive BloomFilter algorithm #251

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.field.bloomfilter;

import java.io.InputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import org.opensearch.flint.core.field.bloomfilter.adaptive.AdaptiveBloomFilter;
import org.opensearch.flint.core.field.bloomfilter.classic.ClassicBloomFilter;

/**
* Bloom filter factory that builds bloom filter based on algorithm parameters.
*/
public abstract class BloomFilterFactory implements Serializable {

/**
* Bloom filter adaptive key and default value.
*/
public static final String BLOOM_FILTER_ADAPTIVE_KEY = "adaptive";
public static final boolean DEFAULT_BLOOM_FILTER_ADAPTIVE = true;

/**
* Expected number of unique items key and default value.
*/
public static final String CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY = "num_items";
public static final int DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS = 10000;

/**
* False positive probability (FPP) key and default value.
*/
public static final String CLASSIC_BLOOM_FILTER_FPP_KEY = "fpp";
public static final double DEFAULT_CLASSIC_BLOOM_FILTER_FPP = 0.03;

/**
* Number of candidate key and default value.
*/
public static final String ADAPTIVE_NUMBER_CANDIDATE_KEY = "num_candidates";
public static final int DEFAULT_ADAPTIVE_NUMBER_CANDIDATE = 10;

/**
* Bloom filter algorithm parameters.
*/
private final Map<String, String> parameters;

protected BloomFilterFactory(Map<String, String> parameters) {
this.parameters = parameters;
}

/**
* @return all parameters including the default ones.
*/
public abstract Map<String, String> getParameters();

/**
* Create specific BloomFilter instance.
*
* @return BloomFilter instance
*/
public abstract BloomFilter create();

/**
* Create specific BloomFilter instance by deserialization.
*
* @param in input stream
* @return BloomFilter instance
*/
public abstract BloomFilter deserialize(InputStream in);

/**
* Create specific BloomFilter factory given the parameters.
*
* @param parameters BloomFilter parameters
* @return BloomFilter factory instance
*/
public static BloomFilterFactory of(Map<String, String> parameters) {
if (isAdaptiveEnabled(parameters)) {
return createAdaptiveBloomFilterFactory(parameters);
} else {
return createClassicBloomFilterFactory(parameters);
}
}

private static BloomFilterFactory createAdaptiveBloomFilterFactory(Map<String, String> parameters) {
return new BloomFilterFactory(parameters) {
@Override
public Map<String, String> getParameters() {
return Map.of(
BLOOM_FILTER_ADAPTIVE_KEY, "true",
ADAPTIVE_NUMBER_CANDIDATE_KEY, Integer.toString(numCandidates()),
CLASSIC_BLOOM_FILTER_FPP_KEY, Double.toString(fpp()));
}

@Override
public BloomFilter create() {
return new AdaptiveBloomFilter(numCandidates(), fpp());
}

@Override
public BloomFilter deserialize(InputStream in) {
return AdaptiveBloomFilter.readFrom(numCandidates(), in);
}
};
}

private static BloomFilterFactory createClassicBloomFilterFactory(Map<String, String> parameters) {
return new BloomFilterFactory(parameters) {
@Override
public Map<String, String> getParameters() {
return Map.of(
BLOOM_FILTER_ADAPTIVE_KEY, "false",
CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, Integer.toString(expectedNumItems()),
CLASSIC_BLOOM_FILTER_FPP_KEY, Double.toString(fpp()));
}

@Override
public BloomFilter create() {
return new ClassicBloomFilter(expectedNumItems(), fpp());
}

@Override
public BloomFilter deserialize(InputStream in) {
return ClassicBloomFilter.readFrom(in);
}
};
}

private static boolean isAdaptiveEnabled(Map<String, String> params) {
return Optional.ofNullable(params.get(BLOOM_FILTER_ADAPTIVE_KEY))
.map(Boolean::parseBoolean)
.orElse(DEFAULT_BLOOM_FILTER_ADAPTIVE);
}

protected int expectedNumItems() {
return Optional.ofNullable(parameters.get(CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY))
.map(Integer::parseInt)
.orElse(DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS);
}

protected double fpp() {
return Optional.ofNullable(parameters.get(CLASSIC_BLOOM_FILTER_FPP_KEY))
.map(Double::parseDouble)
.orElse(DEFAULT_CLASSIC_BLOOM_FILTER_FPP);
}

protected int numCandidates() {
return Optional.ofNullable(parameters.get(ADAPTIVE_NUMBER_CANDIDATE_KEY))
.map(Integer::parseInt)
.orElse(DEFAULT_ADAPTIVE_NUMBER_CANDIDATE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.field.bloomfilter.adaptive;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Function;
import org.opensearch.flint.core.field.bloomfilter.BloomFilter;
import org.opensearch.flint.core.field.bloomfilter.classic.ClassicBloomFilter;

/**
* Adaptive BloomFilter implementation that generates a series of bloom filter candidate
* with different expected number of item (NDV) and at last choose the best one.
*/
public class AdaptiveBloomFilter implements BloomFilter {

/**
* Initial expected number of items for the first candidate.
*/
public static final int INITIAL_EXPECTED_NUM_ITEMS = 1024;

/**
* Total number of distinct items seen so far.
*/
private int cardinality = 0;

/**
* BloomFilter candidates.
*/
private final BloomFilterCandidate[] candidates;

/**
* Construct adaptive BloomFilter instance with the given algorithm parameters.
*
* @param numCandidates number of candidate
* @param fpp false positive probability
*/
public AdaptiveBloomFilter(int numCandidates, double fpp) {
this.candidates = initializeCandidates(numCandidates,
expectedNumItems -> new ClassicBloomFilter(expectedNumItems, fpp));
}

/**
* Construct adaptive BloomFilter instance from BloomFilter array deserialized from input stream.
*
* @param cardinality total number of distinct items
* @param candidates BloomFilter candidates
*/
AdaptiveBloomFilter(int cardinality, BloomFilter[] candidates) {
this.cardinality = cardinality;
Iterator<BloomFilter> it = Arrays.stream(candidates).iterator();
this.candidates = initializeCandidates(candidates.length, expectedNumItems -> it.next());
}

/**
* Deserialize adaptive BloomFilter instance from input stream.
*
* @param numCandidates number of candidates
* @param in input stream of serialized adaptive BloomFilter instance
* @return adaptive BloomFilter instance
*/
public static BloomFilter readFrom(int numCandidates, InputStream in) {
try {
// Read total distinct counter
int cardinality = new DataInputStream(in).readInt();

// Read BloomFilter candidate array
BloomFilter[] candidates = new BloomFilter[numCandidates];
for (int i = 0; i < numCandidates; i++) {
candidates[i] = ClassicBloomFilter.readFrom(in);
}
return new AdaptiveBloomFilter(cardinality, candidates);
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize adaptive BloomFilter", e);
}
}

/**
* @return best BloomFilter candidate which has expected number of item right above total distinct counter.
*/
public BloomFilterCandidate bestCandidate() {
return candidates[bestCandidateIndex()];
}

@Override
public long bitSize() {
return Arrays.stream(candidates)
.mapToLong(c -> c.bloomFilter.bitSize())
.sum();
}

@Override
public boolean put(long item) {
// Only insert into candidate with larger expectedNumItems for efficiency
boolean bitChanged = false;
for (int i = bestCandidateIndex(); i < candidates.length; i++) {
bitChanged = candidates[i].bloomFilter.put(item);
}

// Use the last candidate's put result which is the most accurate
if (bitChanged) {
cardinality++;
}
return bitChanged;
}

@Override
public BloomFilter merge(BloomFilter other) {
AdaptiveBloomFilter otherBf = (AdaptiveBloomFilter) other;
cardinality += otherBf.cardinality;

for (int i = bestCandidateIndex(); i < candidates.length; i++) {
candidates[i].bloomFilter.merge(otherBf.candidates[i].bloomFilter);
}
return this;
}

@Override
public boolean mightContain(long item) {
// Use the last candidate which is the most accurate
return candidates[candidates.length - 1].bloomFilter.mightContain(item);
}

@Override
public void writeTo(OutputStream out) throws IOException {
// Serialized cardinality counter first
new DataOutputStream(out).writeInt(cardinality);

// Serialize classic BloomFilter array
for (BloomFilterCandidate candidate : candidates) {
candidate.bloomFilter.writeTo(out);
}
}

private BloomFilterCandidate[] initializeCandidates(int numCandidates,
Function<Integer, BloomFilter> initializer) {
BloomFilterCandidate[] candidates = new BloomFilterCandidate[numCandidates];
int ndv = INITIAL_EXPECTED_NUM_ITEMS;

// Initialize candidate with NDV doubled in each iteration
for (int i = 0; i < numCandidates; i++, ndv *= 2) {
candidates[i] = new BloomFilterCandidate(ndv, initializer.apply(ndv));
}
return candidates;
}

private int bestCandidateIndex() {
int index = Arrays.binarySearch(candidates, new BloomFilterCandidate(cardinality, null));
if (index < 0) {
index = -(index + 1);
}

/*
* Now 'index' represents the position where the current cardinality should be inserted,
* indicating the best candidate to choose based on its expected number of distinct values.
* The last one is chosen if cardinality exceeds each candidate's expected number.
*/
return Math.min(index, candidates.length - 1);
}

/**
* BloomFilter candidate that records expected number of items for each candidate.
*/
public static class BloomFilterCandidate implements Comparable<BloomFilterCandidate> {
/**
* Expected number of items associated with this candidate.
*/
private final int expectedNumItems;

/**
* BloomFilter instance.
*/
private final BloomFilter bloomFilter;

BloomFilterCandidate(int expectedNumItems, BloomFilter bloomFilter) {
this.expectedNumItems = expectedNumItems;
this.bloomFilter = bloomFilter;
}

public int getExpectedNumItems() {
return expectedNumItems;
}

public BloomFilter getBloomFilter() {
return bloomFilter;
}

@Override
public int compareTo(BloomFilterCandidate other) {
return Integer.compare(expectedNumItems, other.expectedNumItems);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BloomFilterCandidate that = (BloomFilterCandidate) o;
return expectedNumItems == that.expectedNumItems && Objects.equals(bloomFilter, that.bloomFilter);
}

@Override
public int hashCode() {
return Objects.hash(expectedNumItems, bloomFilter);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AdaptiveBloomFilter that = (AdaptiveBloomFilter) o;
return cardinality == that.cardinality && Arrays.equals(candidates, that.candidates);
}

@Override
public int hashCode() {
int result = Objects.hash(cardinality);
result = 31 * result + Arrays.hashCode(candidates);
return result;
}
}
Loading
Loading