Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type #4683

Merged
merged 5 commits into from
May 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ map:
namespace: /tmp/seatunnel/imap
clusterName: seatunnel-cluster
storage.type: oss
block.size: block size(bytes)
oss.bucket: oss://bucket name/
fs.oss.accessKeyId: OSS access key id
fs.oss.accessKeySecret: OSS access key secret
Expand Down
40 changes: 39 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@
<checker.qual.version>3.10.0</checker.qual.version>
<awaitility.version>4.2.0</awaitility.version>
<e2e.dependency.skip>true</e2e.dependency.skip>

<!-- Imap storage dependency package -->
<hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
<json-smart.version>2.4.7</json-smart.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<netty-buffer.version>4.1.60.Final</netty-buffer.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -446,8 +452,40 @@
<scope>provided</scope>
</dependency>

</dependencies>
<!-- Imap storage dependency package -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop-aliyun.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
<version>${json-smart.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop-aws.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty-buffer.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

<dependencies>
Expand Down
35 changes: 12 additions & 23 deletions seatunnel-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@
<tablestore.version>5.13.9</tablestore.version>
<teradata.version>17.20.00.12</teradata.version>
<redshift.version>2.1.0.9</redshift.version>

<!-- Imap storage dependency package -->
<hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
<json-smart.version>2.4.7</json-smart.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<aws-java-sdk.version>1.11.271</aws-java-sdk.version>
<netty-buffer.version>4.1.89.Final</netty-buffer.version>

</properties>
<dependencies>
<!-- starters -->
Expand Down Expand Up @@ -567,44 +575,25 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.89.Final</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.89.Final</version>
<version>${netty-buffer.version}</version>
<scope>provided</scope>
</dependency>

<!-- hadoop jar -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>2.8.3</version>
<version>${hadoop-aliyun.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.1.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>1.11.271</version>
<version>${hadoop-aws.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.commons.collections4.CollectionUtils;

import com.google.common.collect.Lists;
import com.hazelcast.instance.impl.NodeState;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
Expand Down Expand Up @@ -127,6 +128,8 @@ public class TaskExecutionService implements DynamicMetricsProvider {

private final ScheduledExecutorService scheduledExecutorService;

private CountDownLatch waitClusterStarted;

public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
Expand All @@ -142,7 +145,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(
this::updateMetricsContextInImap,
30, // Wait for MapStore loading to complete, wait 30s
0,
seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(),
TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -455,6 +458,15 @@ private synchronized void updateMetricsContextInImap() {
contextMap.putAll(finishedExecutionContexts);
contextMap.putAll(executionContexts);
try {
if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
logger.warning(
String.format(
"The Node is not ready yet, Node state %s,looking forward to the next "
+ "scheduling",
nodeEngine.getNode().getState()));
return;
}

IMap<TaskLocation, SeaTunnelMetricsContext> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
contextMap.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@
<artifactId>imap-storage-file</artifactId>
<name>SeaTunnel : Engine : Storage : IMap Storage Plugins : File</name>

<properties>
<hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<aws.java.sdk.version>1.11.271</aws.java.sdk.version>
<netty-buffer.version>4.1.89.Final</netty-buffer.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
Expand Down Expand Up @@ -74,34 +67,21 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop-aliyun.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>2.8.3</version>
<scope>provided</scope>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop-aws.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.java.sdk.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty-buffer.version}</version>
<scope>provided</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public WALWriter(
Serializer serializer)
throws IOException {
this.writer = DiscoveryWalFileFactory.getWriter(fileConfiguration.getName());
this.writer.setBlockSize(fileConfiguration.getConfiguration().getBlockSize());
this.writer.initialize(fs, parentPath, serializer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,19 @@
import java.util.Map;

public abstract class AbstractConfiguration {

public static final String BLOCK_SIZE = "block.size";
protected static final String HDFS_IMPL_KEY = "impl";

private Long blockSize = 1024 * 1024L;

public Long getBlockSize() {
return blockSize;
}

public void setBlockSize(Long blockSize) {
this.blockSize = blockSize;
}

/**
* check the configuration keys
*
Expand Down Expand Up @@ -58,6 +68,9 @@ void setExtraConfiguration(
Configuration hadoopConf, Map<String, Object> config, String prefix) {
config.forEach(
(k, v) -> {
if (config.containsKey(BLOCK_SIZE)) {
setBlockSize(Long.parseLong(config.get(BLOCK_SIZE).toString()));
}
if (k.startsWith(prefix)) {
hadoopConf.set(k, String.valueOf(v));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.seatunnel.engine.imap.storage.file.wal.writer;

import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils;
import org.apache.seatunnel.engine.serializer.api.Serializer;

import org.apache.curator.shaded.com.google.common.io.ByteStreams;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

@Slf4j
public abstract class CloudWriter implements IFileWriter<IMapFileData> {
private FileSystem fs;
private Path parentPath;
private Path path;
private Serializer serializer;

private ByteBuf bf = Unpooled.buffer(1024);

// block size, default 1024*1024
private long blockSize = 1024 * 1024;

private AtomicLong index = new AtomicLong(0);

@Override
public void initialize(FileSystem fs, Path parentPath, Serializer serializer)
throws IOException {

this.fs = fs;
this.serializer = serializer;
this.parentPath = parentPath;
this.path = createNewPath();
if (fs.exists(path)) {
try (FSDataInputStream fsDataInputStream = fs.open(path)) {
bf.writeBytes(ByteStreams.toByteArray(fsDataInputStream));
}
}
}

@Override
public void setBlockSize(Long blockSize) {
if (blockSize != null && blockSize > DEFAULT_BLOCK_SIZE) {
this.blockSize = blockSize;
}
}

// TODO Synchronous write, asynchronous write can be added in the future
@Override
public void write(IMapFileData data) throws IOException {
byte[] bytes = serializer.serialize(data);
this.write(bytes);
}

private void write(byte[] bytes) {
try (FSDataOutputStream out = fs.create(path, true)) {
// Write to bytebuffer
byte[] data = WALDataUtils.wrapperBytes(bytes);
bf.writeBytes(data);

// Read all bytes
byte[] allBytes = new byte[bf.readableBytes()];
bf.readBytes(allBytes);

// write filesystem
out.write(allBytes);

// check and reset
checkAndSetNextScheduleRotation(allBytes.length);

} catch (Exception ex) {
throw new IMapStorageException(ex);
}
}

private void checkAndSetNextScheduleRotation(long allBytes) {
if (allBytes > blockSize) {
this.path = createNewPath();
this.bf.clear();
} else {
// reset index
bf.resetReaderIndex();
}
}

public Path createNewPath() {
return new Path(parentPath, index.incrementAndGet() + "_" + FILE_NAME);
}

@Override
public void close() throws Exception {
bf.clear();
this.bf = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@

public interface IFileWriter<T> extends AutoCloseable {
String FILE_NAME = "wal.txt";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EricJoy2048 @Hisoka-X please check

Long DEFAULT_BLOCK_SIZE = 1024 * 1024L;

String identifier();

void initialize(FileSystem fs, Path parentPath, Serializer serializer) throws IOException;

default void setBlockSize(Long blockSize) {}

void write(T data) throws IOException;
}
Loading