From 4ac05e4b6e730d976a51e92a518cfdbdeb10b51b Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Tue, 24 Dec 2024 16:07:44 +0800 Subject: [PATCH] [core] External Path in DataFileMeta should be the file path (#4766) --- .../org/apache/paimon/io/DataFileMeta.java | 16 ++++-- .../paimon/io/DataFileMetaSerializer.java | 2 +- .../apache/paimon/io/DataFilePathFactory.java | 54 +++++++++---------- .../apache/paimon/io/FileIndexEvaluator.java | 2 +- .../paimon/io/KeyValueFileReaderFactory.java | 40 ++++---------- .../paimon/io/KeyValueFileWriterFactory.java | 15 +++--- .../org/apache/paimon/manifest/FileEntry.java | 5 +- .../apache/paimon/manifest/ManifestEntry.java | 7 ++- .../paimon/manifest/SimpleFileEntry.java | 1 + .../paimon/mergetree/MergeTreeWriter.java | 16 +++--- .../apache/paimon/table/source/DataSplit.java | 4 +- .../paimon/table/system/FilesTable.java | 53 ++++++++---------- .../paimon/append/AppendOnlyWriterTest.java | 7 ++- .../paimon/io/DataFilePathFactoryTest.java | 12 +++-- .../paimon/io/KeyValueFileReadWriteTest.java | 14 +++-- .../paimon/mergetree/ContainsLevelsTest.java | 9 +--- .../paimon/mergetree/LookupLevelsTest.java | 9 +--- .../paimon/mergetree/MergeTreeTestBase.java | 8 +-- .../utils/FileStorePathFactoryTest.java | 12 +++-- .../paimon/flink/clone/CloneFileInfo.java | 15 ++++-- .../paimon/flink/clone/CopyFileOperator.java | 2 +- .../clone/PickFilesForCloneOperator.java | 6 ++- .../paimon/flink/clone/PickFilesUtil.java | 2 +- .../changelog/ChangelogCompactTask.java | 10 ++-- .../flink/sink/RewriteFileIndexSink.java | 5 +- .../action/RewriteFileIndexActionITCase.java | 2 +- .../RewriteFileIndexProcedureITCase.java | 2 +- ...ngleTableCompactionWorkerOperatorTest.java | 7 +-- .../paimon/spark/SparkFileIndexITCase.java | 3 +- 29 files changed, 157 insertions(+), 183 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 459cd788de53..b164b60fe525 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -368,9 +368,14 @@ public String fileFormat() { return split[split.length - 1]; } - @Nullable - public String externalPath() { - return externalPath; + public Optional externalPath() { + return Optional.ofNullable(externalPath); + } + + public Optional externalPathDir() { + return Optional.ofNullable(externalPath) + .map(Path::new) + .map(p -> p.getParent().toUri().toString()); } public Optional fileSource() { @@ -405,7 +410,8 @@ public DataFileMeta upgrade(int newLevel) { externalPath); } - public DataFileMeta rename(String newExternalPath, String newFileName) { + public DataFileMeta rename(String newFileName) { + String newExternalPath = externalPathDir().map(dir -> dir + "/" + newFileName).orElse(null); return new DataFileMeta( newFileName, fileSize, @@ -452,7 +458,7 @@ public DataFileMeta copyWithoutStats() { public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(this)); - extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, f))); + extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this))); return paths; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index c8a5e326b0a1..a316f897ff73 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -59,7 +59,7 @@ public InternalRow toRow(DataFileMeta meta) { meta.embeddedIndex(), meta.fileSource().map(FileSource::toByteValue).orElse(null), toStringArrayData(meta.valueStatsCols()), - BinaryString.fromString(meta.externalPath())); + meta.externalPath().map(BinaryString::fromString).orElse(null)); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index daeb9f52eada..19525ab6cd91 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -20,9 +20,11 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileEntry; import javax.annotation.concurrent.ThreadSafe; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -67,47 +69,36 @@ public Path newChangelogPath() { return newPath(changelogFilePrefix); } - private Path newPath(String prefix) { + public String newChangelogFileName() { + return newFileName(changelogFilePrefix); + } + + public Path newPath(String prefix) { + return new Path(parent, newFileName(prefix)); + } + + private String newFileName(String prefix) { String extension; if (fileSuffixIncludeCompression) { extension = "." + fileCompression + "." + formatIdentifier; } else { extension = "." + formatIdentifier; } - String name = prefix + uuid + "-" + pathCount.getAndIncrement() + extension; - return new Path(parent, name); + return prefix + uuid + "-" + pathCount.getAndIncrement() + extension; } - @VisibleForTesting - public Path toPath(String fileName) { - return new Path(parent + "/" + fileName); - } - - /** - * for read purpose. - * - * @param fileName the file name - * @param externalPath the external path, if null, it will use the parent path - * @return the file's path - */ - public Path toPath(String fileName, String externalPath) { - return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); + public Path toPath(DataFileMeta file) { + return file.externalPath().map(Path::new).orElse(new Path(parent, file.fileName())); } - public Path toPath(DataFileMeta dataFileMeta) { - String externalPath = dataFileMeta.externalPath(); - String fileName = dataFileMeta.fileName(); - return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); + public Path toPath(FileEntry file) { + return Optional.ofNullable(file.externalPath()) + .map(Path::new) + .orElse(new Path(parent, file.fileName())); } - public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) { - String externalPath = dataFileMeta.externalPath(); - return new Path((externalPath == null ? parent : externalPath) + "/" + extraFile); - } - - @VisibleForTesting - public String uuid() { - return uuid; + public Path toAlignedPath(String fileName, DataFileMeta aligned) { + return new Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName); } public static Path dataFileToFileIndexPath(Path dataFilePath) { @@ -141,4 +132,9 @@ public static String formatIdentifier(String fileName) { return fileName.substring(index + 1); } + + @VisibleForTesting + String uuid() { + return uuid; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java index 9055097d3718..3ed4c278d9d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java @@ -62,7 +62,7 @@ public static FileIndexResult evaluate( // go to file index check try (FileIndexPredicate predicate = new FileIndexPredicate( - dataFilePathFactory.toExtraFilePath(file, indexFiles.get(0)), + dataFilePathFactory.toAlignedPath(indexFiles.get(0), file), fileIO, dataSchema.logicalRowType())) { return predicate.evaluate( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 9d65a5411364..14221d50beb4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -20,7 +20,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; -import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; @@ -98,37 +97,17 @@ private KeyValueFileReaderFactory( @Override public RecordReader createRecordReader(DataFileMeta file) throws IOException { - return createRecordReader( - file.schemaId(), - file.fileName(), - file.fileSize(), - file.level(), - file.externalPath()); - } - - @VisibleForTesting - public RecordReader createRecordReader( - long schemaId, String fileName, long fileSize, int level, String externalPath) - throws IOException { - if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) { - return new AsyncRecordReader<>( - () -> - createRecordReader( - schemaId, fileName, level, false, 2, fileSize, externalPath)); + if (file.fileSize() >= asyncThreshold && file.fileName().endsWith(".orc")) { + return new AsyncRecordReader<>(() -> createRecordReader(file, false, 2)); } - return createRecordReader(schemaId, fileName, level, true, null, fileSize, externalPath); + return createRecordReader(file, true, null); } private FileRecordReader createRecordReader( - long schemaId, - String fileName, - int level, - boolean reuseFormat, - @Nullable Integer orcPoolSize, - long fileSize, - String externalPath) + DataFileMeta file, boolean reuseFormat, @Nullable Integer orcPoolSize) throws IOException { - String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName); + String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName()); + long schemaId = file.schemaId(); Supplier formatSupplier = () -> @@ -143,8 +122,9 @@ private FileRecordReader createRecordReader( new FormatKey(schemaId, formatIdentifier), key -> formatSupplier.get()) : formatSupplier.get(); - Path filePath = pathFactory.toPath(fileName, externalPath); + Path filePath = pathFactory.toPath(file); + long fileSize = file.fileSize(); FileRecordReader fileRecordReader = new DataFileRecordReader( formatReaderMapping.getReaderFactory(), @@ -156,13 +136,13 @@ private FileRecordReader createRecordReader( formatReaderMapping.getCastMapping(), PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition)); - Optional deletionVector = dvFactory.create(fileName); + Optional deletionVector = dvFactory.create(file.fileName()); if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { fileRecordReader = new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get()); } - return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level); + return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level()); } public static Builder builder( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 500320c24947..7b6f4f0e3c56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -142,14 +142,13 @@ private KeyValueDataFileWriter createDataFileWriter( fileIndexOptions); } - public void deleteFile(DataFileMeta meta, int level) { - fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta)); + public void deleteFile(DataFileMeta file) { + fileIO.deleteQuietly(formatContext.pathFactory(file.level()).toPath(file)); } - public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int level) - throws IOException { - Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta); - Path targetPath = formatContext.pathFactory(level).toPath(targetMeta); + public void copyFile(DataFileMeta sourceFile, DataFileMeta targetFile) throws IOException { + Path sourcePath = formatContext.pathFactory(sourceFile.level()).toPath(sourceFile); + Path targetPath = formatContext.pathFactory(targetFile.level()).toPath(targetFile); fileIO.copyFile(sourcePath, targetPath, true); } @@ -157,8 +156,8 @@ public FileIO getFileIO() { return fileIO; } - public Path newChangelogPath(int level) { - return formatContext.pathFactory(level).newChangelogPath(); + public String newChangelogFileName(int level) { + return formatContext.pathFactory(level).newChangelogFileName(); } public static Builder builder( diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index 738776438be7..dd77759de1d5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -54,6 +54,7 @@ public interface FileEntry { String fileName(); + @Nullable String externalPath(); Identifier identifier(); @@ -161,7 +162,9 @@ public String toString(FileStorePathFactory pathFactory) { + ", extraFiles " + extraFiles + ", embeddedIndex " - + Arrays.toString(embeddedIndex); + + Arrays.toString(embeddedIndex) + + ", externalPath " + + externalPath; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index d4748451d8ca..3cb5733a38d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -26,6 +26,8 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.TinyIntType; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -92,9 +94,10 @@ public String fileName() { return file.fileName(); } + @Nullable @Override public String externalPath() { - return file.externalPath(); + return file.externalPath().orElse(null); } @Override @@ -129,7 +132,7 @@ public Identifier identifier() { file.fileName(), file.extraFiles(), file.embeddedIndex(), - file.externalPath()); + externalPath()); } public ManifestEntry copyWithoutStats() { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index f86bded52d46..c8708db0b8f4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -106,6 +106,7 @@ public String fileName() { return fileName; } + @Nullable @Override public String externalPath() { return externalPath; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index df4855922360..1c805e764a77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -27,7 +27,6 @@ import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.fs.Path; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -242,10 +241,9 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul } else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) { List changelogMetas = new ArrayList<>(); for (DataFileMeta dataMeta : dataMetas) { - Path newPath = writerFactory.newChangelogPath(0); - DataFileMeta changelogMeta = - dataMeta.rename(newPath.getParent().getName(), newPath.getName()); - writerFactory.copyFile(dataMeta, changelogMeta, 0); + String newFileName = writerFactory.newChangelogFileName(0); + DataFileMeta changelogMeta = dataMeta.rename(newFileName); + writerFactory.copyFile(dataMeta, changelogMeta); changelogMetas.add(changelogMeta); } newFilesChangelog.addAll(changelogMetas); @@ -343,7 +341,7 @@ private void updateCompactResult(CompactResult result) { // 2. This file is not the input of upgraded. if (!compactBefore.containsKey(file.fileName()) && !afterFiles.contains(file.fileName())) { - writerFactory.deleteFile(file, file.level()); + writerFactory.deleteFile(file); } } else { compactBefore.put(file.fileName(), file); @@ -377,7 +375,7 @@ public void close() throws Exception { deletedFiles.clear(); for (DataFileMeta file : newFilesChangelog) { - writerFactory.deleteFile(file, file.level()); + writerFactory.deleteFile(file); } newFilesChangelog.clear(); @@ -392,12 +390,12 @@ public void close() throws Exception { compactAfter.clear(); for (DataFileMeta file : compactChangelog) { - writerFactory.deleteFile(file, file.level()); + writerFactory.deleteFile(file); } compactChangelog.clear(); for (DataFileMeta file : delete) { - writerFactory.deleteFile(file, file.level()); + writerFactory.deleteFile(file); } if (compactDeletionFile != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 9178d25a912e..39f9269f41ef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -180,10 +180,8 @@ public Optional> convertToRawFiles() { } private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) { - String path = file.externalPath() != null ? file.externalPath() : bucketPath; - path += "/" + file.fileName(); return new RawFile( - path, + file.externalPath().orElse(bucketPath + "/" + file.fileName()), file.fileSize(), 0, file.fileSize(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 3107ebe150e3..5c7ccd4809c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -370,9 +370,9 @@ private LazyGenericRow toRow( DataSplit dataSplit, RowDataToObjectArrayConverter partitionConverter, Function keyConverters, - DataFileMeta dataFileMeta, + DataFileMeta file, SimpleStatsEvolutions simpleStatsEvolutions) { - StatsLazyGetter statsGetter = new StatsLazyGetter(dataFileMeta, simpleStatsEvolutions); + StatsLazyGetter statsGetter = new StatsLazyGetter(file, simpleStatsEvolutions); @SuppressWarnings("unchecked") Supplier[] fields = new Supplier[] { @@ -385,51 +385,44 @@ private LazyGenericRow toRow( dataSplit.partition()))), dataSplit::bucket, () -> - dataFileMeta.externalPath() == null - ? BinaryString.fromString( - dataSplit.bucketPath() - + "/" - + dataFileMeta.fileName()) - : BinaryString.fromString( - dataFileMeta.externalPath() - + "/" - + dataFileMeta.fileName()), + BinaryString.fromString( + file.externalPath() + .orElse( + dataSplit.bucketPath() + + "/" + + file.fileName())), () -> BinaryString.fromString( - DataFilePathFactory.formatIdentifier( - dataFileMeta.fileName())), - dataFileMeta::schemaId, - dataFileMeta::level, - dataFileMeta::rowCount, - dataFileMeta::fileSize, + DataFilePathFactory.formatIdentifier(file.fileName())), + file::schemaId, + file::level, + file::rowCount, + file::fileSize, () -> - dataFileMeta.minKey().getFieldCount() <= 0 + file.minKey().getFieldCount() <= 0 ? null : BinaryString.fromString( Arrays.toString( keyConverters - .apply(dataFileMeta.schemaId()) - .convert(dataFileMeta.minKey()))), + .apply(file.schemaId()) + .convert(file.minKey()))), () -> - dataFileMeta.maxKey().getFieldCount() <= 0 + file.maxKey().getFieldCount() <= 0 ? null : BinaryString.fromString( Arrays.toString( keyConverters - .apply(dataFileMeta.schemaId()) - .convert(dataFileMeta.maxKey()))), + .apply(file.schemaId()) + .convert(file.maxKey()))), () -> BinaryString.fromString(statsGetter.nullValueCounts().toString()), () -> BinaryString.fromString(statsGetter.lowerValueBounds().toString()), () -> BinaryString.fromString(statsGetter.upperValueBounds().toString()), - dataFileMeta::minSequenceNumber, - dataFileMeta::maxSequenceNumber, - dataFileMeta::creationTime, + file::minSequenceNumber, + file::maxSequenceNumber, + file::creationTime, () -> BinaryString.fromString( - dataFileMeta - .fileSource() - .map(FileSource::toString) - .orElse(null)) + file.fileSource().map(FileSource::toString).orElse(null)) }; return new LazyGenericRow(fields); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 3f752be13e73..77570205327e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -66,7 +66,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -646,10 +645,10 @@ private DataFileMeta generateCompactAfter(List toCompact) throws I int size = toCompact.size(); long minSeq = toCompact.get(0).minSequenceNumber(); long maxSeq = toCompact.get(size - 1).maxSequenceNumber(); - String fileName = "compact-" + UUID.randomUUID(); - LocalFileIO.create().newOutputStream(pathFactory.toPath(fileName), false).close(); + Path path = pathFactory.newPath("compact-"); + LocalFileIO.create().newOutputStream(path, false).close(); return DataFileMeta.forAppend( - fileName, + path.getName(), toCompact.stream().mapToLong(DataFileMeta::fileSize).sum(), toCompact.stream().mapToLong(DataFileMeta::rowCount).sum(), STATS_SERIALIZER.toBinaryAllMode( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java index d36966c55a0e..109f33c3dc1b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java @@ -55,8 +55,9 @@ public void testNoPartition() { + "." + CoreOptions.FILE_FORMAT.defaultValue())); } - assertThat(pathFactory.toPath("my-data-file-name")) - .isEqualTo(new Path(tempDir.toString() + "/bucket-123/my-data-file-name")); + assertThat(pathFactory.newPath("my-data-file-name").toString()) + .startsWith( + new Path(tempDir.toString() + "/bucket-123/my-data-file-name").toString()); } @Test @@ -83,8 +84,9 @@ public void testWithPartition() { + "." + CoreOptions.FILE_FORMAT.defaultValue())); } - assertThat(pathFactory.toPath("my-data-file-name")) - .isEqualTo( - new Path(tempDir.toString() + "/dt=20211224/bucket-123/my-data-file-name")); + assertThat(pathFactory.newPath("my-data-file-name").toString()) + .startsWith( + new Path(tempDir.toString() + "/dt=20211224/bucket-123/my-data-file-name") + .toString()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index e81756268980..8f2c815404cf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -61,6 +61,7 @@ import static org.apache.paimon.TestKeyValueGenerator.DEFAULT_ROW_TYPE; import static org.apache.paimon.TestKeyValueGenerator.KEY_TYPE; import static org.apache.paimon.TestKeyValueGenerator.createTestSchemaManager; +import static org.apache.paimon.io.DataFileTestUtils.newFile; import static org.apache.paimon.stats.StatsTestUtils.convertWithoutSchemaEvolution; import static org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory; import static org.assertj.core.api.Assertions.assertThat; @@ -78,7 +79,10 @@ public class KeyValueFileReadWriteTest { public void testReadNonExistentFile() { KeyValueFileReaderFactory readerFactory = createReaderFactory(tempDir.toString(), "avro", null, null); - assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 1, 0, null)) + assertThatThrownBy( + () -> + readerFactory.createRecordReader( + newFile("non_avro_file.avro", 0, 0, 1, 0))) .hasMessageContaining( "you can configure 'snapshot.time-retained' option with a larger value."); } @@ -307,13 +311,7 @@ private void assertData( for (DataFileMeta meta : actualMetas) { // check the contents of data file CloseableIterator actualKvsIterator = - new RecordReaderIterator<>( - readerFactory.createRecordReader( - meta.schemaId(), - meta.fileName(), - meta.fileSize(), - meta.level(), - meta.externalPath())); + new RecordReaderIterator<>(readerFactory.createRecordReader(meta)); while (actualKvsIterator.hasNext()) { assertThat(expectedIterator.hasNext()).isTrue(); KeyValue actualKv = actualKvsIterator.next(); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index fa96765a4278..fa9628b4c1ba 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -195,14 +195,7 @@ private LookupLevels createContainsLevels(Levels levels, MemorySize max comparator, keyType, new LookupLevels.ContainsValueProcessor(), - file -> - createReaderFactory() - .createRecordReader( - 0, - file.fileName(), - file.fileSize(), - file.level(), - file.externalPath()), + file -> createReaderFactory().createRecordReader(file), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 56c45cfdc442..b68a82935bd0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -272,14 +272,7 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxD comparator, keyType, new LookupLevels.KeyValueProcessor(rowType), - file -> - createReaderFactory() - .createRecordReader( - 0, - file.fileName(), - file.fileSize(), - file.level(), - file.externalPath()), + file -> createReaderFactory().createRecordReader(file), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 47d12ce47cf4..e987e2ee9987 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -114,7 +114,7 @@ public void beforeEach() throws IOException { pathFactory = createNonPartFactory(path); comparator = Comparator.comparingInt(o -> o.getInt(0)); recreateMergeTree(1024 * 1024); - Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(0).newPath().getParent(); LocalFileIO.create().mkdirs(bucketDir); } @@ -418,7 +418,7 @@ private void doTestWriteRead(int batchNumber, int perBatch) throws Exception { writer.close(); - Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(0).newPath().getParent(); Set files = Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) .map(FileStatus::getPath) @@ -475,7 +475,7 @@ private void doTestWriteReadWithChangelog( writer.close(); - Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(0).newPath().getParent(); Set files = Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) .map(FileStatus::getPath) @@ -592,7 +592,7 @@ private void mergeCompacted( assertThat(remove).isTrue(); // See MergeTreeWriter.updateCompactResult if (!newFileNames.contains(file.fileName()) && !afterFiles.contains(file.fileName())) { - compactWriterFactory.deleteFile(file, file.level()); + compactWriterFactory.deleteFile(file); } } compactedFiles.addAll(increment.compactIncrement().compactAfter()); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java index 6ca15cf1503d..c5cda2286dfb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java @@ -72,8 +72,9 @@ public void testCreateDataFilePathFactoryNoPartition() { FileStorePathFactory pathFactory = createNonPartFactory(new Path(tempDir.toString())); DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(new BinaryRow(0), 123); - assertThat(dataFilePathFactory.toPath("my-data-file-name")) - .isEqualTo(new Path(tempDir.toString() + "/bucket-123/my-data-file-name")); + assertThat(dataFilePathFactory.newPath("my-data-file-name").toString()) + .startsWith( + new Path(tempDir.toString() + "/bucket-123/my-data-file-name").toString()); } @Test @@ -116,9 +117,10 @@ private void assertPartition( writer.complete(); DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(partition, 123); - assertThat(dataFilePathFactory.toPath("my-data-file-name")) - .isEqualTo( - new Path(tempDir.toString() + expected + "/bucket-123/my-data-file-name")); + assertThat(dataFilePathFactory.newPath("my-data-file-name").toString()) + .startsWith( + new Path(tempDir.toString() + expected + "/bucket-123/my-data-file-name") + .toString()); } public static FileStorePathFactory createNonPartFactory(Path root) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java index d916958412ea..5c0ac75e167f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java @@ -21,17 +21,26 @@ /** The information of copy file. */ public class CloneFileInfo { + private final String sourceFilePath; private final String filePathExcludeTableRoot; private final String sourceIdentifier; private final String targetIdentifier; public CloneFileInfo( - String filePathExcludeTableRoot, String sourceIdentifier, String targetIdentifier) { + String sourceFilePath, + String filePathExcludeTableRoot, + String sourceIdentifier, + String targetIdentifier) { + this.sourceFilePath = sourceFilePath; this.filePathExcludeTableRoot = filePathExcludeTableRoot; this.sourceIdentifier = sourceIdentifier; this.targetIdentifier = targetIdentifier; } + public String getSourceFilePath() { + return sourceFilePath; + } + public String getFilePathExcludeTableRoot() { return filePathExcludeTableRoot; } @@ -47,7 +56,7 @@ public String getTargetIdentifier() { @Override public String toString() { return String.format( - "{ filePath: %s, sourceIdentifier: %s, targetIdentifier: %s }", - filePathExcludeTableRoot, sourceIdentifier, targetIdentifier); + "{ sourceFilePath: %s, filePathExcludeTableRoot: %s, sourceIdentifier: %s, targetIdentifier: %s }", + sourceFilePath, filePathExcludeTableRoot, sourceIdentifier, targetIdentifier); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java index 8bcaa2a2071f..e7002cce1eec 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java @@ -99,7 +99,7 @@ public void processElement(StreamRecord streamRecord) throws Exce }); String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot(); - Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot); + Path sourcePath = new Path(cloneFileInfo.getSourceFilePath()); Path targetPath = new Path(targetTableRootPath + filePathExcludeTableRoot); if (targetTableFileIO.exists(targetPath) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java index 67eecbc6f2ae..f58d3acafdb9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java @@ -123,7 +123,11 @@ private List toCloneFileInfos( for (Path file : files) { Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot); result.add( - new CloneFileInfo(relativePath.toString(), sourceIdentifier, targetIdentifier)); + new CloneFileInfo( + file.toUri().toString(), + relativePath.toString(), + sourceIdentifier, + targetIdentifier)); } return result; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java index b7e1e608789b..9de974d047f1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java @@ -108,7 +108,7 @@ private static List getUsedFilesInternal( pathFactory .createDataFilePathFactory( simpleFileEntry.partition(), simpleFileEntry.bucket()) - .toPath(simpleFileEntry.fileName(), simpleFileEntry.externalPath()); + .toPath(simpleFileEntry); dataFiles.add(dataFilePath); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index 39860e418c18..7f3f73028023 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -167,12 +167,12 @@ private List produceNewCommittables( table.fileIO() .rename( changelogTempPath, - dataFilePathFactory.toExtraFilePath( - baseResult.meta, + dataFilePathFactory.toAlignedPath( realName + "." + CompactedChangelogReadOnlyFormat.getIdentifier( - baseResult.meta.fileFormat()))); + baseResult.meta.fileFormat()), + baseResult.meta)); List newCommittables = new ArrayList<>(); @@ -194,9 +194,9 @@ private List produceNewCommittables( + CompactedChangelogReadOnlyFormat.getIdentifier( result.meta.fileFormat()); if (result.isCompactResult) { - compactChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); + compactChangelog.add(result.meta.rename(name)); } else { - newFilesChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); + newFilesChangelog.add(result.meta.rename(name)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java index d35cb09cb78a..99061d4b82be 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java @@ -199,14 +199,13 @@ public DataFileMeta process(BinaryRow partition, int bucket, DataFileMeta dataFi try (FileIndexFormat.Reader indexReader = FileIndexFormat.createReader( fileIO.newInputStream( - dataFilePathFactory.toExtraFilePath( - dataFileMeta, indexFile)), + dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta)), schemaInfo.fileSchema)) { maintainers = indexReader.readAll(); } newIndexPath = createNewFileIndexFilePath( - dataFilePathFactory.toExtraFilePath(dataFileMeta, indexFile)); + dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta)); } else { maintainers = new HashMap<>(); newIndexPath = dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java index 242a7514168e..dc6e5523b06f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java @@ -102,7 +102,7 @@ public void testFileIndexAddIndex() throws Exception { table.store() .pathFactory() .createDataFilePathFactory(entry.partition(), entry.bucket()) - .toPath(file); + .toAlignedPath(file, entry.file()); try (FileIndexFormat.Reader reader = FileIndexFormat.createReader( table.fileIO().newInputStream(indexFilePath), table.rowType())) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java index 1abfe355a566..23102f2a3d40 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java @@ -199,7 +199,7 @@ public void testFileIndexProcedureDropIndex(boolean isNamedArgument) throws Exce table.store() .pathFactory() .createDataFilePathFactory(entry.partition(), entry.bucket()) - .toPath(file); + .toAlignedPath(file, entry.file()); try (FileIndexFormat.Reader reader = FileIndexFormat.createReader( table.fileIO().newInputStream(indexFilePath), table.rowType())) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java index 6238a9cbf3ea..aa33e4fe75a1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java @@ -160,8 +160,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { List fileMetas = ((CommitMessageImpl) commitMessage).compactIncrement().compactAfter(); for (DataFileMeta fileMeta : fileMetas) { - Assertions.assertThat( - localFileIO.exists(dataFilePathFactory.toPath(fileMeta.fileName()))) + Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta))) .isTrue(); } if (i++ > 2) { @@ -188,9 +187,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { List fileMetas = ((CommitMessageImpl) commitMessage).compactIncrement().compactAfter(); for (DataFileMeta fileMeta : fileMetas) { - Assertions.assertThat( - localFileIO.exists( - dataFilePathFactory.toPath(fileMeta.fileName()))) + Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta))) .isFalse(); } } catch (Exception e) { diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java index 0360def685b6..99e95cf40e5a 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java @@ -185,7 +185,8 @@ protected void foreachIndexReader(Consumer consumer) try (FileIndexFormat.Reader reader = FileIndexFormat.createReader( fileIO.newInputStream( - dataFilePathFactory.toPath(indexFiles.get(0))), + dataFilePathFactory.toAlignedPath( + indexFiles.get(0), dataFileMeta)), tableSchema.logicalRowType())) { Optional fileIndexReader = reader.readColumnIndex("a").stream().findFirst();