-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race-conditions in IdealStateGroupCommit #14214
Fix race-conditions in IdealStateGroupCommit #14214
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14214 +/- ##
============================================
+ Coverage 61.75% 63.85% +2.09%
- Complexity 207 1535 +1328
============================================
Files 2436 2623 +187
Lines 133233 144420 +11187
Branches 20636 22099 +1463
============================================
+ Hits 82274 92215 +9941
- Misses 44911 45398 +487
- Partials 6048 6807 +759
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@@ -124,34 +124,27 @@ public IdealState commit(HelixManager helixManager, String resourceName, | |||
Entry first = queue._pending.poll(); | |||
processed.add(first); | |||
String mergedResourceName = first._resourceName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite related to the fix, but I think
Queue queue = getQueue(resourceName);
so queue should already only has resourceName?
Why do we pull mergedResourceName again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite --
private Queue getQueue(String resourceName) {
return _queues[(resourceName.hashCode() & Integer.MAX_VALUE) % _queues.length];
}
So just it's a bit likely the queue only has 1 resource but not necessarily
it.remove(); | ||
} | ||
return updatedIdealState; | ||
}, | ||
retryPolicy, noChangeOk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite related to the fix, but if some max attempt failure happens, we would lost track of the series of IS updates removed from the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the code+fix correctly,
- We could have multiple threads adding different segments to the idealstate.
- This case is fixed in line 142 by passing
updatedIdealState
to the updater, so that all updates are applied cumulatively. - The first thread that enters this logic may end updating all of the segments that other threads have added to the queue.
- The other threads are busy-waiting, and will find the queue empty when their turn comes (i.e. they set
queue._running
to themselves and get to examine the queue), and will return SUCCESS to the caller (irrespective of whether the updating thread has succeeded or not).
Is my understanding right? If so, we should add a status of some sort to each entry, and have only the thread that enters the entry remove it, so that all threads can return errors. Otherwise, the inbound call (a segment push) that caused the first thread to happen will think that it could not push the segment, and a retry may succeed pushing that segment. The other segment pushers will think that their push succeeded and may never retry.
@dinoocch or @jasperjiaguo can you validate this?
In fact, things may get more interesting if the retries start to get added as new entries ... (can that happen?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mcvsubbu do you mean in the case of an exception writing to zk?
I'm not fully sure of the intent, but the current code has the following behavior:
- The executing thread will throw the exception
- Any grouped executions would
return null
Particularly, the code uses Entry._sent
to indicate an entry was already processed and this is the condition for the while loop. So I believe the worst case is that we might either (a) run one extra iteration of the processing or (b) wait 10ms.
We do lose the exception though in all other threads (I don't know if that is an issue or not though/if null is enough to imply an error).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as @jasperjiaguo. If the retry eventually fails, the algorithm would have removed the changes from the queue and we would lose them. Perhaps we need to only remove from queue once we know that the update (or retry) has been successful.
Thanks for debugging this! Since this piece of code is borrowed from Helix |
I think the helix code linked in the original pr perhaps has a different, related issue -- success = accessor.set(mergedKey, merged, options); I guess this is most likely using @Override
public boolean set(String path, T record, int options) {
return set(path, record, -1, options);
}
There's also this class HelixGroupCommit This seems to do everything correctly so far as I can tell. Since this class is much more recently modified, I'm assuming it is the one that is used in Helix? But don't know really without investigating. |
} | ||
IdealState finalUpdatedIdealState = updatedIdealState; | ||
updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState, | ||
updateIdealState(helixManager, mergedResourceName, idealState -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
Can you try to update the IdealStateGroupCommitTest.java
test file and see if it passes.
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.IdealStateGroupCommit;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class IdealStateGroupCommitTest {
private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class);
private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance();
private static final String TABLE_NAME = "potato_OFFLINE";
private static final int NUM_PROCESSORS = 10;
private static final int NUM_UPDATES = 2400;
@BeforeClass
public void setUp()
throws Exception {
TEST_INSTANCE.setupSharedStateAndValidate();
IdealState idealState = new IdealState(TABLE_NAME);
idealState.setStateModelDefRef("OnlineOffline");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
idealState.setReplicas("1");
idealState.setNumPartitions(0);
TEST_INSTANCE.getHelixAdmin()
.addResource(TEST_INSTANCE.getHelixClusterName(), TABLE_NAME, idealState);
}
@AfterClass
public void tearDown() {
TEST_INSTANCE.cleanup();
}
@Test
public void testGroupCommit()
throws InterruptedException {
List<IdealStateGroupCommit> groupCommitList = new ArrayList<>();
for (int i = 0; i < NUM_PROCESSORS; i++) {
groupCommitList.add(new IdealStateGroupCommit());
}
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(400);
for (int i = 0; i < NUM_UPDATES; i++) {
Runnable runnable = new IdealStateUpdater(TEST_INSTANCE.getHelixManager(), groupCommitList.get(new Random().nextInt(NUM_PROCESSORS)), TABLE_NAME, i);
newFixedThreadPool.submit(runnable);
}
IdealState idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
while (idealState.getNumPartitions() < NUM_UPDATES) {
Thread.sleep(500);
idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
System.out.println("idealState.getNumPartitions() = " + idealState.getNumPartitions());
}
Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES);
ControllerMetrics controllerMetrics = ControllerMetrics.get();
long idealStateUpdateSuccessCount =
controllerMetrics.getMeteredTableValue(TABLE_NAME, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count();
Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES);
LOGGER.info("{} IdealState update are successfully commited with {} times zk updates.", NUM_UPDATES,
idealStateUpdateSuccessCount);
}
}
class IdealStateUpdater implements Runnable {
private final HelixManager _helixManager;
private final IdealStateGroupCommit _commit;
private final String _tableName;
private final int _i;
public IdealStateUpdater(HelixManager helixManager, IdealStateGroupCommit commit, String tableName, int i) {
_helixManager = helixManager;
_commit = commit;
_tableName = tableName;
_i = i;
}
@Override
public void run() {
_commit.commit(_helixManager, _tableName, new Function<IdealState, IdealState>() {
@Override
public IdealState apply(IdealState idealState) {
idealState.setPartitionState("test_id" + _i, "test_id" + _i, "ONLINE");
return idealState;
}
}, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f), false);
HelixHelper.getTableIdealState(_helixManager, _tableName);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From this test, this is not a correct fix.
processed.add(ent); | ||
updatedIdealState = ent._updater.apply(updatedIdealState); | ||
ent._updatedIdealState = updatedIdealState; | ||
it.remove(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the first retry failed, the second retry will have no entry to update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, very true. Let me fix this tomorrow.
I made some minor changes based on your diff: #14237 |
Sure, we can use that pr instead. I left a couple questions |
This attempts to fix two issues I believe are introduced in #13976
I think both can cause a table's idealstate to be updated into an unintended state.
We luckily found this issue very quickly when upgrading thanks to consistent push failures.
My understanding of the code is that it intends to:
Issue 1 - Missing updates
Previously the code calculated the idealstate value and blindly set it in the
update
. Particularly, if the idealstate is updated between the first and second reads you may unintentionally clobber the update.Issue 2 - Typo in use of
resourceName
instead ofmergedResourceName
We did not actually experience this, but in the original code:
I believe
mergedResourceName
should actually be used instead on the off-chance that we picked up some outstanding update before processing ours.Luckily I think the risk of running into it is practically reduced by having 100 queues.