-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BMM Restart Improvements: Bug fixes and logging improvements #926
Conversation
@@ -268,8 +271,7 @@ public void start() { | |||
// Initializing executor services | |||
_scheduledExecutor = Executors.newSingleThreadScheduledExecutor( | |||
new ThreadFactoryBuilder().setNameFormat("CoordinatorScheduledExecutor-%d").build()); | |||
// TODO Assess whether having a single threaded executor for token claim tasks is sufficient or it will be exhausted | |||
_tokenClaimExecutor = Executors.newSingleThreadExecutor( | |||
_tokenClaimExecutor = Executors.newFixedThreadPool(TOKEN_CLAIM_THREAD_POOL_SIZE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A single threaded token claim executor will get backed up in the following cases:
- The leader will try to schedule 2 tasks in this executor upon a stream stop: (1) a task for polling the Zookeeper to check whether stop has propagated and completed across the cluster (2) a task for claiming the token assigned to itself.
- Concurrent requests to stop different streams.
A single threaded executor won't work here. For now I increased the thread pool size to 8 threads. That will let the cluster process up to 8 concurrent requests without being backed up (note that token claim tasks are short lived unlike the poll tasks, so they won't be the bottleneck here). I'll monitor how this performs in EI and make adjustments accordingly. A config property can also be added in the future if we end up tinkering with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great catch!
@@ -869,15 +890,18 @@ boolean connectorTasksHaveStopped(String connectorName, Set<String> stoppingTask | |||
@VisibleForTesting | |||
static List<Datastream> inferStoppingDatastreamsFromAssignment(List<DatastreamTask> newAssignment, | |||
List<DatastreamTask> removedTasks) { | |||
Map<String, List<Datastream>> taskPrefixToDatastream = removedTasks.stream(). | |||
collect(Collectors.toMap(DatastreamTask::getTaskPrefix, DatastreamTask::getDatastreams)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For streams with >1 task assigned to a host, this was causing the task claim thread to silently fail with a DuplicateKeyException
collect(Collectors.toList()); | ||
List<String> newAssignmentTaskNames = newAssignment.stream().map(DatastreamTask::getDatastreamTaskName). | ||
collect(Collectors.toList()); | ||
_log.debug("Claiming assignment tokens. Old assignment: {}", oldAssignmentTaskNames); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shouldn't you rather want these logs after / (inside when) a thread is scheduled for claiming the assignment tokens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just wanted these printed before scheduling the task for claiming tokens. These logs helped me figure out the bug with the dying tokens claim thread. I'm also thinking to clean these up once the feature is tested more and rolled out in production.
datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
Show resolved
Hide resolved
@@ -268,8 +271,7 @@ public void start() { | |||
// Initializing executor services | |||
_scheduledExecutor = Executors.newSingleThreadScheduledExecutor( | |||
new ThreadFactoryBuilder().setNameFormat("CoordinatorScheduledExecutor-%d").build()); | |||
// TODO Assess whether having a single threaded executor for token claim tasks is sufficient or it will be exhausted | |||
_tokenClaimExecutor = Executors.newSingleThreadExecutor( | |||
_tokenClaimExecutor = Executors.newFixedThreadPool(TOKEN_CLAIM_THREAD_POOL_SIZE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great catch!
This pull request addresses bugs and makes logging improvements in Assignment Tokens Feature (#919 #921 #922 #924)