Skip to content

Commit

Permalink
Fix index name with uppercase letter issue (#60)
Browse files Browse the repository at this point in the history
* Lowercase index name in Flint client

Signed-off-by: Chen Dai <daichen@amazon.com>

* Update doc

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add IT for Flint client

Signed-off-by: Chen Dai <daichen@amazon.com>

---------

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen authored Oct 4, 2023
1 parent 71d67a0 commit d3f72e6
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 20 deletions.
40 changes: 36 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ Please see the following example in which Index Building Logic and Query Rewrite

| Skipping Index | Create Index Statement | Index Building Logic | Query Rewrite Logic |
|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Partition | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;year PARTITION,<br>&nbsp;&nbsp;month PARTITION,<br>&nbsp;&nbsp;day PARTITION,<br>&nbsp;&nbsp;hour PARTITION<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;FIRST(year) AS year,<br>&nbsp;&nbsp;FIRST(month) AS month,<br>&nbsp;&nbsp;FIRST(day) AS day,<br>&nbsp;&nbsp;FIRST(hour) AS hour,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE year = 2023 AND month = 4<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE year = 2023 AND month = 4<br>)<br>WHERE year = 2023 AND month = 4 |
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;elb_status_code VALUE_SET<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br>FOR COLUMNS (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100
| Partition | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;year PARTITION,<br>&nbsp;&nbsp;month PARTITION,<br>&nbsp;&nbsp;day PARTITION,<br>&nbsp;&nbsp;hour PARTITION<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;FIRST(year) AS year,<br>&nbsp;&nbsp;FIRST(month) AS month,<br>&nbsp;&nbsp;FIRST(day) AS day,<br>&nbsp;&nbsp;FIRST(hour) AS hour,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE year = 2023 AND month = 4<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE year = 2023 AND month = 4<br>)<br>WHERE year = 2023 AND month = 4 |
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;elb_status_code VALUE_SET<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100

### Flint Index Specification

Expand Down Expand Up @@ -223,7 +223,23 @@ WITH (

### OpenSearch

OpenSearch stores the Flint index in an OpenSearch index of the given name.
OpenSearch index corresponding to the Flint index follows the naming convention below:

1. Skipping index: `flint_[catalog_database_table]_skipping_index`
2. Covering index: `flint_[catalog_database_table]_[index_name]_index`

It's important to note that any uppercase letters in the index name and table name (catalog, database and table) in SQL statement will be automatically converted to lowercase due to restriction imposed by OpenSearch.

Examples:

```sql
-- OpenSearch index name is `flint_spark_catalog_default_alb_logs_skipping_index`
CREATE SKIPPING INDEX ON spark_catalog.default.alb_logs ...

-- OpenSearch index name is `flint_spark_catalog_default_alb_logs_elb_and_requesturi_index`
CREATE INDEX elb_and_requestUri ON spark_catalog.default.alb_logs ...
```

In the index mapping, the `_meta` and `properties`field stores meta and schema info of a Flint index.

```json
Expand Down Expand Up @@ -390,6 +406,22 @@ TODO

## Limitations

### Flint Index Naming

Due to the conversion of uppercase letters to lowercase in OpenSearch index names, it is not permissible to create a Flint index with a table name or index name that differs solely by case.

For instance, only one of the statement per group can be successfully:

```sql
-- myGlue vs myglue
CREATE SKIPPING INDEX ON myGlue.default.alb_logs ...
CREATE SKIPPING INDEX ON myglue.default.alb_logs ...

-- idx_elb vs Idx_elb
CREATE INDEX idx_elb ON alb_logs ...
CREATE INDEX Idx_elb ON alb_logs ...
```

### Query Optimization

For now, only single or conjunct conditions (conditions connected by AND) in WHERE clause can be optimized by skipping index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
Expand All @@ -30,8 +32,6 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.GetMappingsRequest;
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -68,30 +68,33 @@ public FlintOpenSearchClient(FlintOptions options) {
}

@Override public void createIndex(String indexName, FlintMetadata metadata) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(metadata.getContent(), XContentType.JSON);

if (metadata.getIndexSettings() != null) {
request.settings(metadata.getIndexSettings(), XContentType.JSON);
}
client.indices().create(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to create Flint index " + indexName, e);
throw new IllegalStateException("Failed to create Flint index " + osIndexName, e);
}
}

@Override public boolean exists(String indexName) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
return client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + indexName, e);
throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e);
}
}

@Override public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
String osIndexNamePattern = toLowercase(indexNamePattern);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(indexNamePattern);
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
Expand All @@ -100,30 +103,32 @@ public FlintOpenSearchClient(FlintOptions options) {
response.getSettings().get(index).toString()))
.collect(Collectors.toList());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + indexNamePattern, e);
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e);
}
}

@Override public FlintMetadata getIndexMetadata(String indexName) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(indexName);
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);

MappingMetadata mapping = response.getMappings().get(indexName);
Settings settings = response.getSettings().get(indexName);
MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
return new FlintMetadata(mapping.source().string(), settings.toString());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + indexName, e);
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e);
}
}

@Override public void deleteIndex(String indexName) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
DeleteIndexRequest request = new DeleteIndexRequest(osIndexName);

client.indices().delete(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to delete Flint index " + indexName, e);
throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e);
}
}

Expand All @@ -144,7 +149,7 @@ public FlintOpenSearchClient(FlintOptions options) {
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return new OpenSearchScrollReader(createClient(),
indexName,
toLowercase(indexName),
new SearchSourceBuilder().query(queryBuilder),
options);
} catch (IOException e) {
Expand All @@ -153,7 +158,7 @@ public FlintOpenSearchClient(FlintOptions options) {
}

public FlintWriter createWriter(String indexName) {
return new OpenSearchWriter(createClient(), indexName, options.getRefreshPolicy());
return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy());
}

private RestHighLevelClient createClient() {
Expand Down Expand Up @@ -194,4 +199,14 @@ private RestHighLevelClient createClient() {
}
return new RestHighLevelClient(restClientBuilder);
}

/*
* Because OpenSearch requires all lowercase letters in index name, we have to
* lowercase all letters in the given Flint index name.
*/
private String toLowercase(String indexName) {
Objects.requireNonNull(indexName);

return indexName.toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
allMetadata.forEach(metadata => metadata.getIndexSettings should not be empty)
}

it should "convert index name to all lowercase" in {
val indexName = "flint_ELB_logs_index"
flintClient.createIndex(
indexName,
new FlintMetadata("""{"properties": {"test": { "type": "integer" } } }"""))

flintClient.exists(indexName) shouldBe true
flintClient.getIndexMetadata(indexName) should not be null
flintClient.getAllIndexMetadata("flint_ELB_*") should not be empty

// Read write test
val writer = flintClient.createWriter(indexName)
writer.write("""{"create":{}}""")
writer.write("\n")
writer.write("""{"test":1}""")
writer.write("\n")
writer.flush()
writer.close()
flintClient.createReader(indexName, "").hasNext shouldBe true

flintClient.deleteIndex(indexName)
flintClient.exists(indexName) shouldBe false
}

it should "return false if index not exist" in {
flintClient.exists("non-exist-index") shouldBe false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import scala.Option.empty

import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.have
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row

class FlintSparkIndexNameITSuite extends FlintSparkSuite {

/** Test table that has table name and column name with uppercase letter */
private val testTable = "spark_catalog.default.Test"

override def beforeAll(): Unit = {
super.beforeAll()

sql(s"""
| CREATE TABLE $testTable
| (
| Name STRING
| )
| USING CSV
| OPTIONS (
| header 'false',
| delimiter '\t'
| )
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES ('Hello')
| """.stripMargin)
}

test("skipping index with table and column name with uppercase letter") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( Name VALUE_SET)
|""".stripMargin)

checkAnswer(
sql(s"DESC SKIPPING INDEX ON $testTable"),
Seq(Row("Name", "string", "VALUE_SET")))

sql(s"REFRESH SKIPPING INDEX ON $testTable")
val flintIndexName = getSkippingIndexName(testTable)
val indexData = flint.queryIndex(flintIndexName).collect().toSet
indexData should have size 1

sql(s"DROP SKIPPING INDEX ON $testTable")
flint.describeIndex(flintIndexName) shouldBe empty
}

test("covering index with index, table and column name with uppercase letter") {
val testIndex = "Idx_Name"
sql(s"""
| CREATE INDEX $testIndex ON $testTable (Name)
|""".stripMargin)

checkAnswer(sql(s"SHOW INDEX ON $testTable"), Seq(Row(testIndex)))
checkAnswer(
sql(s"DESC INDEX $testIndex ON $testTable"),
Seq(Row("Name", "string", "indexed")))

sql(s"REFRESH INDEX $testIndex ON $testTable")
val flintIndexName = getFlintIndexName(testIndex, testTable)
val indexData = flint.queryIndex(flintIndexName).collect().toSet
indexData should have size 1

sql(s"DROP INDEX $testIndex ON $testTable")
flint.describeIndex(flintIndexName) shouldBe empty
}
}

0 comments on commit d3f72e6

Please sign in to comment.