Skip to content

Commit

Permalink
fix: fix JSON read handling when socket broken resulting in partial b…
Browse files Browse the repository at this point in the history
…ytes copied (#2303)

Add integration test with testbench to force failure during read

Update retry conformance tests to assert full byte content when read through reader

Fixes #2301
  • Loading branch information
BenWhitehead authored Nov 14, 2023
1 parent c069aca commit d4bfcf0
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.List;
Expand All @@ -68,6 +69,7 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann
private long position;
private ScatteringByteChannel sbc;
private boolean open;
private boolean returnEOF;

// returned X-Goog-Generation header value
private Long xGoogGeneration;
Expand All @@ -84,28 +86,37 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann
this.options = options;
this.resultRetryAlgorithm = resultRetryAlgorithm;
this.open = true;
this.returnEOF = false;
this.position = apiaryReadRequest.getByteRangeSpec().beginOffset();
}

@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
if (returnEOF) {
close();
return -1;
} else if (!open) {
throw new ClosedChannelException();
}
long totalRead = 0;
do {
if (sbc == null) {
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
}

long totalRemaining = Buffers.totalRemaining(dsts, offset, length);
try {
// According to the contract of Retrying#run it's possible for sbc to be null even after
// invocation. However, the function we provide is guaranteed to return non-null or throw
// an exception. So we suppress the warning from intellij here.
//noinspection ConstantConditions
long read = sbc.read(dsts, offset, length);
if (read == -1) {
open = false;
returnEOF = true;
} else {
position += read;
totalRead += read;
}
return read;
return totalRead;
} catch (Exception t) {
if (resultRetryAlgorithm.shouldRetry(t, null)) {
// if our retry algorithm COULD allow a retry, continue the loop and allow trying to
Expand All @@ -121,6 +132,13 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
} else {
throw new IOException(StorageException.coalesce(t));
}
} finally {
long totalRemainingAfter = Buffers.totalRemaining(dsts, offset, length);
long delta = totalRemaining - totalRemainingAfter;
if (delta > 0) {
position += delta;
totalRead += delta;
}
}
} while (true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Arrays;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -159,4 +160,9 @@ static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException {
}
return total;
}

static long totalRemaining(ByteBuffer[] buffers, int offset, int length) {
ByteBuffer[] sub = Arrays.copyOfRange(buffers, offset, length);
return Arrays.stream(sub).mapToLong(Buffer::remaining).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.storage.conformance.retry;

import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.defaultSetup;
import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.notificationSetup;
import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.pubsubTopicSetup;
Expand Down Expand Up @@ -1277,9 +1278,12 @@ private static void get(ArrayList<RpcMethodMapping> a) {
try {
ReadChannel reader =
ctx.getStorage().reader(ctx.getState().getBlob().getBlobId());
WritableByteChannel write =
Channels.newChannel(ByteStreams.nullOutputStream());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
WritableByteChannel write = Channels.newChannel(baos);
ByteStreams.copy(reader, write);

assertThat(xxd(baos.toByteArray()))
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
} catch (IOException e) {
if (e.getCause() instanceof BaseServiceException) {
throw e.getCause();
Expand All @@ -1299,9 +1303,12 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.reader(
ctx.getState().getBlob().getBlobId().getBucket(),
ctx.getState().getBlob().getBlobId().getName());
WritableByteChannel write =
Channels.newChannel(ByteStreams.nullOutputStream());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
WritableByteChannel write = Channels.newChannel(baos);
ByteStreams.copy(reader, write);

assertThat(xxd(baos.toByteArray()))
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
} catch (IOException e) {
if (e.getCause() instanceof BaseServiceException) {
throw e.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ public void restartingAStreamForGzipContentIsAtTheCorrectOffset() throws Excepti
.setContentEncoding("gzip")
.build();
Blob gen1 = storage.create(info, gzipped.getBytes(), BlobTargetOption.doesNotExist());
String uri = gen1.getBlobId().toGsUtilUri();
System.out.println("uri = " + uri);

JsonObject instructions = new JsonObject();
JsonArray value = new JsonArray();
Expand Down Expand Up @@ -200,4 +198,57 @@ public void restartingAStreamForGzipContentIsAtTheCorrectOffset() throws Excepti
.isEqualTo(ImmutableList.of(String.format("bytes=%d-", 256 * 1024))));
}
}

@Test
public void resumeFromCorrectOffsetWhenPartialReadSuccess() throws Exception {
StorageOptions baseOptions = storage.getOptions();
Random rand = new Random(918273645);

ChecksummedTestContent uncompressed;
{
// must use random strategy, base64 characters compress too well. 512KiB uncompressed becomes
// ~1600 bytes which is smaller than our 'return-broken-stream-after-256K' rule
byte[] bytes = DataGenerator.rand(rand).genBytes(_512KiB);
// byte[] bytes = DataGenerator.base64Characters().genBytes(_512KiB);
uncompressed = ChecksummedTestContent.of(bytes);
}
BlobId id = BlobId.of(bucket.getName(), generator.randomObjectName());
BlobInfo info = BlobInfo.newBuilder(id).build();
Blob gen1 = storage.create(info, uncompressed.getBytes(), BlobTargetOption.doesNotExist());

JsonObject instructions = new JsonObject();
JsonArray value = new JsonArray();
value.add("return-broken-stream-after-256K");
instructions.add("storage.objects.get", value);
RetryTestResource retryTestResource = new RetryTestResource(instructions);
RetryTestResource retryTest = testBench.createRetryTest(retryTestResource);

ImmutableMap<String, String> headers = ImmutableMap.of("x-retry-test-id", retryTest.id);

RequestAuditing requestAuditing = new RequestAuditing();
StorageOptions testStorageOptions =
baseOptions
.toBuilder()
.setTransportOptions(requestAuditing)
.setHeaderProvider(FixedHeaderProvider.create(headers))
.build();

String expected = xxd(uncompressed.getBytes());

try (Storage testStorage = testStorageOptions.getService();
ReadChannel r = testStorage.reader(gen1.getBlobId());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
WritableByteChannel w = Channels.newChannel(baos)) {
long copy = ByteStreams.copy(r, w);
String actual = xxd(baos.toByteArray());
ImmutableList<HttpRequest> requests = requestAuditing.getRequests();
assertAll(
() -> assertThat(copy).isEqualTo(uncompressed.getBytes().length),
() -> assertThat(actual).isEqualTo(expected),
() -> assertThat(requests.get(0).getHeaders().get("range")).isNull(),
() ->
assertThat(requests.get(1).getHeaders().get("range"))
.isEqualTo(ImmutableList.of(String.format("bytes=%d-", 256 * 1024))));
}
}
}

0 comments on commit d4bfcf0

Please sign in to comment.