Skip to content

Commit

Permalink
fix UT
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Aug 24, 2023
1 parent 3741e9b commit bfb63e6
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ public abstract class TopicManager implements Cleanable {

protected ClientContext context;
protected QueryConsumeConfig queryConsumeConfig;
protected final ForkJoinPool pool;

public TopicManager(ClientContext context, QueryConsumeConfig queryConsumeConfig) {
this.context = context;
this.queryConsumeConfig = queryConsumeConfig;
pool = new ForkJoinPool(context.config.getThreadPoolSize());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -65,6 +66,8 @@ public class InlongTopicManager extends TopicManager {
private final Map<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>();
private final Map<String, TubeConsumerCreator> tubeFactories = new ConcurrentHashMap<>();

protected final ForkJoinPool pool;

private volatile boolean stopAssign = false;

private Collection<InLongTopic> assignedTopics;
Expand All @@ -73,6 +76,7 @@ public InlongTopicManager(ClientContext context, QueryConsumeConfig queryConsume
super(context, queryConsumeConfig);
executor.scheduleWithFixedDelay(this::updateMetaData, 0L,
context.getConfig().getUpdateMetaDataIntervalSec(), TimeUnit.SECONDS);
pool = new ForkJoinPool(context.getConfig().getThreadPoolSize());
}

@Override
Expand Down

0 comments on commit bfb63e6

Please sign in to comment.