Skip to content

Commit

Permalink
Add OSS/S3 to cluster-mode type apache#4621
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Apr 26, 2023
1 parent d7c2de9 commit 1055e6e
Show file tree
Hide file tree
Showing 25 changed files with 1,028 additions and 122 deletions.
23 changes: 23 additions & 0 deletions docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ map:
type: hdfs
namespace: /tmp/seatunnel/imap
clusterName: seatunnel-cluster
storage.type: hdfs
fs.defaultFS: hdfs://localhost:9000
```

Expand All @@ -195,9 +196,31 @@ map:
type: hdfs
namespace: /tmp/seatunnel/imap
clusterName: seatunnel-cluster
storage.type: hdfs
fs.defaultFS: file:///
```

if you used OSS, you can config like this:

```yaml
map:
engine*:
map-store:
enabled: true
initial-mode: EAGER
factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
properties:
type: hdfs
namespace: /tmp/seatunnel/imap
clusterName: seatunnel-cluster
storage.type: oss
oss.bucket: oss://bucket name/
fs.oss.accessKeyId: OSS access key id
fs.oss.accessKeySecret: OSS access key secret
fs.oss.endpoint: OSS endpoint
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvid
```

## 6. Config SeaTunnel Engine Client

All SeaTunnel Engine Client config in `hazelcast-client.yaml`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.common.metrics.MetricTags;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
Expand Down Expand Up @@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(
this::updateMetricsContextInImap,
0,
30 * 1000, // Wait for MapStore loading to complete
seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(),
TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -455,8 +456,20 @@ private synchronized void updateMetricsContextInImap() {
contextMap.putAll(finishedExecutionContexts);
contextMap.putAll(executionContexts);
try {
// Wait for MapStore loading to complete
IMap<TaskLocation, SeaTunnelMetricsContext> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
RetryUtils.retryWithException(
() -> {
return nodeEngine
.getHazelcastInstance()
.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
false,
exception -> true,
Constant.OPERATION_RETRY_SLEEP));

contextMap.forEach(
(taskGroupLocation, taskGroupContext) -> {
taskGroupContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,20 @@
<artifactId>imap-storage-file</artifactId>
<name>SeaTunnel : Engine : Storage : IMap Storage Plugins : File</name>

<properties>
<hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<aws.java.sdk.version>1.11.271</aws.java.sdk.version>
<netty-buffer.version>4.1.89.Final</netty-buffer.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>serializer-protobuf</artifactId>
<version>${project.version}</version>
</dependency>
<!-- hadoop jar -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
Expand All @@ -62,6 +70,32 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop-aliyun.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop-aws.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.java.sdk.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty-buffer.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants;
import org.apache.seatunnel.engine.imap.storage.file.common.WALReader;
import org.apache.seatunnel.engine.imap.storage.file.config.AbstractConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALDisruptor;
import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALEventType;
import org.apache.seatunnel.engine.imap.storage.file.future.RequestFuture;
Expand Down Expand Up @@ -52,7 +54,6 @@
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_NAMESPACE;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.BUSINESS_KEY;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.CLUSTER_NAME;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.NAMESPACE_KEY;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY;

Expand All @@ -68,6 +69,8 @@
@Slf4j
public class IMapFileStorage implements IMapStorage {

private static final String STORAGE_TYPE_KEY = "storage.type";

public FileSystem fs;

public String namespace;
Expand Down Expand Up @@ -105,14 +108,26 @@ public class IMapFileStorage implements IMapStorage {

private Configuration conf;

private FileConfiguration fileConfiguration;

/**
* @param configuration configuration
* @see FileConstants.FileInitProperties
*/
@Override
public void initialize(Map<String, Object> configuration) {
checkInitStorageProperties(configuration);
Configuration hadoopConf = (Configuration) configuration.get(HDFS_CONFIG_KEY);

String storageType =
String.valueOf(
configuration.getOrDefault(
STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString()));
this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase());
// build configuration
AbstractConfiguration fileConfiguration =
this.fileConfiguration.getConfiguration(storageType);

Configuration hadoopConf = fileConfiguration.buildConfiguration(configuration);
this.conf = hadoopConf;
this.namespace = (String) configuration.getOrDefault(NAMESPACE_KEY, DEFAULT_IMAP_NAMESPACE);
this.businessName = (String) configuration.get(BUSINESS_KEY);
Expand Down Expand Up @@ -141,7 +156,10 @@ public void initialize(Map<String, Object> configuration) {
this.serializer = new ProtoStuffSerializer();
this.walDisruptor =
new WALDisruptor(
fs, businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT, serializer);
fs,
FileConfiguration.valueOf(storageType.toUpperCase()),
businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT,
serializer);
}

@Override
Expand Down Expand Up @@ -211,7 +229,7 @@ public Set<Object> deleteAll(Collection<Object> keys) {
@Override
public Map<Object, Object> loadAll() {
try {
WALReader reader = new WALReader(fs, serializer);
WALReader reader = new WALReader(fs, fileConfiguration, serializer);
return reader.loadAllData(new Path(businessRootPath), new HashSet<>());
} catch (IOException e) {
throw new IMapStorageException("load all data error", e);
Expand All @@ -221,7 +239,7 @@ public Map<Object, Object> loadAll() {
@Override
public Set<Object> loadAllKeys() {
try {
WALReader reader = new WALReader(fs, serializer);
WALReader reader = new WALReader(fs, fileConfiguration, serializer);
return reader.loadAllKeys(new Path(businessRootPath));
} catch (IOException e) {
throw new IMapStorageException(
Expand Down Expand Up @@ -332,8 +350,7 @@ private void checkInitStorageProperties(Map<String, Object> properties) {
if (properties == null || properties.isEmpty()) {
throw new IllegalArgumentException("init file storage properties is empty");
}
List<String> requiredProperties =
Arrays.asList(BUSINESS_KEY, CLUSTER_NAME, HDFS_CONFIG_KEY);
List<String> requiredProperties = Arrays.asList(BUSINESS_KEY, CLUSTER_NAME);
for (String requiredProperty : requiredProperties) {
if (!properties.containsKey(requiredProperty)) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@
import org.apache.seatunnel.engine.imap.storage.api.IMapStorageFactory;
import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;

import org.apache.hadoop.conf.Configuration;

import com.google.auto.service.AutoService;

import java.util.Map;

import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY;

@AutoService(IMapStorageFactory.class)
public class IMapFileStorageFactory implements IMapStorageFactory {
@Override
Expand All @@ -42,9 +38,6 @@ public String factoryIdentifier() {
@Override
public IMapStorage create(Map<String, Object> initMap) throws IMapStorageException {
IMapFileStorage iMapFileStorage = new IMapFileStorage();
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", (String) initMap.get("fs.defaultFS"));
initMap.put(HDFS_CONFIG_KEY, configuration);
iMapFileStorage.initialize(initMap);
return iMapFileStorage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,37 @@

import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.wal.DiscoveryWalFileFactory;
import org.apache.seatunnel.engine.imap.storage.file.wal.reader.IFileReader;
import org.apache.seatunnel.engine.serializer.api.Serializer;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils.WAL_DATA_METADATA_LENGTH;

public class WALReader {
private static final int DEFAULT_QUERY_LIST_SIZE = 1024;
private FileSystem fs;
private final Serializer serializer;
private final IFileReader fileReader;

public WALReader(FileSystem fs, Serializer serializer) throws IOException {
public WALReader(FileSystem fs, FileConfiguration configuration, Serializer serializer)
throws IOException {
this.serializer = serializer;
this.fs = fs;
this.fileReader = DiscoveryWalFileFactory.getReader(configuration.getName());
this.fileReader.initialize(fs, serializer);
}

private List<IMapFileData> readAllData(Path parentPath) throws IOException {
List<String> fileNames = getFileNames(parentPath);
if (CollectionUtils.isEmpty(fileNames)) {
return new ArrayList<>();
}
List<IMapFileData> result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE);
for (String fileName : fileNames) {
result.addAll(readData(new Path(parentPath, fileName)));
}
return result;
return this.fileReader.readAllData(parentPath);
}

public Set<Object> loadAllKeys(Path parentPath) throws IOException {
Expand Down Expand Up @@ -121,50 +111,6 @@ public Map<Object, Object> loadAllData(Path parentPath, Set<Object> searchKeys)
return result;
}

private List<IMapFileData> readData(Path path) throws IOException {
List<IMapFileData> result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE);
long length = fs.getFileStatus(path).getLen();
try (FSDataInputStream in = fs.open(path)) {
byte[] datas = new byte[(int) length];
in.readFully(datas);
int startIndex = 0;
while (startIndex + WAL_DATA_METADATA_LENGTH < datas.length) {

byte[] metadata = new byte[WAL_DATA_METADATA_LENGTH];
System.arraycopy(datas, startIndex, metadata, 0, WAL_DATA_METADATA_LENGTH);
int dataLength = WALDataUtils.byteArrayToInt(metadata);
startIndex += WAL_DATA_METADATA_LENGTH;
if (startIndex + dataLength > datas.length) {
break;
}
byte[] data = new byte[dataLength];
System.arraycopy(datas, startIndex, data, 0, data.length);
IMapFileData fileData = serializer.deserialize(data, IMapFileData.class);
result.add(fileData);
startIndex += data.length;
}
}
return result;
}

private List<String> getFileNames(Path parentPath) {
try {

RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator =
fs.listFiles(parentPath, true);
List<String> fileNames = new ArrayList<>();
while (fileStatusRemoteIterator.hasNext()) {
LocatedFileStatus fileStatus = fileStatusRemoteIterator.next();
if (fileStatus.getPath().getName().endsWith("wal.txt")) {
fileNames.add(fileStatus.getPath().toString());
}
}
return fileNames;
} catch (IOException e) {
throw new IMapStorageException(e, "get file names error,path is s%", parentPath);
}
}

private Object deserializeData(byte[] data, String className) {
try {
Class<?> clazz = ClassUtils.getClass(className);
Expand Down
Loading

0 comments on commit 1055e6e

Please sign in to comment.