diff --git a/inlong-manager/manager-plugins/base/pom.xml b/inlong-manager/manager-plugins/base/pom.xml
index f49388b5bae..9c738b7e69c 100644
--- a/inlong-manager/manager-plugins/base/pom.xml
+++ b/inlong-manager/manager-plugins/base/pom.xml
@@ -45,17 +45,6 @@
provided
-
- org.apache.inlong
- manager-plugins-flink-v1.13
- ${project.version}
-
-
- org.apache.inlong
- manager-plugins-flink-v1.15
- ${project.version}
-
-
org.projectlombok
lombok
@@ -116,4 +105,40 @@
+
+
+
+ v1.13
+
+ true
+
+
+
+ org.apache.inlong
+ manager-plugins-flink-v1.13
+ ${project.version}
+
+
+
+
+ v1.15
+
+
+ org.apache.inlong
+ manager-plugins-flink-v1.15
+ ${project.version}
+
+
+
+
+ v1.18
+
+
+ org.apache.inlong
+ manager-plugins-flink-v1.18
+ ${project.version}
+
+
+
+
diff --git a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
index 2779612f404..bc9c164d6ad 100644
--- a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
+++ b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
@@ -52,6 +52,15 @@
+
+
+ ../manager-plugins-flink-v1.18/target
+ ./
+
+ manager-plugins-flink-v1.18.jar
+
+
+
../manager-plugins-flink-v1.13/target
@@ -75,5 +84,17 @@
manager-plugins-flink-v1.15.jar
+
+
+
+ ../manager-plugins-flink-v1.18/target
+ ./flink-v1.18
+
+ flink-*.jar
+ sort-flink-*.jar
+ scala-*.jar
+ manager-plugins-flink-v1.18.jar
+
+
\ No newline at end of file
diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml
new file mode 100644
index 00000000000..6ad32208c54
--- /dev/null
+++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml
@@ -0,0 +1,95 @@
+
+
+
+ 4.0.0
+
+ org.apache.inlong
+ manager-plugins
+ 1.13.0-SNAPSHOT
+
+
+ manager-plugins-flink-v1.18
+ Apache InLong - Manager Plugins Flink v1.18
+
+
+ ${project.parent.parent.parent.basedir}
+ 1.18.1
+
+
+
+
+ org.apache.inlong
+ sort-flink-dependencies-v1.18
+ ${project.version}
+
+
+ org.apache.flink
+ flink-file-sink-common
+
+
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-shaded-jackson
+ 2.15.3-18.0
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+ manager-plugins-flink-v1.18
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-dependencies
+
+ copy-dependencies
+
+ package
+
+ target/
+
+
+
+
+
+
+
diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
new file mode 100644
index 00000000000..58094e9bf72
--- /dev/null
+++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Flink service, such as save or get flink config info, etc.
+ */
+@Slf4j
+public class FlinkClientService {
+
+ private final Configuration configuration;
+ private final RestClusterClient flinkClient;
+
+ public FlinkClientService(Configuration configuration) throws Exception {
+ this.configuration = configuration;
+ this.flinkClient = getFlinkClient();
+ }
+
+ /**
+ * Get the Flink Client.
+ */
+ public RestClusterClient getFlinkClient() throws Exception {
+ try {
+ return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
+ } catch (Exception e) {
+ log.error("get flink client failed: ", e);
+ throw new Exception("get flink client failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Get the job status by the given job id.
+ */
+ public JobStatus getJobStatus(String jobId) throws Exception {
+ try {
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture jobStatus = flinkClient.getJobStatus(jobID);
+ return jobStatus.get();
+ } catch (Exception e) {
+ log.error("get job status by jobId={} failed: ", jobId, e);
+ throw new Exception("get job status by jobId=" + jobId + " failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Get job detail by the given job id.
+ */
+ public JobDetailsInfo getJobDetail(String jobId) throws Exception {
+ try {
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture jobDetails = flinkClient.getJobDetails(jobID);
+ return jobDetails.get();
+ } catch (Exception e) {
+ log.error("get job detail by jobId={} failed: ", jobId, e);
+ throw new Exception("get job detail by jobId=" + jobId + " failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Stop the Flink job with the savepoint.
+ */
+ public String stopJob(String jobId, boolean isDrain, String savepointDirectory) throws Exception {
+ try {
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture stopResult = flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory,
+ SavepointFormatType.CANONICAL);
+ return stopResult.get();
+ } catch (Exception e) {
+ log.error("stop job {} failed and savepoint directory is {} : ", jobId, savepointDirectory, e);
+ throw new Exception("stop job " + jobId + " failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Cancel the Flink job.
+ */
+ public void cancelJob(String jobId) throws Exception {
+ try {
+ JobID jobID = JobID.fromHexString(jobId);
+ flinkClient.cancel(jobID);
+ } catch (Exception e) {
+ log.error("cancel job {} failed: ", jobId, e);
+ throw new Exception("cancel job " + jobId + " failed: " + e.getMessage());
+ }
+ }
+}
diff --git a/inlong-manager/manager-plugins/pom.xml b/inlong-manager/manager-plugins/pom.xml
index 6d3e1cd764b..4bb92e96e08 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/pom.xml
@@ -29,8 +29,6 @@
Apache InLong - Manager Plugins
- manager-plugins-flink-v1.13
- manager-plugins-flink-v1.15
base
@@ -38,4 +36,35 @@
${project.parent.parent.basedir}
+
+
+ flink-all-version
+
+ true
+
+
+ manager-plugins-flink-v1.13
+ manager-plugins-flink-v1.15
+ manager-plugins-flink-v1.18
+
+
+
+ v1.13
+
+ manager-plugins-flink-v1.13
+
+
+
+ v1.15
+
+ manager-plugins-flink-v1.15
+
+
+
+ v1.18
+
+ manager-plugins-flink-v1.18
+
+
+