From 28c37a24d443eca06383d2980a59162fa238fd7b Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Fri, 5 May 2023 18:34:10 +0800 Subject: [PATCH] fixed --- docs/en/seatunnel-engine/deployment.md | 2 +- .../connector-seatunnel-e2e-base/pom.xml | 23 ++ .../engine/e2e/ClusterFaultToleranceIT.java | 219 ++++++++++++++++++ 3 files changed, 243 insertions(+), 1 deletion(-) diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md index ddc33078011f..2c3019f7fa11 100644 --- a/docs/en/seatunnel-engine/deployment.md +++ b/docs/en/seatunnel-engine/deployment.md @@ -218,7 +218,7 @@ map: fs.oss.accessKeyId: OSS access key id fs.oss.accessKeySecret: OSS access key secret fs.oss.endpoint: OSS endpoint - fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvid + fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider ``` ## 6. Config SeaTunnel Engine Client diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml index 4bc151db9d1f..a1315565349f 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml @@ -29,6 +29,8 @@ 2.4 + 3.0.0 + 4.1.89.Final @@ -69,6 +71,27 @@ + + + org.apache.hadoop + hadoop-aliyun + ${hadoop-aliyun.version} + test + + + + com.aliyun.oss + aliyun-sdk-oss + 2.8.3 + test + + + + io.netty + netty-buffer + ${netty-buffer.version} + test + diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java index e22d417ef920..f7571968e8ff 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java @@ -935,4 +935,223 @@ public void testStreamJobRestoreInAllNodeDown() } } } + + @SuppressWarnings("checkstyle:RegexpSingleline") + @Test + @Disabled + public void testStreamJobRestoreFromOssInAllNodeDown() + throws ExecutionException, InterruptedException { + String OSS_BUCKET_NAME = "oss://your bucket name/"; + String OSS_ENDPOINT = "your oss endpoint"; + String OSS_ACCESS_KEY_ID = "oss accessKey id"; + String OSS_ACCESS_KEY_SECRET = "oss accessKey secret"; + + String testCaseName = "testStreamJobRestoreFromOssInAllNodeDown"; + String testClusterName = + "ClusterFaultToleranceIT_testStreamJobRestoreFromOssInAllNodeDown_" + + System.currentTimeMillis(); + int testRowNumber = 1000; + int testParallelism = 6; + HazelcastInstanceImpl node1 = null; + HazelcastInstanceImpl node2 = null; + SeaTunnelClient engineClient = null; + + try { + String yaml = + "hazelcast:\n" + + " cluster-name: seatunnel\n" + + " network:\n" + + " rest-api:\n" + + " enabled: true\n" + + " endpoint-groups:\n" + + " CLUSTER_WRITE:\n" + + " enabled: true\n" + + " join:\n" + + " tcp-ip:\n" + + " enabled: true\n" + + " member-list:\n" + + " - localhost\n" + + " port:\n" + + " auto-increment: true\n" + + " port-count: 100\n" + + " port: 5801\n" + + " map:\n" + + " engine*:\n" + + " map-store:\n" + + " enabled: true\n" + + " initial-mode: EAGER\n" + + " factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n" + + " properties:\n" + + " type: hdfs\n" + + " namespace: /seatunnel-test/imap\n" + + " storage.type: oss\n" + + " clusterName: " + + testClusterName + + "\n" + + " oss.bucket: " + + OSS_BUCKET_NAME + + "\n" + + " fs.oss.accessKeyId: " + + OSS_ACCESS_KEY_ID + + "\n" + + " fs.oss.accessKeySecret: " + + OSS_ACCESS_KEY_SECRET + + "\n" + + " fs.oss.endpoint: " + + OSS_ENDPOINT + + "\n" + + " fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider\n" + + " properties:\n" + + " hazelcast.invocation.max.retry.count: 200\n" + + " hazelcast.tcp.join.port.try.count: 30\n" + + " hazelcast.invocation.retry.pause.millis: 2000\n" + + " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n" + + " hazelcast.logging.type: log4j2\n" + + " hazelcast.operation.generic.thread.count: 200\n"; + + Config hazelcastConfig = Config.loadFromString(yaml); + hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName)); + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.setHazelcastConfig(hazelcastConfig); + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + // waiting all node added to cluster + HazelcastInstanceImpl finalNode = node1; + Awaitility.await() + .atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 2, finalNode.getCluster().getMembers().size())); + + Common.setDeployMode(DeployMode.CLIENT); + ImmutablePair testResources = + createTestResources( + testCaseName, JobMode.STREAMING, testRowNumber, testParallelism); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(testCaseName); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); + engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(testResources.getRight(), jobConfig); + ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + Long jobId = clientJobProxy.getJobId(); + + ClientJobProxy finalClientJobProxy = clientJobProxy; + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + // Wait some tasks commit finished, and we can get rows from the + // sink target dir + Thread.sleep(2000); + System.out.println( + "\n=================================" + + FileUtils.getFileLineNumberFromDir( + testResources.getLeft()) + + "=================================\n"); + Assertions.assertTrue( + JobStatus.RUNNING.equals(finalClientJobProxy.getJobStatus()) + && FileUtils.getFileLineNumberFromDir( + testResources.getLeft()) + > 1); + }); + + Thread.sleep(5000); + // shutdown all node + node1.shutdown(); + node2.shutdown(); + + log.info( + "==========================================All node is done========================================"); + Thread.sleep(10000); + + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + log.info( + "==========================================All node is start, begin check node size ========================================"); + // waiting all node added to cluster + HazelcastInstanceImpl restoreFinalNode = node1; + Awaitility.await() + .atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 2, restoreFinalNode.getCluster().getMembers().size())); + + log.info( + "==========================================All node is running========================================"); + engineClient = new SeaTunnelClient(clientConfig); + ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId); + CompletableFuture waitForJobCompleteFuture = + CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete); + + Thread.sleep(10000); + + Awaitility.await() + .atMost(100000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + // Wait job write all rows in file + Thread.sleep(2000); + System.out.println( + "\n=================================" + + FileUtils.getFileLineNumberFromDir( + testResources.getLeft()) + + "=================================\n"); + JobStatus jobStatus = null; + try { + jobStatus = newClientJobProxy.getJobStatus(); + } catch (Exception e) { + log.error(ExceptionUtils.getMessage(e)); + } + + Assertions.assertTrue( + JobStatus.RUNNING.equals(jobStatus) + && testRowNumber * testParallelism + == FileUtils.getFileLineNumberFromDir( + testResources.getLeft())); + }); + + // sleep 10s and expect the job don't write more rows. + Thread.sleep(10000); + log.info( + "==========================================Cancel Job========================================"); + newClientJobProxy.cancelJob(); + + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertTrue( + waitForJobCompleteFuture.isDone() + && JobStatus.CANCELED.equals( + waitForJobCompleteFuture.get()))); + // prove that the task was restarted + Long fileLineNumberFromDir = + FileUtils.getFileLineNumberFromDir(testResources.getLeft()); + Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir); + + } finally { + log.info( + "==========================================Clean test resource ========================================"); + if (engineClient != null) { + engineClient.shutdown(); + } + + if (node1 != null) { + node1.shutdown(); + } + + if (node2 != null) { + node2.shutdown(); + } + } + } }