Skip to content

Commit

Permalink
Cache size of files fetched from S3 (deephaven#5545)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored May 29, 2024
1 parent 5c08eb8 commit 33238dc
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka
static final long UNINITIALIZED_SIZE = -1;
private static final long UNINITIALIZED_NUM_FRAGMENTS = -1;

final S3SeekableChannelProvider provider;
final S3AsyncClient client;
final S3Instructions instructions;

Expand Down Expand Up @@ -63,8 +64,12 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka
*/
private long numFragments;

S3ChannelContext(@NotNull final S3AsyncClient client, @NotNull final S3Instructions instructions,
S3ChannelContext(
@NotNull final S3SeekableChannelProvider provider,
@NotNull final S3AsyncClient client,
@NotNull final S3Instructions instructions,
@NotNull final S3RequestCache sharedCache) {
this.provider = Objects.requireNonNull(provider);
this.client = Objects.requireNonNull(client);
this.instructions = Objects.requireNonNull(instructions);
this.localCache = new S3Request.AcquiredRequest[instructions.maxCacheSize()];
Expand All @@ -88,7 +93,7 @@ void setURI(@NotNull final S3Uri uri) {
this.uri = uri;
}

void verifyOrSetSize(long size) {
void verifyOrSetSize(final long size) {
if (this.size == UNINITIALIZED_SIZE) {
setSize(size);
} else if (this.size != size) {
Expand Down Expand Up @@ -255,10 +260,12 @@ private void ensureSize() throws IOException {
} catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) {
throw handleS3Exception(e, String.format("fetching HEAD for file %s, %s", uri, ctxStr()), instructions);
}
setSize(headObjectResponse.contentLength());
final long fileSize = headObjectResponse.contentLength();
setSize(fileSize);
provider.updateFileSizeCache(uri.uri(), fileSize);
}

private void setSize(long size) {
private void setSize(final long size) {
this.size = size;
// ceil(size / fragmentSize)
this.numFragments = (size + instructions.fragmentSize() - 1) / instructions.fragmentSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ public InputStream getInputStream(final SeekableByteChannel channel) {

@Override
public SeekableChannelContext makeContext() {
return new S3ChannelContext(s3AsyncClient, s3Instructions, sharedCache);
return new S3ChannelContext(this, s3AsyncClient, s3Instructions, sharedCache);
}

@Override
public SeekableChannelContext makeSingleUseContext() {
return new S3ChannelContext(s3AsyncClient, s3Instructions.singleUse(), sharedCache);
return new S3ChannelContext(this, s3AsyncClient, s3Instructions.singleUse(), sharedCache);
}

@Override
Expand Down Expand Up @@ -208,7 +208,7 @@ private void fetchNextBatch() throws IOException {
+ s3Object.key() + " and bucket " + bucketName + " inside directory "
+ directory, e);
}
updateFileSizeCache(getFileSizeCache(), uri, s3Object.size());
updateFileSizeCache(uri, s3Object.size());
return uri;
}).iterator();
// The following token is null when the last batch is fetched.
Expand All @@ -235,12 +235,10 @@ private Map<URI, FileSizeInfo> getFileSizeCache() {
}

/**
* Update the given file size cache with the given URI and size.
* Cache the file size for the given URI.
*/
private static void updateFileSizeCache(
@NotNull final Map<URI, FileSizeInfo> fileSizeCache,
@NotNull final URI uri,
final long size) {
void updateFileSizeCache(@NotNull final URI uri, final long size) {
final Map<URI, FileSizeInfo> fileSizeCache = getFileSizeCache();
fileSizeCache.compute(uri, (key, existingInfo) -> {
if (existingInfo == null) {
return new FileSizeInfo(uri, size);
Expand Down

0 comments on commit 33238dc

Please sign in to comment.