Skip to content

Commit

Permalink
[Flink-33859] Support OpenSearch v2
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Apr 16, 2024
1 parent bdbaee7 commit 5ebe41c
Show file tree
Hide file tree
Showing 23 changed files with 623 additions and 205 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Constructor <org.apache.flink.connector.opensearch.sink.BulkProcessorConfig.<init>(int, int, long, org.apache.flink.connector.opensearch.sink.FlushBackoffType, int, long)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (BulkProcessorConfig.java:44)
Constructor <org.apache.flink.connector.opensearch.table.OpensearchConfiguration.<init>(org.apache.flink.configuration.ReadableConfig)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (OpensearchConfiguration.java:61)
Method <org.apache.flink.connector.opensearch.table.IndexGeneratorFactory.createRuntimeIndexGenerator(java.lang.String, [Ljava.lang.String;, [Lorg.apache.flink.table.types.DataType;, org.apache.flink.connector.opensearch.table.IndexGeneratorFactory$IndexHelper, java.time.ZoneId)> has parameter of type <[Lorg.apache.flink.table.types.DataType;> in (IndexGeneratorFactory.java:0)
Method <org.apache.flink.connector.opensearch.table.IndexGeneratorFactory.createRuntimeIndexGenerator(java.lang.String, [Ljava.lang.String;, [Lorg.apache.flink.table.types.DataType;, org.apache.flink.connector.opensearch.table.IndexGeneratorFactory$IndexHelper, java.time.ZoneId)> has parameter of type <[Lorg.apache.flink.table.types.DataType;> in (IndexGeneratorFactory.java:0)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.flink.connector.opensearch.sink;

import org.apache.flink.util.FlinkRuntimeException;

class DefaultFailureHandler implements FailureHandler {

@Override
public void onFailure(Throwable failure) {
if (failure instanceof FlinkRuntimeException) {
throw (FlinkRuntimeException) failure;
}
throw new FlinkRuntimeException(failure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does not
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.opensearch.sink;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkRuntimeException;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.rest.RestStatus;

import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A strict implementation that fails if either the whole bulk request failed or any of its actions.
*/
class DefaultBulkResponseInspector implements BulkResponseInspector {

@VisibleForTesting final FailureHandler failureHandler;

DefaultBulkResponseInspector() {
this(new DefaultFailureHandler());
}

DefaultBulkResponseInspector(FailureHandler failureHandler) {
this.failureHandler = checkNotNull(failureHandler);
}

@Override
public void inspect(BulkRequest request, BulkResponse response) {
if (!response.hasFailures()) {
return;
}

Throwable chainedFailures = null;
for (int i = 0; i < response.getItems().length; i++) {
final BulkItemResponse itemResponse = response.getItems()[i];
if (!itemResponse.isFailed()) {
continue;
}
final Throwable failure = itemResponse.getFailure().getCause();
if (failure == null) {
continue;
}
final RestStatus restStatus = itemResponse.getFailure().getStatus();
final DocWriteRequest<?> actionRequest = request.requests().get(i);

chainedFailures =
firstOrSuppressed(
wrapException(restStatus, failure, actionRequest), chainedFailures);
}
if (chainedFailures == null) {
return;
}
failureHandler.onFailure(chainedFailures);
}

private static Throwable wrapException(
RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?> actionRequest) {
if (restStatus == null) {
return new FlinkRuntimeException(
String.format("Single action %s of bulk request failed.", actionRequest),
rootFailure);
} else {
return new FlinkRuntimeException(
String.format(
"Single action %s of bulk request failed with status %s.",
actionRequest, restStatus.getStatus()),
rootFailure);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
import org.apache.flink.util.InstantiationUtil;

import org.apache.http.HttpHost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand All @@ -53,7 +52,6 @@
import java.util.List;
import java.util.function.BiConsumer;

import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
import static org.apache.flink.util.Preconditions.checkNotNull;

class OpensearchWriter<IN> implements SinkWriter<IN> {
Expand Down Expand Up @@ -325,63 +323,6 @@ public void add(UpdateRequest... updateRequests) {
}
}

/**
* A strict implementation that fails if either the whole bulk request failed or any of its
* actions.
*/
static class DefaultBulkResponseInspector implements BulkResponseInspector {

@VisibleForTesting final FailureHandler failureHandler;

DefaultBulkResponseInspector() {
this(new DefaultFailureHandler());
}

DefaultBulkResponseInspector(FailureHandler failureHandler) {
this.failureHandler = checkNotNull(failureHandler);
}

@Override
public void inspect(BulkRequest request, BulkResponse response) {
if (!response.hasFailures()) {
return;
}

Throwable chainedFailures = null;
for (int i = 0; i < response.getItems().length; i++) {
final BulkItemResponse itemResponse = response.getItems()[i];
if (!itemResponse.isFailed()) {
continue;
}
final Throwable failure = itemResponse.getFailure().getCause();
if (failure == null) {
continue;
}
final RestStatus restStatus = itemResponse.getFailure().getStatus();
final DocWriteRequest<?> actionRequest = request.requests().get(i);

chainedFailures =
firstOrSuppressed(
wrapException(restStatus, failure, actionRequest), chainedFailures);
}
if (chainedFailures == null) {
return;
}
failureHandler.onFailure(chainedFailures);
}
}

static class DefaultFailureHandler implements FailureHandler {

@Override
public void onFailure(Throwable failure) {
if (failure instanceof FlinkRuntimeException) {
throw (FlinkRuntimeException) failure;
}
throw new FlinkRuntimeException(failure);
}
}

@Internal
interface BulkRequestConsumerFactory
extends BiConsumer<BulkRequest, ActionListener<BulkResponse>> {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.connector.opensearch.sink;

import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.opensearch.OpensearchUtil;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
import org.apache.flink.connector.opensearch.test.DockerImageVersions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.execution;

import org.apache.flink.streaming.tests.OpensearchSinkE2ECase;

/**
* This is a copy of {@link CheckpointingMode} from flink-core module introduced in Flink 1.20. We
* need it here to make {@link OpensearchSinkE2ECase} compatible with earlier releases. Could be
* removed together with dropping support of Flink 1.19.
*/
public enum CheckpointingMode {
EXACTLY_ONCE,
AT_LEAST_ONCE;

private CheckpointingMode() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public OpensearchSinkE2ECase() throws Exception {}
.toUri()
.toURL()));

@Override
/** Could be removed together with dropping support of Flink 1.19. */
@Deprecated
protected void checkResultWithSemantic(
ExternalSystemDataReader<ComparableTuple2<Integer, String>> reader,
List<ComparableTuple2<Integer, String>> testData,
Expand All @@ -109,8 +110,46 @@ protected void checkResultWithSemantic(
READER_RETRY_ATTEMPTS);
}

protected void checkResultWithSemantic(
ExternalSystemDataReader<ComparableTuple2<Integer, String>> reader,
List<ComparableTuple2<Integer, String>> testData,
org.apache.flink.core.execution.CheckpointingMode semantic)
throws Exception {
waitUntilCondition(
() -> {
try {
List<ComparableTuple2<Integer, String>> result =
reader.poll(Duration.ofMillis(READER_TIMEOUT));
assertThat(sort(result).iterator())
.matchesRecordsFromSource(
Collections.singletonList(sort(testData)),
convertFromCheckpointingMode(semantic));
return true;
} catch (Throwable t) {
LOG.warn("Polled results not as expected", t);
return false;
}
},
5000,
READER_RETRY_ATTEMPTS);
}

private static <T extends Comparable<T>> List<T> sort(List<T> list) {
Collections.sort(list);
return list;
}

/** Could be removed together with dropping support of Flink 1.19. */
@Deprecated
private static org.apache.flink.streaming.api.CheckpointingMode convertFromCheckpointingMode(
org.apache.flink.core.execution.CheckpointingMode semantic) {
switch (semantic) {
case EXACTLY_ONCE:
return org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
case AT_LEAST_ONCE:
return org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
default:
throw new IllegalArgumentException("Unsupported semantic: " + semantic);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
org.apache.flink.connector.opensearch.sink.Opensearch2SinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.opensearch.sink.Opensearch2WriterITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.opensearch.table.Opensearch2DynamicSinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
Loading

0 comments on commit 5ebe41c

Please sign in to comment.