From 33238dcecd15c390632d0443d394185084d688c6 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 29 May 2024 15:24:24 -0500 Subject: [PATCH] Cache size of files fetched from S3 (#5545) --- .../deephaven/extensions/s3/S3ChannelContext.java | 15 +++++++++++---- .../extensions/s3/S3SeekableChannelProvider.java | 14 ++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index 78420954697..15bbed956c9 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -32,6 +32,7 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka static final long UNINITIALIZED_SIZE = -1; private static final long UNINITIALIZED_NUM_FRAGMENTS = -1; + final S3SeekableChannelProvider provider; final S3AsyncClient client; final S3Instructions instructions; @@ -63,8 +64,12 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka */ private long numFragments; - S3ChannelContext(@NotNull final S3AsyncClient client, @NotNull final S3Instructions instructions, + S3ChannelContext( + @NotNull final S3SeekableChannelProvider provider, + @NotNull final S3AsyncClient client, + @NotNull final S3Instructions instructions, @NotNull final S3RequestCache sharedCache) { + this.provider = Objects.requireNonNull(provider); this.client = Objects.requireNonNull(client); this.instructions = Objects.requireNonNull(instructions); this.localCache = new S3Request.AcquiredRequest[instructions.maxCacheSize()]; @@ -88,7 +93,7 @@ void setURI(@NotNull final S3Uri uri) { this.uri = uri; } - void verifyOrSetSize(long size) { + void verifyOrSetSize(final long size) { if (this.size == UNINITIALIZED_SIZE) { setSize(size); } else if (this.size != size) { @@ -255,10 +260,12 @@ private void ensureSize() throws IOException { } catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) { throw handleS3Exception(e, String.format("fetching HEAD for file %s, %s", uri, ctxStr()), instructions); } - setSize(headObjectResponse.contentLength()); + final long fileSize = headObjectResponse.contentLength(); + setSize(fileSize); + provider.updateFileSizeCache(uri.uri(), fileSize); } - private void setSize(long size) { + private void setSize(final long size) { this.size = size; // ceil(size / fragmentSize) this.numFragments = (size + instructions.fragmentSize() - 1) / instructions.fragmentSize(); diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index 1b52ca3ac54..b87401e42bb 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -96,12 +96,12 @@ public InputStream getInputStream(final SeekableByteChannel channel) { @Override public SeekableChannelContext makeContext() { - return new S3ChannelContext(s3AsyncClient, s3Instructions, sharedCache); + return new S3ChannelContext(this, s3AsyncClient, s3Instructions, sharedCache); } @Override public SeekableChannelContext makeSingleUseContext() { - return new S3ChannelContext(s3AsyncClient, s3Instructions.singleUse(), sharedCache); + return new S3ChannelContext(this, s3AsyncClient, s3Instructions.singleUse(), sharedCache); } @Override @@ -208,7 +208,7 @@ private void fetchNextBatch() throws IOException { + s3Object.key() + " and bucket " + bucketName + " inside directory " + directory, e); } - updateFileSizeCache(getFileSizeCache(), uri, s3Object.size()); + updateFileSizeCache(uri, s3Object.size()); return uri; }).iterator(); // The following token is null when the last batch is fetched. @@ -235,12 +235,10 @@ private Map getFileSizeCache() { } /** - * Update the given file size cache with the given URI and size. + * Cache the file size for the given URI. */ - private static void updateFileSizeCache( - @NotNull final Map fileSizeCache, - @NotNull final URI uri, - final long size) { + void updateFileSizeCache(@NotNull final URI uri, final long size) { + final Map fileSizeCache = getFileSizeCache(); fileSizeCache.compute(uri, (key, existingInfo) -> { if (existingInfo == null) { return new FileSizeInfo(uri, size);