Skip to content

Commit

Permalink
[HUDI-7431] Add replication and block size to StoragePathInfo to be b…
Browse files Browse the repository at this point in the history
…ackwards compatible (#10717)
  • Loading branch information
yihua committed May 14, 2024
1 parent 292b1d8 commit 91e176c
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
Expand Down Expand Up @@ -107,4 +110,48 @@ public static Path addSchemeIfLocalPath(String path) {
LOG.info("Resolving file " + path + "to be a remote file.");
return providedPath;
}

/**
* @param path {@link StoragePath} instance.
* @return the Hadoop {@link Path} instance after conversion.
*/
public static Path convertToHadoopPath(StoragePath path) {
return new Path(path.toUri());
}

/**
* @param path Hadoop {@link Path} instance.
* @return the {@link StoragePath} instance after conversion.
*/
public static StoragePath convertToStoragePath(Path path) {
return new StoragePath(path.toUri());
}

/**
* @param fileStatus Hadoop {@link FileStatus} instance.
* @return the {@link StoragePathInfo} instance after conversion.
*/
public static StoragePathInfo convertToStoragePathInfo(FileStatus fileStatus) {
return new StoragePathInfo(
convertToStoragePath(fileStatus.getPath()),
fileStatus.getLen(),
fileStatus.isDirectory(),
fileStatus.getReplication(),
fileStatus.getBlockSize(),
fileStatus.getModificationTime());
}

/**
* @param pathInfo {@link StoragePathInfo} instance.
* @return the {@link FileStatus} instance after conversion.
*/
public static FileStatus convertToHadoopFileStatus(StoragePathInfo pathInfo) {
return new FileStatus(
pathInfo.getLength(),
pathInfo.isDirectory(),
pathInfo.getBlockReplication(),
pathInfo.getBlockSize(),
pathInfo.getModificationTime(),
convertToHadoopPath(pathInfo.getPath()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package org.apache.hudi.storage.hadoop;

import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand All @@ -39,6 +39,10 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopPath;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePathInfo;

/**
* Implementation of {@link HoodieStorage} using Hadoop's {@link FileSystem}
*/
Expand Down Expand Up @@ -92,7 +96,7 @@ public boolean createDirectory(StoragePath path) throws IOException {
@Override
public List<StoragePathInfo> listDirectEntries(StoragePath path) throws IOException {
return Arrays.stream(fs.listStatus(convertToHadoopPath(path)))
.map(this::convertToStoragePathInfo)
.map(HadoopFSUtils::convertToStoragePathInfo)
.collect(Collectors.toList());
}

Expand All @@ -109,9 +113,9 @@ public List<StoragePathInfo> listFiles(StoragePath path) throws IOException {
@Override
public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList) throws IOException {
return Arrays.stream(fs.listStatus(pathList.stream()
.map(this::convertToHadoopPath)
.map(HadoopFSUtils::convertToHadoopPath)
.toArray(Path[]::new)))
.map(this::convertToStoragePathInfo)
.map(HadoopFSUtils::convertToStoragePathInfo)
.collect(Collectors.toList());
}

Expand All @@ -122,15 +126,15 @@ public List<StoragePathInfo> listDirectEntries(StoragePath path,
return Arrays.stream(fs.listStatus(
convertToHadoopPath(path), e ->
filter.accept(convertToStoragePath(e))))
.map(this::convertToStoragePathInfo)
.map(HadoopFSUtils::convertToStoragePathInfo)
.collect(Collectors.toList());
}

@Override
public List<StoragePathInfo> globEntries(StoragePath pathPattern)
throws IOException {
return Arrays.stream(fs.globStatus(convertToHadoopPath(pathPattern)))
.map(this::convertToStoragePathInfo)
.map(HadoopFSUtils::convertToStoragePathInfo)
.collect(Collectors.toList());
}

Expand All @@ -139,7 +143,7 @@ public List<StoragePathInfo> globEntries(StoragePath pathPattern, StoragePathFil
throws IOException {
return Arrays.stream(fs.globStatus(convertToHadoopPath(pathPattern), path ->
filter.accept(convertToStoragePath(path))))
.map(this::convertToStoragePathInfo)
.map(HadoopFSUtils::convertToStoragePathInfo)
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -184,22 +188,6 @@ public boolean createNewFile(StoragePath path) throws IOException {
return fs.createNewFile(convertToHadoopPath(path));
}

private Path convertToHadoopPath(StoragePath loc) {
return new Path(loc.toUri());
}

private StoragePath convertToStoragePath(Path path) {
return new StoragePath(path.toUri());
}

private StoragePathInfo convertToStoragePathInfo(FileStatus fileStatus) {
return new StoragePathInfo(
convertToStoragePath(fileStatus.getPath()),
fileStatus.getLen(),
fileStatus.isDirectory(),
fileStatus.getModificationTime());
}

@Override
public void close() throws IOException {
fs.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.hadoop.fs;

import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopFileStatus;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopPath;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePathInfo;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Tests {@link HadoopFSUtils}
*/
public class TestHadoopFSUtils {
@ParameterizedTest
@ValueSource(strings = {
"/a/b/c",
"s3://bucket/partition=1%2F2%2F3",
"hdfs://x/y/z.file#bar"
})
public void testPathConversion(String pathString) {
// Hadoop Path -> StoragePath -> Hadoop Path
Path path = new Path(pathString);
StoragePath storagePath = convertToStoragePath(path);
Path convertedPath = convertToHadoopPath(storagePath);
assertEquals(path.toUri(), storagePath.toUri());
assertEquals(path, convertedPath);

// StoragePath -> Hadoop Path -> StoragePath
storagePath = new StoragePath(pathString);
path = convertToHadoopPath(storagePath);
StoragePath convertedStoragePath = convertToStoragePath(path);
assertEquals(storagePath.toUri(), path.toUri());
assertEquals(storagePath, convertedStoragePath);
}

@ParameterizedTest
@CsvSource({
"/a/b/c,1000,false,1,1000000,1238493920",
"/x/y/z,0,true,2,0,2002403203"
})
public void testFileStatusConversion(String path,
long length,
boolean isDirectory,
short blockReplication,
long blockSize,
long modificationTime) {
// FileStatus -> StoragePathInfo -> FileStatus
FileStatus fileStatus = new FileStatus(
length, isDirectory, blockReplication, blockSize, modificationTime, new Path(path));
StoragePathInfo pathInfo = convertToStoragePathInfo(fileStatus);
assertStoragePathInfo(
pathInfo, path, length, isDirectory, blockReplication, blockSize, modificationTime);
FileStatus convertedFileStatus = convertToHadoopFileStatus(pathInfo);
assertFileStatus(
convertedFileStatus, path, length, isDirectory, blockReplication, blockSize, modificationTime);

// StoragePathInfo -> FileStatus -> StoragePathInfo
pathInfo = new StoragePathInfo(
new StoragePath(path), length, isDirectory, blockReplication, blockSize, modificationTime);
fileStatus = convertToHadoopFileStatus(pathInfo);
assertFileStatus(
fileStatus, path, length, isDirectory, blockReplication, blockSize, modificationTime);
StoragePathInfo convertedPathInfo = convertToStoragePathInfo(fileStatus);
assertStoragePathInfo(
convertedPathInfo, path, length, isDirectory, blockReplication, blockSize, modificationTime);
}

private void assertFileStatus(FileStatus fileStatus,
String path,
long length,
boolean isDirectory,
short blockReplication,
long blockSize,
long modificationTime) {
assertEquals(new Path(path), fileStatus.getPath());
assertEquals(length, fileStatus.getLen());
assertEquals(isDirectory, fileStatus.isDirectory());
assertEquals(!isDirectory, fileStatus.isFile());
assertEquals(blockReplication, fileStatus.getReplication());
assertEquals(blockSize, fileStatus.getBlockSize());
assertEquals(modificationTime, fileStatus.getModificationTime());
}

private void assertStoragePathInfo(StoragePathInfo pathInfo,
String path,
long length,
boolean isDirectory,
short blockReplication,
long blockSize,
long modificationTime) {
assertEquals(new StoragePath(path), pathInfo.getPath());
assertEquals(length, pathInfo.getLength());
assertEquals(isDirectory, pathInfo.isDirectory());
assertEquals(!isDirectory, pathInfo.isFile());
assertEquals(blockReplication, pathInfo.getBlockReplication());
assertEquals(blockSize, pathInfo.getBlockSize());
assertEquals(modificationTime, pathInfo.getModificationTime());
}
}
24 changes: 24 additions & 0 deletions hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,21 @@ public class StoragePathInfo implements Serializable {
private final StoragePath path;
private final long length;
private final boolean isDirectory;
private final short blockReplication;
private final long blockSize;
private final long modificationTime;

public StoragePathInfo(StoragePath path,
long length,
boolean isDirectory,
short blockReplication,
long blockSize,
long modificationTime) {
this.path = path;
this.length = length;
this.isDirectory = isDirectory;
this.blockReplication = blockReplication;
this.blockSize = blockSize;
this.modificationTime = modificationTime;
}

Expand Down Expand Up @@ -79,6 +85,22 @@ public boolean isDirectory() {
return isDirectory;
}

/**
* @return the block replication if applied.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public short getBlockReplication() {
return blockReplication;
}

/**
* @return the block size in bytes if applied.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public long getBlockSize() {
return blockSize;
}

/**
* @return the modification of a file.
*/
Expand Down Expand Up @@ -114,6 +136,8 @@ public String toString() {
+ "path=" + path
+ ", length=" + length
+ ", isDirectory=" + isDirectory
+ ", blockReplication=" + blockReplication
+ ", blockSize=" + blockSize
+ ", modificationTime=" + modificationTime
+ '}';
}
Expand Down
Loading

0 comments on commit 91e176c

Please sign in to comment.