Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not create new Executor everytime createRunner #32272

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.joda.time.Duration;
Expand Down Expand Up @@ -126,7 +127,11 @@ public void initializeState(StateInitializationContext context) throws Exception
// this will implicitly be keyed like the StateInternalsFactory
TimerInternalsFactory<byte[]> timerInternalsFactory = key -> timerInternals;

executorService = Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());
if (this.executorService == null) {
this.executorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("flink-sdf-executor-%d").build());
}

((ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
((ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
Expand Down Expand Up @@ -191,10 +196,12 @@ public void close() throws Exception {
"The scheduled executor service did not properly terminate. Shutting "
+ "it down now.");
executorService.shutdownNow();
executorService = null;
}
} catch (InterruptedException e) {
LOG.debug("Could not properly await the termination of the scheduled executor service.", e);
executorService.shutdownNow();
executorService = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.KeyedWorkItem;
Expand Down Expand Up @@ -56,6 +58,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand Down Expand Up @@ -115,7 +118,10 @@ private static class ProcessFnExtractor implements UserParDoFnFactory.DoFnExtrac
private static class SplittableDoFnRunnerFactory<
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
implements DoFnRunnerFactory<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
private final AtomicReference<ScheduledExecutorService> ses = new AtomicReference<>();

@Override
@SuppressWarnings("nullness") // nullable atomic reference guaranteed nonnull when get
public DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> createRunner(
DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> fn,
PipelineOptions options,
Expand All @@ -131,6 +137,13 @@ public DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> crea
OutputManager outputManager,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
if (this.ses.get() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remembered a cleaner way to do this:

private final Supplier sesSupplier =
Suppliers.memoize(
() -> Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setNameFormat("df-sdf-executor-%d").build())));

and then just ses.get() below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, good to learn, for now merging it before release cut

this.ses.compareAndSet(
null,
Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setNameFormat("df-sdf-executor-%d").build()));
}
ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> processFn =
(ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>) fn;
processFn.setStateInternalsFactory(key -> (StateInternals) stepContext.stateInternals());
Expand Down Expand Up @@ -162,7 +175,7 @@ public <T> void outputWindowedValue(
}
},
sideInputReader,
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
ses.get(),
// Commit at least once every 10 seconds or 10k records. This keeps the watermark
// advancing smoothly, and ensures that not too much work will have to be reprocessed
// in the event of a crash.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.KeyedWorkItem;
Expand Down Expand Up @@ -49,9 +50,11 @@
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,6 +84,7 @@ public class SplittableParDoProcessKeyedElementsOp<
private transient SamzaTimerInternalsFactory<byte[]> timerInternalsFactory;
private transient DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> fnRunner;
private transient SamzaPipelineOptions pipelineOptions;
private transient @MonotonicNonNull ScheduledExecutorService ses = null;

public SplittableParDoProcessKeyedElementsOp(
TupleTag<OutputT> mainOutputTag,
Expand Down Expand Up @@ -137,6 +141,12 @@ public void open(
isBounded,
pipelineOptions);

if (this.ses == null) {
this.ses =
Executors.newSingleThreadScheduledExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not yet using this below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the mistake

new ThreadFactoryBuilder().setNameFormat("samza-sdf-executor-%d").build());
}

final KeyedInternals<byte[]> keyedInternals =
new KeyedInternals<>(stateInternalsFactory, timerInternalsFactory);

Expand Down Expand Up @@ -172,7 +182,7 @@ public <AdditionalOutputT> void outputWindowedValue(
}
},
NullSideInputReader.empty(),
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
ses,
10000,
Duration.standardSeconds(10),
() -> {
Expand Down
Loading