Skip to content

Commit

Permalink
Support STS for OSS UFS and refactor Low Level Output Stream
Browse files Browse the repository at this point in the history
This is a PR to cherry-pick committed PR #16481 and #16122 into target branch dora.
			pr-link: #17320
			change-id: cid-2056ac34c59ae3eb9678be69c8c24d55c72603e3
  • Loading branch information
JiamingMai authored Apr 25, 2023
1 parent c73c423 commit 23a6fd6
Show file tree
Hide file tree
Showing 17 changed files with 1,892 additions and 440 deletions.
141 changes: 141 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,12 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OBJECT_STORE_STREAMING_UPLOAD_PART_TIMEOUT =
durationBuilder(Name.UNDERFS_OBJECT_STORE_STREAMING_UPLOAD_PART_TIMEOUT)
.setDescription("Timeout for uploading part when using streaming uploads.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OBJECT_STORE_BREADCRUMBS_ENABLED =
booleanBuilder(Name.UNDERFS_OBJECT_STORE_BREADCRUMBS_ENABLED)
.setDefaultValue(true)
Expand Down Expand Up @@ -1314,6 +1320,46 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_ECS_RAM_ROLE =
stringBuilder(Name.UNDERFS_OSS_ECS_RAM_ROLE)
.setAlias("alluxio.underfs.oss.ecs.ram.role")
.setDescription("The RAM role of current owner of ECS.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_RETRY_MAX =
intBuilder(Name.UNDERFS_OSS_RETRY_MAX)
.setAlias("alluxio.underfs.oss.retry.max")
.setDefaultValue(3)
.setDescription("The maximum number of OSS error retry.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_STS_ECS_METADATA_SERVICE_ENDPOINT =
stringBuilder(Name.UNDERFS_OSS_STS_ECS_METADATA_SERVICE_ENDPOINT)
.setAlias("alluxio.underfs.oss.sts.ecs.metadata.service.endpoint")
.setDefaultValue("http://100.100.100.200/latest/meta-data/ram/security-credentials/")
.setDescription("The ECS metadata service endpoint for Aliyun STS")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_STS_ENABLED =
booleanBuilder(Name.UNDERFS_OSS_STS_ENABLED)
.setAlias("alluxio.underfs.oss.sts.enabled")
.setDefaultValue(false)
.setDescription("Whether to enable oss STS(Security Token Service).")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_STS_TOKEN_REFRESH_INTERVAL_MS =
durationBuilder(Name.UNDERFS_OSS_STS_TOKEN_REFRESH_INTERVAL_MS)
.setAlias("alluxio.underfs.oss.sts.token.refresh.interval.ms")
.setDefaultValue("30m")
.setDescription("Time before an OSS Security Token is considered expired "
+ "and will be automatically renewed")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_S3_ADMIN_THREADS_MAX =
intBuilder(Name.UNDERFS_S3_ADMIN_THREADS_MAX)
.setDefaultValue(20)
Expand Down Expand Up @@ -1699,6 +1745,41 @@ public String toString() {
.setScope(Scope.SERVER)
.setDisplayType(DisplayType.CREDENTIALS)
.build();
public static final PropertyKey UNDERFS_OSS_INTERMEDIATE_UPLOAD_CLEAN_AGE =
durationBuilder(Name.UNDERFS_OSS_INTERMEDIATE_UPLOAD_CLEAN_AGE)
.setDefaultValue("3day")
.setDescription("Streaming uploads may not have been completed/aborted correctly "
+ "and need periodical ufs cleanup. If ufs cleanup is enabled, "
+ "intermediate multipart uploads in all non-readonly OSS mount points "
+ "older than this age will be cleaned. This may impact other "
+ "ongoing upload operations, so a large clean age is encouraged.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_STREAMING_UPLOAD_ENABLED =
booleanBuilder(Name.UNDERFS_OSS_STREAMING_UPLOAD_ENABLED)
.setDefaultValue(false)
.setDescription("(Experimental) If true, using streaming upload to write to OSS.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_STREAMING_UPLOAD_PARTITION_SIZE =
dataSizeBuilder(Name.UNDERFS_OSS_STREAMING_UPLOAD_PARTITION_SIZE)
.setDefaultValue("64MB")
.setDescription("Maximum allowable size of a single buffer file when using "
+ "OSS streaming upload. When the buffer file reaches the partition size, "
+ "it will be uploaded and the upcoming data will write to other buffer files."
+ "If the partition size is too small, OSS upload speed might be affected. ")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_STREAMING_UPLOAD_THREADS =
intBuilder(Name.UNDERFS_OSS_STREAMING_UPLOAD_THREADS)
.setDefaultValue(20)
.setDescription("the number of threads to use for streaming upload data to OSS.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey S3A_ACCESS_KEY = stringBuilder(Name.S3A_ACCESS_KEY)
.setAlias(Name.AWS_ACCESS_KEY)
.setDescription("The access key of S3 bucket.")
Expand Down Expand Up @@ -1850,6 +1931,41 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OBS_INTERMEDIATE_UPLOAD_CLEAN_AGE =
durationBuilder(Name.UNDERFS_OBS_INTERMEDIATE_UPLOAD_CLEAN_AGE)
.setDefaultValue("3day")
.setDescription("Streaming uploads may not have been completed/aborted correctly "
+ "and need periodical ufs cleanup. If ufs cleanup is enabled, "
+ "intermediate multipart uploads in all non-readonly OBS mount points "
+ "older than this age will be cleaned. This may impact other "
+ "ongoing upload operations, so a large clean age is encouraged.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OBS_STREAMING_UPLOAD_ENABLED =
booleanBuilder(Name.UNDERFS_OBS_STREAMING_UPLOAD_ENABLED)
.setDefaultValue(false)
.setDescription("(Experimental) If true, using streaming upload to write to OBS.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OBS_STREAMING_UPLOAD_PARTITION_SIZE =
dataSizeBuilder(Name.UNDERFS_OBS_STREAMING_UPLOAD_PARTITION_SIZE)
.setDefaultValue("64MB")
.setDescription("Maximum allowable size of a single buffer file when using "
+ "S3A streaming upload. When the buffer file reaches the partition size, "
+ "it will be uploaded and the upcoming data will write to other buffer files."
+ "If the partition size is too small, OBS upload speed might be affected. ")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OBS_STREAMING_UPLOAD_THREADS =
intBuilder(Name.UNDERFS_OBS_STREAMING_UPLOAD_THREADS)
.setDefaultValue(20)
.setDescription("the number of threads to use for streaming upload data to OBS.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
//
// Mount table related properties
//
Expand Down Expand Up @@ -7526,6 +7642,8 @@ public static final class Name {
public static final String UNDERFS_WEB_PARENT_NAMES = "alluxio.underfs.web.parent.names";
public static final String UNDERFS_WEB_TITLES = "alluxio.underfs.web.titles";
public static final String UNDERFS_VERSION = "alluxio.underfs.version";
public static final String UNDERFS_OBJECT_STORE_STREAMING_UPLOAD_PART_TIMEOUT =
"alluxio.underfs.object.store.streaming.upload.part.timeout";
public static final String UNDERFS_OBJECT_STORE_BREADCRUMBS_ENABLED =
"alluxio.underfs.object.store.breadcrumbs.enabled";
public static final String UNDERFS_OBJECT_STORE_SERVICE_THREADS =
Expand All @@ -7541,6 +7659,21 @@ public static final class Name {
"alluxio.underfs.oss.connection.timeout";
public static final String UNDERFS_OSS_CONNECT_TTL = "alluxio.underfs.oss.connection.ttl";
public static final String UNDERFS_OSS_SOCKET_TIMEOUT = "alluxio.underfs.oss.socket.timeout";
public static final String UNDERFS_OSS_ECS_RAM_ROLE = "alluxio.underfs.oss.ecs.ram.role";
public static final String UNDERFS_OSS_RETRY_MAX = "alluxio.underfs.oss.retry.max";
public static final String UNDERFS_OSS_STS_ECS_METADATA_SERVICE_ENDPOINT =
"alluxio.underfs.oss.sts.ecs.metadata.service.endpoint";
public static final String UNDERFS_OSS_STS_ENABLED = "alluxio.underfs.oss.sts.enabled";
public static final String UNDERFS_OSS_STS_TOKEN_REFRESH_INTERVAL_MS =
"alluxio.underfs.oss.sts.token.refresh.interval.ms";
public static final String UNDERFS_OSS_INTERMEDIATE_UPLOAD_CLEAN_AGE =
"alluxio.underfs.oss.intermediate.upload.clean.age";
public static final String UNDERFS_OSS_STREAMING_UPLOAD_ENABLED =
"alluxio.underfs.oss.streaming.upload.enabled";
public static final String UNDERFS_OSS_STREAMING_UPLOAD_PARTITION_SIZE =
"alluxio.underfs.oss.streaming.upload.partition.size";
public static final String UNDERFS_OSS_STREAMING_UPLOAD_THREADS =
"alluxio.underfs.oss.streaming.upload.threads";
public static final String UNDERFS_S3_BULK_DELETE_ENABLED =
"alluxio.underfs.s3.bulk.delete.enabled";
public static final String UNDERFS_S3_DEFAULT_MODE = "alluxio.underfs.s3.default.mode";
Expand Down Expand Up @@ -7612,6 +7745,14 @@ public static final class Name {
"alluxio.underfs.cephfs.mount.point";
public static final String UNDERFS_CEPHFS_LOCALIZE_READS =
"alluxio.underfs.cephfs.localize.reads";
public static final String UNDERFS_OBS_INTERMEDIATE_UPLOAD_CLEAN_AGE =
"alluxio.underfs.obs.intermediate.upload.clean.age";
public static final String UNDERFS_OBS_STREAMING_UPLOAD_ENABLED =
"alluxio.underfs.obs.streaming.upload.enabled";
public static final String UNDERFS_OBS_STREAMING_UPLOAD_PARTITION_SIZE =
"alluxio.underfs.obs.streaming.upload.partition.size";
public static final String UNDERFS_OBS_STREAMING_UPLOAD_THREADS =
"alluxio.underfs.obs.streaming.upload.threads";

//
// UFS access control related properties
Expand Down
Loading

0 comments on commit 23a6fd6

Please sign in to comment.