Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement BloomFilter skipping index building logic #242

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ A Flint index is ...
- Partition: skip data scan by maintaining and filtering partitioned column value per file.
- MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file.
- ValueSet: skip data scan by building a unique value set of the indexed column per file.
- BloomFilter: skip data scan by building a bloom filter of the indexed column per file.
- Covering Index: create index for selected columns within the source dataset to improve query performance
- Materialized View: enhance query performance by storing precomputed and aggregated data from the source dataset

Expand All @@ -23,7 +24,8 @@ Please see the following example in which Index Building Logic and Query Rewrite
|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Partition | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;year PARTITION,<br>&nbsp;&nbsp;month PARTITION,<br>&nbsp;&nbsp;day PARTITION,<br>&nbsp;&nbsp;hour PARTITION<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;FIRST(year) AS year,<br>&nbsp;&nbsp;FIRST(month) AS month,<br>&nbsp;&nbsp;FIRST(day) AS day,<br>&nbsp;&nbsp;FIRST(hour) AS hour,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE year = 2023 AND month = 4<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE year = 2023 AND month = 4<br>)<br>WHERE year = 2023 AND month = 4 |
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;elb_status_code VALUE_SET<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100 |
| BloomFilter | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;client_ip BLOOM_FILTER<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;BLOOM_FILTER_AGG(client_ip) AS client_ip,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE client_ip = '127.0.0.1'<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE BLOOM_FILTER_MIGHT_CONTAIN(client_ip, '127.0.0.1') = true<br>)<br>WHERE client_ip = '127.0.0.1' |

### Flint Index Specification

Expand Down Expand Up @@ -54,6 +56,7 @@ For now, Flint Index doesn't define its own data type and uses OpenSearch field
| **FlintDataType** |
|-------------------|
| boolean |
| binary |
| long |
| integer |
| short |
Expand Down Expand Up @@ -447,6 +450,7 @@ flint.skippingIndex()
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
.addBloomFilter("client_ip")
.create()

flint.refreshIndex("flint_spark_catalog_default_alb_logs_skipping_index")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.field.bloomfilter;

import java.io.IOException;
import java.io.OutputStream;

/**
* Bloom filter interface inspired by [[org.apache.spark.util.sketch.BloomFilter]] but adapts to
* Flint index use and remove unnecessary API.
*/
public interface BloomFilter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, Aren't there any opensource library implementations we can leverage.
is this because we need custom serialization to write to opensearch?

Copy link
Collaborator Author

@dai-chen dai-chen Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed as the Javadoc, we're porting Spark's built-in BloomFilter, BitArray, Murmur3_x86_32 to our flint-core library. We only maintain the minimal API as needed and implement them within flint-core, considering the future possibility below:

  1. Integration with other query engine: We can implement bloom filter index in other query engine with the flint-core library
  2. User creates Flint index using our library in ingestion pipeline: We can add BloomFilter field type and user can generate Flint index at ingestion time


/**
* Bloom filter binary format version.
*/
enum Version {
V1(1);

private final int versionNumber;

Version(int versionNumber) {
this.versionNumber = versionNumber;
}

public int getVersionNumber() {
return versionNumber;
}
}

/**
* @return the number of bits in the underlying bit array.
*/
long bitSize();

/**
* Put an item into this bloom filter.
*
* @param item Long value item to insert
* @return true if bits changed which means the item must be first time added to the bloom filter.
* Otherwise, it maybe the first time or not.
*/
boolean put(long item);

/**
* Merge this bloom filter with another bloom filter.
*
* @param bloomFilter bloom filter to merge
* @return bloom filter after merged
*/
BloomFilter merge(BloomFilter bloomFilter);

/**
* @param item Long value item to check
* @return true if the item may exist in this bloom filter. Otherwise, it is definitely not exist.
*/
boolean mightContain(long item);

/**
* Serialize this bloom filter and write it to an output stream.
*
* @param out output stream to write
*/
void writeTo(OutputStream out) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

/*
* This file contains code from the Apache Spark project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.flint.core.field.bloomfilter.classic;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;

/**
* Bit array.
*/
class BitArray {
private final long[] data;
private long bitCount;

BitArray(long numBits) {
this(new long[numWords(numBits)]);
}

BitArray(long[] data) {
this.data = data;
long bitCount = 0;
for (long word : data) {
bitCount += Long.bitCount(word);
}
this.bitCount = bitCount;
}

/**
* @return array length in bits
*/
long bitSize() {
return (long) data.length * Long.SIZE;
}

/**
* @param index bit index
* @return whether bits at the given index is set
*/
boolean get(long index) {
return (data[(int) (index >>> 6)] & (1L << index)) != 0;
}

/**
* Set bits at the given index.
*
* @param index bit index
* @return bit changed or not
*/
boolean set(long index) {
if (!get(index)) {
data[(int) (index >>> 6)] |= (1L << index);
bitCount++;
return true;
}
return false;
}

/**
* Put another array in this bit array.
*
* @param array other bit array
*/
void putAll(BitArray array) {
assert data.length == array.data.length : "BitArrays must be of equal length when merging";
long bitCount = 0;
for (int i = 0; i < data.length; i++) {
data[i] |= array.data[i];
bitCount += Long.bitCount(data[i]);
}
this.bitCount = bitCount;
}

/**
* Serialize and write out this bit array to the given output stream.
*
* @param out output stream
*/
void writeTo(DataOutputStream out) throws IOException {
out.writeInt(data.length);
for (long datum : data) {
out.writeLong(datum);
}
}

/**
* Deserialize and read bit array from the given input stream.
*
* @param in input stream
* @return bit array
*/
static BitArray readFrom(DataInputStream in) throws IOException {
int numWords = in.readInt();
long[] data = new long[numWords];
for (int i = 0; i < numWords; i++) {
data[i] = in.readLong();
}
return new BitArray(data);
}

private static int numWords(long numBits) {
if (numBits <= 0) {
throw new IllegalArgumentException("numBits must be positive, but got " + numBits);
}
long numWords = (long) Math.ceil(numBits / 64.0);
if (numWords > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Can't allocate enough space for " + numBits + " bits");
}
return (int) numWords;
}

@Override
public boolean equals(Object other) {
if (this == other) return true;
if (!(other instanceof BitArray)) return false;
BitArray that = (BitArray) other;
return Arrays.equals(data, that.data);
}

@Override
public int hashCode() {
return Arrays.hashCode(data);
}
}
Loading
Loading