Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed May 5, 2023
1 parent 9f1cb2c commit 28c37a2
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 1 deletion.
2 changes: 1 addition & 1 deletion docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ map:
fs.oss.accessKeyId: OSS access key id
fs.oss.accessKeySecret: OSS access key secret
fs.oss.endpoint: OSS endpoint
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvid
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
```

## 6. Config SeaTunnel Engine Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

<properties>
<maven-jar-plugin.version>2.4</maven-jar-plugin.version>
<hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
<netty-buffer.version>4.1.89.Final</netty-buffer.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -69,6 +71,27 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop-aliyun.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>2.8.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty-buffer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,4 +935,223 @@ public void testStreamJobRestoreInAllNodeDown()
}
}
}

@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
@Disabled
public void testStreamJobRestoreFromOssInAllNodeDown()
throws ExecutionException, InterruptedException {
String OSS_BUCKET_NAME = "oss://your bucket name/";
String OSS_ENDPOINT = "your oss endpoint";
String OSS_ACCESS_KEY_ID = "oss accessKey id";
String OSS_ACCESS_KEY_SECRET = "oss accessKey secret";

String testCaseName = "testStreamJobRestoreFromOssInAllNodeDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreFromOssInAllNodeDown_"
+ System.currentTimeMillis();
int testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
SeaTunnelClient engineClient = null;

try {
String yaml =
"hazelcast:\n"
+ " cluster-name: seatunnel\n"
+ " network:\n"
+ " rest-api:\n"
+ " enabled: true\n"
+ " endpoint-groups:\n"
+ " CLUSTER_WRITE:\n"
+ " enabled: true\n"
+ " join:\n"
+ " tcp-ip:\n"
+ " enabled: true\n"
+ " member-list:\n"
+ " - localhost\n"
+ " port:\n"
+ " auto-increment: true\n"
+ " port-count: 100\n"
+ " port: 5801\n"
+ " map:\n"
+ " engine*:\n"
+ " map-store:\n"
+ " enabled: true\n"
+ " initial-mode: EAGER\n"
+ " factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n"
+ " properties:\n"
+ " type: hdfs\n"
+ " namespace: /seatunnel-test/imap\n"
+ " storage.type: oss\n"
+ " clusterName: "
+ testClusterName
+ "\n"
+ " oss.bucket: "
+ OSS_BUCKET_NAME
+ "\n"
+ " fs.oss.accessKeyId: "
+ OSS_ACCESS_KEY_ID
+ "\n"
+ " fs.oss.accessKeySecret: "
+ OSS_ACCESS_KEY_SECRET
+ "\n"
+ " fs.oss.endpoint: "
+ OSS_ENDPOINT
+ "\n"
+ " fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider\n"
+ " properties:\n"
+ " hazelcast.invocation.max.retry.count: 200\n"
+ " hazelcast.tcp.join.port.try.count: 30\n"
+ " hazelcast.invocation.retry.pause.millis: 2000\n"
+ " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ " hazelcast.logging.type: log4j2\n"
+ " hazelcast.operation.generic.thread.count: 200\n";

Config hazelcastConfig = Config.loadFromString(yaml);
hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName));
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, finalNode.getCluster().getMembers().size()));

Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
createTestResources(
testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testCaseName);

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Long jobId = clientJobProxy.getJobId();

ClientJobProxy finalClientJobProxy = clientJobProxy;
Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait some tasks commit finished, and we can get rows from the
// sink target dir
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
Assertions.assertTrue(
JobStatus.RUNNING.equals(finalClientJobProxy.getJobStatus())
&& FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
> 1);
});

Thread.sleep(5000);
// shutdown all node
node1.shutdown();
node2.shutdown();

log.info(
"==========================================All node is done========================================");
Thread.sleep(10000);

node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

log.info(
"==========================================All node is start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, restoreFinalNode.getCluster().getMembers().size()));

log.info(
"==========================================All node is running========================================");
engineClient = new SeaTunnelClient(clientConfig);
ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId);
CompletableFuture<JobStatus> waitForJobCompleteFuture =
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);

Thread.sleep(10000);

Awaitility.await()
.atMost(100000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
JobStatus jobStatus = null;
try {
jobStatus = newClientJobProxy.getJobStatus();
} catch (Exception e) {
log.error(ExceptionUtils.getMessage(e));
}

Assertions.assertTrue(
JobStatus.RUNNING.equals(jobStatus)
&& testRowNumber * testParallelism
== FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
});

// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
log.info(
"==========================================Cancel Job========================================");
newClientJobProxy.cancelJob();

Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
waitForJobCompleteFuture.isDone()
&& JobStatus.CANCELED.equals(
waitForJobCompleteFuture.get())));
// prove that the task was restarted
Long fileLineNumberFromDir =
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);

} finally {
log.info(
"==========================================Clean test resource ========================================");
if (engineClient != null) {
engineClient.shutdown();
}

if (node1 != null) {
node1.shutdown();
}

if (node2 != null) {
node2.shutdown();
}
}
}
}

0 comments on commit 28c37a2

Please sign in to comment.