Skip to content

Commit

Permalink
fix: close pending zero-copy responses when Storage#close is called (#…
Browse files Browse the repository at this point in the history
…2696)

Update gRPC based Storage instances to close out in progress reads when zero-copy is used and Storage#close() is called.
  • Loading branch information
BenWhitehead authored Sep 10, 2024
1 parent d91ad84 commit 1855308
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import com.google.storage.v2.StorageSettings;
import com.google.storage.v2.stub.GrpcStorageCallableFactory;
import com.google.storage.v2.stub.GrpcStorageStub;
import com.google.storage.v2.stub.StorageStub;
import com.google.storage.v2.stub.StorageStubSettings;
import io.grpc.ClientInterceptor;
import io.grpc.Detachable;
Expand All @@ -89,9 +88,9 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -908,9 +907,27 @@ private Object readResolve() {

private static final class InternalStorageClient extends StorageClient {

private InternalStorageClient(StorageStub stub) {
private InternalStorageClient(InternalZeroCopyGrpcStorageStub stub) {
super(stub);
}

@Override
public void shutdownNow() {
try {
// GrpcStorageStub#close() is final and we can't override it
// instead hook in here to close out the zero-copy marshaller
getStub().getObjectMediaResponseMarshaller.close();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
super.shutdownNow();
}
}

@Override
public InternalZeroCopyGrpcStorageStub getStub() {
return (InternalZeroCopyGrpcStorageStub) super.getStub();
}
}

private static final class InternalZeroCopyGrpcStorageStub extends GrpcStorageStub
Expand Down Expand Up @@ -1071,30 +1088,21 @@ public void close() throws IOException {
* them all as suppressed exceptions on the first occurrence.
*/
@VisibleForTesting
static void closeAllStreams(Collection<InputStream> inputStreams) throws IOException {
IOException ioException =
inputStreams.stream()
.map(
stream -> {
try {
stream.close();
return null;
} catch (IOException e) {
return e;
}
})
.filter(Objects::nonNull)
.reduce(
null,
(l, r) -> {
if (l != null) {
l.addSuppressed(r);
return l;
} else {
return r;
}
},
(l, r) -> l);
static void closeAllStreams(Iterable<InputStream> inputStreams) throws IOException {
Iterator<InputStream> iterator = inputStreams.iterator();
IOException ioException = null;
while (iterator.hasNext()) {
InputStream next = iterator.next();
try {
next.close();
} catch (IOException e) {
if (ioException == null) {
ioException = e;
} else {
ioException.addSuppressed(e);
}
}
}

if (ioException != null) {
throw ioException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
package com.google.cloud.storage;

import com.google.storage.v2.ReadObjectResponse;
import java.io.Closeable;
import java.io.IOException;

interface ResponseContentLifecycleManager {
interface ResponseContentLifecycleManager extends Closeable {
ResponseContentLifecycleHandle get(ReadObjectResponse response);

@Override
default void close() throws IOException {}

static ResponseContentLifecycleManager noop() {
return response ->
new ResponseContentLifecycleHandle(
Expand Down

0 comments on commit 1855308

Please sign in to comment.