diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncingFileChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncingFileChannel.java new file mode 100644 index 0000000000..74c76ecac0 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncingFileChannel.java @@ -0,0 +1,51 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +/** A FileChannel decorator that will fsync after every {@link #write(ByteBuffer)} */ +final class SyncingFileChannel implements UnbufferedWritableByteChannel { + + private final FileChannel fc; + + SyncingFileChannel(FileChannel fc) { + this.fc = fc; + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + long written = fc.write(srcs, offset, length); + // metadata in this case are things like mtime, atime etc. Those are not important to our needs + // simply force the file contents to by synced. + fc.force(/*includeMetaData = */ false); + return written; + } + + @Override + public boolean isOpen() { + return fc.isOpen(); + } + + @Override + public void close() throws IOException { + fc.close(); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncingFileChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncingFileChannelTest.java new file mode 100644 index 0000000000..5fcd6429d8 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncingFileChannelTest.java @@ -0,0 +1,183 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.RewindableContentPropertyTest.byteBuffers; +import static com.google.cloud.storage.TestUtils.xxd; +import static com.google.common.truth.Truth.assertThat; +import static java.nio.file.Files.readAllBytes; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import net.jqwik.api.Arbitraries; +import net.jqwik.api.Arbitrary; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; +import net.jqwik.api.Provide; + +public final class ITSyncingFileChannelTest { + + /** + * Run a series of generated scenarios where each write is performed against a {@link + * SyncingFileChannel} after {@link SyncingFileChannel#write(ByteBuffer)} returns verify the full + * contents of the file match the expected cumulative value. + */ + @Property + void shouldHandleAnySizeWriteGt0(@ForAll("WriteScenario") WriteScenario writeScenario) + throws IOException { + // use try-with-resource to approximate @TearDown and cleanup the file + try (WriteScenario ws = writeScenario) { + Path path = ws.getPath(); + try (FileChannel fc = FileChannel.open(path, ws.getOpenOptions()); + SyncingFileChannel syncing = new SyncingFileChannel(fc)) { + assertThat(syncing.isOpen()).isTrue(); + ByteBuffer[] writes = ws.writes(); + for (int i = 0; i < writes.length; i++) { + ByteBuffer buf = writes[i]; + syncing.write(buf); + assertThat(xxd(readAllBytes(path))).isEqualTo(ws.expected(i)); + } + } + assertThat(xxd(readAllBytes(path))).isEqualTo(ws.all()); + } + } + + @Provide("WriteScenario") + static Arbitrary writeScenario() { + return Arbitraries.lazyOf( + () -> + Arbitraries.oneOf( + byteBuffers(1, 10), + byteBuffers(10, 100), + byteBuffers(100, 1_000), + byteBuffers(1_000, 10_000), + byteBuffers(10_000, 100_000), + byteBuffers(100_000, 1_000_000))) + .map( + buffers -> + Arrays.stream(buffers).filter(Buffer::hasRemaining).toArray(ByteBuffer[]::new)) + .filter( + buffers -> { + long totalAvailable = Arrays.stream(buffers).mapToLong(ByteBuffer::remaining).sum(); + return totalAvailable > 0; + }) + .map(WriteScenario::of); + } + + static final class WriteScenario implements AutoCloseable { + private static final Path TMP_DIR = Paths.get(System.getProperty("java.io.tmpdir")); + private static final Collector DEBUG_JOINER = + Collectors.joining(",\n\t", "[\n\t", "\n]"); + + private final Path path; + private final ByteBuffer[] writes; + private final ByteString[] expectedCumulativeContents; + private final EnumSet openOptions; + + private WriteScenario(Path path, ByteBuffer[] writes, ByteString[] expectedCumulativeContents) { + this.path = path; + this.writes = writes; + this.expectedCumulativeContents = expectedCumulativeContents; + this.openOptions = EnumSet.of(StandardOpenOption.CREATE, StandardOpenOption.WRITE); + } + + public Path getPath() { + return path; + } + + public EnumSet getOpenOptions() { + return openOptions; + } + + ByteBuffer[] writes() { + return Arrays.stream(writes).map(ByteBuffer::duplicate).toArray(ByteBuffer[]::new); + } + + String expected(int idx) { + Preconditions.checkArgument( + 0 <= idx && idx < expectedCumulativeContents.length, + "index out of bounds: (0 <= %s && %s < %s)", + idx, + idx, + expectedCumulativeContents.length); + return xxd(false, expectedCumulativeContents[idx].asReadOnlyByteBuffer()); + } + + String all() { + return xxd( + false, + expectedCumulativeContents[expectedCumulativeContents.length - 1].asReadOnlyByteBuffer()); + } + + @Override + public void close() throws IOException { + Files.deleteIfExists(path); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("\npath", path) + .add( + "\nwrites", + Arrays.stream(writes) + .map(b -> String.format("%s \n %s", b.toString(), xxd(false, b.duplicate()))) + .collect(DEBUG_JOINER)) + .add( + "\nexpectedCumulativeContents", + Arrays.stream(expectedCumulativeContents) + .map(ByteString::toString) + .collect(DEBUG_JOINER)) + .toString(); + } + + public static WriteScenario of(ByteBuffer[] byteBuffers) { + try { + Path path = Files.createTempFile(TMP_DIR, WriteScenario.class.getName() + "-", ".bin"); + + List byteStrings = new ArrayList<>(); + for (int i = 0; i < byteBuffers.length; i++) { + ByteString bs = ByteString.empty(); + for (int j = 0; j <= i; j++) { + ByteBuffer byteBuffer = byteBuffers[j].duplicate(); + bs = bs.concat(ByteStringStrategy.noCopy().apply(byteBuffer)); + } + byteStrings.add(bs); + } + + return new WriteScenario(path, byteBuffers, byteStrings.toArray(new ByteString[0])); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java index 48d29bc8c8..79453af559 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java @@ -143,7 +143,7 @@ private static Arbitrary bytes(int minFileSize, int maxFileSize) { } @NonNull - private static Arbitrary byteBuffers(int perBufferMinSize, int perBufferMaxSize) { + static Arbitrary byteBuffers(int perBufferMinSize, int perBufferMaxSize) { return byteBuffer(perBufferMinSize, perBufferMaxSize) .array(ByteBuffer[].class) .ofMinSize(1) @@ -155,7 +155,7 @@ private static Arbitrary byteBuffers(int perBufferMinSize, int per * limit */ @NonNull - private static Arbitrary byteBuffer(int minSize, int maxSize) { + static Arbitrary byteBuffer(int minSize, int maxSize) { return Arbitraries.integers() .between(minSize, maxSize) .withDistribution(RandomDistribution.uniform())