From f3ef1965ad83ee11ed7c6801c867ca668bccc8ad Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 13 Jun 2023 09:59:26 -0700 Subject: [PATCH] Fix output schema issue and IT Signed-off-by: Chen Dai --- .../spark/skipping/FlintSparkSkippingIndex.scala | 14 +++++++------- .../flint/spark/FlintSparkSkippingIndexSuite.scala | 5 ++++- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index fe66df7cc7..b1dc4ce550 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -71,13 +71,13 @@ class FlintSparkSkippingIndex( } private def getSchema: String = { - val indexFieldTypes = indexedColumns.map { indexCol => - val columnName = indexCol.columnName - // Data type INT from catalog is not recognized by Spark DataType.fromJson() - val columnType = if (indexCol.columnType == "int") "integer" else indexCol.columnType - val sparkType = DataType.fromJson("\"" + columnType + "\"") - StructField(columnName, sparkType, nullable = false) - } + val indexFieldTypes = + indexedColumns.flatMap(_.outputSchema()).map { case (colName, colType) => + // Data type INT from catalog is not recognized by Spark DataType.fromJson() + val columnType = if (colType == "int") "integer" else colType + val sparkType = DataType.fromJson("\"" + columnType + "\"") + StructField(colName, sparkType, nullable = false) + } val allFieldTypes = indexFieldTypes :+ StructField(FILE_PATH_COLUMN, StringType, nullable = false) diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala index 9b274c4f39..f8d6b67d8b 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala @@ -139,7 +139,10 @@ class FlintSparkSkippingIndexSuite | "address": { | "type": "keyword" | }, - | "age": { + | "MinMax_age_0": { + | "type": "integer" + | }, + | "MinMax_age_1" : { | "type": "integer" | }, | "file_path": {