Skip to content

Commit

Permalink
Merge pull request #15510 from [BEAM-12883] Add coder for ReadableFil…
Browse files Browse the repository at this point in the history
…eCoder that supports MetadataCoderV2

* add coder readable files

* add coder readable files

* :sdks:java:core:spotlessApply

* [BEAM-12883] add ability tp set custom MetadataCoder for ReadableFileCoder

* [BEAM-12883] add ability tp set custom MetadataCoder for ReadableFileCoder

* [BEAM-12883] add ability t0 set custom MetadataCoder using StructuredCoder

* [BEAM-12883] remove asterisk-based import

Co-authored-by: brachipa <brachipa@moonactive.com>
  • Loading branch information
brachi-wernick and brachipa authored Oct 6, 2021
1 parent abdd396 commit 2028e6c
Showing 1 changed file with 47 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,66 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.AtomicCoder;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MetadataCoder;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

/** A {@link Coder} for {@link FileIO.ReadableFile}. */
public class ReadableFileCoder extends AtomicCoder<FileIO.ReadableFile> {
private static final ReadableFileCoder INSTANCE = new ReadableFileCoder();
/** A {@link Coder} for {@link org.apache.beam.sdk.io.FileIO.ReadableFile}. */
public class ReadableFileCoder extends StructuredCoder<FileIO.ReadableFile> {

private final Coder<Metadata> metadataCoder;

public static ReadableFileCoder of(Coder<Metadata> metadataCoder) {
return new ReadableFileCoder(metadataCoder);
}

/** Returns the instance of {@link ReadableFileCoder}. */
public static ReadableFileCoder of() {
return INSTANCE;
return new ReadableFileCoder(MetadataCoder.of());
}

public Coder<Metadata> getMetadataCoder() {
return metadataCoder;
}

private ReadableFileCoder(Coder<Metadata> metadataCoder) {
this.metadataCoder = metadataCoder;
}

@Override
public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException {
MetadataCoder.of().encode(value.getMetadata(), os);
VarIntCoder.of().encode(value.getCompression().ordinal(), os);
public void encode(
FileIO.ReadableFile value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream)
throws CoderException, IOException {
getMetadataCoder().encode(value.getMetadata(), outStream);
VarIntCoder.of().encode(value.getCompression().ordinal(), outStream);
}

@Override
public FileIO.ReadableFile decode(InputStream is) throws IOException {
MatchResult.Metadata metadata = MetadataCoder.of().decode(is);
Compression compression = Compression.values()[VarIntCoder.of().decode(is)];
public FileIO.ReadableFile decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
throws CoderException, IOException {
MatchResult.Metadata metadata = getMetadataCoder().decode(inStream);
Compression compression = Compression.values()[VarIntCoder.of().decode(inStream)];
return new FileIO.ReadableFile(metadata, compression);
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<? extends Coder<?>> getCoderArguments() {
return Collections.singletonList(metadataCoder);
}

@Override
public void verifyDeterministic() throws NonDeterministicException {
// ignore the default Metadata coder for backward compatible
if (!getMetadataCoder().equals(MetadataCoder.of())) {
verifyDeterministic(this, "Metadata coder must be deterministic", getMetadataCoder());
}
}
}

0 comments on commit 2028e6c

Please sign in to comment.