Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into embeddings_mltransform
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandInguva committed Dec 11, 2023
2 parents f1bb42c + 5c7ec72 commit 8d0b47d
Show file tree
Hide file tree
Showing 45 changed files with 1,206 additions and 306 deletions.
1 change: 1 addition & 0 deletions .github/workflows/beam_PreCommit_Java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ jobs:
matrix:
job_name: [beam_PreCommit_Java]
job_phrase: [Run Java PreCommit]
timeout-minutes: 120
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -42,7 +40,6 @@
import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
Expand Down Expand Up @@ -149,19 +146,11 @@ public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
// Add all the source splits being actively read.
beamSourceReaders.forEach(
(splitId, readerAndOutput) -> {
Source.Reader<T> reader = readerAndOutput.reader;
if (reader instanceof BoundedSource.BoundedReader) {
// Sometimes users may decide to run a bounded source in streaming mode as "finite
// stream."
// For bounded source, the checkpoint granularity is the entire source split.
// So, in case of failure, all the data from this split will be consumed again.
splitsState.add(new FlinkSourceSplit<>(splitId, reader.getCurrentSource()));
} else if (reader instanceof UnboundedSource.UnboundedReader) {
// The checkpoint for unbounded sources is fine granular.
byte[] checkpointState =
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
splitsState.add(
new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), checkpointState));
try {
splitsState.add(getReaderCheckpoint(splitId, readerAndOutput));
} catch (IOException e) {
throw new IllegalStateException(
String.format("Failed to get checkpoint for split %d", splitId), e);
}
});
return splitsState;
Expand Down Expand Up @@ -228,9 +217,17 @@ public void close() throws Exception {
*/
protected abstract CompletableFuture<Void> isAvailableForAliveReaders();

/** Create {@link FlinkSourceSplit} for given {@code splitId}. */
protected abstract FlinkSourceSplit<T> getReaderCheckpoint(
int splitId, ReaderAndOutput readerAndOutput) throws IOException;

/** Create {@link Source.Reader} for given {@link FlinkSourceSplit}. */
protected abstract Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
throws IOException;

// ----------------- protected helper methods for subclasses --------------------

protected Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {
protected final Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {
FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
if (sourceSplit != null) {
Source.Reader<T> reader = createReader(sourceSplit);
Expand All @@ -241,7 +238,7 @@ protected Optional<ReaderAndOutput> createAndTrackNextReader() throws IOExceptio
return Optional.empty();
}

protected void finishSplit(int splitIndex) throws IOException {
protected final void finishSplit(int splitIndex) throws IOException {
ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
if (readerAndOutput != null) {
LOG.info("Finished reading from split {}", readerAndOutput.splitId);
Expand All @@ -252,7 +249,7 @@ protected void finishSplit(int splitIndex) throws IOException {
}
}

protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
protected final boolean checkIdleTimeoutAndMaybeStartCountdown() {
if (idleTimeoutMs <= 0) {
idleTimeoutFuture.complete(null);
} else if (!idleTimeoutCountingDown) {
Expand All @@ -262,7 +259,7 @@ protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
return idleTimeoutFuture.isDone();
}

protected boolean noMoreSplits() {
protected final boolean noMoreSplits() {
return noMoreSplits;
}

Expand Down Expand Up @@ -308,49 +305,6 @@ protected Map<Integer, ReaderAndOutput> allReaders() {
protected static void ignoreReturnValue(Object o) {
// do nothing.
}
// ------------------------------ private methods ------------------------------

@SuppressWarnings("unchecked")
private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<OutputT> reader) {
UnboundedSource<OutputT, CheckpointMarkT> source =
(UnboundedSource<OutputT, CheckpointMarkT>) reader.getCurrentSource();
CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark();
Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
coder.encode(checkpointMark, baos);
return baos.toByteArray();
} catch (IOException ioe) {
throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
}
}

private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
throws IOException {
Source<T> beamSource = sourceSplit.getBeamSplitSource();
if (beamSource instanceof BoundedSource) {
return ((BoundedSource<T>) beamSource).createReader(pipelineOptions);
} else if (beamSource instanceof UnboundedSource) {
return createUnboundedSourceReader(beamSource, sourceSplit.getSplitState());
} else {
throw new IllegalStateException("Unknown source type " + beamSource.getClass());
}
}

private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Source.Reader<T> createUnboundedSourceReader(
Source<T> beamSource, @Nullable byte[] splitState) throws IOException {
UnboundedSource<T, CheckpointMarkT> unboundedSource =
(UnboundedSource<T, CheckpointMarkT>) beamSource;
Coder<CheckpointMarkT> coder = unboundedSource.getCheckpointMarkCoder();
if (splitState == null) {
return unboundedSource.createReader(pipelineOptions, null);
} else {
try (ByteArrayInputStream bais = new ByteArrayInputStream(splitState)) {
return unboundedSource.createReader(pipelineOptions, coder.decode(bais));
}
}
}

// -------------------- protected helper class ---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public void addReader(int subtaskId) {
List<FlinkSourceSplit<T>> splitsForSubtask = pendingSplits.remove(subtaskId);
if (splitsForSubtask != null) {
assignSplitsAndLog(splitsForSubtask, subtaskId);
pendingSplits.remove(subtaskId);
} else {
if (splitsInitialized) {
LOG.info("There is no split for subtask {}. Signaling no more splits.", subtaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
Expand All @@ -50,6 +61,8 @@
*/
public class FlinkBoundedSourceReader<T> extends FlinkSourceReaderBase<T, WindowedValue<T>> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkBoundedSourceReader.class);
private static final VarLongCoder LONG_CODER = VarLongCoder.of();
private final Map<Integer, Long> consumedFromSplit = new HashMap<>();
private @Nullable Source.Reader<T> currentReader;
private int currentSplitId;

Expand All @@ -62,6 +75,40 @@ public FlinkBoundedSourceReader(
currentSplitId = -1;
}

@Override
protected FlinkSourceSplit<T> getReaderCheckpoint(int splitId, ReaderAndOutput readerAndOutput)
throws CoderException {
// Sometimes users may decide to run a bounded source in streaming mode as "finite
// stream."
// For bounded source, the checkpoint granularity is the entire source split.
// So, in case of failure, all the data from this split will be consumed again.
return new FlinkSourceSplit<>(
splitId, readerAndOutput.reader.getCurrentSource(), asBytes(consumedFromSplit(splitId)));
}

@Override
protected Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
throws IOException {
Source<T> beamSource = sourceSplit.getBeamSplitSource();
byte[] state = sourceSplit.getSplitState();
if (state != null) {
consumedFromSplit.put(Integer.parseInt(sourceSplit.splitId()), fromBytes(state));
}
return ((BoundedSource<T>) beamSource).createReader(pipelineOptions);
}

private byte[] asBytes(long l) throws CoderException {
return CoderUtils.encodeToByteArray(LONG_CODER, l);
}

private long fromBytes(byte[] b) throws CoderException {
return CoderUtils.decodeFromByteArray(LONG_CODER, b);
}

private long consumedFromSplit(int splitId) {
return consumedFromSplit.getOrDefault(splitId, 0L);
}

@VisibleForTesting
protected FlinkBoundedSourceReader(
String stepName,
Expand All @@ -78,26 +125,28 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<T>> output) throws Except
checkExceptionAndMaybeThrow();
if (currentReader == null && !moveToNextNonEmptyReader()) {
// Nothing to read for now.
if (noMoreSplits() && checkIdleTimeoutAndMaybeStartCountdown()) {
// All the source splits have been read and idle timeout has passed.
LOG.info(
"All splits have finished reading, and idle time {} ms has passed.", idleTimeoutMs);
return InputStatus.END_OF_INPUT;
} else {
// This reader either hasn't received NoMoreSplitsEvent yet or it is waiting for idle
// timeout.
return InputStatus.NOTHING_AVAILABLE;
if (noMoreSplits()) {
output.emitWatermark(Watermark.MAX_WATERMARK);
if (checkIdleTimeoutAndMaybeStartCountdown()) {
// All the source splits have been read and idle timeout has passed.
LOG.info(
"All splits have finished reading, and idle time {} ms has passed.", idleTimeoutMs);
return InputStatus.END_OF_INPUT;
}
}
// This reader either hasn't received NoMoreSplitsEvent yet or it is waiting for idle
// timeout.
return InputStatus.NOTHING_AVAILABLE;
}
Source.Reader<T> tempCurrentReader = currentReader;
if (tempCurrentReader != null) {
T record = tempCurrentReader.getCurrent();
if (currentReader != null) {
// make null checks happy
final @Nonnull Source.Reader<T> splitReader = currentReader;
// store number of processed elements from this split
consumedFromSplit.compute(currentSplitId, (k, v) -> v == null ? 1 : v + 1);
T record = splitReader.getCurrent();
WindowedValue<T> windowedValue =
WindowedValue.of(
record,
tempCurrentReader.getCurrentTimestamp(),
GlobalWindow.INSTANCE,
PaneInfo.NO_FIRING);
record, splitReader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
if (timestampExtractor == null) {
output.collect(windowedValue);
} else {
Expand All @@ -107,11 +156,12 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<T>> output) throws Except
// If the advance() invocation throws exception here, the job will just fail over and read
// everything again from
// the beginning. So the failover granularity is the entire Flink job.
if (!invocationUtil.invokeAdvance(tempCurrentReader)) {
if (!invocationUtil.invokeAdvance(splitReader)) {
finishSplit(currentSplitId);
consumedFromSplit.remove(currentSplitId);
LOG.debug("Finished reading from {}", currentSplitId);
currentReader = null;
currentSplitId = -1;
LOG.debug("Finished reading from {}", currentSplitId);
}
// Always return MORE_AVAILABLE here regardless of the availability of next record. If there
// is no more
Expand All @@ -138,6 +188,12 @@ private boolean moveToNextNonEmptyReader() throws IOException {
if (invocationUtil.invokeStart(rao.reader)) {
currentSplitId = Integer.parseInt(rao.splitId);
currentReader = rao.reader;
long toSkipAfterStart =
MoreObjects.firstNonNull(consumedFromSplit.remove(currentSplitId), 0L);
@Nonnull Source.Reader<T> reader = Preconditions.checkArgumentNotNull(currentReader);
while (toSkipAfterStart > 0 && reader.advance()) {
toSkipAfterStart--;
}
return true;
} else {
finishSplit(Integer.parseInt(rao.splitId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -25,9 +27,12 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -179,6 +184,22 @@ protected CompletableFuture<Void> isAvailableForAliveReaders() {
}
}

@Override
protected FlinkSourceSplit<T> getReaderCheckpoint(int splitId, ReaderAndOutput readerAndOutput) {
// The checkpoint for unbounded sources is fine granular.
byte[] checkpointState =
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<T>) readerAndOutput.reader);
return new FlinkSourceSplit<>(
splitId, readerAndOutput.reader.getCurrentSource(), checkpointState);
}

@Override
protected Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
throws IOException {
Source<T> beamSource = sourceSplit.getBeamSplitSource();
return createUnboundedSourceReader(beamSource, sourceSplit.getSplitState());
}

// -------------- private helper methods ----------------

private void emitRecord(
Expand Down Expand Up @@ -274,4 +295,34 @@ private void createPendingBytesGauge(SourceReaderContext context) {
return pendingBytes;
});
}

@SuppressWarnings("unchecked")
private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<T> reader) {
UnboundedSource<T, CheckpointMarkT> source =
(UnboundedSource<T, CheckpointMarkT>) reader.getCurrentSource();
CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark();
Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
coder.encode(checkpointMark, baos);
return baos.toByteArray();
} catch (IOException ioe) {
throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
}
}

private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Source.Reader<T> createUnboundedSourceReader(
Source<T> beamSource, @Nullable byte[] splitState) throws IOException {
UnboundedSource<T, CheckpointMarkT> unboundedSource =
(UnboundedSource<T, CheckpointMarkT>) beamSource;
Coder<CheckpointMarkT> coder = unboundedSource.getCheckpointMarkCoder();
if (splitState == null) {
return unboundedSource.createReader(pipelineOptions, null);
} else {
try (ByteArrayInputStream bais = new ByteArrayInputStream(splitState)) {
return unboundedSource.createReader(pipelineOptions, coder.decode(bais));
}
}
}
}
Loading

0 comments on commit 8d0b47d

Please sign in to comment.