From 7b6a1e266f1c987a09ef0ef2b282e5881afa0953 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 1 Nov 2022 09:17:43 -0700 Subject: [PATCH 01/11] Add Streaming Source Impl Signed-off-by: Peng Huo --- .../sql/executor/streaming/Batch.java | 18 ++ .../sql/executor/streaming/Offset.java | 17 ++ .../executor/streaming/StreamingSource.java | 29 ++++ .../opensearch/sql/storage/split/Split.java | 21 +++ filesystem/build.gradle | 69 ++++++++ .../storage/split/FileSystemSplit.java | 24 +++ .../filesystem/streaming/FileMetaData.java | 21 +++ .../streaming/FileSystemStreamSource.java | 103 ++++++++++++ .../streaming/FileSystemStreamSourceTest.java | 155 ++++++++++++++++++ settings.gradle | 1 + 10 files changed, 458 insertions(+) create mode 100644 core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java create mode 100644 core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java create mode 100644 core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java create mode 100644 core/src/main/java/org/opensearch/sql/storage/split/Split.java create mode 100644 filesystem/build.gradle create mode 100644 filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java create mode 100644 filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java create mode 100644 filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java create mode 100644 filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java new file mode 100644 index 0000000000..7c27ab4622 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import java.util.List; +import lombok.Data; +import org.opensearch.sql.storage.split.Split; + +/** + * A batch of streaming execution. + */ +@Data +public class Batch { + private final List splits; +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java b/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java new file mode 100644 index 0000000000..00f040e437 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import lombok.Data; + +/** + * Offset. + */ +@Data +public class Offset { + + private final Long offset; +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java b/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java new file mode 100644 index 0000000000..ebd3fa714b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import java.util.Optional; + +/** + * Streaming source. + */ +public interface StreamingSource { + /** + * Get current {@link Offset} of stream data. + * + * @return empty if the stream does not has new data. + */ + Optional getLatestOffset(); + + /** + * Get a {@link Batch} from source between (start, end]. + * + * @param start start offset. + * @param end end offset. + * @return @link Batch}. + */ + Batch getBatch(Optional start, Offset end); +} diff --git a/core/src/main/java/org/opensearch/sql/storage/split/Split.java b/core/src/main/java/org/opensearch/sql/storage/split/Split.java new file mode 100644 index 0000000000..e9e0c6fcc1 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/split/Split.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.storage.split; + +import org.opensearch.sql.storage.StorageEngine; + +/** + * Split is a sections of a data set. Each {@link StorageEngine} should have specific + * implementation of Split. + */ +public interface Split { + + /** + * Get the split id. + * @return split id. + */ + String getSplitId(); +} diff --git a/filesystem/build.gradle b/filesystem/build.gradle new file mode 100644 index 0000000000..64659d85d3 --- /dev/null +++ b/filesystem/build.gradle @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java-library' + id "io.freefair.lombok" + id 'jacoco' +} + +ext { + hadoop = "3.3.4" + aws = "1.12.330" +} + + +dependencies { + implementation project(':core') + + testImplementation "org.junit.jupiter:junit-jupiter:${junit_jupiter}" + testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' +} + +test { + useJUnitPlatform() + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + } +} + +jacocoTestReport { + reports { + html.enabled true + xml.enabled true + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +test.finalizedBy(project.tasks.jacocoTestReport) + +jacocoTestCoverageVerification { + violationRules { + rule { + element = 'CLASS' + limit { + counter = 'LINE' + minimum = 1.0 + } + limit { + counter = 'BRANCH' + minimum = 1.0 + } + } + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +check.dependsOn jacocoTestCoverageVerification +jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java new file mode 100644 index 0000000000..695af94fe4 --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.storage.split; + +import java.nio.file.Path; +import java.util.Set; +import java.util.UUID; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.opensearch.sql.storage.split.Split; + +@Data +public class FileSystemSplit implements Split { + + @Getter + @EqualsAndHashCode.Exclude + private final String splitId = UUID.randomUUID().toString(); + + private final Set paths; +} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java new file mode 100644 index 0000000000..24d2a822cd --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import java.nio.file.Path; +import java.util.Set; +import lombok.Data; + +/** + * File metadata. Batch id associate with the set of {@link Path}. + */ +@Data +public class FileMetaData { + + private final Long batchId; + + private final Set paths; +} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java new file mode 100644 index 0000000000..9207583c5b --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import com.google.common.collect.Sets; +import java.io.File; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.executor.streaming.Batch; +import org.opensearch.sql.executor.streaming.DefaultMetadataLog; +import org.opensearch.sql.executor.streaming.MetadataLog; +import org.opensearch.sql.executor.streaming.Offset; +import org.opensearch.sql.executor.streaming.StreamingSource; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; + +/** + * FileSystem Streaming Source use Hadoop FileSystem. + */ +public class FileSystemStreamSource implements StreamingSource { + + private static final Logger log = LogManager.getLogger(FileSystemStreamSource.class); + + private final MetadataLog fileMetaDataLog; + + private Set seenFiles; + + private final FileSystem fs; + + private final String basePath; + + /** + * Constructor of FileSystemStreamSource. + */ + public FileSystemStreamSource(FileSystem fs, String basePath) { + this.fs = fs; + this.basePath = basePath; + // todo, need to add state recovery + this.fileMetaDataLog = new DefaultMetadataLog<>(); + // todo, need to add state recovery + this.seenFiles = new HashSet<>(); + } + + @Override + public Optional getLatestOffset() { + // list all files. todo. improvement list performance. + Set allFiles = + Arrays.stream(fs.getPath(basePath).toFile().listFiles()) + .filter(file -> !file.isDirectory()) + .map(File::toPath) + .collect(Collectors.toSet()); + + // find unread files. + log.debug("all files {}", allFiles); + Set unread = Sets.difference(allFiles, seenFiles); + + // update seenFiles. + seenFiles = allFiles; + log.debug("seen files {}", seenFiles); + + Optional latestBatchIdOptional = fileMetaDataLog.getLatest().map(Pair::getKey); + if (!unread.isEmpty()) { + long latestBatchId = latestBatchIdOptional.map(id -> id + 1).orElse(0L); + fileMetaDataLog.add(latestBatchId, new FileMetaData(latestBatchId, unread)); + log.debug("latestBatchId {}", latestBatchId); + return Optional.of(new Offset(latestBatchId)); + } else { + log.debug("no unread data"); + Optional offset = + latestBatchIdOptional.isEmpty() + ? Optional.empty() + : Optional.of(new Offset(latestBatchIdOptional.get())); + log.debug("return empty offset {}", offset); + return offset; + } + } + + @Override + public Batch getBatch(Optional start, Offset end) { + Long startBatchId = start.map(Offset::getOffset).map(id -> id + 1).orElse(0L); + Long endBatchId = end.getOffset(); + + Set paths = + fileMetaDataLog.get(Optional.of(startBatchId), Optional.of(endBatchId)).stream() + .map(FileMetaData::getPaths) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + + log.debug("fetch files {} with id from: {} to: {}.", paths, start, end); + return new Batch(Collections.singletonList(new FileSystemSplit(paths))); + } +} diff --git a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java new file mode 100644 index 0000000000..537fd10c9f --- /dev/null +++ b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java @@ -0,0 +1,155 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.executor.streaming.Batch; +import org.opensearch.sql.executor.streaming.Offset; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; + +@ExtendWith(MockitoExtension.class) +class FileSystemStreamSourceTest { + + @TempDir + Path perTestTempDir; + + FileSystemStreamSource streamSource; + + @BeforeEach + void setup() { + streamSource = + new FileSystemStreamSource( + FileSystems.getDefault(), + perTestTempDir.toString()); + } + + @Test + void getBatchFromFolder() throws IOException { + Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + // fetch batch (empty, latestOffset] + assertEquals( + Collections.singletonList( + new FileSystemSplit(ImmutableSet.of(file))), + streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + } + + @Test + void latestOffsetShouldIncreaseIfNoNewFileAdded() throws IOException { + Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file1.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + Path file2 = Files.createFile(perTestTempDir.resolve("log.2022.01.02")); + assertTrue(file2.toFile().exists()); + + latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(1L), latestOffset.get()); + + // fetch batch (empty, 1L] + assertBatchEquals( + ImmutableList.of(file1, file2), + streamSource.getBatch(Optional.empty(), latestOffset.get())); + + // fetch batch (empty, 0L] + assertBatchEquals( + ImmutableList.of(file1), streamSource.getBatch(Optional.empty(), new Offset(0L))); + + // fetch batch (0L, 1L] + assertBatchEquals( + ImmutableList.of(file2), + streamSource.getBatch(Optional.of(new Offset(0L)), new Offset(1L))); + } + + @Test + void latestOffsetShouldSameIfNoNewFileAdded() throws IOException { + Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file1.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + // no new files. + latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + } + + @Test + void latestOffsetIsEmptyIfNoFilesInSource() { + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isEmpty()); + } + + @Test + void getBatchOutOfRange() throws IOException { + Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + assertEquals( + Collections.singletonList( + new FileSystemSplit(ImmutableSet.of(file))), + streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + } + + @Test + void dirIsFiltered() throws IOException { + Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file.toFile().exists()); + + Path dir = Files.createDirectory(perTestTempDir.resolve("logDir")); + assertTrue(dir.toFile().isDirectory()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + // fetch batch (empty, latestOffset] + assertEquals( + Collections.singletonList( + new FileSystemSplit(ImmutableSet.of(file))), + streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + } + + void assertBatchEquals(List expectedFiles, Batch batch) { + assertEquals(1, batch.getSplits().size()); + assertThat( + ((FileSystemSplit) batch.getSplits().get(0)).getPaths(), + containsInAnyOrder(expectedFiles.toArray())); + } +} diff --git a/settings.gradle b/settings.gradle index 2f850f422b..7650959451 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,4 +18,5 @@ include 'doctest' include 'legacy' include 'sql' include 'prometheus' +include 'filesystem' From 3231dfe345adf8023005d2a7079532b403d8c830 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 1 Nov 2022 09:30:05 -0700 Subject: [PATCH 02/11] update build.gradle Signed-off-by: Peng Huo --- filesystem/build.gradle | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/filesystem/build.gradle b/filesystem/build.gradle index 64659d85d3..d7fc3e02a8 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -9,16 +9,10 @@ plugins { id 'jacoco' } -ext { - hadoop = "3.3.4" - aws = "1.12.330" -} - - dependencies { implementation project(':core') - testImplementation "org.junit.jupiter:junit-jupiter:${junit_jupiter}" + testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' From 752168ff68092d0d37a78bc70db22142031dceb4 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 3 Nov 2022 10:24:36 -0700 Subject: [PATCH 03/11] change to hadoop-fs Signed-off-by: Peng Huo --- .../workflows/sql-test-and-build-workflow.yml | 4 +- doctest/build.gradle | 2 +- filesystem/build.gradle | 29 +++ .../storage/split/FileSystemSplit.java | 2 +- .../filesystem/streaming/FileMetaData.java | 2 +- .../streaming/FileSystemStreamSource.java | 19 +- .../streaming/FileSystemStreamSourceTest.java | 203 ++++++++++-------- 7 files changed, 153 insertions(+), 108 deletions(-) diff --git a/.github/workflows/sql-test-and-build-workflow.yml b/.github/workflows/sql-test-and-build-workflow.yml index 3d063a2bfc..25e0387cf3 100644 --- a/.github/workflows/sql-test-and-build-workflow.yml +++ b/.github/workflows/sql-test-and-build-workflow.yml @@ -25,10 +25,10 @@ jobs: matrix: entry: - { os: ubuntu-latest, java: 11 } - - { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc} + - { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows } - { os: macos-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc } - { os: ubuntu-latest, java: 17 } - - { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc } + - { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows } - { os: macos-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc } runs-on: ${{ matrix.entry.os }} diff --git a/doctest/build.gradle b/doctest/build.gradle index 69fac44d95..cf2329d9d3 100644 --- a/doctest/build.gradle +++ b/doctest/build.gradle @@ -86,7 +86,7 @@ task stopOpenSearch(type: KillProcessTask) { doctest.dependsOn startOpenSearch startOpenSearch.dependsOn startPrometheus doctest.finalizedBy stopOpenSearch -build.dependsOn doctest +check.dependsOn doctest clean.dependsOn(cleanBootstrap) // 2.0.0-alpha1-SNAPSHOT -> 2.0.0.0-alpha1-SNAPSHOT diff --git a/filesystem/build.gradle b/filesystem/build.gradle index d7fc3e02a8..b4be876507 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -9,8 +9,29 @@ plugins { id 'jacoco' } +ext { + hadoop = "3.3.4" + aws = "1.12.330" +} + dependencies { implementation project(':core') + // required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html. + implementation("org.apache.hadoop:hadoop-common:${hadoop}") { + exclude group: 'org.apache.zookeeper', module: 'zookeeper' + exclude group: 'com.sun.jersey', module: 'jersey-json' + exclude group: 'com.google.protobuf', module: 'protobuf-java' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.eclipse.jetty', module: 'jetty-server' + } + constraints { + implementation('com.fasterxml.woodstox:woodstox-core:6.4.0') { + because 'https://www.mend.io/vulnerability-database/CVE-2022-40156' + } + } + // required https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html + implementation("org.apache.hadoop:hadoop-aws:${hadoop}") + implementation "com.amazonaws:aws-java-sdk-bundle:${aws}" testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' @@ -24,6 +45,14 @@ test { events "passed", "skipped", "failed" exceptionFormat "full" } + + // hadoop-fs depend on native library which is missing on windows. + // https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library + if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) { + excludes = [ + '**/FileSystemStreamSourceTest.class' + ] + } } jacocoTestReport { diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java index 695af94fe4..7fefb11a85 100644 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java @@ -5,12 +5,12 @@ package org.opensearch.sql.filesystem.storage.split; -import java.nio.file.Path; import java.util.Set; import java.util.UUID; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.Getter; +import org.apache.hadoop.fs.Path; import org.opensearch.sql.storage.split.Split; @Data diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java index 24d2a822cd..6a8c90ee80 100644 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java @@ -5,9 +5,9 @@ package org.opensearch.sql.filesystem.streaming; -import java.nio.file.Path; import java.util.Set; import lombok.Data; +import org.apache.hadoop.fs.Path; /** * File metadata. Batch id associate with the set of {@link Path}. diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java index 9207583c5b..0a1d032c53 100644 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java @@ -6,16 +6,18 @@ package org.opensearch.sql.filesystem.streaming; import com.google.common.collect.Sets; -import java.io.File; -import java.nio.file.FileSystem; -import java.nio.file.Path; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.sql.executor.streaming.Batch; @@ -38,12 +40,12 @@ public class FileSystemStreamSource implements StreamingSource { private final FileSystem fs; - private final String basePath; + private final Path basePath; /** * Constructor of FileSystemStreamSource. */ - public FileSystemStreamSource(FileSystem fs, String basePath) { + public FileSystemStreamSource(FileSystem fs, Path basePath) { this.fs = fs; this.basePath = basePath; // todo, need to add state recovery @@ -52,13 +54,14 @@ public FileSystemStreamSource(FileSystem fs, String basePath) { this.seenFiles = new HashSet<>(); } + @SneakyThrows(value = IOException.class) @Override public Optional getLatestOffset() { // list all files. todo. improvement list performance. Set allFiles = - Arrays.stream(fs.getPath(basePath).toFile().listFiles()) - .filter(file -> !file.isDirectory()) - .map(File::toPath) + Arrays.stream(fs.listStatus(basePath)) + .filter(status -> !status.isDirectory()) + .map(FileStatus::getPath) .collect(Collectors.toSet()); // find unread files. diff --git a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java index 537fd10c9f..75c494ec8c 100644 --- a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java +++ b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java @@ -8,148 +8,161 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import java.io.IOException; -import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.executor.streaming.Batch; import org.opensearch.sql.executor.streaming.Offset; import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; +import org.opensearch.sql.storage.split.Split; @ExtendWith(MockitoExtension.class) class FileSystemStreamSourceTest { - @TempDir - Path perTestTempDir; + @TempDir Path perTestTempDir; FileSystemStreamSource streamSource; + /** + * use hadoop default filesystem. it only works on unix-like system. for running on windows, it + * require native library. Reference. + * https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html + */ @BeforeEach - void setup() { + void setup() throws IOException { streamSource = new FileSystemStreamSource( - FileSystems.getDefault(), - perTestTempDir.toString()); + FileSystem.get(new Configuration()), + new org.apache.hadoop.fs.Path(perTestTempDir.toUri())); } @Test - void getBatchFromFolder() throws IOException { - Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file.toFile().exists()); - - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); - - // fetch batch (empty, latestOffset] - assertEquals( - Collections.singletonList( - new FileSystemSplit(ImmutableSet.of(file))), - streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + void addOneFileToSource() throws IOException { + emptySource().addFile("log1").latestOffsetShouldBe(0L).batchFromStart("log1"); } @Test - void latestOffsetShouldIncreaseIfNoNewFileAdded() throws IOException { - Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file1.toFile().exists()); - - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); - - Path file2 = Files.createFile(perTestTempDir.resolve("log.2022.01.02")); - assertTrue(file2.toFile().exists()); - - latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(1L), latestOffset.get()); - - // fetch batch (empty, 1L] - assertBatchEquals( - ImmutableList.of(file1, file2), - streamSource.getBatch(Optional.empty(), latestOffset.get())); - - // fetch batch (empty, 0L] - assertBatchEquals( - ImmutableList.of(file1), streamSource.getBatch(Optional.empty(), new Offset(0L))); - - // fetch batch (0L, 1L] - assertBatchEquals( - ImmutableList.of(file2), - streamSource.getBatch(Optional.of(new Offset(0L)), new Offset(1L))); + void addMultipleFileInSequence() throws IOException { + emptySource() + .addFile("log1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1") + .addFile("log2") + .latestOffsetShouldBe(1L) + .batchFromStart("log1", "log2") + .batchInBetween(0L, 1L, "log2"); } @Test void latestOffsetShouldSameIfNoNewFileAdded() throws IOException { - Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file1.toFile().exists()); - - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); - - // no new files. - latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); + emptySource() + .addFile("log1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1"); } @Test void latestOffsetIsEmptyIfNoFilesInSource() { - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isEmpty()); + emptySource().noOffset(); } @Test - void getBatchOutOfRange() throws IOException { - Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file.toFile().exists()); - - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); - - assertEquals( - Collections.singletonList( - new FileSystemSplit(ImmutableSet.of(file))), - streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + void dirIsFiltered() throws IOException { + emptySource() + .addFile("log1") + .latestOffsetShouldBe(0L) + .addDir("dir1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1"); } @Test - void dirIsFiltered() throws IOException { - Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file.toFile().exists()); - - Path dir = Files.createDirectory(perTestTempDir.resolve("logDir")); - assertTrue(dir.toFile().isDirectory()); + void sneakThrowException() throws IOException { + FileSystem fs = Mockito.mock(FileSystem.class); + doThrow(IOException.class).when(fs).listStatus(any(org.apache.hadoop.fs.Path.class)); - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); + streamSource = + new FileSystemStreamSource(fs, + new org.apache.hadoop.fs.Path(perTestTempDir.toUri())); + assertThrows(IOException.class, () -> streamSource.getLatestOffset()); + } - // fetch batch (empty, latestOffset] - assertEquals( - Collections.singletonList( - new FileSystemSplit(ImmutableSet.of(file))), - streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + StreamSource emptySource() { + return new StreamSource(); } - void assertBatchEquals(List expectedFiles, Batch batch) { - assertEquals(1, batch.getSplits().size()); - assertThat( - ((FileSystemSplit) batch.getSplits().get(0)).getPaths(), - containsInAnyOrder(expectedFiles.toArray())); + private class StreamSource { + + StreamSource addFile(String filename) throws IOException { + Path file = Files.createFile(perTestTempDir.resolve(filename)); + assertTrue(file.toFile().exists()); + + return this; + } + + StreamSource addDir(String dirname) throws IOException { + Path dir = Files.createDirectory(perTestTempDir.resolve(dirname)); + assertTrue(dir.toFile().isDirectory()); + + return this; + } + + StreamSource noOffset() { + assertFalse(streamSource.getLatestOffset().isPresent()); + + return this; + } + + StreamSource latestOffsetShouldBe(Long offset) { + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(offset), latestOffset.get()); + + return this; + } + + StreamSource batchFromStart(String... uris) { + assertTrue(streamSource.getLatestOffset().isPresent()); + internalBatchInBetween(Optional.empty(), streamSource.getLatestOffset().get(), uris); + + return this; + } + + StreamSource batchInBetween(Long start, Long end, String... uris) { + internalBatchInBetween(Optional.of(new Offset(start)), new Offset(end), uris); + + return this; + } + + private StreamSource internalBatchInBetween( + Optional start, Offset end, String... uris) { + List splits = streamSource.getBatch(start, end).getSplits(); + assertEquals(1, splits.size()); + assertThat( + ((FileSystemSplit) splits.get(0)).getPaths(), + containsInAnyOrder( + Arrays.stream(uris) + .map(name -> new org.apache.hadoop.fs.Path(perTestTempDir.resolve(name).toUri())) + .toArray())); + return this; + } } } From 91ef9b36b024aa9f8a594e8352f2509024137353 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 3 Nov 2022 11:15:31 -0700 Subject: [PATCH 04/11] exclude FileSystemStreamSource from jacoco Signed-off-by: Peng Huo --- filesystem/build.gradle | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/filesystem/build.gradle b/filesystem/build.gradle index b4be876507..37baec959e 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -71,6 +71,13 @@ test.finalizedBy(project.tasks.jacocoTestReport) jacocoTestCoverageVerification { violationRules { rule { + // hadoop-fs depend on native library which is missing on windows. + // https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library + if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) { + excludes = [ + 'org.opensearch.sql.filesystem.streaming.FileSystemStreamSource' + ] + } element = 'CLASS' limit { counter = 'LINE' From 5fc1eb579101dc944e0abd77d1f2fced9dadbc91 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 3 Nov 2022 16:49:22 -0700 Subject: [PATCH 05/11] exclude unnecessary depedency Signed-off-by: Peng Huo --- filesystem/build.gradle | 42 +++++++++++++++++++++++++++++++++++------ plugin/build.gradle | 5 +++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/filesystem/build.gradle b/filesystem/build.gradle index 37baec959e..0571088132 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -14,24 +14,54 @@ ext { aws = "1.12.330" } +configurations.all { + resolutionStrategy.force "commons-io:commons-io:2.8.0" +} + dependencies { implementation project(':core') // required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html. implementation("org.apache.hadoop:hadoop-common:${hadoop}") { - exclude group: 'org.apache.zookeeper', module: 'zookeeper' - exclude group: 'com.sun.jersey', module: 'jersey-json' + exclude group: 'org.apache.zookeeper' + exclude group: 'org.eclipse.jetty' + exclude group: 'com.sun.jersey' + exclude group: 'javax.servlet.jsp' + exclude group: 'javax.servlet' + exclude group: 'org.apache.kerby' + exclude group: 'org.apache.curator' exclude group: 'com.google.protobuf', module: 'protobuf-java' exclude group: 'org.apache.avro', module: 'avro' - exclude group: 'org.eclipse.jetty', module: 'jetty-server' + exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt' + // enforce version. + exclude group: 'com.fasterxml.woodstox', module: 'woodstox-core' + exclude group: 'commons-io', module: 'commons-io' + exclude group: 'ch.qos.reload4j', module: 'reload4j' + exclude group: 'org.apache.httpcomponents', module: 'httpcore' } + implementation('com.fasterxml.woodstox:woodstox-core') constraints { implementation('com.fasterxml.woodstox:woodstox-core:6.4.0') { because 'https://www.mend.io/vulnerability-database/CVE-2022-40156' } } - // required https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html - implementation("org.apache.hadoop:hadoop-aws:${hadoop}") - implementation "com.amazonaws:aws-java-sdk-bundle:${aws}" + implementation('commons-io:commons-io') + constraints { + implementation('commons-io:commons-io:2.8.0') { + because 'between versions 2.8.0 and 2.5' + } + } + implementation('ch.qos.reload4j:reload4j') + constraints { + implementation('ch.qos.reload4j:reload4j:1.2.22') { + because 'between versions 1.2.22 and 1.2.19' + } + } + implementation('org.apache.httpcomponents:httpcore') + constraints { + implementation('org.apache.httpcomponents:httpcore:4.4.15') { + because 'between versions 4.4.15 and 4.4.13' + } + } testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' diff --git a/plugin/build.gradle b/plugin/build.gradle index d170b72a95..4295854e37 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -91,6 +91,10 @@ configurations.all { resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${jackson_version}" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.6.0" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.6.0" + resolutionStrategy.force "org.apache.commons:commons-math3:3.6.1" + resolutionStrategy.force "org.apache.commons:commons-lang3:3.12.0" + resolutionStrategy.force "joda-time:joda-time:2.10.12" + resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" } compileJava { options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor']) @@ -110,6 +114,7 @@ dependencies { api project(':legacy') api project(':opensearch') api project(':prometheus') + api project(':filesystem') } test { From c8b0118638633e2bb85f2132773c516aa18553a9 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 3 Nov 2022 20:02:09 -0700 Subject: [PATCH 06/11] Update integ-test depedency Signed-off-by: Peng Huo --- integ-test/build.gradle | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 5e0a53bf1a..66b3c1c94c 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -58,6 +58,10 @@ configurations.all { resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${jackson_version}" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.6.0" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.6.0" + resolutionStrategy.force "org.apache.commons:commons-math3:3.6.1" + resolutionStrategy.force "org.apache.commons:commons-lang3:3.12.0" + resolutionStrategy.force "joda-time:joda-time:2.10.12" + resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" } dependencies { From 40535d76d967d731a8f91268d7331d0664abed5e Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 7 Nov 2022 19:17:26 -0800 Subject: [PATCH 07/11] Add micro batch streaming execution Signed-off-by: Peng Huo --- .../opensearch/sql/executor/QueryService.java | 29 ++- .../MicroBatchStreamingExecution.java | 127 ++++++++++ .../sql/executor/QueryServiceTest.java | 30 ++- .../MicroBatchStreamingExecutionTest.java | 235 ++++++++++++++++++ 4 files changed, 414 insertions(+), 7 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java create mode 100644 core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index 423d152e06..0d1604efa7 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -37,6 +37,17 @@ public class QueryService { */ public void execute(UnresolvedPlan plan, ResponseListener listener) { + executePlan(analyze(plan), listener); + } + + /** + * Todo. + * + * @param plan {@link LogicalPlan} + * @param listener {@link ResponseListener} + */ + public void executePlan(LogicalPlan plan, + ResponseListener listener) { try { executionEngine.execute(plan(plan), listener); } catch (Exception e) { @@ -54,17 +65,23 @@ public void execute(UnresolvedPlan plan, public void explain(UnresolvedPlan plan, ResponseListener listener) { try { - executionEngine.explain(plan(plan), listener); + executionEngine.explain(plan(analyze(plan)), listener); } catch (Exception e) { listener.onFailure(e); } } - private PhysicalPlan plan(UnresolvedPlan plan) { - // 1.Analyze abstract syntax to generate logical plan - LogicalPlan logicalPlan = analyzer.analyze(plan, new AnalysisContext()); + /** + * Todo. + */ + public LogicalPlan analyze(UnresolvedPlan plan) { + return analyzer.analyze(plan, new AnalysisContext()); + } - // 2.Generate optimal physical plan from logical plan - return planner.plan(logicalPlan); + /** + * Todo. + */ + public PhysicalPlan plan(LogicalPlan plan) { + return planner.plan(plan); } } diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java new file mode 100644 index 0000000000..6d398bfa7c --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java @@ -0,0 +1,127 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.streaming; + +import com.google.common.base.Preconditions; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.planner.logical.LogicalPlan; + +public class MicroBatchStreamingExecution { + + private static final Logger log = LogManager.getLogger(MicroBatchStreamingExecution.class); + + static final long INITIAL_LATEST_BATCH_ID = -1L; + + private final StreamingSource source; + + private final LogicalPlan batchPlan; + + private final QueryService queryService; + + /** + * A write-ahead-log that records the offsets that are present in each batch. In order to ensure + * that a given batch will always consist of the same data, we write to this log before any + * processing is done. Thus, the Nth record in this log indicated data that is currently being + * processed and the N-1th entry indicates which offsets have been durably committed to the sink. + */ + private final MetadataLog offsetLog; + + /** keep track the latest commit batchId. */ + private final MetadataLog committedLog; + + /** + * Constructor. + */ + public MicroBatchStreamingExecution( + StreamingSource source, + LogicalPlan batchPlan, + QueryService queryService, + MetadataLog offsetLog, + MetadataLog committedLog) { + this.source = source; + this.batchPlan = batchPlan; + this.queryService = queryService; + // todo. add offsetLog and committedLog offset recovery. + this.offsetLog = offsetLog; + this.committedLog = committedLog; + } + + /** + * Execute micro-batch streaming execution. + */ + public void execute() { + Long latestBatchId = offsetLog.getLatest().map(Pair::getKey).orElse(INITIAL_LATEST_BATCH_ID); + Long latestCommittedBatchId = + committedLog.getLatest().map(Pair::getKey).orElse(INITIAL_LATEST_BATCH_ID); + Optional committedOffset = offsetLog.get(latestCommittedBatchId); + AtomicLong currentBatchId = new AtomicLong(INITIAL_LATEST_BATCH_ID); + + if (latestBatchId.equals(latestCommittedBatchId)) { + // there are no unhandled Offset. + currentBatchId.set(latestCommittedBatchId + 1L); + } else { + Preconditions.checkArgument( + latestBatchId.equals(latestCommittedBatchId + 1L), + "[BUG] Expected latestBatchId - latestCommittedBatchId = 0 or 1, " + + "but latestBatchId=%d, latestCommittedBatchId=%d", + latestBatchId, + latestCommittedBatchId); + + // latestBatchId is not committed yet. + currentBatchId.set(latestBatchId); + } + + Optional availableOffsets = source.getLatestOffset(); + if (hasNewData(availableOffsets, committedOffset)) { + // todo, add batch to execution context. + Batch batch = source.getBatch(committedOffset, availableOffsets.get()); + offsetLog.add(currentBatchId.get(), availableOffsets.get()); + + queryService.executePlan( + batchPlan, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + long finalBatchId = currentBatchId.get(); + Offset finalAvailableOffsets = availableOffsets.get(); + committedLog.add(finalBatchId, finalAvailableOffsets); + } + + @Override + public void onFailure(Exception e) { + log.error("streaming processing failed. source = {}", source); + } + }); + } + } + + private boolean hasNewData(Optional availableOffsets, Optional committedOffset) { + if (availableOffsets.equals(committedOffset)) { + log.debug("source does not have new data, exit. source = {}", source); + return false; + } else { + Preconditions.checkArgument( + availableOffsets.isPresent(), "[BUG] available offsets must be no empty"); + + log.debug( + "source has new data. source = {}, availableOffsets:{}, committedOffset:{}", + source, + availableOffsets, + committedOffset); + return true; + } + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java index 2884544dd0..2c70cf9f92 100644 --- a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java @@ -13,6 +13,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; import java.util.Collections; @@ -58,7 +59,7 @@ class QueryServiceTest { @BeforeEach public void setUp() { - when(analyzer.analyze(any(), any())).thenReturn(logicalPlan); + lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan); when(planner.plan(any())).thenReturn(plan); queryService = new QueryService(analyzer, executionEngine, planner); @@ -161,4 +162,31 @@ public void onFailure(Exception e) { } }); } + + @Test + public void testExecutePlanShouldPass() { + doAnswer( + invocation -> { + ResponseListener listener = invocation.getArgument(1); + listener.onResponse( + new ExecutionEngine.QueryResponse(schema, Collections.emptyList())); + return null; + }) + .when(executionEngine) + .execute(any(), any()); + + queryService.executePlan( + logicalPlan, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { + + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }); + } } diff --git a/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java b/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java new file mode 100644 index 0000000000..4c0ca308bd --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java @@ -0,0 +1,235 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.planner.logical.LogicalPlan; + +@ExtendWith(MockitoExtension.class) +class MicroBatchStreamingExecutionTest { + + @Test + void executedSuccess() { + streamingQuery() + .addData() + .executeSuccess() + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L); + } + + @Test + void executedFailed() { + streamingQuery() + .addData() + .executeFailed() + .latestOffsetLogShouldBe(0L) + .noCommittedLog(); + } + + @Test + void noDataInSource() { + streamingQuery() + .executeSuccess() + .noOffsetLog() + .noCommittedLog(); + } + + @Test + void noNewDataInSource() { + streamingQuery() + .addData() + .executeSuccess() + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L) + .executeSuccess() + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L); + } + + @Test + void addNewDataInSequenceAllExecuteSuccess() { + streamingQuery() + .addData() + .executeSuccess() + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L) + .addData() + .executeSuccess() + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(1L); + } + + @Test + void addNewDataInSequenceExecuteFailedInBetween() { + streamingQuery() + .addData() + .executeSuccess() + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L) + .addData() + .executeFailed() + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(0L) + .executeSuccess() + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(1L); + } + + @Test + void addNewDataInSequenceExecuteFailed() { + streamingQuery() + .addData() + .executeSuccess() + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L) + .addData() + .executeFailed() + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(0L) + .executeFailed() + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(0L); + } + + Helper streamingQuery() { + return new Helper(); + } + + private static class Helper { + + private final MicroBatchStreamingExecution execution; + + private final MetadataLog offsetLog; + + private final MetadataLog committedLog; + + private final LogicalPlan batchPlan; + + private final QueryService queryService; + + private final TestStreamingSource source = new TestStreamingSource(); + + public Helper() { + this.offsetLog = new DefaultMetadataLog<>(); + this.committedLog = new DefaultMetadataLog<>(); + this.batchPlan = Mockito.mock(LogicalPlan.class); + this.queryService = Mockito.mock(QueryService.class); + this.execution = + new MicroBatchStreamingExecution( + source, batchPlan, queryService, offsetLog, committedLog); + } + + Helper addData() { + source.addData(); + return this; + } + + Helper executeSuccess() { + lenient().doAnswer( + invocation -> { + ResponseListener listener = + invocation.getArgument(1); + listener.onResponse( + new ExecutionEngine.QueryResponse(null, Collections.emptyList())); + return null; + }) + .when(queryService) + .executePlan(any(), any()); + execution.execute(); + + return this; + } + + Helper executeFailed() { + lenient().doAnswer( + invocation -> { + ResponseListener listener = invocation.getArgument(1); + listener.onFailure(new RuntimeException()); + return null; + }) + .when(queryService) + .executePlan(any(), any()); + execution.execute(); + + return this; + } + + Helper noCommittedLog() { + assertTrue(committedLog.getLatest().isEmpty()); + return this; + } + + Helper latestCommittedLogShouldBe(Long offsetId) { + assertTrue(committedLog.getLatest().isPresent()); + assertEquals(offsetId, committedLog.getLatest().get().getRight().getOffset()); + return this; + } + + Helper noOffsetLog() { + assertTrue(offsetLog.getLatest().isEmpty()); + return this; + } + + Helper latestOffsetLogShouldBe(Long offsetId) { + assertTrue(offsetLog.getLatest().isPresent()); + assertEquals(offsetId, offsetLog.getLatest().get().getRight().getOffset()); + return this; + } + } + + /** + * StreamingSource impl only for testing. + * + *

* initially, offset is -1, getLatestOffset() will return Optional.emtpy(). + *

* call addData() add offset by one. + */ + static class TestStreamingSource implements StreamingSource { + + private final AtomicLong offset = new AtomicLong(-1L); + + /** + * add offset by one. + */ + void addData() { + offset.incrementAndGet(); + } + + /** + * return offset if addData was called. + */ + @Override + public Optional getLatestOffset() { + if (offset.get() == -1) { + return Optional.empty(); + } else { + return Optional.of(new Offset(offset.get())); + } + } + + /** + * always return `empty` Batch regardless start and end offset. + */ + @Override + public Batch getBatch(Optional start, Offset end) { + return new Batch(new ArrayList<>()); + } + } +} From d63345b46c72338eae3fe3a60741e8720870a4be Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 8 Nov 2022 17:04:44 -0800 Subject: [PATCH 08/11] fix compile issue Signed-off-by: Peng Huo --- .../MicroBatchStreamingExecution.java | 5 +- .../MicroBatchStreamingExecutionTest.java | 48 ++++++++----------- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java index 6d398bfa7c..190387676d 100644 --- a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java @@ -19,6 +19,9 @@ import org.opensearch.sql.executor.QueryService; import org.opensearch.sql.planner.logical.LogicalPlan; +/** + * Micro batch streaming execution. + */ public class MicroBatchStreamingExecution { private static final Logger log = LogManager.getLogger(MicroBatchStreamingExecution.class); @@ -60,7 +63,7 @@ public MicroBatchStreamingExecution( } /** - * Execute micro-batch streaming execution. + * Pull the {@link Batch} from {@link StreamingSource} and execute the {@link Batch}. */ public void execute() { Long latestBatchId = offsetLog.getLatest().map(Pair::getKey).orElse(INITIAL_LATEST_BATCH_ID); diff --git a/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java b/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java index 4c0ca308bd..7fa8f4d670 100644 --- a/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java @@ -10,7 +10,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; -import java.util.ArrayList; import java.util.Collections; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -37,19 +36,12 @@ void executedSuccess() { @Test void executedFailed() { - streamingQuery() - .addData() - .executeFailed() - .latestOffsetLogShouldBe(0L) - .noCommittedLog(); + streamingQuery().addData().executeFailed().latestOffsetLogShouldBe(0L).noCommittedLog(); } @Test void noDataInSource() { - streamingQuery() - .executeSuccess() - .noOffsetLog() - .noCommittedLog(); + streamingQuery().executeSuccess().noOffsetLog().noCommittedLog(); } @Test @@ -143,7 +135,8 @@ Helper addData() { } Helper executeSuccess() { - lenient().doAnswer( + lenient() + .doAnswer( invocation -> { ResponseListener listener = invocation.getArgument(1); @@ -159,12 +152,14 @@ Helper executeSuccess() { } Helper executeFailed() { - lenient().doAnswer( - invocation -> { - ResponseListener listener = invocation.getArgument(1); - listener.onFailure(new RuntimeException()); - return null; - }) + lenient() + .doAnswer( + invocation -> { + ResponseListener listener = + invocation.getArgument(1); + listener.onFailure(new RuntimeException()); + return null; + }) .when(queryService) .executePlan(any(), any()); execution.execute(); @@ -198,23 +193,20 @@ Helper latestOffsetLogShouldBe(Long offsetId) { /** * StreamingSource impl only for testing. * - *

* initially, offset is -1, getLatestOffset() will return Optional.emtpy(). - *

* call addData() add offset by one. + *

initially, offset is -1, getLatestOffset() will return Optional.emtpy(). + * + *

call addData() add offset by one. */ static class TestStreamingSource implements StreamingSource { private final AtomicLong offset = new AtomicLong(-1L); - /** - * add offset by one. - */ + /** add offset by one. */ void addData() { offset.incrementAndGet(); } - /** - * return offset if addData was called. - */ + /** return offset if addData was called. */ @Override public Optional getLatestOffset() { if (offset.get() == -1) { @@ -224,12 +216,10 @@ public Optional getLatestOffset() { } } - /** - * always return `empty` Batch regardless start and end offset. - */ + /** always return `empty` Batch regardless start and end offset. */ @Override public Batch getBatch(Optional start, Offset end) { - return new Batch(new ArrayList<>()); + return new Batch(() -> "id"); } } } From 6aee254892b92d50db57bc3131bcae9172a8573d Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 9 Nov 2022 10:31:18 -0800 Subject: [PATCH 09/11] add PlanContext Signed-off-by: Peng Huo --- .../opensearch/sql/executor/QueryService.java | 14 ++- .../MicroBatchStreamingExecution.java | 4 +- .../opensearch/sql/planner/PlanContext.java | 31 +++++++ .../sql/executor/QueryServiceTest.java | 5 + .../MicroBatchStreamingExecutionTest.java | 91 +++++++++++++++---- .../sql/planner/PlanContextTest.java | 31 +++++++ 6 files changed, 154 insertions(+), 22 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/planner/PlanContext.java create mode 100644 core/src/test/java/org/opensearch/sql/planner/PlanContextTest.java diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index 0d1604efa7..32fda1ab2c 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -13,6 +13,7 @@ import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -31,22 +32,27 @@ public class QueryService { /** * Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response. + * Todo. deprecated this interface after finalize {@link PlanContext}. * * @param plan {@link UnresolvedPlan} * @param listener {@link ResponseListener} */ public void execute(UnresolvedPlan plan, ResponseListener listener) { - executePlan(analyze(plan), listener); + executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); } /** - * Todo. + * Execute the {@link UnresolvedPlan}, with {@link PlanContext} and using {@link ResponseListener} + * to get response. + * Todo. Pass split from PlanContext to ExecutionEngine in following PR. * * @param plan {@link LogicalPlan} + * @param planContext {@link PlanContext} * @param listener {@link ResponseListener} */ public void executePlan(LogicalPlan plan, + PlanContext planContext, ResponseListener listener) { try { executionEngine.execute(plan(plan), listener); @@ -72,14 +78,14 @@ public void explain(UnresolvedPlan plan, } /** - * Todo. + * Analyze {@link UnresolvedPlan}. */ public LogicalPlan analyze(UnresolvedPlan plan) { return analyzer.analyze(plan, new AnalysisContext()); } /** - * Todo. + * Translate {@link LogicalPlan} to {@link PhysicalPlan}. */ public PhysicalPlan plan(LogicalPlan plan) { return planner.plan(plan); diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java index 190387676d..4f25b9433f 100644 --- a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java @@ -17,6 +17,7 @@ import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalPlan; /** @@ -89,12 +90,11 @@ public void execute() { Optional availableOffsets = source.getLatestOffset(); if (hasNewData(availableOffsets, committedOffset)) { - // todo, add batch to execution context. Batch batch = source.getBatch(committedOffset, availableOffsets.get()); offsetLog.add(currentBatchId.get(), availableOffsets.get()); - queryService.executePlan( batchPlan, + new PlanContext(batch.getSplit()), new ResponseListener<>() { @Override public void onResponse(ExecutionEngine.QueryResponse response) { diff --git a/core/src/main/java/org/opensearch/sql/planner/PlanContext.java b/core/src/main/java/org/opensearch/sql/planner/PlanContext.java new file mode 100644 index 0000000000..3d43c02d61 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/PlanContext.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner; + +import java.util.Optional; +import lombok.Getter; +import org.opensearch.sql.storage.split.Split; + +/** + * Plan context hold planning related information. + */ +public class PlanContext { + + @Getter + private final Optional split; + + public PlanContext(Split split) { + this.split = Optional.of(split); + } + + private PlanContext(Optional split) { + this.split = split; + } + + public static PlanContext emptyPlanContext() { + return new PlanContext(Optional.empty()); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java index 2c70cf9f92..c65210e97e 100644 --- a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java @@ -25,6 +25,7 @@ import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -57,6 +58,9 @@ class QueryServiceTest { @Mock private ExecutionEngine.Schema schema; + @Mock + private PlanContext planContext; + @BeforeEach public void setUp() { lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan); @@ -177,6 +181,7 @@ public void testExecutePlanShouldPass() { queryService.executePlan( logicalPlan, + planContext, new ResponseListener<>() { @Override public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { diff --git a/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java b/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java index 7fa8f4d670..1a2b6e3f2a 100644 --- a/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java @@ -7,12 +7,18 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import lombok.EqualsAndHashCode; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; @@ -20,7 +26,9 @@ import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.storage.split.Split; @ExtendWith(MockitoExtension.class) class MicroBatchStreamingExecutionTest { @@ -29,29 +37,36 @@ class MicroBatchStreamingExecutionTest { void executedSuccess() { streamingQuery() .addData() - .executeSuccess() + .executeSuccess(0L) .latestOffsetLogShouldBe(0L) .latestCommittedLogShouldBe(0L); } @Test void executedFailed() { - streamingQuery().addData().executeFailed().latestOffsetLogShouldBe(0L).noCommittedLog(); + streamingQuery() + .addData() + .executeFailed() + .latestOffsetLogShouldBe(0L) + .noCommittedLog(); } @Test void noDataInSource() { - streamingQuery().executeSuccess().noOffsetLog().noCommittedLog(); + streamingQuery() + .neverProcess() + .noOffsetLog() + .noCommittedLog(); } @Test void noNewDataInSource() { streamingQuery() .addData() - .executeSuccess() + .executeSuccess(0L) .latestOffsetLogShouldBe(0L) .latestCommittedLogShouldBe(0L) - .executeSuccess() + .neverProcess() .latestOffsetLogShouldBe(0L) .latestCommittedLogShouldBe(0L); } @@ -60,11 +75,11 @@ void noNewDataInSource() { void addNewDataInSequenceAllExecuteSuccess() { streamingQuery() .addData() - .executeSuccess() + .executeSuccess(0L) .latestOffsetLogShouldBe(0L) .latestCommittedLogShouldBe(0L) .addData() - .executeSuccess() + .executeSuccess(1L) .latestOffsetLogShouldBe(1L) .latestCommittedLogShouldBe(1L); } @@ -73,14 +88,14 @@ void addNewDataInSequenceAllExecuteSuccess() { void addNewDataInSequenceExecuteFailedInBetween() { streamingQuery() .addData() - .executeSuccess() + .executeSuccess(0L) .latestOffsetLogShouldBe(0L) .latestCommittedLogShouldBe(0L) .addData() .executeFailed() .latestOffsetLogShouldBe(1L) .latestCommittedLogShouldBe(0L) - .executeSuccess() + .executeSuccess(1L) .latestOffsetLogShouldBe(1L) .latestCommittedLogShouldBe(1L); } @@ -89,7 +104,7 @@ void addNewDataInSequenceExecuteFailedInBetween() { void addNewDataInSequenceExecuteFailed() { streamingQuery() .addData() - .executeSuccess() + .executeSuccess(0L) .latestOffsetLogShouldBe(0L) .latestCommittedLogShouldBe(0L) .addData() @@ -134,18 +149,36 @@ Helper addData() { return this; } - Helper executeSuccess() { + Helper neverProcess() { + lenient() + .doAnswer( + invocation -> { + fail(); + return null; + }) + .when(queryService) + .executePlan(any(), any(), any()); + execution.execute(); + return this; + } + + Helper executeSuccess(Long... offsets) { lenient() .doAnswer( invocation -> { ResponseListener listener = - invocation.getArgument(1); + invocation.getArgument(2); listener.onResponse( new ExecutionEngine.QueryResponse(null, Collections.emptyList())); + + PlanContext planContext = invocation.getArgument(1); + assertTrue(planContext.getSplit().isPresent()); + assertEquals(new TestOffsetSplit(offsets), planContext.getSplit().get()); + return null; }) .when(queryService) - .executePlan(any(), any()); + .executePlan(any(), any(), any()); execution.execute(); return this; @@ -156,12 +189,13 @@ Helper executeFailed() { .doAnswer( invocation -> { ResponseListener listener = - invocation.getArgument(1); + invocation.getArgument(2); listener.onFailure(new RuntimeException()); + return null; }) .when(queryService) - .executePlan(any(), any()); + .executePlan(any(), any(), any()); execution.execute(); return this; @@ -219,7 +253,32 @@ public Optional getLatestOffset() { /** always return `empty` Batch regardless start and end offset. */ @Override public Batch getBatch(Optional start, Offset end) { - return new Batch(() -> "id"); + return new Batch( + new TestOffsetSplit( + start.map(v -> v.getOffset() + 1).orElse(0L), Long.min(offset.get(), + end.getOffset()))); + } + } + + @EqualsAndHashCode + static class TestOffsetSplit implements Split { + + private final List offsets; + + public TestOffsetSplit(Long start, Long end) { + this.offsets = new ArrayList<>(); + for (long l = start; l <= end; l++) { + this.offsets.add(l); + } + } + + public TestOffsetSplit(Long... offsets) { + this.offsets = Arrays.stream(offsets).collect(Collectors.toList()); + } + + @Override + public String getSplitId() { + return "id"; } } } diff --git a/core/src/test/java/org/opensearch/sql/planner/PlanContextTest.java b/core/src/test/java/org/opensearch/sql/planner/PlanContextTest.java new file mode 100644 index 0000000000..77ae78f77e --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/PlanContextTest.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.storage.split.Split; + +@ExtendWith(MockitoExtension.class) +class PlanContextTest { + + @Mock + private Split split; + + @Test + void createEmptyPlanContext() { + assertTrue(PlanContext.emptyPlanContext().getSplit().isEmpty()); + } + + @Test + void createPlanContextWithSplit() { + assertTrue(new PlanContext(split).getSplit().isPresent()); + } +} From 5b99c082e2a97af744b397f9538b37c9c792881b Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 9 Nov 2022 10:43:36 -0800 Subject: [PATCH 10/11] bump jackson version Signed-off-by: Peng Huo --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index b25c36f687..bbfc122005 100644 --- a/build.gradle +++ b/build.gradle @@ -8,8 +8,8 @@ buildscript { ext { opensearch_version = System.getProperty("opensearch.version", "2.4.0-SNAPSHOT") spring_version = "5.3.22" - jackson_version = "2.13.4" - jackson_databind_version = "2.13.4.2" + jackson_version = "2.14.0" + jackson_databind_version = "2.14.0" isSnapshot = "true" == System.getProperty("build.snapshot", "true") buildVersionQualifier = System.getProperty("build.version_qualifier", "") version_tokens = opensearch_version.tokenize('-') From faff812bafe3a36ee868d48dc574c1079dc9b5cd Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 11 Nov 2022 19:31:54 -0800 Subject: [PATCH 11/11] fix bug Signed-off-by: Peng Huo --- .../opensearch/sql/executor/QueryService.java | 6 +++- .../sql/executor/QueryServiceTest.java | 28 ++++++++++++++++--- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index 32fda1ab2c..dcdf6bc010 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -39,7 +39,11 @@ public class QueryService { */ public void execute(UnresolvedPlan plan, ResponseListener listener) { - executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); + try { + executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); + } catch (Exception e) { + listener.onFailure(e); + } } /** diff --git a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java index c65210e97e..d1ffa51fcc 100644 --- a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java @@ -8,6 +8,7 @@ package org.opensearch.sql.executor; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; @@ -64,7 +65,7 @@ class QueryServiceTest { @BeforeEach public void setUp() { lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan); - when(planner.plan(any())).thenReturn(plan); + lenient().when(planner.plan(any())).thenReturn(plan); queryService = new QueryService(analyzer, executionEngine, planner); } @@ -86,7 +87,7 @@ public void testExecuteShouldPass() { new ResponseListener<>() { @Override public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { - + assertNotNull(pplQueryResponse); } @Override @@ -115,7 +116,7 @@ public void testExplainShouldPass() { new ResponseListener<>() { @Override public void onResponse(ExecutionEngine.ExplainResponse pplQueryResponse) { - + assertNotNull(pplQueryResponse); } @Override @@ -185,7 +186,7 @@ public void testExecutePlanShouldPass() { new ResponseListener<>() { @Override public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { - + assertNotNull(pplQueryResponse); } @Override @@ -194,4 +195,23 @@ public void onFailure(Exception e) { } }); } + + @Test + public void analyzeExceptionShouldBeCached() { + when(analyzer.analyze(any(), any())).thenThrow(IllegalStateException.class); + + queryService.execute( + ast, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { + fail(); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalStateException); + } + }); + } }