Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add direct access option to avoid caching certain paths #18326

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

import alluxio.AlluxioURI;
import alluxio.Constants;
import alluxio.client.ReadType;
import alluxio.client.WriteType;
import alluxio.client.block.BlockStoreClient;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.file.FileSystemContextReinitializer.ReinitBlockerResource;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.ConfigurationBuilder;
import alluxio.conf.OverlayConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.exception.DirectoryNotEmptyException;
Expand Down Expand Up @@ -69,6 +73,7 @@
import alluxio.security.authorization.AclEntry;
import alluxio.uri.Authority;
import alluxio.util.FileSystemOptionsUtils;
import alluxio.util.io.PathUtils;
import alluxio.wire.BlockLocation;
import alluxio.wire.BlockLocationInfo;
import alluxio.wire.FileBlockInfo;
Expand Down Expand Up @@ -99,11 +104,17 @@
*/
@ThreadSafe
public class BaseFileSystem implements FileSystem {
private static final AlluxioConfiguration DIRECT_ACCESS_CONF = new ConfigurationBuilder()
.setProperty(PropertyKey.USER_FILE_METADATA_SYNC_INTERVAL, "0")
.setProperty(PropertyKey.USER_FILE_READ_TYPE_DEFAULT, ReadType.NO_CACHE)
.setProperty(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.THROUGH).build();
private static final Logger LOG = LoggerFactory.getLogger(BaseFileSystem.class);

/** Used to manage closeable resources. */
private final Closer mCloser = Closer.create();
protected final FileSystemContext mFsContext;
protected final BlockStoreClient mBlockStore;
protected List<String> mPathList;

protected volatile boolean mClosed = false;

Expand Down Expand Up @@ -149,39 +160,64 @@ public void checkAccess(AlluxioURI path, CheckAccessPOptions options)
checkUri(path);
rpc(client -> {
CheckAccessPOptions mergedOptions = FileSystemOptionsUtils
.checkAccessDefaults(mFsContext.getPathConf(path))
.checkAccessDefaults(getDirectAccessConf(path))
.toBuilder().mergeFrom(options).build();
client.checkAccess(path, mergedOptions);
LOG.debug("Checked access {}, options: {}", path.getPath(), mergedOptions);
return null;
});
}

private boolean checkDirectAccess(AlluxioURI uri) {
if (!getConf().isSet(PropertyKey.USER_FILE_DIRECT_ACCESS)) {
return false;
}
if (mPathList == null) {
mPathList = getConf().getList(PropertyKey.USER_FILE_DIRECT_ACCESS);
}
return mPathList.stream().anyMatch(x -> {
try {
return PathUtils.hasPrefix(uri.getPath(), x);
} catch (InvalidPathException e) {
return false;
}
});
}

private AlluxioConfiguration getDirectAccessConf(AlluxioURI uri) {
AlluxioConfiguration inner = mFsContext.getPathConf(uri);
if (checkDirectAccess(uri)) {
return new OverlayConfiguration(DIRECT_ACCESS_CONF, inner);
} else {
return inner;
}
}

@Override
public void createDirectory(AlluxioURI path, CreateDirectoryPOptions options)
throws FileAlreadyExistsException, InvalidPathException, IOException, AlluxioException {
checkUri(path);
rpc(client -> {
CreateDirectoryPOptions mergedOptions = FileSystemOptionsUtils.createDirectoryDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
client.createDirectory(path, mergedOptions);
LOG.debug("Created directory {}, options: {}", path.getPath(), mergedOptions);
return null;
});
}

@Override
public FileOutStream createFile(AlluxioURI path, CreateFilePOptions options)
public FileOutStream createFile(AlluxioURI path, final CreateFilePOptions options)
throws FileAlreadyExistsException, InvalidPathException, IOException, AlluxioException {
checkUri(path);
return rpc(client -> {
CreateFilePOptions mergedOptions = FileSystemOptionsUtils.createFileDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
URIStatus status = client.createFile(path, mergedOptions);
LOG.debug("Created file {}, options: {}", path.getPath(), mergedOptions);
OutStreamOptions outStreamOptions =
new OutStreamOptions(mergedOptions, mFsContext,
mFsContext.getPathConf(path));
getDirectAccessConf(path));
outStreamOptions.setUfsPath(status.getUfsPath());
outStreamOptions.setMountId(status.getMountId());
outStreamOptions.setAcl(status.getAcl());
Expand All @@ -200,7 +236,7 @@ public void delete(AlluxioURI path, DeletePOptions options)
checkUri(path);
rpc(client -> {
DeletePOptions mergedOptions = FileSystemOptionsUtils.deleteDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
client.delete(path, mergedOptions);
LOG.debug("Deleted {}, options: {}", path.getPath(), mergedOptions);
return null;
Expand All @@ -213,7 +249,7 @@ public boolean exists(AlluxioURI path, final ExistsPOptions options)
checkUri(path);
return rpc(client -> {
ExistsPOptions mergedOptions = FileSystemOptionsUtils.existsDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
return client.exists(path, mergedOptions);
});
}
Expand All @@ -223,7 +259,7 @@ public void free(AlluxioURI path, final FreePOptions options)
throws FileDoesNotExistException, IOException, AlluxioException {
checkUri(path);
rpc(client -> {
FreePOptions mergedOptions = FileSystemOptionsUtils.freeDefaults(mFsContext.getPathConf(path))
FreePOptions mergedOptions = FileSystemOptionsUtils.freeDefaults(getDirectAccessConf(path))
.toBuilder().mergeFrom(options).build();
client.free(path, mergedOptions);
LOG.debug("Freed {}, options: {}", path.getPath(), mergedOptions);
Expand Down Expand Up @@ -279,7 +315,7 @@ public URIStatus getStatus(AlluxioURI path, final GetStatusPOptions options)
checkUri(path);
URIStatus status = rpc(client -> {
GetStatusPOptions mergedOptions = FileSystemOptionsUtils.getStatusDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
return client.getStatus(path, mergedOptions);
});
if (!status.isCompleted()) {
Expand All @@ -295,7 +331,7 @@ public List<URIStatus> listStatus(AlluxioURI path, final ListStatusPOptions opti
return rpc(client -> {
// TODO(calvin): Fix the exception handling in the master
ListStatusPOptions mergedOptions = FileSystemOptionsUtils.listStatusDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
return client.listStatus(path, mergedOptions);
});
}
Expand All @@ -308,7 +344,7 @@ public void iterateStatus(AlluxioURI path, final ListStatusPOptions options,
rpc(client -> {
// TODO(calvin): Fix the exception handling in the master
ListStatusPOptions mergedOptions = FileSystemOptionsUtils.listStatusDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
client.iterateStatus(path, mergedOptions, action);
return null;
});
Expand All @@ -321,7 +357,7 @@ public ListStatusPartialResult listStatusPartial(
checkUri(path);
return rpc(client -> {
ListStatusPartialPOptions mergedOptions = FileSystemOptionsUtils.listStatusPartialDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
return client.listStatusPartial(path, mergedOptions);
});
}
Expand All @@ -332,7 +368,7 @@ public void loadMetadata(AlluxioURI path, final ListStatusPOptions options)
checkUri(path);
rpc(client -> {
ListStatusPOptions mergedOptions = FileSystemOptionsUtils.listStatusDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options)
getDirectAccessConf(path)).toBuilder().mergeFrom(options)
.setLoadMetadataType(LoadMetadataPType.ALWAYS).setLoadMetadataOnly(true).build();
client.listStatus(path, mergedOptions);
return null;
Expand Down Expand Up @@ -384,7 +420,7 @@ public void persist(final AlluxioURI path, final ScheduleAsyncPersistencePOption
rpc(client -> {
ScheduleAsyncPersistencePOptions mergedOptions =
FileSystemOptionsUtils
.scheduleAsyncPersistDefaults(mFsContext.getPathConf(path)).toBuilder()
.scheduleAsyncPersistDefaults(getDirectAccessConf(path)).toBuilder()
.mergeFrom(options).build();
client.scheduleAsyncPersist(path, mergedOptions);
LOG.debug("Scheduled persist for {}, options: {}", path.getPath(), mergedOptions);
Expand All @@ -397,12 +433,12 @@ public FileInStream openFile(AlluxioURI path, OpenFilePOptions options)
throws FileDoesNotExistException, OpenDirectoryException, FileIncompleteException,
IOException, AlluxioException {
checkUri(path);
AlluxioConfiguration conf = mFsContext.getPathConf(path);
URIStatus status = getStatus(path,
FileSystemOptionsUtils.getStatusDefaults(conf).toBuilder()
.setAccessMode(Bits.READ)
.setUpdateTimestamps(options.getUpdateLastAccessTime())
.build());
AlluxioConfiguration conf = getDirectAccessConf(path);
GetStatusPOptions opt = FileSystemOptionsUtils.getStatusDefaults(conf)
.toBuilder()
.setAccessMode(Bits.READ)
.setUpdateTimestamps(options.getUpdateLastAccessTime()).build();
URIStatus status = getStatus(path, opt);
return openFile(status, options);
}

Expand All @@ -417,7 +453,7 @@ public FileInStream openFile(URIStatus status, OpenFilePOptions options)
if (!status.isCompleted()) {
throw new FileIncompleteException(path);
}
AlluxioConfiguration conf = mFsContext.getPathConf(path);
AlluxioConfiguration conf = getDirectAccessConf(path);
OpenFilePOptions mergedOptions = FileSystemOptionsUtils.openFileDefaults(conf)
.toBuilder().mergeFrom(options).build();
InStreamOptions inStreamOptions = new InStreamOptions(status, mergedOptions, conf, mFsContext);
Expand Down Expand Up @@ -455,7 +491,7 @@ public void setAcl(AlluxioURI path, SetAclAction action, List<AclEntry> entries,
checkUri(path);
rpc(client -> {
SetAclPOptions mergedOptions = FileSystemOptionsUtils.setAclDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
client.setAcl(path, action, entries, mergedOptions);
LOG.debug("Set ACL for {}, entries: {} options: {}", path.getPath(), entries,
mergedOptions);
Expand All @@ -468,7 +504,7 @@ public void setAttribute(AlluxioURI path, SetAttributePOptions options)
throws FileDoesNotExistException, IOException, AlluxioException {
checkUri(path);
SetAttributePOptions mergedOptions =
FileSystemOptionsUtils.setAttributeClientDefaults(mFsContext.getPathConf(path))
FileSystemOptionsUtils.setAttributeClientDefaults(getDirectAccessConf(path))
.toBuilder().mergeFrom(options).build();
rpc(client -> {
client.setAttribute(path, mergedOptions);
Expand Down Expand Up @@ -512,7 +548,7 @@ public void unmount(AlluxioURI path, UnmountPOptions options)
checkUri(path);
rpc(client -> {
UnmountPOptions mergedOptions = FileSystemOptionsUtils.unmountDefaults(
mFsContext.getPathConf(path)).toBuilder().mergeFrom(options).build();
getDirectAccessConf(path)).toBuilder().mergeFrom(options).build();
client.unmount(path);
LOG.debug("Unmounted {}, options: {}", path.getPath(), mergedOptions);
return null;
Expand Down
Loading
Loading