Skip to content

Commit

Permalink
Implement async caching for paged block store
Browse files Browse the repository at this point in the history
  • Loading branch information
dbw9580 committed Sep 22, 2022
1 parent a4a85ee commit 6bbc180
Show file tree
Hide file tree
Showing 8 changed files with 441 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.file.FileSystemMasterClient;
import alluxio.worker.grpc.GrpcExecutors;
import alluxio.worker.page.PagedBlockStore;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -376,6 +377,11 @@ public void asyncCache(AsyncCacheRequest request) {

@Override
public void cache(CacheRequest request) throws AlluxioException, IOException {
// todo(bowen): paged block store handles caching from UFS automatically and on-the-fly
// this will cause an unnecessary extra read of the block
if (mBlockStore instanceof PagedBlockStore) {
return;
}
mCacheManager.submitRequest(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public PagedBlockStoreDir getDir() {

@Override
public String getPath() {
// todo(bowen): return path of the directory for block?
throw new UnsupportedOperationException("getPath is not supported for a paged block as the "
+ "block does not have a single file representation");
// todo(bowen): paged block does not have a single file representation, this is most likely
// a directory; in case of memory page store, there is not path at all
return mDir.getRootPath().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,12 @@
import alluxio.conf.PropertyKey;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.IdUtils;
import alluxio.worker.block.UfsInputStreamCache;
import alluxio.worker.block.io.BlockReader;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Optional;
Expand All @@ -45,10 +38,8 @@ public class PagedBlockReader extends BlockReader {
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
private final long mPageSize;
private final CacheManager mCacheManager;
private final UfsManager mUfsManager;
private final UfsInputStreamCache mUfsInStreamCache;
private final Optional<PagedUfsBlockReader> mUfsBlockReader;
private final PagedBlockMeta mBlockMeta;
private final Optional<UfsBlockReadOptions> mUfsBlockOptions;
private boolean mClosed = false;
private boolean mReadFromLocalCache = false;
private boolean mReadFromUfs = false;
Expand All @@ -57,24 +48,19 @@ public class PagedBlockReader extends BlockReader {
/**
* Constructor for PagedBlockReader.
* @param cacheManager paging cache manager
* @param ufsManager under file storage manager
* @param ufsInStreamCache a cache for the in streams from ufs
* @param conf alluxio configurations
* @param blockMeta block meta
* @param offset initial offset within the block to begin the read from
* @param ufsBlockReadOptions options to open a ufs block
* @param ufsBlockReader ufs block reader
*/
public PagedBlockReader(CacheManager cacheManager,
UfsManager ufsManager, UfsInputStreamCache ufsInStreamCache, AlluxioConfiguration conf,
PagedBlockMeta blockMeta, long offset, Optional<UfsBlockReadOptions> ufsBlockReadOptions) {
public PagedBlockReader(CacheManager cacheManager, AlluxioConfiguration conf,
PagedBlockMeta blockMeta, long offset, Optional<PagedUfsBlockReader> ufsBlockReader) {
Preconditions.checkArgument(offset >= 0 && offset <= blockMeta.getBlockSize(),
"Attempt to read block %d which is %d bytes long at invalid byte offset %d",
blockMeta.getBlockId(), blockMeta.getBlockSize(), offset);
mCacheManager = cacheManager;
mUfsManager = ufsManager;
mUfsInStreamCache = ufsInStreamCache;
mUfsBlockReader = ufsBlockReader;
mBlockMeta = blockMeta;
mUfsBlockOptions = ufsBlockReadOptions;
mPageSize = conf.getBytes(PropertyKey.USER_CLIENT_CACHE_PAGE_SIZE);
mPosition = offset;
}
Expand Down Expand Up @@ -105,59 +91,35 @@ public ByteBuffer read(long offset, long length) throws IOException {
MetricsSystem.meter(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE.getName()).mark(bytesRead);
mReadFromLocalCache = true;
} else {
if (!mUfsBlockOptions.isPresent()) {
throw new IOException(String.format("Block %d does not have UFS read options, "
+ "therefore cannot be read from UFS", mBlockMeta.getBlockId()));
if (!mUfsBlockReader.isPresent()) {
throw new IOException(String.format("Block %d cannot be read from UFS as UFS reader is "
+ "missing", mBlockMeta.getBlockId()));
}
byte[] page = readPageFromUFS(mUfsBlockOptions.get(), pos);
if (page.length > 0) {
PagedUfsBlockReader ufsBlockReader = mUfsBlockReader.get();
long pageStart = pos - (pos % mPageSize);
int pageSize = (int) Math.min(mPageSize, mBlockMeta.getBlockSize() - pageStart);
byte[] page = new byte[(int) mPageSize];
int pageBytesRead = ufsBlockReader.readPageAt(ByteBuffer.wrap(page), pageIndex);
if (pageBytesRead > 0) {
System.arraycopy(page, currentPageOffset, buf, (int) bytesRead, bytesLeftInPage);
bytesRead += bytesLeftInPage;
MetricsSystem.meter(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL.getName())
.mark(bytesLeftInPage);
mReadFromUfs = true;
mCacheManager.put(pageId, page);
if (ufsBlockReader.getUfsReadOptions().isCacheIntoAlluxio()) {
byte[] pageToCache = page;
if (pageBytesRead < mPageSize) {
pageToCache = new byte[(int) pageBytesRead];
System.arraycopy(page, 0, pageToCache, 0, pageBytesRead);
}
mCacheManager.put(pageId, pageToCache);
}
}
}
}
return ByteBuffer.wrap(buf);
}

private byte[] readPageFromUFS(UfsBlockReadOptions options, long pos) throws IOException {
long pageStart = pos - (pos % mPageSize);
InputStream ufsInputStream = seekUfsInputStream(options, options.getOffsetInFile() + pageStart);
int pageSize = (int) Math.min(mPageSize, mBlockMeta.getBlockSize() - pageStart);
byte[] page = new byte[pageSize];
int totalBytesRead = 0;
try {
while (totalBytesRead < pageSize) {
int bytesRead = ufsInputStream.read(page, totalBytesRead, pageSize - totalBytesRead);
if (bytesRead <= 0) {
break;
}
totalBytesRead += bytesRead;
}
} finally {
mUfsInStreamCache.release(ufsInputStream);
}
return page;
}

private InputStream seekUfsInputStream(UfsBlockReadOptions options, long posInFile)
throws IOException {
UfsManager.UfsClient ufsClient = mUfsManager.get(options.getMountId());
try (CloseableResource<UnderFileSystem> ufsResource =
ufsClient.acquireUfsResource()) {
return mUfsInStreamCache.acquire(
ufsResource.get(),
options.getUfsPath(),
IdUtils.fileIdFromBlockId(mBlockMeta.getBlockId()),
OpenOptions.defaults()
.setOffset(posInFile)
.setPositionShort(true));
}
}

@Override
public long getLength() {
return mBlockMeta.getBlockSize();
Expand Down Expand Up @@ -189,7 +151,7 @@ public boolean isClosed() {

@Override
public String getLocation() {
throw new UnsupportedOperationException();
return mBlockMeta.getPath();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,40 +151,46 @@ public void commitBlock(long sessionId, long blockId, boolean pinOnCreate) {
sessionId, blockId, pinOnCreate);
long lockId = mLockManager.lockBlock(sessionId, blockId, BlockLockType.WRITE);

PagedBlockMeta blockMeta = mPageMetaStore.getBlock(blockId)
.orElseThrow(() -> new BlockDoesNotExistRuntimeException(blockId));
PagedBlockStoreDir pageStoreDir = blockMeta.getDir();
// todo(bowen): need to pin this block until commit is complete
// unconditionally pin this block until committing is done
boolean isPreviouslyUnpinned = pageStoreDir.getEvictor().addPinnedBlock(blockId);
try {
pageStoreDir.commit(String.valueOf(blockId));
mPageMetaStore.commit(blockId);
} catch (IOException e) {
throw AlluxioRuntimeException.from(e);
} finally {
// committing failed, restore to the previous pinning state
if (isPreviouslyUnpinned) {
pageStoreDir.getEvictor().removePinnedBlock(blockId);
try (LockResource lock = new LockResource(mPageMetaStore.getLock().writeLock())) {
PagedTempBlockMeta blockMeta = mPageMetaStore.getTempBlock(blockId)
.orElseThrow(() -> new BlockDoesNotExistRuntimeException(blockId));
PagedBlockStoreDir pageStoreDir = blockMeta.getDir();
// unconditionally pin this block until committing is done
boolean isPreviouslyUnpinned = pageStoreDir.getEvictor().addPinnedBlock(blockId);
try {
pageStoreDir.commit(String.valueOf(blockId));
final PagedBlockMeta committed = mPageMetaStore.commit(blockId);
commitBlockToMaster(committed);
} catch (IOException e) {
throw AlluxioRuntimeException.from(e);
} finally {
if (!pinOnCreate && isPreviouslyUnpinned) {
// unpin this block if it should not be pinned on creation, e.g. a MUST_CACHE file
pageStoreDir.getEvictor().removePinnedBlock(blockId);
}
mLockManager.unlockBlock(lockId);
}
mLockManager.unlockBlock(lockId);
}
}

/**
* Commits a block to master. The block must have been committed in metastore and storage dir.
* Caller must have acquired at least READ lock on the metastore and the block.
* @param blockMeta the block to commit
*/
private void commitBlockToMaster(PagedBlockMeta blockMeta) {
final long blockId = blockMeta.getBlockId();
BlockMasterClient bmc = mBlockMasterClientPool.acquire();
try {
bmc.commitBlock(mWorkerId.get(), mPageMetaStore.getStoreMeta().getUsedBytes(), DEFAULT_TIER,
DEFAULT_MEDIUM, blockId, pageStoreDir.getBlockCachedBytes(blockId));
DEFAULT_MEDIUM, blockId, blockMeta.getDir().getBlockCachedBytes(blockId));
} catch (IOException e) {
throw new AlluxioRuntimeException(Status.UNAVAILABLE,
ExceptionMessage.FAILED_COMMIT_BLOCK_TO_MASTER.getMessage(blockId), e, ErrorType.Internal,
false);
} finally {
mBlockMasterClientPool.release(bmc);
}
if (!pinOnCreate) {
// unpin this block if it should not be pinned on creation, e.g. a MUST_CACHE file
pageStoreDir.getEvictor().removePinnedBlock(blockId);
}
BlockStoreLocation blockLocation =
new BlockStoreLocation(DEFAULT_TIER, getDirIndexOfBlock(blockId));
for (BlockStoreEventListener listener : mBlockStoreEventListeners) {
Expand Down Expand Up @@ -214,15 +220,7 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset,
if (blockMeta.isPresent()) {
final BlockPageEvictor evictor = blockMeta.get().getDir().getEvictor();
evictor.addPinnedBlock(blockId);
Optional<UfsBlockReadOptions> readOptions;
if (mPageMetaStore.hasFullBlock(blockId)) {
readOptions = Optional.empty();
} else {
readOptions = Optional.of(UfsBlockReadOptions.fromProto(options));
}
final PagedBlockReader pagedBlockReader = new PagedBlockReader(mCacheManager,
mUfsManager, mUfsInStreamCache, mConf, blockMeta.get(), offset, readOptions);
return new DelegatingBlockReader(pagedBlockReader, () -> {
return new DelegatingBlockReader(getBlockReader(blockMeta.get(), offset, options), () -> {
evictor.removePinnedBlock(blockId);
unpinBlock(lockId);
});
Expand All @@ -233,32 +231,72 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset,
try (LockResource lock = new LockResource(mPageMetaStore.getLock().writeLock())) {
// in case someone else has added this block while we wait for the lock,
// just use the block meta; otherwise create a new one and add to the metastore
PagedBlockMeta blockMeta = mPageMetaStore
.getBlock(blockId)
.orElseGet(() -> {
long blockSize = options.getBlockSize();
PagedBlockStoreDir dir =
(PagedBlockStoreDir) mPageMetaStore.allocate(String.valueOf(blockId), 0);
PagedBlockMeta newBlockMeta = new PagedBlockMeta(blockId, blockSize, dir);
mPageMetaStore.addBlock(newBlockMeta);
dir.getEvictor().addPinnedBlock(blockId);
return newBlockMeta;
});

final PagedBlockReader pagedBlockReader = new PagedBlockReader(mCacheManager,
mUfsManager, mUfsInStreamCache, mConf, blockMeta, offset, Optional.of(readOptions));
return new DelegatingBlockReader(pagedBlockReader, () -> {
blockMeta.getDir().getEvictor().removePinnedBlock(blockId);
Optional<PagedBlockMeta> blockMeta = mPageMetaStore.getBlock(blockId);
if (blockMeta.isPresent()) {
blockMeta.get().getDir().getEvictor().addPinnedBlock(blockId);
return new DelegatingBlockReader(getBlockReader(blockMeta.get(), offset, options), () -> {
blockMeta.get().getDir().getEvictor().removePinnedBlock(blockId);
unpinBlock(lockId);
});
}
PagedBlockStoreDir dir =
(PagedBlockStoreDir) mPageMetaStore.allocate(String.valueOf(blockId),
options.getBlockSize());
PagedBlockMeta newBlockMeta = new PagedBlockMeta(blockId, options.getBlockSize(), dir);
if (!readOptions.isCacheIntoAlluxio()) {
// block does not need to be cached in Alluxio, no need to add and commit it
unpinBlock(lockId);
});
return new PagedUfsBlockReader(
mUfsManager, mUfsInStreamCache, mConf, newBlockMeta, offset, readOptions);
}
mPageMetaStore.addBlock(newBlockMeta);
dir.getEvictor().addPinnedBlock(blockId);
}
// todo(bowen): can we downgrade a write lock to a read lock without unlocking it?
try (LockResource lock = new LockResource(mPageMetaStore.getLock().readLock())) {
Optional<PagedBlockMeta> blockMeta = mPageMetaStore.getBlock(blockId);
if (blockMeta.isPresent()) {
return new DelegatingBlockReader(getBlockReader(blockMeta.get(), offset, options), () -> {
commitBlockToMaster(blockMeta.get());
blockMeta.get().getDir().getEvictor().removePinnedBlock(blockId);
unpinBlock(lockId);
});
}
throw new IllegalStateException(String.format(
"Block %d removed while being read, this is due to a race condition", blockId));
}
}

private BlockReader getBlockReader(PagedBlockMeta blockMeta, long offset,
Protocol.OpenUfsBlockOptions options) {
final long blockId = blockMeta.getBlockId();
final Optional<PagedUfsBlockReader> ufsBlockReader;
if (mPageMetaStore.hasFullBlock(blockId)) {
ufsBlockReader = Optional.empty();
} else {
UfsBlockReadOptions readOptions = UfsBlockReadOptions.fromProto(options);
ufsBlockReader = Optional.of(new PagedUfsBlockReader(
mUfsManager, mUfsInStreamCache, mConf, blockMeta, offset, readOptions));
}
return new PagedBlockReader(mCacheManager, mConf, blockMeta, offset, ufsBlockReader);
}

@Override
public BlockReader createUfsBlockReader(long sessionId, long blockId, long offset,
boolean positionShort,
Protocol.OpenUfsBlockOptions options) throws IOException {
return null;
PagedBlockMeta blockMeta = mPageMetaStore
.getBlock(blockId)
.orElseGet(() -> {
long blockSize = options.getBlockSize();
PagedBlockStoreDir dir =
(PagedBlockStoreDir) mPageMetaStore.allocate(String.valueOf(blockId), 0);
// do not add the block to metastore
return new PagedBlockMeta(blockId, blockSize, dir);
});
UfsBlockReadOptions readOptions = UfsBlockReadOptions.fromProto(options);
return new PagedUfsBlockReader(mUfsManager, mUfsInStreamCache, mConf, blockMeta,
offset, readOptions);
}

@Override
Expand All @@ -280,7 +318,7 @@ public void abortBlock(long sessionId, long blockId) {
@Override
public void requestSpace(long sessionId, long blockId, long additionalBytes) {
// TODO(bowen): implement actual space allocation and replace placeholder values
boolean blockEvicted = true;
boolean blockEvicted = false;
if (blockEvicted) {
long evictedBlockId = 0;
BlockStoreLocation evictedBlockLocation = new BlockStoreLocation(DEFAULT_TIER, 1);
Expand All @@ -291,7 +329,6 @@ public void requestSpace(long sessionId, long blockId, long additionalBytes) {
}
}
}
throw new UnsupportedOperationException();
}

@Override
Expand Down Expand Up @@ -383,12 +420,12 @@ public Optional<TempBlockMeta> getTempBlockMeta(long blockId) {

@Override
public boolean hasBlockMeta(long blockId) {
throw new UnsupportedOperationException();
return mPageMetaStore.getBlock(blockId).isPresent();
}

@Override
public boolean hasTempBlockMeta(long blockId) {
throw new UnsupportedOperationException();
return mPageMetaStore.getTempBlock(blockId).isPresent();
}

@Override
Expand Down
Loading

0 comments on commit 6bbc180

Please sign in to comment.