Skip to content

Commit

Permalink
[Improve][E2E] Support windows for the e2e of paimon (apache#7329)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 authored Aug 7, 2024
1 parent 855254e commit a4db64d
Showing 1 changed file with 46 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.seatunnel.e2e.connector.paimon;

import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
Expand Down Expand Up @@ -52,6 +54,7 @@

import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
Expand All @@ -68,7 +71,8 @@
"Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error")
@Slf4j
public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource {
private static final String CATALOG_ROOT_DIR = "/tmp/";

private static String CATALOG_ROOT_DIR = "/tmp/";
private static final String NAMESPACE = "paimon";
private static final String NAMESPACE_TAR = "paimon.tar.gz";
private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + "/";
Expand All @@ -77,10 +81,18 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource {
private static final String FAKE_DATABASE1 = "FakeDatabase1";
private static final String FAKE_TABLE2 = "FakeTable1";
private static final String FAKE_DATABASE2 = "FakeDatabase2";
private String CATALOG_ROOT_DIR_WIN = "C:/Users/";
private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
private boolean isWindows;

@BeforeAll
@Override
public void startUp() throws Exception {}
public void startUp() throws Exception {
this.isWindows =
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN + System.getProperty("user.name") + "/tmp/";
CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
}

@AfterAll
@Override
Expand Down Expand Up @@ -498,8 +510,15 @@ public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer contai

protected final ContainerExtendedFactory containerExtendedFactory =
container -> {
FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
FileUtils.createNewDir(CATALOG_DIR);
if (isWindows) {
FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR);
FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar");
FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN);
} else {
FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
FileUtils.createNewDir(CATALOG_DIR);
}

container.execInContainer(
"sh",
"-c",
Expand All @@ -510,8 +529,13 @@ public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer contai
+ " "
+ NAMESPACE);
container.copyFileFromContainer(
CATALOG_ROOT_DIR + NAMESPACE_TAR, CATALOG_ROOT_DIR + NAMESPACE_TAR);
extractFiles();
CATALOG_ROOT_DIR + NAMESPACE_TAR,
(isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR) + NAMESPACE_TAR);
if (isWindows) {
extractFilesWin();
} else {
extractFiles();
}
};

private void extractFiles() {
Expand All @@ -532,6 +556,17 @@ private void extractFiles() {
}
}

private void extractFilesWin() {
try {
CompressionUtils.unGzip(
new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new File(CATALOG_ROOT_DIR_WIN));
CompressionUtils.unTar(
new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new File(CATALOG_ROOT_DIR_WIN));
} catch (IOException | ArchiveException e) {
throw new RuntimeException(e);
}
}

private List<PaimonRecord> loadPaimonData(String dbName, String tbName) throws Exception {
Table table = getTable(dbName, tbName);
ReadBuilder readBuilder = table.newReadBuilder();
Expand Down Expand Up @@ -575,7 +610,11 @@ private Identifier getIdentifier(String dbName, String tbName) {

private Catalog getCatalog() {
Options options = new Options();
options.set("warehouse", "file://" + CATALOG_DIR);
if (isWindows) {
options.set("warehouse", "file://" + CATALOG_DIR_WIN);
} else {
options.set("warehouse", "file://" + CATALOG_DIR);
}
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
return catalog;
}
Expand Down

0 comments on commit a4db64d

Please sign in to comment.