diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index c899dd0e8bf..4b1d7dd86ce 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.e2e.connector.paimon; import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.core.starter.utils.CompressionUtils; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; @@ -25,6 +26,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.commons.compress.archivers.ArchiveException; import org.apache.commons.lang3.StringUtils; import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; @@ -52,6 +54,7 @@ import lombok.extern.slf4j.Slf4j; +import java.io.File; import java.io.IOException; import java.time.LocalDate; import java.util.ArrayList; @@ -68,7 +71,8 @@ "Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error") @Slf4j public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource { - private static final String CATALOG_ROOT_DIR = "/tmp/"; + + private static String CATALOG_ROOT_DIR = "/tmp/"; private static final String NAMESPACE = "paimon"; private static final String NAMESPACE_TAR = "paimon.tar.gz"; private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + "/"; @@ -77,10 +81,18 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource { private static final String FAKE_DATABASE1 = "FakeDatabase1"; private static final String FAKE_TABLE2 = "FakeTable1"; private static final String FAKE_DATABASE2 = "FakeDatabase2"; + private String CATALOG_ROOT_DIR_WIN = "C:/Users/"; + private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/"; + private boolean isWindows; @BeforeAll @Override - public void startUp() throws Exception {} + public void startUp() throws Exception { + this.isWindows = + System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS"); + CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN + System.getProperty("user.name") + "/tmp/"; + CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/"; + } @AfterAll @Override @@ -498,8 +510,15 @@ public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer contai protected final ContainerExtendedFactory containerExtendedFactory = container -> { - FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR); - FileUtils.createNewDir(CATALOG_DIR); + if (isWindows) { + FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR); + FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar"); + FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN); + } else { + FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR); + FileUtils.createNewDir(CATALOG_DIR); + } + container.execInContainer( "sh", "-c", @@ -510,8 +529,13 @@ public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer contai + " " + NAMESPACE); container.copyFileFromContainer( - CATALOG_ROOT_DIR + NAMESPACE_TAR, CATALOG_ROOT_DIR + NAMESPACE_TAR); - extractFiles(); + CATALOG_ROOT_DIR + NAMESPACE_TAR, + (isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR) + NAMESPACE_TAR); + if (isWindows) { + extractFilesWin(); + } else { + extractFiles(); + } }; private void extractFiles() { @@ -532,6 +556,17 @@ private void extractFiles() { } } + private void extractFilesWin() { + try { + CompressionUtils.unGzip( + new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new File(CATALOG_ROOT_DIR_WIN)); + CompressionUtils.unTar( + new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new File(CATALOG_ROOT_DIR_WIN)); + } catch (IOException | ArchiveException e) { + throw new RuntimeException(e); + } + } + private List loadPaimonData(String dbName, String tbName) throws Exception { Table table = getTable(dbName, tbName); ReadBuilder readBuilder = table.newReadBuilder(); @@ -575,7 +610,11 @@ private Identifier getIdentifier(String dbName, String tbName) { private Catalog getCatalog() { Options options = new Options(); - options.set("warehouse", "file://" + CATALOG_DIR); + if (isWindows) { + options.set("warehouse", "file://" + CATALOG_DIR_WIN); + } else { + options.set("warehouse", "file://" + CATALOG_DIR); + } Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); return catalog; }