Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not create new Executor everytime createRunner #32272

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.joda.time.Duration;
Expand Down Expand Up @@ -126,7 +127,11 @@ public void initializeState(StateInitializationContext context) throws Exception
// this will implicitly be keyed like the StateInternalsFactory
TimerInternalsFactory<byte[]> timerInternalsFactory = key -> timerInternals;

executorService = Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());
if (this.executorService == null) {
this.executorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("flink-sdf-executor-%d").build());
}

((ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
((ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
Expand Down Expand Up @@ -191,10 +196,12 @@ public void close() throws Exception {
"The scheduled executor service did not properly terminate. Shutting "
+ "it down now.");
executorService.shutdownNow();
executorService = null;
}
} catch (InterruptedException e) {
LOG.debug("Could not properly await the termination of the scheduled executor service.", e);
executorService.shutdownNow();
executorService = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.KeyedWorkItem;
Expand Down Expand Up @@ -56,6 +57,8 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand Down Expand Up @@ -115,6 +118,8 @@ private static class ProcessFnExtractor implements UserParDoFnFactory.DoFnExtrac
private static class SplittableDoFnRunnerFactory<
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
implements DoFnRunnerFactory<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
private transient @MonotonicNonNull ScheduledExecutorService ses = null;

@Override
public DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> createRunner(
DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> fn,
Expand All @@ -131,6 +136,11 @@ public DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> crea
OutputManager outputManager,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
if (this.ses == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added debug log in both branch and run a sample job, saw

INFO 2024-08-21T18:47:53.569Z creating new sdf executor
INFO 2024-08-21T18:47:55.698Z Reuse old sdf executor
INFO 2024-08-21T18:47:56.812Z Reuse old sdf executor
INFO 2024-08-21T18:47:57.919Z Reuse old sdf executor
INFO 2024-08-21T18:47:59.030Z Reuse old sdf executor
INFO 2024-08-21T18:48:00.138Z Reuse old sdf executor
INFO 2024-08-21T18:48:00.699Z Reuse old sdf executor
INFO 2024-08-21T18:48:01.871Z Reuse old sdf executor
INFO 2024-08-21T18:48:03.044Z Reuse old sdf executor
INFO 2024-08-21T18:48:04.121Z Reuse old sdf executor
INFO 2024-08-21T18:48:04.715Z Reuse old sdf executor
INFO 2024-08-21T18:48:05.821Z Reuse old sdf executor
...

indicating this change is effective

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this factory possibly called concurrently? might need some synchronization if so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure but now guard with AtomicReference

this.ses =
Executors.newSingleThreadScheduledExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want a single threaded one here, because I believe the factory vends many different dofnrunner which will want some parallel threads for splits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to use Executors.newScheduledThreadPool

new ThreadFactoryBuilder().setNameFormat("df-sdf-executor-%d").build());
}
ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> processFn =
(ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>) fn;
processFn.setStateInternalsFactory(key -> (StateInternals) stepContext.stateInternals());
Expand Down Expand Up @@ -162,7 +172,7 @@ public <T> void outputWindowedValue(
}
},
sideInputReader,
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
ses,
// Commit at least once every 10 seconds or 10k records. This keeps the watermark
// advancing smoothly, and ensures that not too much work will have to be reprocessed
// in the event of a crash.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.KeyedWorkItem;
Expand Down Expand Up @@ -49,9 +50,11 @@
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,6 +84,7 @@ public class SplittableParDoProcessKeyedElementsOp<
private transient SamzaTimerInternalsFactory<byte[]> timerInternalsFactory;
private transient DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> fnRunner;
private transient SamzaPipelineOptions pipelineOptions;
private transient @MonotonicNonNull ScheduledExecutorService ses = null;

public SplittableParDoProcessKeyedElementsOp(
TupleTag<OutputT> mainOutputTag,
Expand Down Expand Up @@ -137,6 +141,12 @@ public void open(
isBounded,
pipelineOptions);

if (this.ses == null) {
this.ses =
Executors.newSingleThreadScheduledExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not yet using this below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the mistake

new ThreadFactoryBuilder().setNameFormat("samza-sdf-executor-%d").build());
}

final KeyedInternals<byte[]> keyedInternals =
new KeyedInternals<>(stateInternalsFactory, timerInternalsFactory);

Expand Down
Loading