From 68d109cda0f07e5dbce13d7cca317b63f011a371 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Tue, 16 Apr 2024 18:20:44 +0200 Subject: [PATCH] [FLINK-35094] Apply fix for Opensearch2 --- .../core/execution/CheckpointingMode.java | 32 +++++++++++++++ .../tests/OpensearchSinkE2ECase.java | 41 ++++++++++++++++++- 2 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 flink-connector-opensearch2-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java diff --git a/flink-connector-opensearch2-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java b/flink-connector-opensearch2-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java new file mode 100644 index 0000000..f003dd5 --- /dev/null +++ b/flink-connector-opensearch2-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.streaming.tests.OpensearchSinkE2ECase; + +/** + * This is a copy of {@link CheckpointingMode} from flink-core module introduced in Flink 1.20. We + * need it here to make {@link OpensearchSinkE2ECase} compatible with earlier releases. Could be + * removed together with dropping support of Flink 1.19. + */ +public enum CheckpointingMode { + EXACTLY_ONCE, + AT_LEAST_ONCE; + + private CheckpointingMode() {} +} diff --git a/flink-connector-opensearch2-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java b/flink-connector-opensearch2-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java index 44ed74d..eb14b0f 100644 --- a/flink-connector-opensearch2-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java +++ b/flink-connector-opensearch2-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java @@ -85,7 +85,8 @@ public OpensearchSinkE2ECase() throws Exception {} .toUri() .toURL())); - @Override + /** Could be removed together with dropping support of Flink 1.19. */ + @Deprecated protected void checkResultWithSemantic( ExternalSystemDataReader> reader, List> testData, @@ -109,8 +110,46 @@ protected void checkResultWithSemantic( READER_RETRY_ATTEMPTS); } + protected void checkResultWithSemantic( + ExternalSystemDataReader> reader, + List> testData, + org.apache.flink.core.execution.CheckpointingMode semantic) + throws Exception { + waitUntilCondition( + () -> { + try { + List> result = + reader.poll(Duration.ofMillis(READER_TIMEOUT)); + assertThat(sort(result).iterator()) + .matchesRecordsFromSource( + Collections.singletonList(sort(testData)), + convertFromCheckpointingMode(semantic)); + return true; + } catch (Throwable t) { + LOG.warn("Polled results not as expected", t); + return false; + } + }, + 5000, + READER_RETRY_ATTEMPTS); + } + private static > List sort(List list) { Collections.sort(list); return list; } + + /** Could be removed together with dropping support of Flink 1.19. */ + @Deprecated + private static org.apache.flink.streaming.api.CheckpointingMode convertFromCheckpointingMode( + org.apache.flink.core.execution.CheckpointingMode semantic) { + switch (semantic) { + case EXACTLY_ONCE: + return org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; + case AT_LEAST_ONCE: + return org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE; + default: + throw new IllegalArgumentException("Unsupported semantic: " + semantic); + } + } }