From 8c6d35a288971cdb7f87aaaa335220ce3a99e905 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 2 Feb 2024 13:05:16 -0800 Subject: [PATCH 1/7] Add bloom filter, skipping strategy and aggregator Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexFactory.scala | 8 +- .../skipping/FlintSparkSkippingIndex.scala | 20 ++++ .../skipping/FlintSparkSkippingStrategy.scala | 2 +- .../skipping/bloomfilter/BloomFilter.scala | 63 ++++++++++++ .../skipping/bloomfilter/BloomFilterAgg.scala | 98 +++++++++++++++++++ .../BloomFilterSkippingStrategy.scala | 38 +++++++ .../bloomfilter/ClassicBloomFilter.scala | 65 ++++++++++++ .../FlintSparkSkippingIndexITSuite.scala | 44 +++++++++ 8 files changed, 336 insertions(+), 2 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index 6cd5b3352..7a783a610 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -17,7 +17,8 @@ import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy @@ -59,6 +60,11 @@ object FlintSparkIndexFactory { params = parameters) case MIN_MAX => MinMaxSkippingStrategy(columnName = columnName, columnType = columnType) + case BLOOM_FILTER => + BloomFilterSkippingStrategy( + columnName = columnName, + columnType = columnType, + params = parameters) case other => throw new IllegalStateException(s"Unknown skipping strategy: $other") } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index ae6518bf0..c27a7f7e2 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -12,6 +12,7 @@ import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex._ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy @@ -188,6 +189,25 @@ object FlintSparkSkippingIndex { this } + /** + * Add bloom filter skipping index column. + * + * @param colName + * indexed column name + * @param params + * bloom filter parameters + * @return + * index builder + */ + def addBloomFilter(colName: String, params: Map[String, String] = Map.empty): Builder = { + val col = findColumn(colName) + indexedColumns = indexedColumns :+ BloomFilterSkippingStrategy( + columnName = col.name, + columnType = col.dataType, + params = params) + this + } + override def buildIndex(): FlintSparkIndex = new FlintSparkSkippingIndex(tableName, indexedColumns, indexOptions) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala index 2569f06fa..6c87924e7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala @@ -69,7 +69,7 @@ object FlintSparkSkippingStrategy { type SkippingKind = Value // Use Value[s]Set because ValueSet already exists in Enumeration - val PARTITION, VALUE_SET, MIN_MAX = Value + val PARTITION, VALUE_SET, MIN_MAX, BLOOM_FILTER = Value } /** json4s doesn't serialize Enum by default */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala new file mode 100644 index 000000000..b263e7bcc --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping.bloomfilter + +import java.io.{InputStream, OutputStream} +import java.util.Locale + +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.{Algorithm, CLASSIC} + +trait BloomFilter { + + def algorithm: Algorithm + + def bitSize(): Long + + def put(item: Long): Boolean + + def merge(bloomFilter: BloomFilter): BloomFilter + + def mightContain(item: Long): Boolean + + def writeTo(out: OutputStream): Unit +} + +object BloomFilter { + + object Algorithm extends Enumeration { + type Algorithm = Value + val CLASSIC = Value + } + + val BLOOM_FILTER_ALGORITHM_KEY = "algorithm" + val DEFAULT_BLOOM_FILTER_ALGORITHM = CLASSIC.toString + + class BloomFilterFactory(params: Map[String, String]) extends Serializable { + + private val algorithm: Algorithm = { + val param = params.getOrElse(BLOOM_FILTER_ALGORITHM_KEY, DEFAULT_BLOOM_FILTER_ALGORITHM) + Algorithm.withName(param.toUpperCase(Locale.ROOT)) + } + + def parameters: Map[String, String] = { + algorithm match { + case CLASSIC => ClassicBloomFilter.getParameters(params) // TODO: add algorithm param + } + } + + def create(): BloomFilter = { + algorithm match { + case CLASSIC => new ClassicBloomFilter(parameters) + } + } + + def deserialize(in: InputStream): BloomFilter = { + algorithm match { + case CLASSIC => ClassicBloomFilter.deserialize(in) + } + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala new file mode 100644 index 000000000..046c2f264 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping.bloomfilter + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.BloomFilterFactory + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate} +import org.apache.spark.sql.types.{BinaryType, DataType} + +/** + * Aggregate function that build bloom filter and serialize to binary as result. Copy from Spark + * built-in BloomFilterAggregate because it: 1) it accepts number of bits as argument instead of + * FPP 2) it calls static method BloomFilter.create and thus cannot change to other implementation + * 3) it is a Scala case class that cannot be extend and overridden + * + * @param child + * child expression of + * @param bloomFilter + * @param mutableAggBufferOffset + * @param inputAggBufferOffset + */ +case class BloomFilterAgg( + child: Expression, + bloomFilterFactory: BloomFilterFactory, + override val mutableAggBufferOffset: Int, + override val inputAggBufferOffset: Int) + extends TypedImperativeAggregate[BloomFilter] { + + def this(child: Expression, bloomFilterFactory: BloomFilterFactory) = { + this(child, bloomFilterFactory, 0, 0) + } + + override def nullable: Boolean = true + + override def dataType: DataType = BinaryType + + override def children: Seq[Expression] = Seq(child) + + override def createAggregationBuffer(): BloomFilter = bloomFilterFactory.create() + + override def update(buffer: BloomFilter, inputRow: InternalRow): BloomFilter = { + val value = child.eval(inputRow) + // Ignore null values. + if (value == null) { + return buffer + } + buffer.put(value.asInstanceOf[Long]) + buffer + } + + override def merge(buffer: BloomFilter, input: BloomFilter): BloomFilter = { + buffer.merge(input) + buffer + } + + override def eval(buffer: BloomFilter): Any = { + if (buffer.bitSize() == 0) { + // There's no set bit in the Bloom filter and hence no not-null value is processed. + return null + } + serialize(buffer) + } + + override def serialize(buffer: BloomFilter): Array[Byte] = { + // BloomFilterImpl.writeTo() writes 2 integers (version number and num hash functions), hence + // the +8 + val size = (buffer.bitSize() / 8) + 8 + require(size <= Integer.MAX_VALUE, s"actual number of bits is too large $size") + val out = new ByteArrayOutputStream(size.intValue()) + buffer.writeTo(out) + out.close() + out.toByteArray + } + + override def deserialize(bytes: Array[Byte]): BloomFilter = { + val in = new ByteArrayInputStream(bytes) + val bloomFilter = bloomFilterFactory.deserialize(in) + in.close() + bloomFilter + } + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[Expression]): Expression = + copy(child = newChildren.head) + + override def withNewMutableAggBufferOffset(newOffset: Int): ImperativeAggregate = + copy(mutableAggBufferOffset = newOffset) + + override def withNewInputAggBufferOffset(newOffset: Int): ImperativeAggregate = + copy(inputAggBufferOffset = newOffset) +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala new file mode 100644 index 000000000..8cc72249d --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping.bloomfilter + +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, SkippingKind} +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.BloomFilterFactory + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.functions.{col, xxhash64} + +/** + * Skipping strategy based on approximate data structure bloom filter. + */ +case class BloomFilterSkippingStrategy( + override val kind: SkippingKind = BLOOM_FILTER, + override val columnName: String, + override val columnType: String, + params: Map[String, String] = Map.empty) + extends FlintSparkSkippingStrategy { + + private val bloomFilterFactory = new BloomFilterFactory(params) + + override val parameters: Map[String, String] = bloomFilterFactory.parameters + + override def outputSchema(): Map[String, String] = Map(columnName -> "binary") // TODO: binary? + + override def getAggregators: Seq[Expression] = { + Seq( + new BloomFilterAgg(xxhash64(col(columnName)).expr, bloomFilterFactory) + .toAggregateExpression()) + } + + override def rewritePredicate(predicate: Expression): Option[Expression] = None +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala new file mode 100644 index 000000000..ef9ded0d6 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping.bloomfilter +import java.io.{InputStream, OutputStream} + +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.{Algorithm, CLASSIC} +import org.opensearch.flint.spark.skipping.bloomfilter.ClassicBloomFilter.{CLASSIC_BLOOM_FILTER_FPP_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} + +class ClassicBloomFilter(val delegate: org.apache.spark.util.sketch.BloomFilter) + extends BloomFilter + with Serializable { + + def this(params: Map[String, String]) = { + this( + org.apache.spark.util.sketch.BloomFilter + .create( + params(CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY).toLong, + params(CLASSIC_BLOOM_FILTER_FPP_KEY).toDouble)) + } + + override def algorithm: Algorithm = CLASSIC + + override def bitSize(): Long = delegate.bitSize() + + override def put(item: Long): Boolean = delegate.putLong(item) + + override def merge(bloomFilter: BloomFilter): BloomFilter = { + delegate.mergeInPlace(bloomFilter.asInstanceOf[ClassicBloomFilter].delegate) + this + } + + override def mightContain(item: Long): Boolean = delegate.mightContainLong(item) + + override def writeTo(out: OutputStream): Unit = delegate.writeTo(out) +} + +object ClassicBloomFilter { + + val CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY = "num_items" + val DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS = 10000 + + val CLASSIC_BLOOM_FILTER_FPP_KEY = "fpp" + val DEFAULT_CLASSIC_BLOOM_FILTER_FPP = 0.01 + + def getParameters(params: Map[String, String]): Map[String, String] = { + val map = Map.newBuilder[String, String] + map ++= params + + if (!params.contains(CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY)) { + map += (CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS.toString) + } + if (!params.contains(CLASSIC_BLOOM_FILTER_FPP_KEY)) { + map += (CLASSIC_BLOOM_FILTER_FPP_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_FPP.toString) + } + map.result() + } + + def deserialize(in: InputStream): BloomFilter = { + val delegate = org.apache.spark.util.sketch.BloomFilter.readFrom(in) + new ClassicBloomFilter(delegate) + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e4bea2013..c79d8cfad 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -47,6 +47,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .addValueSet("address") .addMinMax("age") + .addBloomFilter("name") .create() val index = flint.describeIndex(testIndex) @@ -80,6 +81,16 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "parameters": {}, | "columnName": "age", | "columnType": "int" + | }, + | { + | "kind": "BLOOM_FILTER", + | "parameters": { + | "algorithm": "CLASSIC", + | "num_items": "10000", + | "fpp": "0.01" + | }, + | "columnName": "age", + | "columnType": "binary" | }], | "source": "spark_catalog.default.test", | "options": { "auto_refresh": "false" }, @@ -101,6 +112,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "MinMax_age_1" : { | "type": "integer" | }, + | "name" : { + | "type": "binary" + | }, | "file_path": { | "type": "keyword" | } @@ -313,6 +327,36 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { hasIndexFilter(col("MinMax_age_0") <= 30 && col("MinMax_age_1") >= 30)) } + test("can build bloom filter skipping index and rewrite applicable query") { + flint + .skippingIndex() + .onTable(testTable) + .addBloomFilter("age") + .create() + flint.refreshIndex(testIndex) + + // Assert index data + /* + checkAnswer( + flint.queryIndex(testIndex).select("age"), + Seq(Row(20, 30), Row(40, 60))) + */ + + // Assert query rewrite + /* + val query = sql(s""" + | SELECT name + | FROM $testTable + | WHERE age = 30 + |""".stripMargin) + + checkAnswer(query, Row("World")) + query.queryExecution.executedPlan should + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("MinMax_age_0") <= 30 && col("MinMax_age_1") >= 30)) + */ + } + test("should rewrite applicable query with table name without database specified") { flint .skippingIndex() From 4c5bffedee13eceef8eea343b12c8553848d24df Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 2 Feb 2024 14:47:45 -0800 Subject: [PATCH 2/7] Add UT Signed-off-by: Chen Dai --- .../skipping/bloomfilter/BloomFilter.scala | 76 +++++++++++++++++++ .../BloomFilterSkippingStrategy.scala | 1 + .../bloomfilter/ClassicBloomFilter.scala | 32 +++++++- .../BloomFilterSkippingStrategySuite.scala | 30 ++++++++ .../bloomfilter/ClassicBloomFilterSuite.scala | 55 ++++++++++++++ .../FlintSparkSkippingIndexITSuite.scala | 4 +- 6 files changed, 195 insertions(+), 3 deletions(-) create mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategySuite.scala create mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilterSuite.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala index b263e7bcc..6a8df6b0e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala @@ -10,50 +10,126 @@ import java.util.Locale import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.{Algorithm, CLASSIC} +/** + * Bloom filter interface inspired by [[org.apache.spark.util.sketch.BloomFilter]] but adapts to + * Flint skipping index use and remove unnecessary API for now. + */ trait BloomFilter { + /** + * @return + * algorithm kind + */ def algorithm: Algorithm + /** + * @return + * the number of bits in the underlying bit array. + */ def bitSize(): Long + /** + * Put an item into this bloom filter. + * + * @param item + * Long value item to insert + * @return + * true if bits changed which means the item must be first time added to the bloom filter. + * Otherwise, it maybe the first time or not. + */ def put(item: Long): Boolean + /** + * Merge this bloom filter with another bloom filter. + * + * @param bloomFilter + * bloom filter to merge + * @return + * bloom filter after merged + */ def merge(bloomFilter: BloomFilter): BloomFilter + /** + * @param item + * Long value item to check + * @return + * true if the item may exist in this bloom filter. Otherwise, it is definitely not exist. + */ def mightContain(item: Long): Boolean + /** + * Serialize this bloom filter and write it to an output stream. + * + * @param out + * output stream to write + */ def writeTo(out: OutputStream): Unit } object BloomFilter { + /** + * Bloom filter algorithm. + */ object Algorithm extends Enumeration { type Algorithm = Value val CLASSIC = Value } + /** + * Bloom filter algorithm parameter name and default value if not present. + */ val BLOOM_FILTER_ALGORITHM_KEY = "algorithm" val DEFAULT_BLOOM_FILTER_ALGORITHM = CLASSIC.toString + /** + * Bloom filter factory that instantiate concrete bloom filter implementation. + * + * @param params + * bloom filter algorithm parameters + */ class BloomFilterFactory(params: Map[String, String]) extends Serializable { + /** + * Bloom filter algorithm specified in parameters. + */ private val algorithm: Algorithm = { val param = params.getOrElse(BLOOM_FILTER_ALGORITHM_KEY, DEFAULT_BLOOM_FILTER_ALGORITHM) Algorithm.withName(param.toUpperCase(Locale.ROOT)) } + /** + * Get all bloom filter parameters used to store in index metadata. + * + * @return + * all bloom filter algorithm parameters including those not present but has default values. + */ def parameters: Map[String, String] = { algorithm match { case CLASSIC => ClassicBloomFilter.getParameters(params) // TODO: add algorithm param } } + /** + * Create a concrete bloom filter according to the parameters. + * + * @return + * bloom filter instance + */ def create(): BloomFilter = { algorithm match { case CLASSIC => new ClassicBloomFilter(parameters) } } + /** + * Deserialize to create the bloom filter. + * + * @param in + * input stream to read from + * @return + * bloom filter instance + */ def deserialize(in: InputStream): BloomFilter = { algorithm match { case CLASSIC => ClassicBloomFilter.deserialize(in) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala index 8cc72249d..47aea8858 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala @@ -22,6 +22,7 @@ case class BloomFilterSkippingStrategy( params: Map[String, String] = Map.empty) extends FlintSparkSkippingStrategy { + /** Bloom filter factory */ private val bloomFilterFactory = new BloomFilterFactory(params) override val parameters: Map[String, String] = bloomFilterFactory.parameters diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala index ef9ded0d6..00e05ee69 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala @@ -7,9 +7,16 @@ package org.opensearch.flint.spark.skipping.bloomfilter import java.io.{InputStream, OutputStream} import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.{Algorithm, CLASSIC} +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.BLOOM_FILTER_ALGORITHM_KEY import org.opensearch.flint.spark.skipping.bloomfilter.ClassicBloomFilter.{CLASSIC_BLOOM_FILTER_FPP_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} -class ClassicBloomFilter(val delegate: org.apache.spark.util.sketch.BloomFilter) +/** + * Classic bloom filter implementation by reusing Spark built-in bloom filter. + * + * @param delegate + * Spark bloom filter instance + */ +case class ClassicBloomFilter(delegate: org.apache.spark.util.sketch.BloomFilter) extends BloomFilter with Serializable { @@ -39,16 +46,31 @@ class ClassicBloomFilter(val delegate: org.apache.spark.util.sketch.BloomFilter) object ClassicBloomFilter { + /** + * Expected number of unique items key and default value. + */ val CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY = "num_items" val DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS = 10000 + /** + * False positive probability (FPP) key and default value. + */ val CLASSIC_BLOOM_FILTER_FPP_KEY = "fpp" val DEFAULT_CLASSIC_BLOOM_FILTER_FPP = 0.01 + /** + * @param params + * given parameters + * @return + * all parameters including those not present but has default value + */ def getParameters(params: Map[String, String]): Map[String, String] = { val map = Map.newBuilder[String, String] map ++= params + if (!params.contains(BLOOM_FILTER_ALGORITHM_KEY)) { + map += (BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString) + } if (!params.contains(CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY)) { map += (CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS.toString) } @@ -58,6 +80,14 @@ object ClassicBloomFilter { map.result() } + /** + * Deserialize and instantiate a classic bloom filter instance. + * + * @param in + * input stream to read from + * @return + * classic bloom filter instance + */ def deserialize(in: InputStream): BloomFilter = { val delegate = org.apache.spark.util.sketch.BloomFilter.readFrom(in) new ClassicBloomFilter(delegate) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategySuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategySuite.scala new file mode 100644 index 000000000..6c1de90e4 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategySuite.scala @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping.bloomfilter + +import org.opensearch.flint.spark.skipping.{FlintSparkSkippingStrategy, FlintSparkSkippingStrategySuite} +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.CLASSIC +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.BLOOM_FILTER_ALGORITHM_KEY +import org.opensearch.flint.spark.skipping.bloomfilter.ClassicBloomFilter.{CLASSIC_BLOOM_FILTER_FPP_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, DEFAULT_CLASSIC_BLOOM_FILTER_FPP, DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.FlintSuite + +class BloomFilterSkippingStrategySuite + extends FlintSuite + with FlintSparkSkippingStrategySuite + with Matchers { + + /** Subclass initializes strategy class to test */ + override val strategy: FlintSparkSkippingStrategy = + BloomFilterSkippingStrategy(columnName = "name", columnType = "string") + + test("parameters") { + strategy.parameters should contain allOf (BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString, + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS.toString, + CLASSIC_BLOOM_FILTER_FPP_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_FPP.toString) + } +} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilterSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilterSuite.scala new file mode 100644 index 000000000..6c7ea4eee --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilterSuite.scala @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping.bloomfilter + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.{BLOOM_FILTER_ALGORITHM_KEY, BloomFilterFactory} +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.CLASSIC +import org.opensearch.flint.spark.skipping.bloomfilter.ClassicBloomFilter.{CLASSIC_BLOOM_FILTER_FPP_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, DEFAULT_CLASSIC_BLOOM_FILTER_FPP, DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.FlintSuite + +class ClassicBloomFilterSuite extends FlintSuite with Matchers { + + test("parameters should return all parameters including defaults") { + val factory = new BloomFilterFactory(Map(BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString)) + + factory.parameters should contain allOf (BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString, + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS.toString, + CLASSIC_BLOOM_FILTER_FPP_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_FPP.toString) + } + + test("parameters should return all specified parameters") { + val expectedNumItems = 50000 + val fpp = 0.001 + val factory = new BloomFilterFactory( + Map( + BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString, + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> expectedNumItems.toString, + CLASSIC_BLOOM_FILTER_FPP_KEY -> fpp.toString)) + + factory.parameters should contain allOf (BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString, + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> expectedNumItems.toString, + CLASSIC_BLOOM_FILTER_FPP_KEY -> fpp.toString) + } + + test("serialize and deserialize") { + val factory = new BloomFilterFactory(Map(BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString)) + val bloomFilter = factory.create() + bloomFilter.put(1L) + bloomFilter.put(2L) + bloomFilter.put(3L) + + // Serialize and then deserialize should remain the same + val out = new ByteArrayOutputStream() + bloomFilter.writeTo(out) + val in = new ByteArrayInputStream(out.toByteArray) + val newBloomFilter = factory.deserialize(in) + bloomFilter shouldBe newBloomFilter + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index c79d8cfad..3d50aaeb7 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -89,8 +89,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "num_items": "10000", | "fpp": "0.01" | }, - | "columnName": "age", - | "columnType": "binary" + | "columnName": "name", + | "columnType": "string" | }], | "source": "spark_catalog.default.test", | "options": { "auto_refresh": "false" }, From e164a909bdf73d276d33bace56393c500a0467c9 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 5 Feb 2024 10:25:38 -0800 Subject: [PATCH 3/7] Add classic bloom filter in flint core Signed-off-by: Chen Dai --- .../core/field/bloomfilter/BloomFilter.java | 50 +++++++++++ .../field/bloomfilter/classic/BitArray.java | 62 +++++++++++++ .../classic/ClassicBloomFilter.java | 90 +++++++++++++++++++ .../bloomfilter/classic/Murmur3_x86_32.java | 49 ++++++++++ .../classic/ClassicBloomFilterTest.java | 50 +++++++++++ .../bloomfilter/ClassicBloomFilter.scala | 1 + 6 files changed, 302 insertions(+) create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/BloomFilter.java create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/Murmur3_x86_32.java create mode 100644 flint-core/src/test/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilterTest.java diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/BloomFilter.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/BloomFilter.java new file mode 100644 index 000000000..4692f857e --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/BloomFilter.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.field.bloomfilter; + +import java.io.OutputStream; + +/** + * Bloom filter interface inspired by [[org.apache.spark.util.sketch.BloomFilter]] but adapts to + * Flint skipping index use and remove unnecessary API for now. + */ +public interface BloomFilter { + + /** + * @return the number of bits in the underlying bit array. + */ + long bitSize(); + + /** + * Put an item into this bloom filter. + * + * @param item Long value item to insert + * @return true if bits changed which means the item must be first time added to the bloom filter. + * Otherwise, it maybe the first time or not. + */ + boolean put(long item); + + /** + * Merge this bloom filter with another bloom filter. + * + * @param bloomFilter bloom filter to merge + * @return bloom filter after merged + */ + BloomFilter merge(BloomFilter bloomFilter); + + /** + * @param item Long value item to check + * @return true if the item may exist in this bloom filter. Otherwise, it is definitely not exist. + */ + boolean mightContain(long item); + + /** + * Serialize this bloom filter and write it to an output stream. + * + * @param out output stream to write + */ + void writeTo(OutputStream out); +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java new file mode 100644 index 000000000..6f3823d5b --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.field.bloomfilter.classic; + +class BitArray { + private final long[] data; + private long bitCount; + + BitArray(long numBits) { + this(new long[numWords(numBits)]); + } + + BitArray(long[] data) { + this.data = data; + long bitCount = 0; + for (long word : data) { + bitCount += Long.bitCount(word); + } + this.bitCount = bitCount; + } + + long bitSize() { + return (long) data.length * Long.SIZE; + } + + boolean get(long index) { + return (data[(int) (index >>> 6)] & (1L << index)) != 0; + } + + boolean set(long index) { + if (!get(index)) { + data[(int) (index >>> 6)] |= (1L << index); + bitCount++; + return true; + } + return false; + } + + void putAll(BitArray array) { + assert data.length == array.data.length : "BitArrays must be of equal length when merging"; + long bitCount = 0; + for (int i = 0; i < data.length; i++) { + data[i] |= array.data[i]; + bitCount += Long.bitCount(data[i]); + } + this.bitCount = bitCount; + } + + private static int numWords(long numBits) { + if (numBits <= 0) { + throw new IllegalArgumentException("numBits must be positive, but got " + numBits); + } + long numWords = (long) Math.ceil(numBits / 64.0); + if (numWords > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Can't allocate enough space for " + numBits + " bits"); + } + return (int) numWords; + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java new file mode 100644 index 000000000..02fb25d4e --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.field.bloomfilter.classic; + +import java.io.OutputStream; +import org.opensearch.flint.core.field.bloomfilter.BloomFilter; + +/** + * Classic bloom filter implementation. + */ +public class ClassicBloomFilter implements BloomFilter { + + private final BitArray bits; + + private final int numHashFunctions; + + public ClassicBloomFilter(int expectedNumItems, double fpp) { + long numBits = optimalNumOfBits(expectedNumItems, fpp); + this.bits = new BitArray(numBits); + this.numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits); + } + + @Override + public long bitSize() { + return bits.bitSize(); + } + + @Override + public boolean put(long item) { + int h1 = Murmur3_x86_32.hashLong(item, 0); + int h2 = Murmur3_x86_32.hashLong(item, h1); + + long bitSize = bits.bitSize(); + boolean bitsChanged = false; + for (int i = 1; i <= numHashFunctions; i++) { + int combinedHash = h1 + (i * h2); + // Flip all the bits if it's negative (guaranteed positive number) + if (combinedHash < 0) { + combinedHash = ~combinedHash; + } + bitsChanged |= bits.set(combinedHash % bitSize); + } + return bitsChanged; + } + + @Override + public BloomFilter merge(BloomFilter other) { + if (!(other instanceof ClassicBloomFilter)) { + throw new IllegalStateException("Cannot merge incompatible bloom filter of class" + + other.getClass().getName()); + } + this.bits.putAll(((ClassicBloomFilter) other).bits); + return this; + } + + @Override + public boolean mightContain(long item) { + int h1 = Murmur3_x86_32.hashLong(item, 0); + int h2 = Murmur3_x86_32.hashLong(item, h1); + + long bitSize = bits.bitSize(); + for (int i = 1; i <= numHashFunctions; i++) { + int combinedHash = h1 + (i * h2); + // Flip all the bits if it's negative (guaranteed positive number) + if (combinedHash < 0) { + combinedHash = ~combinedHash; + } + if (!bits.get(combinedHash % bitSize)) { + return false; + } + } + return true; + } + + @Override + public void writeTo(OutputStream out) { + } + + private static int optimalNumOfHashFunctions(long n, long m) { + // (m / n) * log(2), but avoid truncation due to division! + return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); + } + + private static long optimalNumOfBits(long n, double p) { + return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/Murmur3_x86_32.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/Murmur3_x86_32.java new file mode 100644 index 000000000..5d0f15dbb --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/Murmur3_x86_32.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.field.bloomfilter.classic; + +class Murmur3_x86_32 { + private static final int C1 = 0xcc9e2d51; + private static final int C2 = 0x1b873593; + + static int hashLong(long input, int seed) { + int low = (int) input; + int high = (int) (input >>> 32); + + int k1 = mixK1(low); + int h1 = mixH1(seed, k1); + + k1 = mixK1(high); + h1 = mixH1(h1, k1); + + return fmix(h1, 8); + } + + private static int mixK1(int k1) { + k1 *= C1; + k1 = Integer.rotateLeft(k1, 15); + k1 *= C2; + return k1; + } + + private static int mixH1(int h1, int k1) { + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + h1 = h1 * 5 + 0xe6546b64; + return h1; + } + + // Finalization mix - force all bits of a hash block to avalanche + private static int fmix(int h1, int length) { + h1 ^= length; + h1 ^= h1 >>> 16; + h1 *= 0x85ebca6b; + h1 ^= h1 >>> 13; + h1 *= 0xc2b2ae35; + h1 ^= h1 >>> 16; + return h1; + } +} diff --git a/flint-core/src/test/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilterTest.java b/flint-core/src/test/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilterTest.java new file mode 100644 index 000000000..520255af4 --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilterTest.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.field.bloomfilter.classic; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.Test; + +public class ClassicBloomFilterTest { + + private final ClassicBloomFilter bloomFilter = new ClassicBloomFilter(100, 0.01); + + private static final double ACCEPTABLE_FALSE_POSITIVE_RATE = 0.2; + + @Test + public void shouldReturnNoFalseNegative() { + bloomFilter.put(123L); + bloomFilter.put(456L); + bloomFilter.put(789L); + + // For items added, expect no false negative + assertTrue(bloomFilter.mightContain(123L)); + assertTrue(bloomFilter.mightContain(456L)); + assertTrue(bloomFilter.mightContain(789L)); + } + + @Test + public void shouldReturnFalsePositiveLessThanConfigured() { + bloomFilter.put(123L); + bloomFilter.put(456L); + bloomFilter.put(789L); + + // For items not added, expect false positives much lower than configure 1% + int numElements = 1000; + int falsePositiveCount = 0; + for (int i = 0; i < numElements; i++) { + long element = 1000L + i; + if (bloomFilter.mightContain(element)) { + falsePositiveCount++; + } + } + + double actualFalsePositiveRate = (double) falsePositiveCount / numElements; + assertTrue(actualFalsePositiveRate <= ACCEPTABLE_FALSE_POSITIVE_RATE, + "Actual false positive rate is higher than expected"); + } +} \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala index 00e05ee69..939984432 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala @@ -4,6 +4,7 @@ */ package org.opensearch.flint.spark.skipping.bloomfilter + import java.io.{InputStream, OutputStream} import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.{Algorithm, CLASSIC} From d6b3d107cc5e0126d2476673a335cd112a131d6f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 5 Feb 2024 13:11:28 -0800 Subject: [PATCH 4/7] Refactor skipping index to use bloom filter in Flint core Signed-off-by: Chen Dai --- .../core/field/bloomfilter/BloomFilter.java | 22 ++- .../field/bloomfilter/classic/BitArray.java | 20 +++ .../classic/ClassicBloomFilter.java | 29 +++- .../skipping/bloomfilter/BloomFilter.scala | 139 ------------------ .../skipping/bloomfilter/BloomFilterAgg.scala | 15 +- .../BloomFilterSkippingStrategy.scala | 47 +++++- .../bloomfilter/ClassicBloomFilter.scala | 96 ------------ .../BloomFilterSkippingStrategySuite.scala | 10 +- .../bloomfilter/ClassicBloomFilterSuite.scala | 55 ------- .../FlintSparkSkippingIndexITSuite.scala | 1 - 10 files changed, 120 insertions(+), 314 deletions(-) delete mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala delete mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala delete mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilterSuite.scala diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/BloomFilter.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/BloomFilter.java index 4692f857e..60aba1d2a 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/BloomFilter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/BloomFilter.java @@ -5,14 +5,32 @@ package org.opensearch.flint.core.field.bloomfilter; +import java.io.IOException; import java.io.OutputStream; /** * Bloom filter interface inspired by [[org.apache.spark.util.sketch.BloomFilter]] but adapts to - * Flint skipping index use and remove unnecessary API for now. + * Flint index use and remove unnecessary API. */ public interface BloomFilter { + /** + * Bloom filter binary format version. + */ + enum Version { + V1(1); + + private final int versionNumber; + + Version(int versionNumber) { + this.versionNumber = versionNumber; + } + + public int getVersionNumber() { + return versionNumber; + } + } + /** * @return the number of bits in the underlying bit array. */ @@ -46,5 +64,5 @@ public interface BloomFilter { * * @param out output stream to write */ - void writeTo(OutputStream out); + void writeTo(OutputStream out) throws IOException; } diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java index 6f3823d5b..38d7e103e 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java @@ -5,6 +5,10 @@ package org.opensearch.flint.core.field.bloomfilter.classic; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + class BitArray { private final long[] data; private long bitCount; @@ -49,6 +53,22 @@ void putAll(BitArray array) { this.bitCount = bitCount; } + void writeTo(DataOutputStream out) throws IOException { + out.writeInt(data.length); + for (long datum : data) { + out.writeLong(datum); + } + } + + static BitArray readFrom(DataInputStream in) throws IOException { + int numWords = in.readInt(); + long[] data = new long[numWords]; + for (int i = 0; i < numWords; i++) { + data[i] = in.readLong(); + } + return new BitArray(data); + } + private static int numWords(long numBits) { if (numBits <= 0) { throw new IllegalArgumentException("numBits must be positive, but got " + numBits); diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java index 02fb25d4e..f6c967f76 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java @@ -5,6 +5,10 @@ package org.opensearch.flint.core.field.bloomfilter.classic; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import org.opensearch.flint.core.field.bloomfilter.BloomFilter; @@ -23,6 +27,11 @@ public ClassicBloomFilter(int expectedNumItems, double fpp) { this.numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits); } + ClassicBloomFilter(BitArray bits, int numHashFunctions) { + this.bits = bits; + this.numHashFunctions = numHashFunctions; + } + @Override public long bitSize() { return bits.bitSize(); @@ -76,7 +85,25 @@ public boolean mightContain(long item) { } @Override - public void writeTo(OutputStream out) { + public void writeTo(OutputStream out) throws IOException { + DataOutputStream dos = new DataOutputStream(out); + + dos.writeInt(Version.V1.getVersionNumber()); + dos.writeInt(numHashFunctions); + bits.writeTo(dos); + } + + public static BloomFilter readFrom(InputStream in) throws IOException { + DataInputStream dis = new DataInputStream(in); + + int version = dis.readInt(); + if (version != Version.V1.getVersionNumber()) { + throw new IOException("Unexpected Bloom filter version number (" + version + ")"); + } + + int numHashFunctions = dis.readInt(); + BitArray bits = BitArray.readFrom(dis); + return new ClassicBloomFilter(bits, numHashFunctions); } private static int optimalNumOfHashFunctions(long n, long m) { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala deleted file mode 100644 index 6a8df6b0e..000000000 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilter.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark.skipping.bloomfilter - -import java.io.{InputStream, OutputStream} -import java.util.Locale - -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.{Algorithm, CLASSIC} - -/** - * Bloom filter interface inspired by [[org.apache.spark.util.sketch.BloomFilter]] but adapts to - * Flint skipping index use and remove unnecessary API for now. - */ -trait BloomFilter { - - /** - * @return - * algorithm kind - */ - def algorithm: Algorithm - - /** - * @return - * the number of bits in the underlying bit array. - */ - def bitSize(): Long - - /** - * Put an item into this bloom filter. - * - * @param item - * Long value item to insert - * @return - * true if bits changed which means the item must be first time added to the bloom filter. - * Otherwise, it maybe the first time or not. - */ - def put(item: Long): Boolean - - /** - * Merge this bloom filter with another bloom filter. - * - * @param bloomFilter - * bloom filter to merge - * @return - * bloom filter after merged - */ - def merge(bloomFilter: BloomFilter): BloomFilter - - /** - * @param item - * Long value item to check - * @return - * true if the item may exist in this bloom filter. Otherwise, it is definitely not exist. - */ - def mightContain(item: Long): Boolean - - /** - * Serialize this bloom filter and write it to an output stream. - * - * @param out - * output stream to write - */ - def writeTo(out: OutputStream): Unit -} - -object BloomFilter { - - /** - * Bloom filter algorithm. - */ - object Algorithm extends Enumeration { - type Algorithm = Value - val CLASSIC = Value - } - - /** - * Bloom filter algorithm parameter name and default value if not present. - */ - val BLOOM_FILTER_ALGORITHM_KEY = "algorithm" - val DEFAULT_BLOOM_FILTER_ALGORITHM = CLASSIC.toString - - /** - * Bloom filter factory that instantiate concrete bloom filter implementation. - * - * @param params - * bloom filter algorithm parameters - */ - class BloomFilterFactory(params: Map[String, String]) extends Serializable { - - /** - * Bloom filter algorithm specified in parameters. - */ - private val algorithm: Algorithm = { - val param = params.getOrElse(BLOOM_FILTER_ALGORITHM_KEY, DEFAULT_BLOOM_FILTER_ALGORITHM) - Algorithm.withName(param.toUpperCase(Locale.ROOT)) - } - - /** - * Get all bloom filter parameters used to store in index metadata. - * - * @return - * all bloom filter algorithm parameters including those not present but has default values. - */ - def parameters: Map[String, String] = { - algorithm match { - case CLASSIC => ClassicBloomFilter.getParameters(params) // TODO: add algorithm param - } - } - - /** - * Create a concrete bloom filter according to the parameters. - * - * @return - * bloom filter instance - */ - def create(): BloomFilter = { - algorithm match { - case CLASSIC => new ClassicBloomFilter(parameters) - } - } - - /** - * Deserialize to create the bloom filter. - * - * @param in - * input stream to read from - * @return - * bloom filter instance - */ - def deserialize(in: InputStream): BloomFilter = { - algorithm match { - case CLASSIC => ClassicBloomFilter.deserialize(in) - } - } - } -} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala index 046c2f264..46151e1af 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala @@ -7,7 +7,8 @@ package org.opensearch.flint.spark.skipping.bloomfilter import java.io.{ByteArrayInputStream, ByteArrayOutputStream} -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.BloomFilterFactory +import org.opensearch.flint.core.field.bloomfilter.BloomFilter +import org.opensearch.flint.core.field.bloomfilter.classic.ClassicBloomFilter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression @@ -28,13 +29,14 @@ import org.apache.spark.sql.types.{BinaryType, DataType} */ case class BloomFilterAgg( child: Expression, - bloomFilterFactory: BloomFilterFactory, + expectedNumItems: Int, + fpp: Double, override val mutableAggBufferOffset: Int, override val inputAggBufferOffset: Int) extends TypedImperativeAggregate[BloomFilter] { - def this(child: Expression, bloomFilterFactory: BloomFilterFactory) = { - this(child, bloomFilterFactory, 0, 0) + def this(child: Expression, expectedNumItems: Int, fpp: Double) = { + this(child, expectedNumItems, fpp, 0, 0) } override def nullable: Boolean = true @@ -43,7 +45,8 @@ case class BloomFilterAgg( override def children: Seq[Expression] = Seq(child) - override def createAggregationBuffer(): BloomFilter = bloomFilterFactory.create() + override def createAggregationBuffer(): BloomFilter = + new ClassicBloomFilter(expectedNumItems, fpp) override def update(buffer: BloomFilter, inputRow: InternalRow): BloomFilter = { val value = child.eval(inputRow) @@ -81,7 +84,7 @@ case class BloomFilterAgg( override def deserialize(bytes: Array[Byte]): BloomFilter = { val in = new ByteArrayInputStream(bytes) - val bloomFilter = bloomFilterFactory.deserialize(in) + val bloomFilter = ClassicBloomFilter.readFrom(in) in.close() bloomFilter } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala index 47aea8858..af90ae1f5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.skipping.bloomfilter import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, SkippingKind} -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.BloomFilterFactory +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy.{CLASSIC_BLOOM_FILTER_FPP_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, DEFAULT_CLASSIC_BLOOM_FILTER_FPP, DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.functions.{col, xxhash64} @@ -22,18 +22,49 @@ case class BloomFilterSkippingStrategy( params: Map[String, String] = Map.empty) extends FlintSparkSkippingStrategy { - /** Bloom filter factory */ - private val bloomFilterFactory = new BloomFilterFactory(params) - - override val parameters: Map[String, String] = bloomFilterFactory.parameters + override val parameters: Map[String, String] = { + Map( + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> expectedNumItems.toString, + CLASSIC_BLOOM_FILTER_FPP_KEY -> fpp.toString) + } - override def outputSchema(): Map[String, String] = Map(columnName -> "binary") // TODO: binary? + override def outputSchema(): Map[String, String] = Map(columnName -> "binary") override def getAggregators: Seq[Expression] = { Seq( - new BloomFilterAgg(xxhash64(col(columnName)).expr, bloomFilterFactory) - .toAggregateExpression()) + new BloomFilterAgg(xxhash64(col(columnName)).expr, expectedNumItems, fpp) + .toAggregateExpression() + ) // TODO: always xxhash64 ? } override def rewritePredicate(predicate: Expression): Option[Expression] = None + + private def expectedNumItems: Int = { + params + .get(CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY) + .map(_.toInt) + .getOrElse(DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS) + } + + private def fpp: Double = { + params + .get(CLASSIC_BLOOM_FILTER_FPP_KEY) + .map(_.toDouble) + .getOrElse(DEFAULT_CLASSIC_BLOOM_FILTER_FPP) + } +} + +object BloomFilterSkippingStrategy { + + /** + * Expected number of unique items key and default value. + */ + val CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY = "num_items" + val DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS = 10000 + + /** + * False positive probability (FPP) key and default value. + */ + val CLASSIC_BLOOM_FILTER_FPP_KEY = "fpp" + val DEFAULT_CLASSIC_BLOOM_FILTER_FPP = 0.03 } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala deleted file mode 100644 index 939984432..000000000 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilter.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark.skipping.bloomfilter - -import java.io.{InputStream, OutputStream} - -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.{Algorithm, CLASSIC} -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.BLOOM_FILTER_ALGORITHM_KEY -import org.opensearch.flint.spark.skipping.bloomfilter.ClassicBloomFilter.{CLASSIC_BLOOM_FILTER_FPP_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY} - -/** - * Classic bloom filter implementation by reusing Spark built-in bloom filter. - * - * @param delegate - * Spark bloom filter instance - */ -case class ClassicBloomFilter(delegate: org.apache.spark.util.sketch.BloomFilter) - extends BloomFilter - with Serializable { - - def this(params: Map[String, String]) = { - this( - org.apache.spark.util.sketch.BloomFilter - .create( - params(CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY).toLong, - params(CLASSIC_BLOOM_FILTER_FPP_KEY).toDouble)) - } - - override def algorithm: Algorithm = CLASSIC - - override def bitSize(): Long = delegate.bitSize() - - override def put(item: Long): Boolean = delegate.putLong(item) - - override def merge(bloomFilter: BloomFilter): BloomFilter = { - delegate.mergeInPlace(bloomFilter.asInstanceOf[ClassicBloomFilter].delegate) - this - } - - override def mightContain(item: Long): Boolean = delegate.mightContainLong(item) - - override def writeTo(out: OutputStream): Unit = delegate.writeTo(out) -} - -object ClassicBloomFilter { - - /** - * Expected number of unique items key and default value. - */ - val CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY = "num_items" - val DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS = 10000 - - /** - * False positive probability (FPP) key and default value. - */ - val CLASSIC_BLOOM_FILTER_FPP_KEY = "fpp" - val DEFAULT_CLASSIC_BLOOM_FILTER_FPP = 0.01 - - /** - * @param params - * given parameters - * @return - * all parameters including those not present but has default value - */ - def getParameters(params: Map[String, String]): Map[String, String] = { - val map = Map.newBuilder[String, String] - map ++= params - - if (!params.contains(BLOOM_FILTER_ALGORITHM_KEY)) { - map += (BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString) - } - if (!params.contains(CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY)) { - map += (CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS.toString) - } - if (!params.contains(CLASSIC_BLOOM_FILTER_FPP_KEY)) { - map += (CLASSIC_BLOOM_FILTER_FPP_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_FPP.toString) - } - map.result() - } - - /** - * Deserialize and instantiate a classic bloom filter instance. - * - * @param in - * input stream to read from - * @return - * classic bloom filter instance - */ - def deserialize(in: InputStream): BloomFilter = { - val delegate = org.apache.spark.util.sketch.BloomFilter.readFrom(in) - new ClassicBloomFilter(delegate) - } -} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategySuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategySuite.scala index 6c1de90e4..c3db6fb1d 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategySuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategySuite.scala @@ -6,9 +6,7 @@ package org.opensearch.flint.spark.skipping.bloomfilter import org.opensearch.flint.spark.skipping.{FlintSparkSkippingStrategy, FlintSparkSkippingStrategySuite} -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.CLASSIC -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.BLOOM_FILTER_ALGORITHM_KEY -import org.opensearch.flint.spark.skipping.bloomfilter.ClassicBloomFilter.{CLASSIC_BLOOM_FILTER_FPP_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, DEFAULT_CLASSIC_BLOOM_FILTER_FPP, DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS} +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterSkippingStrategy.{CLASSIC_BLOOM_FILTER_FPP_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, DEFAULT_CLASSIC_BLOOM_FILTER_FPP, DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS} import org.scalatest.matchers.should.Matchers import org.apache.spark.FlintSuite @@ -23,8 +21,8 @@ class BloomFilterSkippingStrategySuite BloomFilterSkippingStrategy(columnName = "name", columnType = "string") test("parameters") { - strategy.parameters should contain allOf (BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString, - CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS.toString, - CLASSIC_BLOOM_FILTER_FPP_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_FPP.toString) + strategy.parameters shouldBe Map( + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS.toString, + CLASSIC_BLOOM_FILTER_FPP_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_FPP.toString) } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilterSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilterSuite.scala deleted file mode 100644 index 6c7ea4eee..000000000 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/bloomfilter/ClassicBloomFilterSuite.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark.skipping.bloomfilter - -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} - -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.{BLOOM_FILTER_ALGORITHM_KEY, BloomFilterFactory} -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilter.Algorithm.CLASSIC -import org.opensearch.flint.spark.skipping.bloomfilter.ClassicBloomFilter.{CLASSIC_BLOOM_FILTER_FPP_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, DEFAULT_CLASSIC_BLOOM_FILTER_FPP, DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS} -import org.scalatest.matchers.should.Matchers - -import org.apache.spark.FlintSuite - -class ClassicBloomFilterSuite extends FlintSuite with Matchers { - - test("parameters should return all parameters including defaults") { - val factory = new BloomFilterFactory(Map(BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString)) - - factory.parameters should contain allOf (BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString, - CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_NUM_ITEMS.toString, - CLASSIC_BLOOM_FILTER_FPP_KEY -> DEFAULT_CLASSIC_BLOOM_FILTER_FPP.toString) - } - - test("parameters should return all specified parameters") { - val expectedNumItems = 50000 - val fpp = 0.001 - val factory = new BloomFilterFactory( - Map( - BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString, - CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> expectedNumItems.toString, - CLASSIC_BLOOM_FILTER_FPP_KEY -> fpp.toString)) - - factory.parameters should contain allOf (BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString, - CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY -> expectedNumItems.toString, - CLASSIC_BLOOM_FILTER_FPP_KEY -> fpp.toString) - } - - test("serialize and deserialize") { - val factory = new BloomFilterFactory(Map(BLOOM_FILTER_ALGORITHM_KEY -> CLASSIC.toString)) - val bloomFilter = factory.create() - bloomFilter.put(1L) - bloomFilter.put(2L) - bloomFilter.put(3L) - - // Serialize and then deserialize should remain the same - val out = new ByteArrayOutputStream() - bloomFilter.writeTo(out) - val in = new ByteArrayInputStream(out.toByteArray) - val newBloomFilter = factory.deserialize(in) - bloomFilter shouldBe newBloomFilter - } -} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 3d50aaeb7..a94e57bb6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -85,7 +85,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | { | "kind": "BLOOM_FILTER", | "parameters": { - | "algorithm": "CLASSIC", | "num_items": "10000", | "fpp": "0.01" | }, From 9355532ecdaddd2eb8fecd458ae7ea8624c4ae1b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 5 Feb 2024 13:59:58 -0800 Subject: [PATCH 5/7] Update javadoc and user manual Signed-off-by: Chen Dai --- docs/index.md | 3 + .../field/bloomfilter/classic/BitArray.java | 67 +++++++++++++++++++ .../classic/ClassicBloomFilter.java | 47 ++++++++++++- .../bloomfilter/classic/Murmur3_x86_32.java | 31 +++++++++ .../classic/ClassicBloomFilterTest.java | 19 ++++++ .../skipping/bloomfilter/BloomFilterAgg.scala | 23 ++++--- .../BloomFilterSkippingStrategy.scala | 2 +- 7 files changed, 178 insertions(+), 14 deletions(-) diff --git a/docs/index.md b/docs/index.md index 5f9d594de..d8bcaf60a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -14,6 +14,7 @@ A Flint index is ... - Partition: skip data scan by maintaining and filtering partitioned column value per file. - MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file. - ValueSet: skip data scan by building a unique value set of the indexed column per file. + - BloomFilter: skip data scan by building a bloom filter of the indexed column per file. - Covering Index: create index for selected columns within the source dataset to improve query performance - Materialized View: enhance query performance by storing precomputed and aggregated data from the source dataset @@ -54,6 +55,7 @@ For now, Flint Index doesn't define its own data type and uses OpenSearch field | **FlintDataType** | |-------------------| | boolean | +| binary | | long | | integer | | short | @@ -447,6 +449,7 @@ flint.skippingIndex() .addPartitions("year", "month", "day") .addValueSet("elb_status_code") .addMinMax("request_processing_time") + .addBloomFilter("client_ip") .create() flint.refreshIndex("flint_spark_catalog_default_alb_logs_skipping_index") diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java index 38d7e103e..2bf36b360 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/BitArray.java @@ -3,12 +3,37 @@ * SPDX-License-Identifier: Apache-2.0 */ +/* + * This file contains code from the Apache Spark project (original license below). + * It contains modifications, which are licensed as above: + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.opensearch.flint.core.field.bloomfilter.classic; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Arrays; +/** + * Bit array. + */ class BitArray { private final long[] data; private long bitCount; @@ -26,14 +51,27 @@ class BitArray { this.bitCount = bitCount; } + /** + * @return array length in bits + */ long bitSize() { return (long) data.length * Long.SIZE; } + /** + * @param index bit index + * @return whether bits at the given index is set + */ boolean get(long index) { return (data[(int) (index >>> 6)] & (1L << index)) != 0; } + /** + * Set bits at the given index. + * + * @param index bit index + * @return bit changed or not + */ boolean set(long index) { if (!get(index)) { data[(int) (index >>> 6)] |= (1L << index); @@ -43,6 +81,11 @@ boolean set(long index) { return false; } + /** + * Put another array in this bit array. + * + * @param array other bit array + */ void putAll(BitArray array) { assert data.length == array.data.length : "BitArrays must be of equal length when merging"; long bitCount = 0; @@ -53,6 +96,11 @@ void putAll(BitArray array) { this.bitCount = bitCount; } + /** + * Serialize and write out this bit array to the given output stream. + * + * @param out output stream + */ void writeTo(DataOutputStream out) throws IOException { out.writeInt(data.length); for (long datum : data) { @@ -60,6 +108,12 @@ void writeTo(DataOutputStream out) throws IOException { } } + /** + * Deserialize and read bit array from the given input stream. + * + * @param in input stream + * @return bit array + */ static BitArray readFrom(DataInputStream in) throws IOException { int numWords = in.readInt(); long[] data = new long[numWords]; @@ -79,4 +133,17 @@ private static int numWords(long numBits) { } return (int) numWords; } + + @Override + public boolean equals(Object other) { + if (this == other) return true; + if (!(other instanceof BitArray)) return false; + BitArray that = (BitArray) other; + return Arrays.equals(data, that.data); + } + + @Override + public int hashCode() { + return Arrays.hashCode(data); + } } diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java index f6c967f76..56b737e60 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java @@ -3,6 +3,27 @@ * SPDX-License-Identifier: Apache-2.0 */ +/* + * This file contains code from the Apache Spark project (original license below). + * It contains modifications, which are licensed as above: + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.opensearch.flint.core.field.bloomfilter.classic; import java.io.DataInputStream; @@ -13,7 +34,13 @@ import org.opensearch.flint.core.field.bloomfilter.BloomFilter; /** - * Classic bloom filter implementation. + * Classic bloom filter implementation inspired by [[org.apache.spark.util.sketch.BloomFilterImpl]] + * but only keep minimal functionality. Bloom filter is serialized in the following format: + *

+ * 1) Version number, always 1 (32 bit) + * 2) Number of hash functions (32 bit) + * 3) Total number of words of the underlying bit array (32 bit) + * 4) The words/longs (numWords * 64 bit) */ public class ClassicBloomFilter implements BloomFilter { @@ -100,7 +127,6 @@ public static BloomFilter readFrom(InputStream in) throws IOException { if (version != Version.V1.getVersionNumber()) { throw new IOException("Unexpected Bloom filter version number (" + version + ")"); } - int numHashFunctions = dis.readInt(); BitArray bits = BitArray.readFrom(dis); return new ClassicBloomFilter(bits, numHashFunctions); @@ -114,4 +140,21 @@ private static int optimalNumOfHashFunctions(long n, long m) { private static long optimalNumOfBits(long n, double p) { return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (!(other instanceof ClassicBloomFilter)) { + return false; + } + ClassicBloomFilter that = (ClassicBloomFilter) other; + return this.numHashFunctions == that.numHashFunctions && this.bits.equals(that.bits); + } + + @Override + public int hashCode() { + return bits.hashCode() * 31 + numHashFunctions; + } } diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/Murmur3_x86_32.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/Murmur3_x86_32.java index 5d0f15dbb..b76c3bd88 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/Murmur3_x86_32.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/Murmur3_x86_32.java @@ -3,12 +3,43 @@ * SPDX-License-Identifier: Apache-2.0 */ +/* + * This file contains code from the Apache Spark project (original license below). + * It contains modifications, which are licensed as above: + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.opensearch.flint.core.field.bloomfilter.classic; +/** + * 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction. + */ class Murmur3_x86_32 { private static final int C1 = 0xcc9e2d51; private static final int C2 = 0x1b873593; + /** + * Calculate hash for the given input long. + * + * @param input long value + * @param seed seed + * @return hash value + */ static int hashLong(long input, int seed) { int low = (int) input; int high = (int) (input >>> 32); diff --git a/flint-core/src/test/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilterTest.java b/flint-core/src/test/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilterTest.java index 520255af4..39ca8e98d 100644 --- a/flint-core/src/test/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilterTest.java +++ b/flint-core/src/test/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilterTest.java @@ -5,9 +5,15 @@ package org.opensearch.flint.core.field.bloomfilter.classic; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import org.junit.Test; +import org.opensearch.flint.core.field.bloomfilter.BloomFilter; public class ClassicBloomFilterTest { @@ -47,4 +53,17 @@ public void shouldReturnFalsePositiveLessThanConfigured() { assertTrue(actualFalsePositiveRate <= ACCEPTABLE_FALSE_POSITIVE_RATE, "Actual false positive rate is higher than expected"); } + + @Test + public void shouldBeTheSameAfterWriteToAndReadFrom() throws IOException { + bloomFilter.put(123L); + bloomFilter.put(456L); + bloomFilter.put(789L); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + bloomFilter.writeTo(out); + InputStream in = new ByteArrayInputStream(out.toByteArray()); + BloomFilter newBloomFilter = ClassicBloomFilter.readFrom(in); + assertEquals(bloomFilter, newBloomFilter); + } } \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala index 46151e1af..0430476b6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala @@ -16,16 +16,17 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, import org.apache.spark.sql.types.{BinaryType, DataType} /** - * Aggregate function that build bloom filter and serialize to binary as result. Copy from Spark - * built-in BloomFilterAggregate because it: 1) it accepts number of bits as argument instead of - * FPP 2) it calls static method BloomFilter.create and thus cannot change to other implementation - * 3) it is a Scala case class that cannot be extend and overridden + * An aggregate function that builds a bloom filter and serializes it to binary as the result. + * This implementation is a customized version inspired by Spark's built-in BloomFilterAggregate. + * Spark's implementation only accepts number of bits, uses BloomFilterImpl and cannot be extended + * due to Scala case class restriction. * * @param child - * child expression of - * @param bloomFilter - * @param mutableAggBufferOffset - * @param inputAggBufferOffset + * child expression that generate Long values for creating a bloom filter + * @param expectedNumItems + * expected maximum unique number of items + * @param fpp + * false positive probability */ case class BloomFilterAgg( child: Expression, @@ -45,13 +46,13 @@ case class BloomFilterAgg( override def children: Seq[Expression] = Seq(child) - override def createAggregationBuffer(): BloomFilter = + override def createAggregationBuffer(): BloomFilter = { new ClassicBloomFilter(expectedNumItems, fpp) + } override def update(buffer: BloomFilter, inputRow: InternalRow): BloomFilter = { val value = child.eval(inputRow) - // Ignore null values. - if (value == null) { + if (value == null) { // Ignore null values return buffer } buffer.put(value.asInstanceOf[Long]) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala index af90ae1f5..73b03ef0f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterSkippingStrategy.scala @@ -34,7 +34,7 @@ case class BloomFilterSkippingStrategy( Seq( new BloomFilterAgg(xxhash64(col(columnName)).expr, expectedNumItems, fpp) .toAggregateExpression() - ) // TODO: always xxhash64 ? + ) // TODO: use xxhash64() for now } override def rewritePredicate(predicate: Expression): Option[Expression] = None From cb215487a1d6465640cb65ebec3a4ffbbfb26530 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 5 Feb 2024 15:37:07 -0800 Subject: [PATCH 6/7] Fix broken IT Signed-off-by: Chen Dai --- docs/index.md | 3 ++- .../flint/spark/FlintSparkSkippingIndexITSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/index.md b/docs/index.md index d8bcaf60a..8cf60f24a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -24,7 +24,8 @@ Please see the following example in which Index Building Logic and Query Rewrite |----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Partition | CREATE SKIPPING INDEX
ON alb_logs
(
  year PARTITION,
  month PARTITION,
  day PARTITION,
  hour PARTITION
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  FIRST(year) AS year,
  FIRST(month) AS month,
  FIRST(day) AS day,
  FIRST(hour) AS hour,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE year = 2023 AND month = 4
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE year = 2023 AND month = 4
)
WHERE year = 2023 AND month = 4 | | ValueSet | CREATE SKIPPING INDEX
ON alb_logs
(
  elb_status_code VALUE_SET
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  COLLECT_SET(elb_status_code) AS elb_status_code,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE elb_status_code = 404
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE ARRAY_CONTAINS(elb_status_code, 404)
)
WHERE elb_status_code = 404 | -| MinMax | CREATE SKIPPING INDEX
ON alb_logs
(
  request_processing_time MIN_MAX
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  MIN(request_processing_time) AS request_processing_time_min,
  MAX(request_processing_time) AS request_processing_time_max,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE request_processing_time = 100
=>
SELECT *
FROM alb_logs (input_files =
SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE request_processing_time_min <= 100
    AND 100 <= request_processing_time_max
)
WHERE request_processing_time = 100 +| MinMax | CREATE SKIPPING INDEX
ON alb_logs
(
  request_processing_time MIN_MAX
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  MIN(request_processing_time) AS request_processing_time_min,
  MAX(request_processing_time) AS request_processing_time_max,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE request_processing_time = 100
=>
SELECT *
FROM alb_logs (input_files =
SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE request_processing_time_min <= 100
    AND 100 <= request_processing_time_max
)
WHERE request_processing_time = 100 | +| BloomFilter | CREATE SKIPPING INDEX
ON alb_logs
(
  client_ip BLOOM_FILTER
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  BLOOM_FILTER_AGG(client_ip) AS client_ip,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE client_ip = '127.0.0.1'
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE BLOOM_FILTER_MIGHT_CONTAIN(client_ip, '127.0.0.1') = true
)
WHERE client_ip = '127.0.0.1' | ### Flint Index Specification diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index a94e57bb6..cf16e4bc6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -86,7 +86,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "kind": "BLOOM_FILTER", | "parameters": { | "num_items": "10000", - | "fpp": "0.01" + | "fpp": "0.03" | }, | "columnName": "name", | "columnType": "string" From 1bb5484905f9f695798ee96272e2dbe9b441f014 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 6 Feb 2024 10:01:27 -0800 Subject: [PATCH 7/7] Updated javadoc and IT Signed-off-by: Chen Dai --- .../classic/ClassicBloomFilter.java | 12 +++++++++++ .../skipping/bloomfilter/BloomFilterAgg.scala | 13 +++++++----- .../FlintSparkSkippingIndexITSuite.scala | 20 ++----------------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java index 56b737e60..f6444fb09 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/field/bloomfilter/classic/ClassicBloomFilter.java @@ -44,8 +44,14 @@ */ public class ClassicBloomFilter implements BloomFilter { + /** + * Bit array + */ private final BitArray bits; + /** + * Number of hash function + */ private final int numHashFunctions; public ClassicBloomFilter(int expectedNumItems, double fpp) { @@ -120,6 +126,12 @@ public void writeTo(OutputStream out) throws IOException { bits.writeTo(dos); } + /** + * Deserialize and read bloom filter from an input stream. + * + * @param in input stream + * @return bloom filter + */ public static BloomFilter readFrom(InputStream in) throws IOException { DataInputStream dis = new DataInputStream(in); diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala index 0430476b6..b40554335 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterAgg.scala @@ -17,9 +17,12 @@ import org.apache.spark.sql.types.{BinaryType, DataType} /** * An aggregate function that builds a bloom filter and serializes it to binary as the result. - * This implementation is a customized version inspired by Spark's built-in BloomFilterAggregate. - * Spark's implementation only accepts number of bits, uses BloomFilterImpl and cannot be extended - * due to Scala case class restriction. + * This implementation is a customized version inspired by Spark's built-in + * [[org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate]]. + * + * The reason of not reusing Spark's implementation include it only accepts expected number of + * bits, it couples with its own BloomFilterImpl and most importantly it cannot be extended due to + * Scala case class restriction. * * @param child * child expression that generate Long values for creating a bloom filter @@ -73,8 +76,8 @@ case class BloomFilterAgg( } override def serialize(buffer: BloomFilter): Array[Byte] = { - // BloomFilterImpl.writeTo() writes 2 integers (version number and num hash functions), hence - // the +8 + // Preallocate space. BloomFilter.writeTo() writes 2 integers (version number and + // num hash functions) first, hence +8 val size = (buffer.bitSize() / 8) + 8 require(size <= Integer.MAX_VALUE, s"actual number of bits is too large $size") val out = new ByteArrayOutputStream(size.intValue()) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index cf16e4bc6..3612d3101 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -335,25 +335,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { flint.refreshIndex(testIndex) // Assert index data - /* - checkAnswer( - flint.queryIndex(testIndex).select("age"), - Seq(Row(20, 30), Row(40, 60))) - */ + flint.queryIndex(testIndex).collect() should have size 2 - // Assert query rewrite - /* - val query = sql(s""" - | SELECT name - | FROM $testTable - | WHERE age = 30 - |""".stripMargin) - - checkAnswer(query, Row("World")) - query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex( - hasIndexFilter(col("MinMax_age_0") <= 30 && col("MinMax_age_1") >= 30)) - */ + // TODO: Assert query rewrite result } test("should rewrite applicable query with table name without database specified") {