Skip to content

Commit

Permalink
Support STS for OSS ufs through RAMRole
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Support STS for OSS ufs

### Why are the changes needed?

1. Plaintext AccessKey/AccessSecret is not safe and not Recommended for
Aliyun
2. STS(Security Token Service) for OSS is more safe. For details, see
https://help.aliyun.com/document_detail/32016.html

### Does this PR introduce any user facing changes?

addition property keys
1. UNDERFS_OSS_STS_ENABLED
2. UNDERFS_OSS_RETRY_MAX
3. UNDERFS_OSS_ECS_RAM_ROLE

Alluxio#16510

pr-link: Alluxio#16481
change-id: cid-7d486e84f82e5ab5211238b1f180b4a8fd8e5742
  • Loading branch information
StephenRi authored and jja725 committed Jan 27, 2023
1 parent 9bd6bc9 commit 84823ce
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 19 deletions.
47 changes: 47 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,46 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_ECS_RAM_ROLE =
stringBuilder(Name.UNDERFS_OSS_ECS_RAM_ROLE)
.setAlias("alluxio.underfs.oss.ecs.ram.role")
.setDescription("The RAM role of current owner of ECS.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_RETRY_MAX =
intBuilder(Name.UNDERFS_OSS_RETRY_MAX)
.setAlias("alluxio.underfs.oss.retry.max")
.setDefaultValue(3)
.setDescription("The maximum number of OSS error retry.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_STS_ECS_METADATA_SERVICE_ENDPOINT =
stringBuilder(Name.UNDERFS_OSS_STS_ECS_METADATA_SERVICE_ENDPOINT)
.setAlias("alluxio.underfs.oss.sts.ecs.metadata.service.endpoint")
.setDefaultValue("http://100.100.100.200/latest/meta-data/ram/security-credentials/")
.setDescription("The ECS metadata service endpoint for Aliyun STS")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_STS_ENABLED =
booleanBuilder(Name.UNDERFS_OSS_STS_ENABLED)
.setAlias("alluxio.underfs.oss.sts.enabled")
.setDefaultValue(false)
.setDescription("Whether to enable oss STS(Security Token Service).")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OSS_STS_TOKEN_REFRESH_INTERVAL_MS =
durationBuilder(Name.UNDERFS_OSS_STS_TOKEN_REFRESH_INTERVAL_MS)
.setAlias("alluxio.underfs.oss.sts.token.refresh.interval.ms")
.setDefaultValue("30m")
.setDescription("Time before an OSS Security Token is considered expired "
+ "and will be automatically renewed")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_S3_ADMIN_THREADS_MAX =
intBuilder(Name.UNDERFS_S3_ADMIN_THREADS_MAX)
.setDefaultValue(20)
Expand Down Expand Up @@ -7387,6 +7427,13 @@ public static final class Name {
"alluxio.underfs.oss.connection.timeout";
public static final String UNDERFS_OSS_CONNECT_TTL = "alluxio.underfs.oss.connection.ttl";
public static final String UNDERFS_OSS_SOCKET_TIMEOUT = "alluxio.underfs.oss.socket.timeout";
public static final String UNDERFS_OSS_ECS_RAM_ROLE = "alluxio.underfs.oss.ecs.ram.role";
public static final String UNDERFS_OSS_RETRY_MAX = "alluxio.underfs.oss.retry.max";
public static final String UNDERFS_OSS_STS_ECS_METADATA_SERVICE_ENDPOINT =
"alluxio.underfs.oss.sts.ecs.metadata.service.endpoint";
public static final String UNDERFS_OSS_STS_ENABLED = "alluxio.underfs.oss.sts.enabled";
public static final String UNDERFS_OSS_STS_TOKEN_REFRESH_INTERVAL_MS =
"alluxio.underfs.oss.sts.token.refresh.interval.ms";
public static final String UNDERFS_S3_BULK_DELETE_ENABLED =
"alluxio.underfs.s3.bulk.delete.enabled";
public static final String UNDERFS_S3_DEFAULT_MODE = "alluxio.underfs.s3.default.mode";
Expand Down
5 changes: 5 additions & 0 deletions underfs/oss/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.OutputStream;
import java.util.Date;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -59,6 +60,8 @@ public class OSSUnderFileSystem extends ObjectUnderFileSystem {
/** Bucket name of user's configured Alluxio bucket. */
private final String mBucketName;

private StsOssClientProvider mClientProvider;

/**
* Constructs a new instance of {@link OSSUnderFileSystem}.
*
Expand All @@ -69,20 +72,7 @@ public class OSSUnderFileSystem extends ObjectUnderFileSystem {
public static OSSUnderFileSystem createInstance(AlluxioURI uri, UnderFileSystemConfiguration conf)
throws Exception {
String bucketName = UnderFileSystemUtils.getBucketName(uri);
Preconditions.checkArgument(conf.isSet(PropertyKey.OSS_ACCESS_KEY),
"Property %s is required to connect to OSS", PropertyKey.OSS_ACCESS_KEY);
Preconditions.checkArgument(conf.isSet(PropertyKey.OSS_SECRET_KEY),
"Property %s is required to connect to OSS", PropertyKey.OSS_SECRET_KEY);
Preconditions.checkArgument(conf.isSet(PropertyKey.OSS_ENDPOINT_KEY),
"Property %s is required to connect to OSS", PropertyKey.OSS_ENDPOINT_KEY);
String accessId = conf.getString(PropertyKey.OSS_ACCESS_KEY);
String accessKey = conf.getString(PropertyKey.OSS_SECRET_KEY);
String endPoint = conf.getString(PropertyKey.OSS_ENDPOINT_KEY);

ClientBuilderConfiguration ossClientConf = initializeOSSClientConfig(conf);
OSS ossClient = new OSSClientBuilder().build(endPoint, accessId, accessKey, ossClientConf);

return new OSSUnderFileSystem(uri, ossClient, bucketName, conf);
return new OSSUnderFileSystem(uri, null, bucketName, conf);
}

/**
Expand All @@ -93,10 +83,36 @@ public static OSSUnderFileSystem createInstance(AlluxioURI uri, UnderFileSystemC
* @param bucketName bucket name of user's configured Alluxio bucket
* @param conf configuration for this UFS
*/
protected OSSUnderFileSystem(AlluxioURI uri, OSS ossClient, String bucketName,
UnderFileSystemConfiguration conf) {
protected OSSUnderFileSystem(AlluxioURI uri, @Nullable OSS ossClient, String bucketName,
UnderFileSystemConfiguration conf) {
super(uri, conf);
mClient = ossClient;

if (conf.getBoolean(PropertyKey.UNDERFS_OSS_STS_ENABLED)) {
try {
mClientProvider = new StsOssClientProvider(conf);
mClientProvider.init();
mClient = mClientProvider.getOSSClient();
} catch (IOException e) {
LOG.error("init sts client provider failed!", e);
throw new ServiceException(e);
}
} else if (null != ossClient) {
mClient = ossClient;
} else {
Preconditions.checkArgument(conf.isSet(PropertyKey.OSS_ACCESS_KEY),
"Property %s is required to connect to OSS", PropertyKey.OSS_ACCESS_KEY);
Preconditions.checkArgument(conf.isSet(PropertyKey.OSS_SECRET_KEY),
"Property %s is required to connect to OSS", PropertyKey.OSS_SECRET_KEY);
Preconditions.checkArgument(conf.isSet(PropertyKey.OSS_ENDPOINT_KEY),
"Property %s is required to connect to OSS", PropertyKey.OSS_ENDPOINT_KEY);
String accessId = conf.getString(PropertyKey.OSS_ACCESS_KEY);
String accessKey = conf.getString(PropertyKey.OSS_SECRET_KEY);
String endPoint = conf.getString(PropertyKey.OSS_ENDPOINT_KEY);

ClientBuilderConfiguration ossClientConf = initializeOSSClientConfig(conf);
mClient = new OSSClientBuilder().build(endPoint, accessId, accessKey, ossClientConf);
}

mBucketName = bucketName;
}

Expand Down Expand Up @@ -268,17 +284,18 @@ protected String getRootKey() {

/**
* Creates an OSS {@code ClientConfiguration} using an Alluxio Configuration.
*
* @param alluxioConf the OSS Configuration
* @return the OSS {@link ClientBuilderConfiguration}
*/
private static ClientBuilderConfiguration initializeOSSClientConfig(
public static ClientBuilderConfiguration initializeOSSClientConfig(
AlluxioConfiguration alluxioConf) {
ClientBuilderConfiguration ossClientConf = new ClientBuilderConfiguration();
ossClientConf
.setConnectionTimeout((int) alluxioConf.getMs(PropertyKey.UNDERFS_OSS_CONNECT_TIMEOUT));
ossClientConf.setSocketTimeout((int) alluxioConf.getMs(PropertyKey.UNDERFS_OSS_SOCKET_TIMEOUT));
ossClientConf.setConnectionTTL(alluxioConf.getMs(PropertyKey.UNDERFS_OSS_CONNECT_TTL));
ossClientConf.setMaxConnections(alluxioConf.getInt(PropertyKey.UNDERFS_OSS_CONNECT_MAX));
ossClientConf.setMaxErrorRetry(alluxioConf.getInt(PropertyKey.UNDERFS_OSS_RETRY_MAX));
return ossClientConf;
}

Expand All @@ -292,4 +309,10 @@ protected InputStream openObject(String key, OpenOptions options, RetryPolicy re
throw new IOException(e.getMessage());
}
}

@Override
public void close() throws IOException {
super.close();
mClientProvider.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public boolean supportsPath(String path) {
* @return true if both access, secret and endpoint keys are present, false otherwise
*/
private boolean checkOSSCredentials(UnderFileSystemConfiguration conf) {
if (conf.getBoolean(PropertyKey.UNDERFS_OSS_STS_ENABLED)) {
return conf.isSet(PropertyKey.UNDERFS_OSS_ECS_RAM_ROLE);
}

return conf.isSet(PropertyKey.OSS_ACCESS_KEY)
&& conf.isSet(PropertyKey.OSS_SECRET_KEY)
&& conf.isSet(PropertyKey.OSS_ENDPOINT_KEY);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.underfs.oss;

import alluxio.conf.PropertyKey;
import alluxio.retry.ExponentialBackoffRetry;
import alluxio.retry.RetryPolicy;
import alluxio.underfs.UnderFileSystemConfiguration;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.network.HttpUtils;

import com.aliyun.oss.ClientBuilderConfiguration;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.DefaultCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* STS client provider for Aliyun OSS.
*/
public class StsOssClientProvider implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(StsOssClientProvider.class);

private static final int ECS_META_GET_TIMEOUT = 10000;
private static final int BASE_SLEEP_TIME_MS = 1000;
private static final int MAX_SLEEP_MS = 3000;
private static final int MAX_RETRIES = 5;
private static final String ACCESS_KEY_ID = "AccessKeyId";
private static final String ACCESS_KEY_SECRET = "AccessKeySecret";
private static final String SECURITY_TOKEN = "SecurityToken";
private static final String EXPIRATION = "Expiration";

private volatile OSS mOssClient = null;
private long mStsTokenExpiration = 0;
private final String mEcsMetadataServiceUrl;
private final long mTokenTimeoutMs;
private final UnderFileSystemConfiguration mOssConf;
private final ScheduledExecutorService mRefreshOssClientScheduledThread;
private OSSClientBuilder mOssClientBuilder = new OSSClientBuilder();

/**
* Constructs a new instance of {@link StsOssClientProvider}.
* @param ossConfiguration {@link UnderFileSystemConfiguration} for OSS
*/
public StsOssClientProvider(UnderFileSystemConfiguration ossConfiguration) {
mOssConf = ossConfiguration;
mEcsMetadataServiceUrl = ossConfiguration.getString(
PropertyKey.UNDERFS_OSS_STS_ECS_METADATA_SERVICE_ENDPOINT);
mTokenTimeoutMs = ossConfiguration.getMs(PropertyKey.UNDERFS_OSS_STS_TOKEN_REFRESH_INTERVAL_MS);

mRefreshOssClientScheduledThread = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryUtils.build("refresh_oss_client-%d", false));
mRefreshOssClientScheduledThread.scheduleAtFixedRate(() -> {
try {
createOrRefreshOssStsClient(mOssConf);
} catch (Exception e) {
//retry it
LOG.warn("exception when refreshing OSS client access token", e);
}
}, 0, 60000, TimeUnit.MILLISECONDS);
}

/**
* Init {@link StsOssClientProvider}.
* @throws IOException if failed to init OSS Client
*/
public void init() throws IOException {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(
BASE_SLEEP_TIME_MS, MAX_SLEEP_MS, MAX_RETRIES);
IOException lastException = null;
while (retryPolicy.attempt()) {
try {
createOrRefreshOssStsClient(mOssConf);
lastException = null;
break;
} catch (IOException e) {
LOG.warn("init oss client failed! has retried {} times", retryPolicy.getAttemptCount(), e);
lastException = e;
}
}
if (lastException != null) {
LOG.error("init oss client failed.", lastException);
throw lastException;
}
}

/**
* Create Or Refresh the STS OSS client.
* @param ossConfiguration OSS {@link UnderFileSystemConfiguration}
* @throws IOException if failed to create or refresh OSS client
*/
protected void createOrRefreshOssStsClient(UnderFileSystemConfiguration ossConfiguration)
throws IOException {
ClientBuilderConfiguration ossClientConf =
OSSUnderFileSystem.initializeOSSClientConfig(ossConfiguration);
doCreateOrRefreshStsOssClient(ossConfiguration, ossClientConf);
}

boolean tokenWillExpiredAfter(long after) {
return mStsTokenExpiration - System.currentTimeMillis() <= after;
}

private void doCreateOrRefreshStsOssClient(
UnderFileSystemConfiguration ossConfiguration,
ClientBuilderConfiguration clientConfiguration) throws IOException {
if (tokenWillExpiredAfter(mTokenTimeoutMs)) {
String ecsRamRole = ossConfiguration.getString(PropertyKey.UNDERFS_OSS_ECS_RAM_ROLE);
String fullECSMetaDataServiceUrl = mEcsMetadataServiceUrl + ecsRamRole;
String jsonStringResponse = HttpUtils.get(fullECSMetaDataServiceUrl, ECS_META_GET_TIMEOUT);

JsonObject jsonObject = new Gson().fromJson(jsonStringResponse, JsonObject.class);
String accessKeyId = jsonObject.get(ACCESS_KEY_ID).getAsString();
String accessKeySecret = jsonObject.get(ACCESS_KEY_SECRET).getAsString();
String securityToken = jsonObject.get(SECURITY_TOKEN).getAsString();
mStsTokenExpiration =
convertStringToDate(jsonObject.get(EXPIRATION).getAsString()).getTime();

if (null == mOssClient) {
mOssClient = mOssClientBuilder.build(
ossConfiguration.getString(PropertyKey.OSS_ENDPOINT_KEY),
accessKeyId, accessKeySecret, securityToken,
clientConfiguration);
} else {
mOssClient.switchCredentials((new DefaultCredentials(
accessKeyId, accessKeySecret, securityToken)));
}
LOG.debug("oss sts client create success, expiration = {}", mStsTokenExpiration);
}
}

/**
* Returns the STS OSS client.
* @return oss client
*/
public OSS getOSSClient() {
return mOssClient;
}

private Date convertStringToDate(String dateString) throws IOException {
TimeZone zeroTimeZone = TimeZone.getTimeZone("ETC/GMT-0");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
sdf.setTimeZone(zeroTimeZone);
Date date = null;
try {
date = sdf.parse(dateString);
} catch (ParseException e) {
throw new IOException(String.format("failed to parse date: %s", dateString), e);
}
return date;
}

@Override
public void close() throws IOException {
if (null != mRefreshOssClientScheduledThread) {
mRefreshOssClientScheduledThread.shutdown();
}
if (null != mOssClient) {
mOssClient.shutdown();
mOssClient = null;
}
}

@VisibleForTesting
protected void setOssClientBuilder(OSSClientBuilder ossClientBuilder) {
mOssClientBuilder = ossClientBuilder;
}
}
Loading

0 comments on commit 84823ce

Please sign in to comment.