Skip to content

Commit

Permalink
fixed bug & add e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed May 6, 2023
1 parent c4c5b99 commit 2375378
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 32 deletions.
2 changes: 1 addition & 1 deletion docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ map:
fs.oss.accessKeyId: OSS access key id
fs.oss.accessKeySecret: OSS access key secret
fs.oss.endpoint: OSS endpoint
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvid
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
```

## 6. Config SeaTunnel Engine Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

<properties>
<maven-jar-plugin.version>2.4</maven-jar-plugin.version>
<hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
<netty-buffer.version>4.1.89.Final</netty-buffer.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -69,6 +71,27 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop-aliyun.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>2.8.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty-buffer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,4 +935,223 @@ public void testStreamJobRestoreInAllNodeDown()
}
}
}

@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
@Disabled
public void testStreamJobRestoreFromOssInAllNodeDown()
throws ExecutionException, InterruptedException {
String OSS_BUCKET_NAME = "oss://your bucket name/";
String OSS_ENDPOINT = "your oss endpoint";
String OSS_ACCESS_KEY_ID = "oss accessKey id";
String OSS_ACCESS_KEY_SECRET = "oss accessKey secret";

String testCaseName = "testStreamJobRestoreFromOssInAllNodeDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreFromOssInAllNodeDown_"
+ System.currentTimeMillis();
int testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
SeaTunnelClient engineClient = null;

try {
String yaml =
"hazelcast:\n"
+ " cluster-name: seatunnel\n"
+ " network:\n"
+ " rest-api:\n"
+ " enabled: true\n"
+ " endpoint-groups:\n"
+ " CLUSTER_WRITE:\n"
+ " enabled: true\n"
+ " join:\n"
+ " tcp-ip:\n"
+ " enabled: true\n"
+ " member-list:\n"
+ " - localhost\n"
+ " port:\n"
+ " auto-increment: true\n"
+ " port-count: 100\n"
+ " port: 5801\n"
+ " map:\n"
+ " engine*:\n"
+ " map-store:\n"
+ " enabled: true\n"
+ " initial-mode: EAGER\n"
+ " factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n"
+ " properties:\n"
+ " type: hdfs\n"
+ " namespace: /seatunnel-test/imap\n"
+ " storage.type: oss\n"
+ " clusterName: "
+ testClusterName
+ "\n"
+ " oss.bucket: "
+ OSS_BUCKET_NAME
+ "\n"
+ " fs.oss.accessKeyId: "
+ OSS_ACCESS_KEY_ID
+ "\n"
+ " fs.oss.accessKeySecret: "
+ OSS_ACCESS_KEY_SECRET
+ "\n"
+ " fs.oss.endpoint: "
+ OSS_ENDPOINT
+ "\n"
+ " fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider\n"
+ " properties:\n"
+ " hazelcast.invocation.max.retry.count: 200\n"
+ " hazelcast.tcp.join.port.try.count: 30\n"
+ " hazelcast.invocation.retry.pause.millis: 2000\n"
+ " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ " hazelcast.logging.type: log4j2\n"
+ " hazelcast.operation.generic.thread.count: 200\n";

Config hazelcastConfig = Config.loadFromString(yaml);
hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName));
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, finalNode.getCluster().getMembers().size()));

Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
createTestResources(
testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testCaseName);

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Long jobId = clientJobProxy.getJobId();

ClientJobProxy finalClientJobProxy = clientJobProxy;
Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait some tasks commit finished, and we can get rows from the
// sink target dir
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
Assertions.assertTrue(
JobStatus.RUNNING.equals(finalClientJobProxy.getJobStatus())
&& FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
> 1);
});

Thread.sleep(5000);
// shutdown all node
node1.shutdown();
node2.shutdown();

log.info(
"==========================================All node is done========================================");
Thread.sleep(10000);

node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

log.info(
"==========================================All node is start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, restoreFinalNode.getCluster().getMembers().size()));

log.info(
"==========================================All node is running========================================");
engineClient = new SeaTunnelClient(clientConfig);
ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId);
CompletableFuture<JobStatus> waitForJobCompleteFuture =
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);

Thread.sleep(10000);

Awaitility.await()
.atMost(100000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
JobStatus jobStatus = null;
try {
jobStatus = newClientJobProxy.getJobStatus();
} catch (Exception e) {
log.error(ExceptionUtils.getMessage(e));
}

Assertions.assertTrue(
JobStatus.RUNNING.equals(jobStatus)
&& testRowNumber * testParallelism
== FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
});

// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
log.info(
"==========================================Cancel Job========================================");
newClientJobProxy.cancelJob();

Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
waitForJobCompleteFuture.isDone()
&& JobStatus.CANCELED.equals(
waitForJobCompleteFuture.get())));
// prove that the task was restarted
Long fileLineNumberFromDir =
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);

} finally {
log.info(
"==========================================Clean test resource ========================================");
if (engineClient != null) {
engineClient.shutdown();
}

if (node1 != null) {
node1.shutdown();
}

if (node2 != null) {
node2.shutdown();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.api.common.metrics.MetricTags;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
Expand Down Expand Up @@ -143,7 +142,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(
this::updateMetricsContextInImap,
30 * 1000, // Wait for MapStore loading to complete
30, // Wait for MapStore loading to complete, wait 30s
seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(),
TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -456,20 +455,8 @@ private synchronized void updateMetricsContextInImap() {
contextMap.putAll(finishedExecutionContexts);
contextMap.putAll(executionContexts);
try {
// Wait for MapStore loading to complete
IMap<TaskLocation, SeaTunnelMetricsContext> map =
RetryUtils.retryWithException(
() -> {
return nodeEngine
.getHazelcastInstance()
.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
false,
exception -> true,
Constant.OPERATION_RETRY_SLEEP));

nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
contextMap.forEach(
(taskGroupLocation, taskGroupContext) -> {
taskGroupContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ public void initialize(Map<String, Object> configuration) {
STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString()));
this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase());
// build configuration
AbstractConfiguration fileConfiguration =
this.fileConfiguration.getConfiguration(storageType);
AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration();

Configuration hadoopConf = fileConfiguration.buildConfiguration(configuration);
this.conf = hadoopConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public enum FileConfiguration {
this.configuration = configuration;
}

public AbstractConfiguration getConfiguration(String name) {
public AbstractConfiguration getConfiguration() {
return configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@

public class HdfsWriter implements IFileWriter<IMapFileData> {

FSDataOutputStream out;
private FSDataOutputStream out;

Serializer serializer;
private Serializer serializer;

@Override
public String identifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@

@Slf4j
public class OssWriter implements IFileWriter<IMapFileData> {
FileSystem fs;
Path parentPath;
Path path;
Serializer serializer;
private FileSystem fs;
private Path parentPath;
private Path path;
private Serializer serializer;

ByteBuf bf = Unpooled.buffer(1024);
private ByteBuf bf = Unpooled.buffer(1024);

// block size
long blockSize = 1024 * 1024;
private long blockSize = 1024 * 1024;

AtomicLong index = new AtomicLong(0);
private AtomicLong index = new AtomicLong(0);

@Override
public void initialize(FileSystem fs, Path parentPath, Serializer serializer)
Expand Down Expand Up @@ -114,7 +114,6 @@ public Path createNewPath() {

@Override
public void close() throws Exception {
// No-op
bf.clear();
this.bf = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class IMapFileOSSStorageTest {
static String OSS_ENDPOINT = "your oss endpoint";
static String OSS_ACCESS_KEY_ID = "oss accessKey id";
static String OSS_ACCESS_KEY_SECRET = "oss accessKey secret";
static String BUSINESS = "random";
static String NAMESPACE = "/seatunnel-test/2";
static String CLUSTER_NAME = "test-one";
private static final Configuration CONF;

private static final IMapFileStorage STORAGE;
Expand Down Expand Up @@ -80,9 +83,9 @@ public class IMapFileOSSStorageTest {
properties.put(
"fs.oss.credentials.provider",
"org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider");
properties.put(FileConstants.FileInitProperties.BUSINESS_KEY, "random");
properties.put(FileConstants.FileInitProperties.NAMESPACE_KEY, "/seatunnel-test/2");
properties.put(FileConstants.FileInitProperties.CLUSTER_NAME, "test-one");
properties.put(FileConstants.FileInitProperties.BUSINESS_KEY, BUSINESS);
properties.put(FileConstants.FileInitProperties.NAMESPACE_KEY, NAMESPACE);
properties.put(FileConstants.FileInitProperties.CLUSTER_NAME, CLUSTER_NAME);
properties.put(WRITE_DATA_TIMEOUT_MILLISECONDS_KEY, 6000L);

STORAGE.initialize(properties);
Expand Down

0 comments on commit 2375378

Please sign in to comment.