Skip to content

Commit

Permalink
Merge branch 'feature/maximus-m1' into feature/maximus/statement
Browse files Browse the repository at this point in the history
  • Loading branch information
penghuo committed Oct 28, 2022
2 parents 62cd187 + 63f3449 commit 2422a5b
Show file tree
Hide file tree
Showing 58 changed files with 3,105 additions and 2,663 deletions.
29 changes: 18 additions & 11 deletions .github/workflows/sql-test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,32 @@ on:
jobs:
build:
strategy:
# Run all jobs
fail-fast: false
matrix:
java:
- 11
- 17
runs-on: ubuntu-latest
entry:
- { os: ubuntu-latest, java: 11 }
- { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc}
- { os: macos-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc }
- { os: ubuntu-latest, java: 17 }
- { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc }
- { os: macos-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc }
runs-on: ${{ matrix.entry.os }}

steps:
- uses: actions/checkout@v3

- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: ${{ matrix.java }}
java-version: ${{ matrix.entry.java }}

- name: Build with Gradle
run: ./gradlew --continue build assemble
run: ./gradlew --continue build ${{ matrix.entry.os_build_args }}

- name: Run backward compatibility tests
if: ${{ matrix.entry.os == 'ubuntu-latest' }}
run: ./scripts/bwctest.sh

- name: Create Artifact Path
Expand All @@ -48,7 +55,7 @@ jobs:
# This step uses the codecov-action Github action: https://github.com/codecov/codecov-action
- name: Upload SQL Coverage Report
if: always()
if: ${{ always() && matrix.entry.os == 'ubuntu-latest' }}
uses: codecov/codecov-action@v3
with:
flags: sql-engine
Expand All @@ -57,11 +64,11 @@ jobs:
- name: Upload Artifacts
uses: actions/upload-artifact@v2
with:
name: opensearch-sql
name: opensearch-sql-${{ matrix.entry.os }}
path: opensearch-sql-builds

- name: Upload test reports
if: always()
if: ${{ always() && matrix.entry.os == 'ubuntu-latest' }}
uses: actions/upload-artifact@v2
with:
name: test-reports
Expand Down
14 changes: 11 additions & 3 deletions .github/workflows/sql-workbench-test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ env:

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Enable longer filenames
if: ${{ matrix.os == 'windows-latest' }}
run: git config --system core.longpaths true

- name: Checkout Plugin
uses: actions/checkout@v3

Expand Down Expand Up @@ -51,7 +58,7 @@ jobs:
yarn test:jest --coverage
- name: Upload coverage
if: always()
if: ${{ always() && matrix.os == 'ubuntu-latest' }}
uses: codecov/codecov-action@v3
with:
flags: query-workbench
Expand All @@ -68,5 +75,6 @@ jobs:
if: always()
uses: actions/upload-artifact@v1 # can't update to v3 because upload fails
with:
name: workbench
name: workbench-${{ matrix.os }}
path: ../OpenSearch-Dashboards/plugins/workbench/build

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ gen

/artifacts/
/.pid.lock
/.prom.pid.lock
31 changes: 25 additions & 6 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP;
import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD;
import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
Expand All @@ -25,6 +26,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -60,6 +62,7 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.exception.SemanticCheckException;
Expand Down Expand Up @@ -89,6 +92,7 @@
import org.opensearch.sql.planner.logical.LogicalRename;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.physical.catalog.CatalogTable;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.utils.ParseUtils;

Expand Down Expand Up @@ -129,14 +133,24 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
.stream()
.map(Catalog::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierName catalogSchemaIdentifierName
= new CatalogSchemaIdentifierName(qualifiedName.getParts(), catalogService.getCatalogs());
= new CatalogSchemaIdentifierName(qualifiedName.getParts(), allowedCatalogNames);
String tableName = catalogSchemaIdentifierName.getIdentifierName();
context.push();
TypeEnvironment curEnv = context.peek();
Table table = catalogService
.getStorageEngine(catalogSchemaIdentifierName.getCatalogName())
.getTable(catalogSchemaIdentifierName.getIdentifierName());
Table table;
if (CATALOGS_TABLE_NAME.equals(tableName)) {
table = new CatalogTable(catalogService);
} else {
table = catalogService
.getCatalog(catalogSchemaIdentifierName.getCatalogName())
.getStorageEngine()
.getTable(tableName);
}
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));

// Put index name or its alias in index namespace on type environment so qualifier
Expand All @@ -163,8 +177,12 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
QualifiedName qualifiedName = node.getFunctionName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
.stream()
.map(Catalog::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierName catalogSchemaIdentifierName
= new CatalogSchemaIdentifierName(qualifiedName.getParts(), catalogService.getCatalogs());
= new CatalogSchemaIdentifierName(qualifiedName.getParts(), allowedCatalogNames);

FunctionName functionName = FunctionName.of(catalogSchemaIdentifierName.getIdentifierName());
List<Expression> arguments = node.getArguments().stream()
Expand Down Expand Up @@ -479,7 +497,8 @@ public LogicalPlan visitAD(AD node, AnalysisContext context) {
currentEnv.define(new Symbol(Namespace.FIELD_NAME, RCF_ANOMALOUS), ExprCoreType.BOOLEAN);
} else {
currentEnv.define(new Symbol(Namespace.FIELD_NAME, RCF_ANOMALY_GRADE), ExprCoreType.DOUBLE);
currentEnv.define(new Symbol(Namespace.FIELD_NAME, RCF_TIMESTAMP), ExprCoreType.TIMESTAMP);
currentEnv.define(new Symbol(Namespace.FIELD_NAME,
(String) node.getArguments().get(TIME_FIELD).getValue()), ExprCoreType.TIMESTAMP);
}
return new LogicalAD(child, options);
}
Expand Down
29 changes: 22 additions & 7 deletions core/src/main/java/org/opensearch/sql/catalog/CatalogService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,35 @@
package org.opensearch.sql.catalog;

import java.util.Set;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.storage.StorageEngine;

/**
* Catalog Service defines api for
* providing and managing storage engines and execution engines
* for all the catalogs.
* The storage and execution indirectly make connections to the underlying datastore catalog.
* Catalog Service manages catalogs.
*/
public interface CatalogService {

StorageEngine getStorageEngine(String catalog);
/**
* Returns all catalog objects.
*
* @return Catalog Catalogs.
*/
Set<Catalog> getCatalogs();

Set<String> getCatalogs();
/**
* Returns Catalog with corresponding to the catalog name.
*
* @param catalogName Name of the catalog.
* @return Catalog catalog.
*/
Catalog getCatalog(String catalogName);

void registerOpenSearchStorageEngine(StorageEngine storageEngine);
/**
* Default opensearch engine is not defined in catalog.json.
* So the registration of default catalog happens separately.
*
* @param storageEngine StorageEngine.
*/
void registerDefaultOpenSearchCatalog(StorageEngine storageEngine);

}
27 changes: 27 additions & 0 deletions core/src/main/java/org/opensearch/sql/catalog/model/Catalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.catalog.model;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.storage.StorageEngine;

@Getter
@RequiredArgsConstructor
@EqualsAndHashCode
public class Catalog {

private final String name;

private final ConnectorType connectorType;

@EqualsAndHashCode.Exclude
private final StorageEngine storageEngine;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor.streaming;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.lang3.tuple.Pair;

/**
* In memory implementation of {@link MetadataLog}. Todo. Current implementation does not guarantee
* thread safe. We will re-evaluate it when adding pipeline execution.
*
* @param <T> type of metadata type.
*/
public class DefaultMetadataLog<T> implements MetadataLog<T> {

private static final long MIN_ACCEPTABLE_ID = 0L;

private SortedMap<Long, T> metadataMap = new TreeMap<>();

@Override
public boolean add(Long batchId, T metadata) {
Preconditions.checkArgument(batchId >= MIN_ACCEPTABLE_ID, "batch id must large or equal 0");

if (metadataMap.containsKey(batchId)) {
return false;
}
metadataMap.put(batchId, metadata);
return true;
}

@Override
public Optional<T> get(Long batchId) {
if (!metadataMap.containsKey(batchId)) {
return Optional.empty();
} else {
return Optional.of(metadataMap.get(batchId));
}
}

@Override
public List<T> get(Optional<Long> startBatchId, Optional<Long> endBatchId) {
if (startBatchId.isEmpty() && endBatchId.isEmpty()) {
return new ArrayList<>(metadataMap.values());
} else {
Long s = startBatchId.orElse(MIN_ACCEPTABLE_ID);
Long e = endBatchId.map(i -> i + 1).orElse(Long.MAX_VALUE);
return new ArrayList<>(metadataMap.subMap(s, e).values());
}
}

@Override
public Optional<Pair<Long, T>> getLatest() {
if (metadataMap.isEmpty()) {
return Optional.empty();
} else {
Long latestId = metadataMap.lastKey();
return Optional.of(Pair.of(latestId, metadataMap.get(latestId)));
}
}

@Override
public void purge(Long batchId) {
metadataMap.headMap(batchId).clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor.streaming;

import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.tuple.Pair;

/**
* Write-ahead Log (WAL). Which allow client write metadata associate with id.
*
* @param <T> type of metadata type.
*/
public interface MetadataLog<T> {

/**
* add metadata to WAL.
*
* @param id metadata index in WAL.
* @param metadata metadata.
* @return true if add success, otherwise return false.
*/
boolean add(Long id, T metadata);

/**
* get metadata from WAL.
*
* @param id metadata index in WAL.
* @return metadata.
*/
Optional<T> get(Long id);

/**
* Return metadata for id between [startId, endId].
*
* @param startId If startId is empty, return all metadata before endId (inclusive).
* @param endId If end is empty, return all batches after endId (inclusive).
* @return a list of metadata sorted by id (nature order).
*/
List<T> get(Optional<Long> startId, Optional<Long> endId);

/**
* Get latest batchId and metadata.
*
* @return pair of id and metadata if not empty.
*/
Optional<Pair<Long, T>> getLatest();

/**
* Remove all the metadata less then id (exclusive).
*
* @param id smallest batchId should keep.
*/
void purge(Long id);
}
Loading

0 comments on commit 2422a5b

Please sign in to comment.