diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md index be38ac2db52..c07cd45d6b1 100644 --- a/docs/en/seatunnel-engine/deployment.md +++ b/docs/en/seatunnel-engine/deployment.md @@ -179,6 +179,7 @@ map: type: hdfs namespace: /tmp/seatunnel/imap clusterName: seatunnel-cluster + storage.type: hdfs fs.defaultFS: hdfs://localhost:9000 ``` @@ -195,9 +196,32 @@ map: type: hdfs namespace: /tmp/seatunnel/imap clusterName: seatunnel-cluster + storage.type: hdfs fs.defaultFS: file:/// ``` +if you used OSS, you can config like this: + +```yaml +map: + engine*: + map-store: + enabled: true + initial-mode: EAGER + factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory + properties: + type: hdfs + namespace: /tmp/seatunnel/imap + clusterName: seatunnel-cluster + storage.type: oss + block.size: block size(bytes) + oss.bucket: oss://bucket name/ + fs.oss.accessKeyId: OSS access key id + fs.oss.accessKeySecret: OSS access key secret + fs.oss.endpoint: OSS endpoint + fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider +``` + ## 6. Config SeaTunnel Engine Client All SeaTunnel Engine Client config in `hazelcast-client.yaml`. diff --git a/pom.xml b/pom.xml index 84b16a88ded..ba473688193 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,12 @@ 3.10.0 4.2.0 true + + + 3.0.0 + 2.4.7 + 3.1.4 + 4.1.60.Final @@ -446,8 +452,40 @@ provided - + + + org.apache.hadoop + hadoop-aliyun + ${hadoop-aliyun.version} + provided + + + net.minidev + json-smart + + + + + + net.minidev + json-smart + ${json-smart.version} + + + + org.apache.hadoop + hadoop-aws + ${hadoop-aws.version} + provided + + + io.netty + netty-buffer + ${netty-buffer.version} + + + diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index de61efa9bf1..af963803fed 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -97,6 +97,14 @@ 5.13.9 17.20.00.12 2.1.0.9 + + + 3.0.0 + 2.4.7 + 3.1.4 + 1.11.271 + 4.1.89.Final + @@ -571,19 +579,28 @@ + + io.netty + netty-buffer + ${netty-buffer.version} + provided + + org.apache.hadoop - hadoop-aws - 3.1.4 + hadoop-aliyun + ${hadoop-aliyun.version} provided + - com.amazonaws - aws-java-sdk-bundle - 1.11.271 + org.apache.hadoop + hadoop-aws + ${hadoop-aws.version} provided + org.apache.seatunnel seatunnel-hadoop3-3.1.4-uber diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE index 30f463ab7a0..7895b730525 100644 --- a/seatunnel-dist/release-docs/LICENSE +++ b/seatunnel-dist/release-docs/LICENSE @@ -273,7 +273,8 @@ The text of each license is the standard Apache 2.0 license. (Apache-2.0) failureaccess (com.google.guava:failureaccess:1.0 https://mvnrepository.com/artifact/com.google.guava/failureaccess/1.0) (Apache-2.0) j2objc-annotations (com.google.j2objc:j2objc-annotations:1.1 https://mvnrepository.com/artifact/com.google.j2objc/j2objc-annotations/1.1) (Apache-2.0) listenablefuture (com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava https://mvnrepository.com/artifact/com.google.guava/listenablefuture/9999.0-empty-to-avoid-conflict-with-guava) - + (Apache-2.0) accessors-smart (com.google.guava:accessors-smart:2.4.7 - https://mvnrepository.com/artifact/net.minidev/accessors-smart) + (Apache-2.0) json-smart (net.minidev:json-smart:2.4.7 - https://mvnrepository.com/artifact/net.minidev/json-smart) ======================================================================== MOZILLA PUBLIC LICENSE License @@ -292,7 +293,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. (New BSD license) Protocol Buffer Java API (com.google.protobuf:protobuf-java:2.5.0 - http://code.google.com/p/protobuf) (BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.11.12 - http://www.scala-lang.org/) - + (BSD 3-Clause) Scala Library (org.ow2.asm:asm:9.1 - https://mvnrepository.com/artifact/org.ow2.asm/asm/) ======================================================================== CDDL License ======================================================================== diff --git a/seatunnel-dist/release-docs/licenses/LICENSE-asm.txt b/seatunnel-dist/release-docs/licenses/LICENSE-asm.txt new file mode 100644 index 00000000000..631ee53c53d --- /dev/null +++ b/seatunnel-dist/release-docs/licenses/LICENSE-asm.txt @@ -0,0 +1,27 @@ +ASM: a very small and fast Java bytecode manipulation framework +Copyright (c) 2000-2011 INRIA, France Telecom +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. +3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF +THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml index c6f48949ab5..08d3d759a1a 100644 --- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml +++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml @@ -170,6 +170,14 @@ org.apache.hadoop:hadoop-aws:jar com.amazonaws:aws-java-sdk-bundle:jar org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional + + org.apache.hadoop:hadoop-aliyun:jar + com.aliyun.oss:aliyun-sdk-oss:jar + org.jdom:jdom:jar + + + io.netty:netty-buffer:jar + io.netty:netty-common:jar ${artifact.file.name} /lib diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml index 4bc151db9d1..a1315565349 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml @@ -29,6 +29,8 @@ 2.4 + 3.0.0 + 4.1.89.Final @@ -69,6 +71,27 @@ + + + org.apache.hadoop + hadoop-aliyun + ${hadoop-aliyun.version} + test + + + + com.aliyun.oss + aliyun-sdk-oss + 2.8.3 + test + + + + io.netty + netty-buffer + ${netty-buffer.version} + test + diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java index e22d417ef92..f7571968e8f 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java @@ -935,4 +935,223 @@ public void testStreamJobRestoreInAllNodeDown() } } } + + @SuppressWarnings("checkstyle:RegexpSingleline") + @Test + @Disabled + public void testStreamJobRestoreFromOssInAllNodeDown() + throws ExecutionException, InterruptedException { + String OSS_BUCKET_NAME = "oss://your bucket name/"; + String OSS_ENDPOINT = "your oss endpoint"; + String OSS_ACCESS_KEY_ID = "oss accessKey id"; + String OSS_ACCESS_KEY_SECRET = "oss accessKey secret"; + + String testCaseName = "testStreamJobRestoreFromOssInAllNodeDown"; + String testClusterName = + "ClusterFaultToleranceIT_testStreamJobRestoreFromOssInAllNodeDown_" + + System.currentTimeMillis(); + int testRowNumber = 1000; + int testParallelism = 6; + HazelcastInstanceImpl node1 = null; + HazelcastInstanceImpl node2 = null; + SeaTunnelClient engineClient = null; + + try { + String yaml = + "hazelcast:\n" + + " cluster-name: seatunnel\n" + + " network:\n" + + " rest-api:\n" + + " enabled: true\n" + + " endpoint-groups:\n" + + " CLUSTER_WRITE:\n" + + " enabled: true\n" + + " join:\n" + + " tcp-ip:\n" + + " enabled: true\n" + + " member-list:\n" + + " - localhost\n" + + " port:\n" + + " auto-increment: true\n" + + " port-count: 100\n" + + " port: 5801\n" + + " map:\n" + + " engine*:\n" + + " map-store:\n" + + " enabled: true\n" + + " initial-mode: EAGER\n" + + " factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n" + + " properties:\n" + + " type: hdfs\n" + + " namespace: /seatunnel-test/imap\n" + + " storage.type: oss\n" + + " clusterName: " + + testClusterName + + "\n" + + " oss.bucket: " + + OSS_BUCKET_NAME + + "\n" + + " fs.oss.accessKeyId: " + + OSS_ACCESS_KEY_ID + + "\n" + + " fs.oss.accessKeySecret: " + + OSS_ACCESS_KEY_SECRET + + "\n" + + " fs.oss.endpoint: " + + OSS_ENDPOINT + + "\n" + + " fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider\n" + + " properties:\n" + + " hazelcast.invocation.max.retry.count: 200\n" + + " hazelcast.tcp.join.port.try.count: 30\n" + + " hazelcast.invocation.retry.pause.millis: 2000\n" + + " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n" + + " hazelcast.logging.type: log4j2\n" + + " hazelcast.operation.generic.thread.count: 200\n"; + + Config hazelcastConfig = Config.loadFromString(yaml); + hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName)); + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.setHazelcastConfig(hazelcastConfig); + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + // waiting all node added to cluster + HazelcastInstanceImpl finalNode = node1; + Awaitility.await() + .atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 2, finalNode.getCluster().getMembers().size())); + + Common.setDeployMode(DeployMode.CLIENT); + ImmutablePair testResources = + createTestResources( + testCaseName, JobMode.STREAMING, testRowNumber, testParallelism); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(testCaseName); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); + engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(testResources.getRight(), jobConfig); + ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + Long jobId = clientJobProxy.getJobId(); + + ClientJobProxy finalClientJobProxy = clientJobProxy; + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + // Wait some tasks commit finished, and we can get rows from the + // sink target dir + Thread.sleep(2000); + System.out.println( + "\n=================================" + + FileUtils.getFileLineNumberFromDir( + testResources.getLeft()) + + "=================================\n"); + Assertions.assertTrue( + JobStatus.RUNNING.equals(finalClientJobProxy.getJobStatus()) + && FileUtils.getFileLineNumberFromDir( + testResources.getLeft()) + > 1); + }); + + Thread.sleep(5000); + // shutdown all node + node1.shutdown(); + node2.shutdown(); + + log.info( + "==========================================All node is done========================================"); + Thread.sleep(10000); + + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + log.info( + "==========================================All node is start, begin check node size ========================================"); + // waiting all node added to cluster + HazelcastInstanceImpl restoreFinalNode = node1; + Awaitility.await() + .atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 2, restoreFinalNode.getCluster().getMembers().size())); + + log.info( + "==========================================All node is running========================================"); + engineClient = new SeaTunnelClient(clientConfig); + ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId); + CompletableFuture waitForJobCompleteFuture = + CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete); + + Thread.sleep(10000); + + Awaitility.await() + .atMost(100000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + // Wait job write all rows in file + Thread.sleep(2000); + System.out.println( + "\n=================================" + + FileUtils.getFileLineNumberFromDir( + testResources.getLeft()) + + "=================================\n"); + JobStatus jobStatus = null; + try { + jobStatus = newClientJobProxy.getJobStatus(); + } catch (Exception e) { + log.error(ExceptionUtils.getMessage(e)); + } + + Assertions.assertTrue( + JobStatus.RUNNING.equals(jobStatus) + && testRowNumber * testParallelism + == FileUtils.getFileLineNumberFromDir( + testResources.getLeft())); + }); + + // sleep 10s and expect the job don't write more rows. + Thread.sleep(10000); + log.info( + "==========================================Cancel Job========================================"); + newClientJobProxy.cancelJob(); + + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertTrue( + waitForJobCompleteFuture.isDone() + && JobStatus.CANCELED.equals( + waitForJobCompleteFuture.get()))); + // prove that the task was restarted + Long fileLineNumberFromDir = + FileUtils.getFileLineNumberFromDir(testResources.getLeft()); + Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir); + + } finally { + log.info( + "==========================================Clean test resource ========================================"); + if (engineClient != null) { + engineClient.shutdown(); + } + + if (node1 != null) { + node1.shutdown(); + } + + if (node2 != null) { + node2.shutdown(); + } + } + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 86f17a7bbb7..0b95baded64 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -47,6 +47,7 @@ import org.apache.commons.collections4.CollectionUtils; import com.google.common.collect.Lists; +import com.hazelcast.instance.impl.NodeState; import com.hazelcast.internal.metrics.DynamicMetricsProvider; import com.hazelcast.internal.metrics.MetricDescriptor; import com.hazelcast.internal.metrics.MetricsCollectionContext; @@ -127,6 +128,8 @@ public class TaskExecutionService implements DynamicMetricsProvider { private final ScheduledExecutorService scheduledExecutorService; + private CountDownLatch waitClusterStarted; + public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) { seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); this.hzInstanceName = nodeEngine.getHazelcastInstance().getName(); @@ -455,6 +458,15 @@ private synchronized void updateMetricsContextInImap() { contextMap.putAll(finishedExecutionContexts); contextMap.putAll(executionContexts); try { + if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) { + logger.warning( + String.format( + "The Node is not ready yet, Node state %s,looking forward to the next " + + "scheduling", + nodeEngine.getNode().getState())); + return; + } + IMap map = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); contextMap.forEach( diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml index c4fd403ae11..c39ddda99c8 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml @@ -36,6 +36,7 @@ serializer-protobuf ${project.version} + org.apache.seatunnel seatunnel-hadoop3-3.1.4-uber @@ -62,6 +63,28 @@ org.awaitility awaitility + + + org.apache.hadoop + hadoop-aliyun + + + + net.minidev + json-smart + + + + org.apache.hadoop + hadoop-aws + + + + io.netty + netty-buffer + provided + + diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java index c1e5ef18fc4..915981e476d 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java @@ -25,6 +25,8 @@ import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants; import org.apache.seatunnel.engine.imap.storage.file.common.WALReader; +import org.apache.seatunnel.engine.imap.storage.file.config.AbstractConfiguration; +import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALDisruptor; import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALEventType; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFuture; @@ -52,7 +54,6 @@ import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_NAMESPACE; import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.BUSINESS_KEY; import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.CLUSTER_NAME; -import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY; import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.NAMESPACE_KEY; import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY; @@ -68,6 +69,8 @@ @Slf4j public class IMapFileStorage implements IMapStorage { + private static final String STORAGE_TYPE_KEY = "storage.type"; + public FileSystem fs; public String namespace; @@ -105,6 +108,8 @@ public class IMapFileStorage implements IMapStorage { private Configuration conf; + private FileConfiguration fileConfiguration; + /** * @param configuration configuration * @see FileConstants.FileInitProperties @@ -112,7 +117,16 @@ public class IMapFileStorage implements IMapStorage { @Override public void initialize(Map configuration) { checkInitStorageProperties(configuration); - Configuration hadoopConf = (Configuration) configuration.get(HDFS_CONFIG_KEY); + + String storageType = + String.valueOf( + configuration.getOrDefault( + STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString())); + this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase()); + // build configuration + AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration(); + + Configuration hadoopConf = fileConfiguration.buildConfiguration(configuration); this.conf = hadoopConf; this.namespace = (String) configuration.getOrDefault(NAMESPACE_KEY, DEFAULT_IMAP_NAMESPACE); this.businessName = (String) configuration.get(BUSINESS_KEY); @@ -141,7 +155,10 @@ public void initialize(Map configuration) { this.serializer = new ProtoStuffSerializer(); this.walDisruptor = new WALDisruptor( - fs, businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT, serializer); + fs, + FileConfiguration.valueOf(storageType.toUpperCase()), + businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT, + serializer); } @Override @@ -211,7 +228,7 @@ public Set deleteAll(Collection keys) { @Override public Map loadAll() { try { - WALReader reader = new WALReader(fs, serializer); + WALReader reader = new WALReader(fs, fileConfiguration, serializer); return reader.loadAllData(new Path(businessRootPath), new HashSet<>()); } catch (IOException e) { throw new IMapStorageException("load all data error", e); @@ -221,7 +238,7 @@ public Map loadAll() { @Override public Set loadAllKeys() { try { - WALReader reader = new WALReader(fs, serializer); + WALReader reader = new WALReader(fs, fileConfiguration, serializer); return reader.loadAllKeys(new Path(businessRootPath)); } catch (IOException e) { throw new IMapStorageException( @@ -332,8 +349,7 @@ private void checkInitStorageProperties(Map properties) { if (properties == null || properties.isEmpty()) { throw new IllegalArgumentException("init file storage properties is empty"); } - List requiredProperties = - Arrays.asList(BUSINESS_KEY, CLUSTER_NAME, HDFS_CONFIG_KEY); + List requiredProperties = Arrays.asList(BUSINESS_KEY, CLUSTER_NAME); for (String requiredProperty : requiredProperties) { if (!properties.containsKey(requiredProperty)) { throw new IllegalArgumentException( diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java index 7309770d052..49c47a4d47f 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java @@ -24,14 +24,10 @@ import org.apache.seatunnel.engine.imap.storage.api.IMapStorageFactory; import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; -import org.apache.hadoop.conf.Configuration; - import com.google.auto.service.AutoService; import java.util.Map; -import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY; - @AutoService(IMapStorageFactory.class) public class IMapFileStorageFactory implements IMapStorageFactory { @Override @@ -42,9 +38,6 @@ public String factoryIdentifier() { @Override public IMapStorage create(Map initMap) throws IMapStorageException { IMapFileStorage iMapFileStorage = new IMapFileStorage(); - Configuration configuration = new Configuration(); - configuration.set("fs.defaultFS", (String) initMap.get("fs.defaultFS")); - initMap.put(HDFS_CONFIG_KEY, configuration); iMapFileStorage.initialize(initMap); return iMapFileStorage; } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java index 94e7a839385..6feca6a16fc 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java @@ -22,18 +22,17 @@ import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; +import org.apache.seatunnel.engine.imap.storage.file.wal.DiscoveryWalFileFactory; +import org.apache.seatunnel.engine.imap.storage.file.wal.reader.IFileReader; import org.apache.seatunnel.engine.serializer.api.Serializer; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ClassUtils; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -41,28 +40,19 @@ import java.util.Map; import java.util.Set; -import static org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils.WAL_DATA_METADATA_LENGTH; - public class WALReader { - private static final int DEFAULT_QUERY_LIST_SIZE = 1024; - private FileSystem fs; private final Serializer serializer; + private final IFileReader fileReader; - public WALReader(FileSystem fs, Serializer serializer) throws IOException { + public WALReader(FileSystem fs, FileConfiguration configuration, Serializer serializer) + throws IOException { this.serializer = serializer; - this.fs = fs; + this.fileReader = DiscoveryWalFileFactory.getReader(configuration.getName()); + this.fileReader.initialize(fs, serializer); } private List readAllData(Path parentPath) throws IOException { - List fileNames = getFileNames(parentPath); - if (CollectionUtils.isEmpty(fileNames)) { - return new ArrayList<>(); - } - List result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE); - for (String fileName : fileNames) { - result.addAll(readData(new Path(parentPath, fileName))); - } - return result; + return this.fileReader.readAllData(parentPath); } public Set loadAllKeys(Path parentPath) throws IOException { @@ -121,50 +111,6 @@ public Map loadAllData(Path parentPath, Set searchKeys) return result; } - private List readData(Path path) throws IOException { - List result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE); - long length = fs.getFileStatus(path).getLen(); - try (FSDataInputStream in = fs.open(path)) { - byte[] datas = new byte[(int) length]; - in.readFully(datas); - int startIndex = 0; - while (startIndex + WAL_DATA_METADATA_LENGTH < datas.length) { - - byte[] metadata = new byte[WAL_DATA_METADATA_LENGTH]; - System.arraycopy(datas, startIndex, metadata, 0, WAL_DATA_METADATA_LENGTH); - int dataLength = WALDataUtils.byteArrayToInt(metadata); - startIndex += WAL_DATA_METADATA_LENGTH; - if (startIndex + dataLength > datas.length) { - break; - } - byte[] data = new byte[dataLength]; - System.arraycopy(datas, startIndex, data, 0, data.length); - IMapFileData fileData = serializer.deserialize(data, IMapFileData.class); - result.add(fileData); - startIndex += data.length; - } - } - return result; - } - - private List getFileNames(Path parentPath) { - try { - - RemoteIterator fileStatusRemoteIterator = - fs.listFiles(parentPath, true); - List fileNames = new ArrayList<>(); - while (fileStatusRemoteIterator.hasNext()) { - LocatedFileStatus fileStatus = fileStatusRemoteIterator.next(); - if (fileStatus.getPath().getName().endsWith("wal.txt")) { - fileNames.add(fileStatus.getPath().toString()); - } - } - return fileNames; - } catch (IOException e) { - throw new IMapStorageException(e, "get file names error,path is s%", parentPath); - } - } - private Object deserializeData(byte[] data, String className) { try { Class clazz = ClassUtils.getClass(className); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java index d769dd35f75..168c13efd58 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java @@ -21,57 +21,37 @@ package org.apache.seatunnel.engine.imap.storage.file.common; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; +import org.apache.seatunnel.engine.imap.storage.file.wal.DiscoveryWalFileFactory; +import org.apache.seatunnel.engine.imap.storage.file.wal.writer.IFileWriter; import org.apache.seatunnel.engine.serializer.api.Serializer; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSOutputStream; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import java.io.IOException; -import java.util.EnumSet; public class WALWriter implements AutoCloseable { - FSDataOutputStream out; - - Serializer serializer; - - public WALWriter(FileSystem fs, Path parentPath, Serializer serializer) throws IOException { - Path path = new Path(parentPath, "wal.txt"); - this.out = fs.create(path); - this.serializer = serializer; + IFileWriter writer; + + public WALWriter( + FileSystem fs, + FileConfiguration fileConfiguration, + Path parentPath, + Serializer serializer) + throws IOException { + this.writer = DiscoveryWalFileFactory.getWriter(fileConfiguration.getName()); + this.writer.setBlockSize(fileConfiguration.getConfiguration().getBlockSize()); + this.writer.initialize(fs, parentPath, serializer); } public void write(IMapFileData data) throws IOException { - byte[] bytes = serializer.serialize(data); - this.write(bytes); - } - - public void flush() throws IOException { - // hsync to flag - if (out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) out) - .hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); - } - if (out.getWrappedStream() instanceof DFSOutputStream) { - ((DFSOutputStream) out.getWrappedStream()) - .hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); - } else { - out.hsync(); - } - this.out.hflush(); - } - - private void write(byte[] bytes) throws IOException { - byte[] data = WALDataUtils.wrapperBytes(bytes); - this.out.write(data); - this.flush(); + this.writer.write(data); } @Override public void close() throws Exception { - out.close(); + this.writer.close(); } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java new file mode 100644 index 00000000000..8d59ac11b80 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +public abstract class AbstractConfiguration { + public static final String BLOCK_SIZE = "block.size"; + protected static final String HDFS_IMPL_KEY = "impl"; + + private Long blockSize = 1024 * 1024L; + + public Long getBlockSize() { + return blockSize; + } + + public void setBlockSize(Long blockSize) { + this.blockSize = blockSize; + } + + /** + * check the configuration keys + * + * @param config configuration + * @param keys keys + */ + void checkConfiguration(Map config, String... keys) { + for (String key : keys) { + if (!config.containsKey(key) || null == config.get(key)) { + throw new IllegalArgumentException(key + " is required"); + } + } + } + + public abstract Configuration buildConfiguration(Map config) + throws IMapStorageException; + + /** + * set extra options for configuration + * + * @param hadoopConf + * @param config + * @param prefix + */ + void setExtraConfiguration( + Configuration hadoopConf, Map config, String prefix) { + config.forEach( + (k, v) -> { + if (config.containsKey(BLOCK_SIZE)) { + setBlockSize(Long.parseLong(config.get(BLOCK_SIZE).toString())); + } + if (k.startsWith(prefix)) { + hadoopConf.set(k, String.valueOf(v)); + } + }); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java new file mode 100644 index 00000000000..2da8a5056f8 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +public enum FileConfiguration { + HDFS("hdfs", new HdfsConfiguration()), + S3("s3", new S3Configuration()), + OSS("oss", new OssConfiguration()); + + /** file system type */ + private final String name; + + /** file system configuration */ + private final AbstractConfiguration configuration; + + FileConfiguration(String name, AbstractConfiguration configuration) { + this.name = name; + this.configuration = configuration; + } + + public AbstractConfiguration getConfiguration() { + return configuration; + } + + public String getName() { + return name; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java new file mode 100644 index 00000000000..2f98dfa0b40 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT; +import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; + +public class HdfsConfiguration extends AbstractConfiguration { + + @Override + public Configuration buildConfiguration(Map config) { + Configuration hadoopConf = new Configuration(); + hadoopConf.set( + FS_DEFAULT_NAME_KEY, + String.valueOf(config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT))); + return hadoopConf; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java new file mode 100644 index 00000000000..d36aa341457 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; + +public class OssConfiguration extends AbstractConfiguration { + public static final String OSS_BUCKET_KEY = "oss.bucket"; + private static final String OSS_IMPL_KEY = "fs.oss.impl"; + private static final String HDFS_OSS_IMPL = + "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"; + private static final String OSS_KEY = "fs.oss."; + + @Override + public Configuration buildConfiguration(Map config) + throws IMapStorageException { + checkConfiguration(config, OSS_BUCKET_KEY); + Configuration hadoopConf = new Configuration(); + hadoopConf.set(FS_DEFAULT_NAME_KEY, String.valueOf(config.get(OSS_BUCKET_KEY))); + hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL); + setExtraConfiguration(hadoopConf, config, OSS_KEY); + return hadoopConf; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java new file mode 100644 index 00000000000..5d34f7814bb --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; + +public class S3Configuration extends AbstractConfiguration { + public static final String S3_BUCKET_KEY = "s3.bucket"; + private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem"; + private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem"; + private static final String S3A_PROTOCOL = "s3a"; + private static final String DEFAULT_PROTOCOL = "s3n"; + private static final String S3_FORMAT_KEY = "fs.%s.%s"; + private static final String SPLIT_CHAR = "."; + private static final String FS_KEY = "fs."; + + @Override + public Configuration buildConfiguration(Map config) + throws IMapStorageException { + checkConfiguration(config, S3_BUCKET_KEY); + String protocol = DEFAULT_PROTOCOL; + if (config.get(S3_BUCKET_KEY).toString().startsWith(S3A_PROTOCOL)) { + protocol = S3A_PROTOCOL; + } + String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL; + Configuration hadoopConf = new Configuration(); + hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY).toString()); + hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl); + setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + SPLIT_CHAR); + return hadoopConf; + } + + private String formatKey(String protocol, String key) { + return String.format(S3_FORMAT_KEY, protocol, key); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java index 1130c21bedc..35db6d2842f 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.serializer.api.Serializer; import org.apache.hadoop.fs.FileSystem; @@ -58,7 +59,11 @@ public class WALDisruptor implements Closeable { event.setRequestId(requestId); }; - public WALDisruptor(FileSystem fs, String parentPath, Serializer serializer) { + public WALDisruptor( + FileSystem fs, + FileConfiguration fileConfiguration, + String parentPath, + Serializer serializer) { // todo should support multi thread producer ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE; this.disruptor = @@ -69,7 +74,8 @@ public WALDisruptor(FileSystem fs, String parentPath, Serializer serializer) { ProducerType.SINGLE, new BlockingWaitStrategy()); - disruptor.handleEventsWithWorkerPool(new WALWorkHandler(fs, parentPath, serializer)); + disruptor.handleEventsWithWorkerPool( + new WALWorkHandler(fs, fileConfiguration, parentPath, serializer)); disruptor.start(); } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java index 6255794056c..70c411e44ef 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.common.WALWriter; +import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache; import org.apache.seatunnel.engine.serializer.api.Serializer; @@ -40,9 +41,13 @@ public class WALWorkHandler implements WorkHandler { private WALWriter writer; - public WALWorkHandler(FileSystem fs, String parentPath, Serializer serializer) { + public WALWorkHandler( + FileSystem fs, + FileConfiguration fileConfiguration, + String parentPath, + Serializer serializer) { try { - writer = new WALWriter(fs, new Path(parentPath), serializer); + writer = new WALWriter(fs, fileConfiguration, new Path(parentPath), serializer); } catch (IOException e) { throw new IMapStorageException( e, "create new current writer failed, parent path is %s", parentPath); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/DiscoveryWalFileFactory.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/DiscoveryWalFileFactory.java new file mode 100644 index 00000000000..7f76ae2f39e --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/DiscoveryWalFileFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.seatunnel.engine.imap.storage.file.wal; + +import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; +import org.apache.seatunnel.engine.imap.storage.file.wal.reader.DefaultReader; +import org.apache.seatunnel.engine.imap.storage.file.wal.reader.IFileReader; +import org.apache.seatunnel.engine.imap.storage.file.wal.writer.HdfsWriter; +import org.apache.seatunnel.engine.imap.storage.file.wal.writer.IFileWriter; +import org.apache.seatunnel.engine.imap.storage.file.wal.writer.OssWriter; +import org.apache.seatunnel.engine.imap.storage.file.wal.writer.S3Writer; + +public class DiscoveryWalFileFactory { + + public static IFileReader getReader(String type) { + FileConfiguration configuration = FileConfiguration.valueOf(type.toUpperCase()); + switch (configuration) { + case HDFS: + case S3: + case OSS: + return new DefaultReader(); + } + throw new UnsupportedOperationException("Unsupported type " + type); + } + + public static IFileWriter getWriter(String type) { + FileConfiguration configuration = FileConfiguration.valueOf(type.toUpperCase()); + switch (configuration) { + case HDFS: + return new HdfsWriter(); + case S3: + return new S3Writer(); + case OSS: + return new OssWriter(); + } + throw new UnsupportedOperationException("Unsupported type " + type); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java new file mode 100644 index 00000000000..f8d5e7fadf9 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.seatunnel.engine.imap.storage.file.wal.reader; + +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils; +import org.apache.seatunnel.engine.serializer.api.Serializer; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils.WAL_DATA_METADATA_LENGTH; + +public class DefaultReader implements IFileReader { + private static final int DEFAULT_QUERY_LIST_SIZE = 1024; + FileSystem fs; + Serializer serializer; + + @Override + public String identifier() { + return "default"; + } + + @Override + public void initialize(FileSystem fs, Serializer serializer) throws IOException { + this.fs = fs; + this.serializer = serializer; + } + + @Override + public List readAllData(Path parentPath) throws IOException { + List fileNames = getFileNames(parentPath); + if (CollectionUtils.isEmpty(fileNames)) { + return new ArrayList<>(); + } + List result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE); + for (String fileName : fileNames) { + result.addAll(readData(new Path(parentPath, fileName))); + } + return result; + } + + private List getFileNames(Path parentPath) { + try { + if (!fs.exists(parentPath)) { + return new ArrayList<>(); + } + RemoteIterator fileStatusRemoteIterator = + fs.listFiles(parentPath, true); + List fileNames = new ArrayList<>(); + while (fileStatusRemoteIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusRemoteIterator.next(); + if (fileStatus.getPath().getName().endsWith("wal.txt")) { + fileNames.add(fileStatus.getPath().toString()); + } + } + return fileNames; + } catch (IOException e) { + throw new IMapStorageException(e, "get file names error,path is s%", parentPath); + } + } + + private List readData(Path path) throws IOException { + List result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE); + long length = fs.getFileStatus(path).getLen(); + try (FSDataInputStream in = fs.open(path)) { + byte[] datas = new byte[(int) length]; + in.readFully(datas); + int startIndex = 0; + while (startIndex + WAL_DATA_METADATA_LENGTH < datas.length) { + + byte[] metadata = new byte[WAL_DATA_METADATA_LENGTH]; + System.arraycopy(datas, startIndex, metadata, 0, WAL_DATA_METADATA_LENGTH); + int dataLength = WALDataUtils.byteArrayToInt(metadata); + startIndex += WAL_DATA_METADATA_LENGTH; + if (startIndex + dataLength > datas.length) { + break; + } + byte[] data = new byte[dataLength]; + System.arraycopy(datas, startIndex, data, 0, data.length); + IMapFileData fileData = serializer.deserialize(data, IMapFileData.class); + result.add(fileData); + startIndex += data.length; + } + } + return result; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/IFileReader.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/IFileReader.java new file mode 100644 index 00000000000..1fbc85706ee --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/IFileReader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.seatunnel.engine.imap.storage.file.wal.reader; + +import org.apache.seatunnel.engine.serializer.api.Serializer; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.List; + +public interface IFileReader { + String identifier(); + + void initialize(FileSystem fs, Serializer serializer) throws IOException; + + List readAllData(Path parentPath) throws IOException; +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java new file mode 100644 index 00000000000..48c9cea0156 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.seatunnel.engine.imap.storage.file.wal.writer; + +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils; +import org.apache.seatunnel.engine.serializer.api.Serializer; + +import org.apache.curator.shaded.com.google.common.io.ByteStreams; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +@Slf4j +public abstract class CloudWriter implements IFileWriter { + private FileSystem fs; + private Path parentPath; + private Path path; + private Serializer serializer; + + private ByteBuf bf = Unpooled.buffer(1024); + + // block size, default 1024*1024 + private long blockSize = 1024 * 1024; + + private AtomicLong index = new AtomicLong(0); + + @Override + public void initialize(FileSystem fs, Path parentPath, Serializer serializer) + throws IOException { + + this.fs = fs; + this.serializer = serializer; + this.parentPath = parentPath; + this.path = createNewPath(); + if (fs.exists(path)) { + try (FSDataInputStream fsDataInputStream = fs.open(path)) { + bf.writeBytes(ByteStreams.toByteArray(fsDataInputStream)); + } + } + } + + @Override + public void setBlockSize(Long blockSize) { + if (blockSize != null && blockSize > DEFAULT_BLOCK_SIZE) { + this.blockSize = blockSize; + } + } + + // TODO Synchronous write, asynchronous write can be added in the future + @Override + public void write(IMapFileData data) throws IOException { + byte[] bytes = serializer.serialize(data); + this.write(bytes); + } + + private void write(byte[] bytes) { + try (FSDataOutputStream out = fs.create(path, true)) { + // Write to bytebuffer + byte[] data = WALDataUtils.wrapperBytes(bytes); + bf.writeBytes(data); + + // Read all bytes + byte[] allBytes = new byte[bf.readableBytes()]; + bf.readBytes(allBytes); + + // write filesystem + out.write(allBytes); + + // check and reset + checkAndSetNextScheduleRotation(allBytes.length); + + } catch (Exception ex) { + throw new IMapStorageException(ex); + } + } + + private void checkAndSetNextScheduleRotation(long allBytes) { + if (allBytes > blockSize) { + this.path = createNewPath(); + this.bf.clear(); + } else { + // reset index + bf.resetReaderIndex(); + } + } + + public Path createNewPath() { + return new Path(parentPath, index.incrementAndGet() + "_" + FILE_NAME); + } + + @Override + public void close() throws Exception { + bf.clear(); + this.bf = null; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java new file mode 100644 index 00000000000..57b329c5c97 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.wal.writer; + +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils; +import org.apache.seatunnel.engine.serializer.api.Serializer; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; + +import java.io.IOException; +import java.util.EnumSet; + +public class HdfsWriter implements IFileWriter { + + private FSDataOutputStream out; + + private Serializer serializer; + + @Override + public String identifier() { + return "hdfs"; + } + + @Override + public void initialize(FileSystem fs, Path parentPath, Serializer serializer) + throws IOException { + Path path = new Path(parentPath, FILE_NAME); + this.out = fs.create(path); + this.serializer = serializer; + } + + @Override + public void write(IMapFileData data) throws IOException { + byte[] bytes = serializer.serialize(data); + this.write(bytes); + } + + public void flush() throws IOException { + // hsync to flag + if (out instanceof HdfsDataOutputStream) { + ((HdfsDataOutputStream) out) + .hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); + } + if (out.getWrappedStream() instanceof DFSOutputStream) { + ((DFSOutputStream) out.getWrappedStream()) + .hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); + } else { + out.hsync(); + } + this.out.hflush(); + } + + private void write(byte[] bytes) throws IOException { + byte[] data = WALDataUtils.wrapperBytes(bytes); + this.out.write(data); + this.flush(); + } + + @Override + public void close() throws Exception { + if (out != null) { + out.close(); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/IFileWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/IFileWriter.java new file mode 100644 index 00000000000..f69fc3fae47 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/IFileWriter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.seatunnel.engine.imap.storage.file.wal.writer; + +import org.apache.seatunnel.engine.serializer.api.Serializer; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +public interface IFileWriter extends AutoCloseable { + String FILE_NAME = "wal.txt"; + Long DEFAULT_BLOCK_SIZE = 1024 * 1024L; + + String identifier(); + + void initialize(FileSystem fs, Path parentPath, Serializer serializer) throws IOException; + + default void setBlockSize(Long blockSize) {} + + void write(T data) throws IOException; +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/OssWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/OssWriter.java new file mode 100644 index 00000000000..8eeb8749450 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/OssWriter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.seatunnel.engine.imap.storage.file.wal.writer; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class OssWriter extends CloudWriter { + @Override + public String identifier() { + return "oss"; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/S3Writer.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/S3Writer.java new file mode 100644 index 00000000000..8b7fad093c1 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/S3Writer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.seatunnel.engine.imap.storage.file.wal.writer; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class S3Writer extends CloudWriter { + @Override + public String identifier() { + return "s3"; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileOSSStorageTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileOSSStorageTest.java new file mode 100644 index 00000000000..dd92b65e762 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileOSSStorageTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file; + +import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledOnOs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.condition.OS.LINUX; +import static org.junit.jupiter.api.condition.OS.MAC; + +@EnabledOnOs({LINUX, MAC}) +@Disabled +public class IMapFileOSSStorageTest { + + static String OSS_BUCKET_NAME = "oss://your bucket name/"; + static String OSS_ENDPOINT = "your oss endpoint"; + static String OSS_ACCESS_KEY_ID = "oss accessKey id"; + static String OSS_ACCESS_KEY_SECRET = "oss accessKey secret"; + static String BUSINESS = "random"; + static String NAMESPACE = "/seatunnel-test/2"; + static String CLUSTER_NAME = "test-one"; + private static final Configuration CONF; + + private static final IMapFileStorage STORAGE; + + static { + CONF = new Configuration(); + CONF.set("storage.type", "oss"); + CONF.set("fs.defaultFS", OSS_BUCKET_NAME); + CONF.set("fs.oss.endpoint", OSS_ENDPOINT); + CONF.set("fs.oss.accessKeyId", OSS_ACCESS_KEY_ID); + CONF.set("fs.oss.accessKeySecret", OSS_ACCESS_KEY_SECRET); + CONF.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); + CONF.set( + "fs.oss.credentials.provider", + "org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider"); + + STORAGE = new IMapFileStorage(); + Map properties = new HashMap<>(); + properties.put("storage.type", "oss"); + properties.put("oss.bucket", OSS_BUCKET_NAME); + properties.put("block.size", 1024 * 1024 * 2); + properties.put("fs.oss.endpoint", OSS_ENDPOINT); + properties.put("fs.oss.accessKeyId", OSS_ACCESS_KEY_ID); + properties.put("fs.oss.accessKeySecret", OSS_ACCESS_KEY_SECRET); + properties.put("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); + properties.put( + "fs.oss.credentials.provider", + "org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider"); + properties.put(FileConstants.FileInitProperties.BUSINESS_KEY, BUSINESS); + properties.put(FileConstants.FileInitProperties.NAMESPACE_KEY, NAMESPACE); + properties.put(FileConstants.FileInitProperties.CLUSTER_NAME, CLUSTER_NAME); + properties.put(WRITE_DATA_TIMEOUT_MILLISECONDS_KEY, 6000L); + + STORAGE.initialize(properties); + } + + @Test + void testAll() { + + List keys = new ArrayList<>(); + String key1Index = "key1"; + String key2Index = "key2"; + String key50Index = "key50"; + + AtomicInteger dataSize = new AtomicInteger(); + Long keyValue = 123456789L; + for (int i = 0; i < 100; i++) { + String key = "key" + i; + Long value = System.currentTimeMillis(); + + if (i == 50) { + // delete + STORAGE.delete(key1Index); + keys.remove(key1Index); + // update + STORAGE.store(key2Index, keyValue); + keys.add(key2Index); + value = keyValue; + new Thread(() -> dataSize.set(STORAGE.loadAll().size())).start(); + } + STORAGE.store(key, value); + keys.add(key); + STORAGE.delete(key1Index); + keys.remove(key1Index); + } + + await().atMost(1, TimeUnit.SECONDS).until(dataSize::get, size -> size > 0); + Map loadAllDatas = STORAGE.loadAll(); + Assertions.assertTrue(dataSize.get() >= 50); + Assertions.assertEquals(keyValue, loadAllDatas.get(key50Index)); + Assertions.assertEquals(keyValue, loadAllDatas.get(key2Index)); + Assertions.assertNull(loadAllDatas.get(key1Index)); + + STORAGE.deleteAll(keys); + } + + @Test + void testStoreArray() { + Long[] data = new Long[10]; + data[6] = 111111111L; + STORAGE.store("array", data); + Long[] array = (Long[]) STORAGE.loadAll().get("array"); + Assertions.assertEquals(array[6], 111111111L); + } + + @AfterAll + static void afterAll() throws IOException { + FileSystem.get(CONF).delete(new Path("/seatunnel-test/2"), true); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java index a38cdad5422..a1aebb5e478 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java @@ -56,11 +56,13 @@ public class IMapFileStorageTest { CONF.set("fs.defaultFS", "file:///"); CONF.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); STORAGE = new IMapFileStorage(); + Map properties = new HashMap<>(); + properties.put("fs.defaultFS", "file:///"); + properties.put("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); properties.put(FileConstants.FileInitProperties.BUSINESS_KEY, "random"); properties.put(FileConstants.FileInitProperties.NAMESPACE_KEY, "/tmp/imap-kris-test/2"); properties.put(FileConstants.FileInitProperties.CLUSTER_NAME, "test-one"); - properties.put(FileConstants.FileInitProperties.HDFS_CONFIG_KEY, CONF); properties.put(WRITE_DATA_TIMEOUT_MILLISECONDS_KEY, 60L); STORAGE.initialize(properties); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java index 48f88bd2d3a..388be848360 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java @@ -21,6 +21,7 @@ package org.apache.seatunnel.engine.imap.storage.file.common; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.serializer.api.Serializer; import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer; @@ -59,7 +60,7 @@ public static void init() throws IOException { @Test public void testWriterAndReader() throws Exception { - WALWriter writer = new WALWriter(FS, PARENT_PATH, SERIALIZER); + WALWriter writer = new WALWriter(FS, FileConfiguration.HDFS, PARENT_PATH, SERIALIZER); IMapFileData data; boolean isDelete; for (int i = 0; i < 1024; i++) { @@ -106,7 +107,7 @@ public void testWriterAndReader() throws Exception { writer.close(); await().atMost(10, java.util.concurrent.TimeUnit.SECONDS).await(); - WALReader reader = new WALReader(FS, new ProtoStuffSerializer()); + WALReader reader = new WALReader(FS, FileConfiguration.HDFS, new ProtoStuffSerializer()); Map result = reader.loadAllData(PARENT_PATH, new HashSet<>()); Assertions.assertEquals("Kristen", result.get("key511")); Assertions.assertEquals(511, result.size()); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java index fb06c91e707..a8bc80a89a7 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java @@ -21,6 +21,7 @@ package org.apache.seatunnel.engine.imap.storage.file.disruptor; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFuture; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache; import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer; @@ -59,7 +60,8 @@ public class WALDisruptorTest { @Test void testProducerAndConsumer() throws IOException { FS = FileSystem.get(CONF); - DISRUPTOR = new WALDisruptor(FS, FILEPATH, new ProtoStuffSerializer()); + DISRUPTOR = + new WALDisruptor(FS, FileConfiguration.HDFS, FILEPATH, new ProtoStuffSerializer()); IMapFileData data; for (int i = 0; i < 100; i++) { data = diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index d1db75ff191..3a1e736b68b 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -35,3 +35,6 @@ jsr305-1.3.9.jar jsr305-3.0.0.jar jsr305-3.0.2.jar listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar +accessors-smart-2.4.7.jar +asm-9.1.jar +json-smart-2.4.7.jar