Skip to content

Commit

Permalink
refine some code for OSS STS
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenRi committed Nov 17, 2022
1 parent 873637b commit 5725a87
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.OutputStream;
import java.util.Date;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -59,7 +60,6 @@ public class OSSUnderFileSystem extends ObjectUnderFileSystem {
/** Bucket name of user's configured Alluxio bucket. */
private final String mBucketName;

private boolean mStsEnabled;
private StsOssClientProvider mClientProvider;

/**
Expand All @@ -83,12 +83,11 @@ public static OSSUnderFileSystem createInstance(AlluxioURI uri, UnderFileSystemC
* @param bucketName bucket name of user's configured Alluxio bucket
* @param conf configuration for this UFS
*/
protected OSSUnderFileSystem(AlluxioURI uri, OSS ossClient, String bucketName,
UnderFileSystemConfiguration conf) {
protected OSSUnderFileSystem(AlluxioURI uri, @Nullable OSS ossClient, String bucketName,
UnderFileSystemConfiguration conf) {
super(uri, conf);

mStsEnabled = conf.getBoolean(PropertyKey.UNDERFS_OSS_STS_ENABLED);
if (mStsEnabled) {
if (conf.getBoolean(PropertyKey.UNDERFS_OSS_STS_ENABLED)) {
try {
mClientProvider = new StsOssClientProvider(conf);
mClientProvider.init();
Expand Down Expand Up @@ -288,14 +287,15 @@ protected String getRootKey() {
*
* @return the OSS {@link ClientBuilderConfiguration}
*/
private static ClientBuilderConfiguration initializeOSSClientConfig(
public static ClientBuilderConfiguration initializeOSSClientConfig(
AlluxioConfiguration alluxioConf) {
ClientBuilderConfiguration ossClientConf = new ClientBuilderConfiguration();
ossClientConf
.setConnectionTimeout((int) alluxioConf.getMs(PropertyKey.UNDERFS_OSS_CONNECT_TIMEOUT));
ossClientConf.setSocketTimeout((int) alluxioConf.getMs(PropertyKey.UNDERFS_OSS_SOCKET_TIMEOUT));
ossClientConf.setConnectionTTL(alluxioConf.getMs(PropertyKey.UNDERFS_OSS_CONNECT_TTL));
ossClientConf.setMaxConnections(alluxioConf.getInt(PropertyKey.UNDERFS_OSS_CONNECT_MAX));
ossClientConf.setMaxErrorRetry(alluxioConf.getInt(PropertyKey.UNDERFS_OSS_RETRY_MAX));
return ossClientConf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,38 @@ public class StsOssClientProvider implements Closeable {
private static final int BASE_SLEEP_TIME_MS = 1000;
private static final int MAX_SLEEP_MS = 3000;
private static final int MAX_RETRIES = 5;

private volatile OSS mOssClient = null;
private Date mStsTokenExpiration = null;
private final String mEcsMetadataServiceUrl;
private final long mTokenTimeoutMs;

private static final String ACCESS_KEY_ID = "AccessKeyId";
private static final String ACCESS_KEY_SECRET = "AccessKeySecret";
private static final String SECURITY_TOKEN = "SecurityToken";
private static final String EXPIRATION = "Expiration";

private volatile OSS mOssClient = null;
private long mStsTokenExpiration = 0;
private final String mEcsMetadataServiceUrl;
private final long mTokenTimeoutMs;
private final UnderFileSystemConfiguration mOssConf;
private ScheduledExecutorService mRefreshOssClientScheduledThread;
private final ScheduledExecutorService mRefreshOssClientScheduledThread;

/**
* Constructs a new instance of {@link StsOssClientProvider}.
* @param ossConfiguration {@link UnderFileSystemConfiguration} for OSS
* @throws IOException if failed to init OSS STS client
*/
public StsOssClientProvider(UnderFileSystemConfiguration ossConfiguration) throws IOException {
public StsOssClientProvider(UnderFileSystemConfiguration ossConfiguration) {
mOssConf = ossConfiguration;
mEcsMetadataServiceUrl = ossConfiguration.getString(
PropertyKey.UNDERFS_OSS_STS_ECS_METADATA_SERVICE_ENDPOINT);
mTokenTimeoutMs = ossConfiguration.getMs(PropertyKey.UNDERFS_OSS_STS_TOKEN_REFRESH_INTERVAL_MS);

mRefreshOssClientScheduledThread = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryUtils.build("refresh_oss_client-%d", false));
mRefreshOssClientScheduledThread.scheduleAtFixedRate(() -> {
try {
createOrRefreshOssStsClient(mOssConf);
} catch (Exception e) {
//retry it
LOG.warn("throw exception when clear meta data cache", e);
}
}, 0, 60000, TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -83,7 +91,7 @@ public void init() throws IOException {
IOException lastException = null;
while (retryPolicy.attempt()) {
try {
initializeOssClient(mOssConf);
createOrRefreshOssStsClient(mOssConf);
lastException = null;
break;
} catch (IOException e) {
Expand All @@ -94,60 +102,22 @@ public void init() throws IOException {
if (lastException != null) {
throw lastException;
}
mRefreshOssClientScheduledThread = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryUtils.build("refresh_oss_client-%d", false));
mRefreshOssClientScheduledThread.scheduleAtFixedRate(() -> {
try {
if (null != mOssConf) {
refreshOssStsClient(mOssConf);
}
} catch (Exception e) {
//retry it
LOG.warn("throw exception when clear meta data cache", e);
}
}, 0, 60000, TimeUnit.MILLISECONDS);
}

protected void refreshOssStsClient(UnderFileSystemConfiguration ossConfiguration)
throws IOException {
ClientBuilderConfiguration ossClientConf = getClientBuilderConfiguration(ossConfiguration);
createOrRefreshStsOssClient(ossConfiguration, ossClientConf);
}

private ClientBuilderConfiguration getClientBuilderConfiguration(
UnderFileSystemConfiguration ossConfiguration) {
ClientBuilderConfiguration ossClientConf = new ClientBuilderConfiguration();
ossClientConf.setMaxConnections(ossConfiguration.getInt(PropertyKey.UNDERFS_OSS_CONNECT_MAX));
ossClientConf.setMaxErrorRetry(ossConfiguration.getInt(PropertyKey.UNDERFS_OSS_RETRY_MAX));
ossClientConf.setConnectionTimeout(
(int) ossConfiguration.getMs(PropertyKey.UNDERFS_OSS_CONNECT_TIMEOUT));
ossClientConf.setSocketTimeout(
(int) ossConfiguration.getMs(PropertyKey.UNDERFS_OSS_SOCKET_TIMEOUT));
ossClientConf.setSupportCname(false);
ossClientConf.setCrcCheckEnabled(true);
return ossClientConf;
}

/**
* Init the STS OSS client.
* Create Or Refresh the STS OSS client.
* @param ossConfiguration OSS {@link UnderFileSystemConfiguration}
* @throws IOException if failed to init OSS client
* @throws IOException if failed to create or refresh OSS client
*/
private void initializeOssClient(UnderFileSystemConfiguration ossConfiguration)
protected void createOrRefreshOssStsClient(UnderFileSystemConfiguration ossConfiguration)
throws IOException {
if (null != mOssClient) {
return;
}

ClientBuilderConfiguration clientConf = getClientBuilderConfiguration(ossConfiguration);
createOrRefreshStsOssClient(ossConfiguration, clientConf);

LOG.info("init ossClient success : {}", mOssClient.toString());
ClientBuilderConfiguration ossClientConf =
OSSUnderFileSystem.initializeOSSClientConfig(ossConfiguration);
createOrRefreshStsOssClient(ossConfiguration, ossClientConf);
}

boolean tokenWillExpiredAfter(long after) {
return null == mStsTokenExpiration
|| mStsTokenExpiration.getTime() - System.currentTimeMillis() <= after;
return mStsTokenExpiration - System.currentTimeMillis() <= after;
}

private void createOrRefreshStsOssClient(
Expand All @@ -163,7 +133,8 @@ private void createOrRefreshStsOssClient(
String accessKeyId = jsonObject.get(ACCESS_KEY_ID).getAsString();
String accessKeySecret = jsonObject.get(ACCESS_KEY_SECRET).getAsString();
String securityToken = jsonObject.get(SECURITY_TOKEN).getAsString();
mStsTokenExpiration = convertStringToDate(jsonObject.get(EXPIRATION).getAsString());
mStsTokenExpiration =
convertStringToDate(jsonObject.get(EXPIRATION).getAsString()).getTime();

if (null == mOssClient) {
mOssClient = new OSSClientBuilder().build(
Expand All @@ -174,8 +145,7 @@ private void createOrRefreshStsOssClient(
mOssClient.switchCredentials((new DefaultCredentials(
accessKeyId, accessKeySecret, securityToken)));
}
LOG.info("oss sts client create success {} {} {}",
mOssClient, securityToken, mStsTokenExpiration);
LOG.info("oss sts client create success, expiration = {}", mStsTokenExpiration);
} catch (IOException e) {
LOG.error("create stsOssClient exception", e);
throw new IOException("create stsOssClient exception", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,23 @@ public void testInitAndRefresh() throws Exception {
PowerMockito.whenNew(OSSClient.class).withAnyArguments().thenReturn(ossClient);
PowerMockito.mockStatic(HttpUtils.class);
when(HttpUtils.get(mEcsMetadataService, 10000)).thenReturn(MOCK_ECS_META_RESPONSE);
StsOssClientProvider clientProvider = new StsOssClientProvider(ossConfiguration);

// refresh
String responseBodyString = "{\n"
+ " 'AccessKeyId' : 'STS.mockAK',\n"
+ " 'AccessKeySecret' : 'mockSK',\n"
+ " 'Expiration' : '" + expiration + "',\n"
+ " 'SecurityToken' : 'mockSecurityToken',\n"
+ " 'LastUpdated' : '" + lastUpdated + "',\n"
+ " 'Code' : 'Success'\n"
+ "}";
PowerMockito.mockStatic(HttpUtils.class);
when(HttpUtils.get(mEcsMetadataService, 10000)).thenReturn(responseBodyString);
assertTrue(clientProvider.tokenWillExpiredAfter(0));
clientProvider.refreshOssStsClient(ossConfiguration);
assertFalse(clientProvider.tokenWillExpiredAfter(0));
try(StsOssClientProvider clientProvider = new StsOssClientProvider(ossConfiguration)) {
clientProvider.init();
// refresh
String responseBodyString = "{\n"
+ " 'AccessKeyId' : 'STS.mockAK',\n"
+ " 'AccessKeySecret' : 'mockSK',\n"
+ " 'Expiration' : '" + expiration + "',\n"
+ " 'SecurityToken' : 'mockSecurityToken',\n"
+ " 'LastUpdated' : '" + lastUpdated + "',\n"
+ " 'Code' : 'Success'\n"
+ "}";
PowerMockito.mockStatic(HttpUtils.class);
when(HttpUtils.get(mEcsMetadataService, 10000)).thenReturn(responseBodyString);
assertTrue(clientProvider.tokenWillExpiredAfter(0));
clientProvider.createOrRefreshOssStsClient(ossConfiguration);
assertFalse(clientProvider.tokenWillExpiredAfter(0));
}
}

private Date toUtcDateString(long dateInMills) throws ParseException {
Expand Down

0 comments on commit 5725a87

Please sign in to comment.