Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support more configuration in TOS #18628

Merged
merged 10 commits into from
Jun 19, 2024
66 changes: 66 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -2065,6 +2065,64 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_TOS_RETRY_MAX =
intBuilder(Name.UNDERFS_TOS_RETRY_MAX)
.setAlias("alluxio.underfs.tos.retry.max")
.setDefaultValue(3)
.setDescription("The maximum number of TOS error retry.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_TOS_WRITE_TIMEOUT =
intBuilder(Name.UNDERFS_TOS_WRITE_TIMEOUT)
.setAlias("alluxio.underfs.tos.write.timeout.ms", "alluxio.underfs.tos.write.timeout")
.setDefaultValue(30000)
.setDescription("The timeout for a single write request to TOS.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_TOS_READ_TIMEOUT =
intBuilder(Name.UNDERFS_TOS_READ_TIMEOUT)
.setAlias("alluxio.underfs.tos.read.timeout.ms", "alluxio.underfs.tos.read.timeout")
.setDefaultValue(30000)
.setDescription("The timeout for a single read request to TOS.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_TOS_CONNECT_TIMEOUT =
intBuilder(Name.UNDERFS_TOS_CONNECT_TIMEOUT)
.setAlias("alluxio.underfs.tos.connect.timeout.ms", "alluxio.underfs.tos.connect.timeout")
.setDefaultValue(30000)
.setDescription("The timeout for a connection to TOS.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_TOS_CONNECT_TTL =
intBuilder(Name.UNDERFS_TOS_CONNECT_TTL)
.setDefaultValue(60000)
.setDescription("The expiration time of TOS connections in ms. -1 means the connection "
+ "will never expire.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE =
durationBuilder(Name.UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE)
.setDefaultValue("3day")
.setDescription("Streaming uploads may not have been completed/aborted correctly "
+ "and need periodical ufs cleanup. If ufs cleanup is enabled, "
+ "intermediate multipart uploads in all non-readonly TOS mount points "
+ "older than this age will be cleaned. This may impact other "
+ "ongoing upload operations, so a large clean age is encouraged.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_TOS_CONNECT_MAX =
intBuilder(Name.UNDERFS_TOS_CONNECT_MAX)
.setDefaultValue(1024)
.setDescription("The maximum number of TOS connections.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
//
// Mount table related properties
//
Expand Down Expand Up @@ -8086,6 +8144,14 @@ public static final class Name {
"alluxio.underfs.tos.streaming.upload.partition.size";
public static final String UNDERFS_TOS_STREAMING_UPLOAD_THREADS =
"alluxio.underfs.tos.streaming.upload.threads";
public static final String UNDERFS_TOS_RETRY_MAX = "alluxio.underfs.tos.retry.max";
public static final String UNDERFS_TOS_WRITE_TIMEOUT = "alluxio.underfs.tos.write.timeout";
public static final String UNDERFS_TOS_READ_TIMEOUT = "alluxio.underfs.tos.read.timeout";
public static final String UNDERFS_TOS_CONNECT_TIMEOUT = "alluxio.underfs.tos.connect.timeout";
public static final String UNDERFS_TOS_CONNECT_TTL = "alluxio.underfs.tos.connect.ttl";
public static final String UNDERFS_TOS_CONNECT_MAX = "alluxio.underfs.tos.connect.max";
public static final String UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE =
"alluxio.underfs.tos.intermediate.upload.clean.age";

//
// UFS access control related properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import alluxio.AlluxioURI;
import alluxio.Constants;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.retry.RetryPolicy;
import alluxio.underfs.ObjectUnderFileSystem;
Expand All @@ -27,11 +28,14 @@
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.volcengine.tos.TOSClientConfiguration;
import com.volcengine.tos.TOSV2;
import com.volcengine.tos.TOSV2ClientBuilder;
import com.volcengine.tos.TosClientException;
import com.volcengine.tos.TosException;
import com.volcengine.tos.TosServerException;
import com.volcengine.tos.auth.StaticCredentials;
import com.volcengine.tos.model.object.AbortMultipartUploadInput;
import com.volcengine.tos.model.object.CopyObjectV2Input;
import com.volcengine.tos.model.object.CopyObjectV2Output;
import com.volcengine.tos.model.object.DeleteMultiObjectsV2Input;
Expand All @@ -41,13 +45,17 @@
import com.volcengine.tos.model.object.Deleted;
import com.volcengine.tos.model.object.HeadObjectV2Input;
import com.volcengine.tos.model.object.HeadObjectV2Output;
import com.volcengine.tos.model.object.ListMultipartUploadsV2Input;
import com.volcengine.tos.model.object.ListMultipartUploadsV2Output;
import com.volcengine.tos.model.object.ListObjectsType2Input;
import com.volcengine.tos.model.object.ListObjectsType2Output;
import com.volcengine.tos.model.object.ListedCommonPrefix;
import com.volcengine.tos.model.object.ListedObjectV2;
import com.volcengine.tos.model.object.ListedUpload;
import com.volcengine.tos.model.object.ObjectMetaRequestOptions;
import com.volcengine.tos.model.object.ObjectTobeDeleted;
import com.volcengine.tos.model.object.PutObjectInput;
import com.volcengine.tos.transport.TransportConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -110,7 +118,13 @@ public static TOSUnderFileSystem createInstance(AlluxioURI uri, UnderFileSystemC
String secretKey = conf.getString(PropertyKey.TOS_SECRET_KEY);
String regionName = conf.getString(PropertyKey.TOS_REGION);
String endPoint = conf.getString(PropertyKey.TOS_ENDPOINT_KEY);
TOSV2 tos = new TOSV2ClientBuilder().build(regionName, endPoint, accessKey, secretKey);
TOSClientConfiguration configuration = TOSClientConfiguration.builder()
.transportConfig(initializeTOSClientConfig(conf))
.region(regionName)
.endpoint(endPoint)
.credentials(new StaticCredentials(accessKey, secretKey))
.build();
TOSV2 tos = new TOSV2ClientBuilder().build(configuration);
return new TOSUnderFileSystem(uri, tos, bucketName, conf);
}

Expand Down Expand Up @@ -152,6 +166,33 @@ public void setOwner(String path, String user, String group) {
public void setMode(String path, short mode) throws IOException {
}

@Override
public void cleanup() throws IOException {
long cleanAge = mUfsConf.getMs(PropertyKey.UNDERFS_TOS_INTERMEDIATE_UPLOAD_CLEAN_AGE);
Date cleanBefore = new Date(new Date().getTime() - cleanAge);
boolean isTruncated = true;
String keyMarker = null;
String uploadIdMarker = null;
int maxKeys = 10;
while (isTruncated) {
ListMultipartUploadsV2Input input = new ListMultipartUploadsV2Input().setBucket(mBucketName)
.setMaxUploads(maxKeys).setKeyMarker(keyMarker).setUploadIDMarker(uploadIdMarker);
ListMultipartUploadsV2Output output = mClient.listMultipartUploads(input);
if (output.getUploads() != null) {
for (int i = 0; i < output.getUploads().size(); ++i) {
ListedUpload upload = output.getUploads().get(i);
if (upload.getInitiated().before(cleanBefore)) {
mClient.abortMultipartUpload(new AbortMultipartUploadInput().setBucket(mBucketName)
.setKey(upload.getKey()).setUploadID(upload.getUploadID()));
}
}
}
isTruncated = output.isTruncated();
keyMarker = output.getNextKeyMarker();
uploadIdMarker = output.getNextUploadIdMarker();
}
}

@Override
protected boolean copyObject(String src, String dst) {
LOG.debug("Copying {} to {}", src, dst);
Expand Down Expand Up @@ -348,6 +389,29 @@ protected String getRootKey() {
return Constants.HEADER_TOS + mBucketName;
}

/**
* Creates an TOS {@code ClientConfiguration} using an Alluxio Configuration.
* @param alluxioConf the TOS Configuration
* @return the TOS {@link TransportConfig}
*/
public static TransportConfig initializeTOSClientConfig(AlluxioConfiguration alluxioConf) {
int readTimeoutMills = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_READ_TIMEOUT);
int writeTimeoutMills = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_WRITE_TIMEOUT);
int connectionTimeoutMills = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_CONNECT_TIMEOUT);
int maxConnections = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_CONNECT_MAX);
int idleConnectionTime = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_CONNECT_TTL);
int maxErrorRetry = alluxioConf.getInt(PropertyKey.UNDERFS_TOS_RETRY_MAX);
TransportConfig config = TransportConfig.builder()
.connectTimeoutMills(connectionTimeoutMills)
.maxConnections(maxConnections)
.maxRetryCount(maxErrorRetry)
.readTimeoutMills(readTimeoutMills)
.writeTimeoutMills(writeTimeoutMills)
.idleConnectionTimeMills(idleConnectionTime)
.build();
return config;
}

@Override
protected InputStream openObject(String key, OpenOptions options, RetryPolicy retryPolicy)
throws IOException {
Expand Down
Loading