Skip to content

Commit

Permalink
Refactor for prepare/commit API
Browse files Browse the repository at this point in the history
  • Loading branch information
cdouglas committed Dec 15, 2024
1 parent 67a540b commit 0399001
Show file tree
Hide file tree
Showing 21 changed files with 206 additions and 113 deletions.
15 changes: 10 additions & 5 deletions api/src/main/java/org/apache/iceberg/io/AtomicOutputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@
import java.io.InputStream;
import java.util.function.Supplier;

public interface AtomicOutputFile extends OutputFile {
/** Checksum expected by this object store, bridging (in Java8) Checksum and Digest APIs. */
FileChecksum checksum();
public interface AtomicOutputFile<T> extends OutputFile {

/**
* Generate a token to replace the InputFile with the specified content.
*
* @param source Invoked to obtain an InputStream for the future output.
*/
T prepare(Supplier<InputStream> source) throws IOException;

/**
* Atomically replace the contents of the target AtomicOutputFile using the contents of the stream
* provided, only if the content checksum matches.
*
* @param checksum Checksum provided to the underlying store for validation
* @param token Checksum provided to the underlying store for validation
* @param source Function invoked to obtain an InputStream to copy to the destination
* @return an {@link InputFile} with metadata identifying the file written, could be used in a
* subsequent call to {@link SupportsAtomicOperations#newOutputFile(InputFile)}
*/
InputFile writeAtomic(FileChecksum checksum, Supplier<InputStream> source) throws IOException;
InputFile writeAtomic(T token, Supplier<InputStream> source) throws IOException;
}
4 changes: 2 additions & 2 deletions api/src/main/java/org/apache/iceberg/io/FileChecksum.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ default void update(byte[] bytes) {

void update(byte[] bytes, int off, int len);

byte[] asBytes();
byte[] contentChecksumBytes();

String toHeaderString();
String contentHeaderString();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.iceberg.io;

// TODO using InputFile is imprecise. It should be resolved.
public interface SupportsAtomicOperations extends FileIO {
public interface SupportsAtomicOperations<T> extends FileIO {
/**
* Create a new atomic output file that will replace the given input file.
*
* @param replace an input file to replace
* @return a new atomic output file
*/
AtomicOutputFile newOutputFile(InputFile replace);
AtomicOutputFile<T> newOutputFile(InputFile replace);

class CASException extends RuntimeException {
public CASException(String message, Exception cause) {
Expand Down
22 changes: 12 additions & 10 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3Checksum.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,39 @@
import java.util.Base64;
import java.util.zip.Checksum;
import org.apache.commons.codec.digest.PureJavaCrc32C;
import org.apache.iceberg.io.CAS;
import org.apache.iceberg.io.FileChecksum;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;

public class S3Checksum implements FileChecksum {
// mild abuse of types to accommodate existing work
public class S3Checksum implements FileChecksum, CAS {

private long length = 0L;
private final Checksum crc32c = new PureJavaCrc32C();

@Override
public long contentLength() {
return length;
}

@Override
public void update(byte[] bytes, int off, int len) {
crc32c.update(bytes, off, len);
length += len;
}

@Override
public byte[] asBytes() {
public long contentLength() {
return length;
}

@Override
public byte[] contentChecksumBytes() {
return Ints.toByteArray((int) crc32c.getValue());
}

@Override
public String toHeaderString() {
return Base64.getEncoder().encodeToString(asBytes());
public String contentHeaderString() {
return Base64.getEncoder().encodeToString(contentChecksumBytes());
}

@Override
public String toString() {
return toHeaderString();
return contentHeaderString();
}
}
3 changes: 2 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.AtomicOutputFile;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.CAS;
import org.apache.iceberg.io.CredentialSupplier;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileInfo;
Expand Down Expand Up @@ -75,7 +76,7 @@
* schemes s3a, s3n, https are also treated as s3 file paths. Using this FileIO with other schemes
* will result in {@link org.apache.iceberg.exceptions.ValidationException}.
*/
public class S3FileIO implements CredentialSupplier, DelegateFileIO, SupportsAtomicOperations {
public class S3FileIO implements CredentialSupplier, DelegateFileIO, SupportsAtomicOperations<CAS> {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand Down
29 changes: 18 additions & 11 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,26 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.function.Supplier;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.iceberg.encryption.NativeFileCryptoParameters;
import org.apache.iceberg.encryption.NativelyEncryptedFile;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.io.AtomicOutputFile;
import org.apache.iceberg.io.FileChecksum;
import org.apache.iceberg.io.CAS;
import org.apache.iceberg.io.FileChecksumOutputStream;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.io.SupportsAtomicOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;

public class S3OutputFile extends BaseS3File
implements OutputFile, NativelyEncryptedFile, AtomicOutputFile {
implements NativelyEncryptedFile, AtomicOutputFile<CAS> {
private NativeFileCryptoParameters nativeEncryptionParameters;
private final String etag;

Expand Down Expand Up @@ -110,32 +112,37 @@ public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoPar
}

@Override
public FileChecksum checksum() {
public CAS prepare(Supplier<InputStream> source) throws IOException {
// S3OutputStream forces multipart upload w/ attendant MD5 and work pool... meh, do it manually
// in writeAtomic
// Catalog + metadata writes are likely smaller than multipart would justify, anyway
return new S3Checksum();
final S3Checksum checksum = new S3Checksum();
try (InputStream in = source.get();
FileChecksumOutputStream chk =
new FileChecksumOutputStream(new NullOutputStream(), checksum)) {
ByteStreams.copy(in, chk);
}
return checksum;
}

@Override
public InputFile writeAtomic(FileChecksum checksum, Supplier<InputStream> source)
throws IOException {
public InputFile writeAtomic(CAS token, Supplier<InputStream> source) throws IOException {
try (InputStream src = source.get()) {
final S3URI location = uri();
PutObjectRequest req =
PutObjectRequest.builder()
.bucket(location.bucket())
.key(location.key())
.checksumCRC32C(checksum.toHeaderString())
.contentLength(checksum.contentLength())
.checksumCRC32C(token.contentHeaderString())
.contentLength(token.contentLength())
.ifMatch(etag)
.build();
RequestBody content = RequestBody.fromInputStream(src, checksum().contentLength());
RequestBody content = RequestBody.fromInputStream(src, token.contentLength());
PutObjectResponse response = client().putObject(req, content);
return new S3InputFile(
client(),
location,
checksum.contentLength(),
token.contentLength(), // TODO should be identical, but get from resposne?
s3FileIOProperties(),
metrics(),
response.eTag());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.io.AtomicOutputFile;
import org.apache.iceberg.io.FileChecksum;
import org.apache.iceberg.io.CAS;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SupportsAtomicOperations;
import org.apache.iceberg.relocated.com.google.common.io.CharStreams;
Expand Down Expand Up @@ -160,21 +160,19 @@ public void testFileIOOverwrite() throws IOException, S3Exception {
assertThat(CharStreams.toString(new InputStreamReader(i, StandardCharsets.UTF_8)))
.isEqualTo("shaved my kiwis");
}
final AtomicOutputFile outf = fileIO.newOutputFile(inf);
final FileChecksum chk = outf.checksum();
final AtomicOutputFile<CAS> outf = fileIO.newOutputFile(inf);
final byte[] replContent = "shaved my hamster".getBytes(StandardCharsets.UTF_8);
chk.update(replContent);
final CAS chk = outf.prepare(() -> new ByteArrayInputStream(replContent));

InputFile replf = outf.writeAtomic(chk, () -> new ByteArrayInputStream(replContent));
try (InputStream i = replf.newStream()) {
assertThat(CharStreams.toString(new InputStreamReader(i, StandardCharsets.UTF_8)))
.isEqualTo("shaved my hamster");
}

final AtomicOutputFile outfFail = fileIO.newOutputFile(inf);
final FileChecksum chkFail = outfFail.checksum();
final AtomicOutputFile<CAS> outfFail = fileIO.newOutputFile(inf);
final byte[] failContent = "shaved your mom".getBytes(StandardCharsets.UTF_8);
chkFail.update(failContent);
final CAS chkFail = outfFail.prepare(() -> new ByteArrayInputStream(failContent));

assertThatThrownBy(
() -> outfFail.writeAtomic(chkFail, () -> new ByteArrayInputStream(failContent)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import org.apache.iceberg.io.CAS;
import org.apache.iceberg.io.FileChecksum;

public class ADLSChecksum implements FileChecksum {
public class ADLSChecksum implements FileChecksum, CAS {
private long length = 0L;
private final MessageDigest checksum = getMD5DigestInstance();

Expand All @@ -47,17 +48,17 @@ public void update(byte[] bytes, int off, int len) {
}

@Override
public byte[] asBytes() {
public byte[] contentChecksumBytes() {
return checksum.digest();
}

@Override
public String toHeaderString() {
return Base64.getEncoder().encodeToString(asBytes());
public String contentHeaderString() {
return Base64.getEncoder().encodeToString(contentChecksumBytes());
}

@Override
public String toString() {
return toHeaderString();
return contentHeaderString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.function.Supplier;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.io.AtomicOutputFile;
import org.apache.iceberg.io.FileChecksum;
import org.apache.iceberg.io.CAS;
import org.apache.iceberg.io.FileChecksumOutputStream;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.io.SupportsAtomicOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;

class ADLSOutputFile extends BaseADLSFile implements AtomicOutputFile {
class ADLSOutputFile extends BaseADLSFile implements AtomicOutputFile<CAS> {

private Long length;
private final DataLakeRequestConditions conditions;
Expand Down Expand Up @@ -89,13 +93,33 @@ public InputFile toInputFile() {
}

@Override
public FileChecksum checksum() {
return new ADLSChecksum();
public CAS prepare(Supplier<InputStream> source) {
final ADLSChecksum checksum = new ADLSChecksum();
final byte[] buffer = new byte[8192];
try (InputStream in = source.get();
FileChecksumOutputStream chk =
new FileChecksumOutputStream(
new OutputStream() {
@Override
public void write(int b) {
// TODO: no NullOutputStream?
}

@Override
public void write(byte[] b, int off, int len) {
// do nothing
}
},
checksum)) {
ByteStreams.copy(in, chk);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return checksum;
}

@Override
public ADLSInputFile writeAtomic(FileChecksum checksum, Supplier<InputStream> source)
throws IOException {
public ADLSInputFile writeAtomic(CAS checksum, Supplier<InputStream> source) throws IOException {
// Annoyingly, the checksum is not validated server-side, but stored as metadata. The
// partial-write problem is less of an issue using an InputStream, so the length validation can
// suffice
Expand All @@ -108,7 +132,7 @@ public ADLSInputFile writeAtomic(FileChecksum checksum, Supplier<InputStream> so
.setRequestConditions(conditions)
.setHeaders(
new PathHttpHeaders()
.setContentMd5(checksum.asBytes())
.setContentMd5(checksum.contentChecksumBytes())
.setContentType("binary")),
null, // no timeout
Context.NONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.io.AtomicOutputFile;
import org.apache.iceberg.io.CAS;
import org.apache.iceberg.io.FileChecksum;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
Expand Down Expand Up @@ -280,11 +281,10 @@ public void testAtomicPartialWrite() throws IOException {
assertThat(in.exists()).isTrue();

// overwrite fails, checksum does not match
final AtomicOutputFile overwrite = io.newOutputFile(in);
final AtomicOutputFile<CAS> overwrite = io.newOutputFile(in);
final byte[] overbytes = new byte[1024 * 1024];
random.nextBytes(overbytes);
final FileChecksum chk = overwrite.checksum();
chk.update(overbytes, 0, 1024 * 1024);
final CAS chk = overwrite.prepare(() -> new ByteArrayInputStream(overbytes));
// precondition not met (bad checksum)
UnexpectedLengthException chkFailure =
Assertions.assertThrows(
Expand Down Expand Up @@ -337,7 +337,7 @@ public void scratchADLS() {
client.uploadWithResponse(
new FileParallelUploadOptions(new ByteArrayInputStream(overbytes))
.setRequestConditions(cond2)
.setHeaders(new PathHttpHeaders().setContentMd5(chk.asBytes())),
.setHeaders(new PathHttpHeaders().setContentMd5(chk.contentChecksumBytes())),
null, // no timeout
Context.NONE);
PathInfo info2 = resp.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static void internalWrite(
public static void internalWrite(TableMetadata metadata, OutputStream out, boolean close) {
OutputStreamWriter writer = null;
try {
writer = new OutputStreamWriter(out);
writer = new OutputStreamWriter(out, StandardCharsets.UTF_8);
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
Expand Down
Loading

0 comments on commit 0399001

Please sign in to comment.