Skip to content

Commit

Permalink
chore: attempt to drain gRPC ReadObject stream iterator (#2700)
Browse files Browse the repository at this point in the history
Related #2696
  • Loading branch information
BenWhitehead authored Sep 10, 2024
1 parent 1855308 commit 3432345
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public boolean hasNext() {
if (!result.isDone()) {
result.setException(StorageException.coalesce(e));
}
reset();
throw e;
}
}
Expand All @@ -194,15 +195,31 @@ public ReadObjectResponse next() {
if (!result.isDone()) {
result.setException(StorageException.coalesce(e));
}
reset();
throw e;
}
}

@Override
public void close() {
if (serverStream != null) {
// todo: do we need to "drain" anything?
serverStream.cancel();
if (responseIterator != null) {
IOException ioException = null;
while (responseIterator.hasNext()) {
try {
ReadObjectResponse next = responseIterator.next();
ResponseContentLifecycleHandle handle = rclm.get(next);
handle.close();
} catch (IOException e) {
if (ioException == null) {
ioException = e;
} else if (ioException != e) {
ioException.addSuppressed(e);
}
}
}
}
}
}

Expand All @@ -223,5 +240,11 @@ private Iterator<ReadObjectResponse> ensureResponseIteratorOpen() {
}
}
}

private void reset() {
serverStream = null;
responseIterator = null;
streamInitialized = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ static void closeAllStreams(Iterable<InputStream> inputStreams) throws IOExcepti
} catch (IOException e) {
if (ioException == null) {
ioException = e;
} else {
} else if (ioException != e) {
ioException.addSuppressed(e);
}
}
Expand Down

0 comments on commit 3432345

Please sign in to comment.