Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a series of problems in the s3 file system #5072

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,7 +111,7 @@ public FsPath get(String dest) throws IOException {
@Override
public InputStream read(FsPath dest) throws IOException {
try {
return s3Client.getObject(bucket, dest.getPath()).getObjectContent();
return s3Client.getObject(bucket, buildPrefix(dest.getPath(), false)).getObjectContent();
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + dest.getPath());
}
Expand All @@ -123,8 +120,9 @@ public InputStream read(FsPath dest) throws IOException {
@Override
public OutputStream write(FsPath dest, boolean overwrite) throws IOException {
try (InputStream inputStream = read(dest);
OutputStream outputStream = new S3OutputStream(s3Client, bucket, dest.getPath())) {
if (overwrite) {
OutputStream outputStream =
new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), false))) {
if (!overwrite) {
IOUtils.copy(inputStream, outputStream);
}
return outputStream;
Expand Down Expand Up @@ -164,20 +162,37 @@ public List<FsPath> list(FsPath path) throws IOException {

@Override
public FsPathListWithError listPathWithError(FsPath path) throws IOException {
return listPathWithError(path, true);
}

public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile)
throws IOException {
List<FsPath> rtn = new ArrayList<>();
try {
if (!StringUtils.isEmpty(path.getPath())) {
ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(bucket, path.getPath());
List<S3ObjectSummary> s3ObjectSummaries = listObjectsV2Result.getObjectSummaries();
ListObjectsV2Request listObjectsV2Request =
new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(buildPrefix(path.getPath()))
.withDelimiter("/");
ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request);
List<S3ObjectSummary> s3ObjectSummaries = dirResult.getObjectSummaries();
List<String> commonPrefixes = dirResult.getCommonPrefixes();
if (s3ObjectSummaries != null) {
List<FsPath> rtn = new ArrayList();
String message = "";
for (S3ObjectSummary summary : s3ObjectSummaries) {
if (isDir(summary, path.getPath()) || isInitFile(summary)) continue;
if (isInitFile(summary) && ignoreInitFile) continue;
FsPath newPath = new FsPath(buildPath(summary.getKey()));
rtn.add(fillStorageFile(newPath, summary));
}
return new FsPathListWithError(rtn, message);
}
if (commonPrefixes != null) {
for (String dir : commonPrefixes) {
FsPath newPath = new FsPath(buildPath(dir));
newPath.setIsdir(true);
rtn.add(newPath);
}
}
return new FsPathListWithError(rtn, "");
}
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + path.getPath());
Expand All @@ -189,8 +204,25 @@ public FsPathListWithError listPathWithError(FsPath path) throws IOException {
@Override
public boolean exists(FsPath dest) throws IOException {
try {
int size = s3Client.listObjectsV2(bucket, dest.getPath()).getObjectSummaries().size();
return size > 0;
if (new File(dest.getPath()).getName().contains(".")) {
return existsFile(dest);
}
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request
.withBucketName(bucket)
.withPrefix(buildPrefix(dest.getPath()))
.withDelimiter("/");
return s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size()
+ s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size()
> 0;
} catch (AmazonS3Exception e) {
return false;
}
}

public boolean existsFile(FsPath dest) {
try {
return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), false));
} catch (AmazonS3Exception e) {
return false;
}
Expand All @@ -199,7 +231,14 @@ public boolean exists(FsPath dest) throws IOException {
@Override
public boolean delete(FsPath dest) throws IOException {
try {
s3Client.deleteObject(bucket, dest.getPath());
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(), false));
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
String[] keyList =
result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
DeleteObjectsRequest deleteObjectsRequest =
new DeleteObjectsRequest("test").withKeys(keyList);
s3Client.deleteObjects(deleteObjectsRequest);
return true;
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + dest.getPath());
Expand All @@ -209,8 +248,25 @@ public boolean delete(FsPath dest) throws IOException {
@Override
public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
try {
s3Client.copyObject(bucket, oldDest.getPath(), bucket, newDest.getPath());
s3Client.deleteObject(bucket, oldDest.getPath());
String newOriginPath = buildPrefix(oldDest.getPath(), false);
String newDestPath = buildPrefix(newDest.getPath(), false);
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath);
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
List<String> keyList =
result.getObjectSummaries().stream()
.map(S3ObjectSummary::getKey)
.collect(Collectors.toList());
List<String> newKeyList =
keyList.stream()
.map(key -> key.replaceFirst(newOriginPath, newDestPath))
.collect(Collectors.toList());
for (int i = 0; i < keyList.size(); i++) {
String key = keyList.get(i);
String newKey = newKeyList.get(i);
s3Client.copyObject(bucket, key, bucket, newKey);
s3Client.deleteObject(bucket, key);
}
return true;
} catch (AmazonS3Exception e) {
s3Client.deleteObject(bucket, newDest.getPath());
Expand All @@ -225,7 +281,24 @@ public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
@Override
public boolean copy(String origin, String dest) throws IOException {
try {
s3Client.copyObject(bucket, origin, bucket, dest);
String newOrigin = buildPrefix(origin, false);
String newDest = buildPrefix(dest, false);
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin);
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
List<String> keyList =
result.getObjectSummaries().stream()
.map(S3ObjectSummary::getKey)
.collect(Collectors.toList());
List<String> newKeyList =
keyList.stream()
.map(key -> key.replaceFirst(newOrigin, newDest))
.collect(Collectors.toList());
for (int i = 0; i < keyList.size(); i++) {
String key = keyList.get(i);
String newKey = newKeyList.get(i);
s3Client.copyObject(bucket, key, bucket, newKey);
}
return true;
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + origin + " or " + dest);
Expand Down Expand Up @@ -261,7 +334,10 @@ public boolean mkdirs(FsPath dest) throws IOException {

private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary s3ObjectSummary) {
fsPath.setModification_time(s3ObjectSummary.getLastModified().getTime());
fsPath.setOwner(s3ObjectSummary.getOwner().getDisplayName());
Owner owner = s3ObjectSummary.getOwner();
if (owner != null) {
fsPath.setOwner(owner.getDisplayName());
}
try {
fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath()));
} catch (Throwable e) {
Expand Down Expand Up @@ -344,6 +420,22 @@ public String buildPath(String path) {
}
return StorageUtils.S3_SCHEMA + "/" + path;
}

public String buildPrefix(String path, boolean addTail) {
String res = path;
if (path == null || "".equals(path)) return "";
if (path.startsWith("/")) {
res = path.replaceFirst("/", "");
}
if (!path.endsWith("/") && addTail) {
res = res + "/";
}
return res;
}

public String buildPrefix(String path) {
return buildPrefix(path, true);
}
}

class S3OutputStream extends ByteArrayOutputStream {
Expand Down
Loading