From eb55096d82e06d40310a196a3ca1a4864c2ad451 Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Tue, 28 Feb 2023 11:47:02 +0100 Subject: [PATCH] Make LocalOnlyLock reentrant It did not support reentrancy. This was a problem at least in following case: worker -> updateTask -> throw new TerminateWorkflowException -> terminateWorkflow The lock was acquired twice during above execution order. With no reentrancy, the second lock (wf terminateion) failed and sweeper had to wait until the first lock (task update) was released and only then workflow could be terminated. Signed-off-by: Maros Marsalek --- .../core/sync/local/LocalOnlyLock.java | 31 +++---- .../core/sync/local/LocalOnlyLockTest.java | 83 +++++++++++++++---- 2 files changed, 81 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/sync/local/LocalOnlyLock.java b/core/src/main/java/com/netflix/conductor/core/sync/local/LocalOnlyLock.java index ec5f6eec0c..17cbee4fec 100644 --- a/core/src/main/java/com/netflix/conductor/core/sync/local/LocalOnlyLock.java +++ b/core/src/main/java/com/netflix/conductor/core/sync/local/LocalOnlyLock.java @@ -16,9 +16,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,16 +34,10 @@ public class LocalOnlyLock implements Lock { private static final Logger LOGGER = LoggerFactory.getLogger(LocalOnlyLock.class); - private static final CacheLoader LOADER = - new CacheLoader() { - @Override - public Semaphore load(String key) { - return new Semaphore(1, true); - } - }; + private static final CacheLoader LOADER = key -> new ReentrantLock(true); private static final ConcurrentHashMap> SCHEDULEDFUTURES = new ConcurrentHashMap<>(); - private static final LoadingCache LOCKIDTOSEMAPHOREMAP = + private static final LoadingCache LOCKIDTOSEMAPHOREMAP = Caffeine.newBuilder().build(LOADER); private static final ThreadGroup THREAD_GROUP = new ThreadGroup("LocalOnlyLock-scheduler"); private static final ThreadFactory THREAD_FACTORY = @@ -54,14 +48,14 @@ public Semaphore load(String key) { @Override public void acquireLock(String lockId) { LOGGER.trace("Locking {}", lockId); - LOCKIDTOSEMAPHOREMAP.get(lockId).acquireUninterruptibly(); + LOCKIDTOSEMAPHOREMAP.get(lockId).lock(); } @Override public boolean acquireLock(String lockId, long timeToTry, TimeUnit unit) { try { LOGGER.trace("Locking {} with timeout {} {}", lockId, timeToTry, unit); - return LOCKIDTOSEMAPHOREMAP.get(lockId).tryAcquire(timeToTry, unit); + return LOCKIDTOSEMAPHOREMAP.get(lockId).tryLock(timeToTry, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -80,7 +74,7 @@ public boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUn if (acquireLock(lockId, timeToTry, unit)) { LOGGER.trace("Releasing {} automatically after {} {}", lockId, leaseTime, unit); SCHEDULEDFUTURES.put( - lockId, SCHEDULER.schedule(() -> releaseLock(lockId), leaseTime, unit)); + lockId, SCHEDULER.schedule(() -> deleteLock(lockId), leaseTime, unit)); return true; } return false; @@ -97,14 +91,13 @@ private void removeLeaseExpirationJob(String lockId) { @Override public void releaseLock(String lockId) { // Synchronized to prevent race condition between semaphore check and actual release - // The check is here to prevent semaphore getting above 1 - // e.g. in case when lease runs out but release is also called synchronized (LOCKIDTOSEMAPHOREMAP) { - if (LOCKIDTOSEMAPHOREMAP.get(lockId).availablePermits() == 0) { - LOGGER.trace("Releasing {}", lockId); - LOCKIDTOSEMAPHOREMAP.get(lockId).release(); - removeLeaseExpirationJob(lockId); + if (LOCKIDTOSEMAPHOREMAP.getIfPresent(lockId) == null) { + return; } + LOGGER.trace("Releasing {}", lockId); + LOCKIDTOSEMAPHOREMAP.get(lockId).unlock(); + removeLeaseExpirationJob(lockId); } } @@ -115,7 +108,7 @@ public void deleteLock(String lockId) { } @VisibleForTesting - LoadingCache cache() { + LoadingCache cache() { return LOCKIDTOSEMAPHOREMAP; } diff --git a/core/src/test/java/com/netflix/conductor/core/sync/local/LocalOnlyLockTest.java b/core/src/test/java/com/netflix/conductor/core/sync/local/LocalOnlyLockTest.java index 9ce38addc3..7fc95a54c3 100644 --- a/core/src/test/java/com/netflix/conductor/core/sync/local/LocalOnlyLockTest.java +++ b/core/src/test/java/com/netflix/conductor/core/sync/local/LocalOnlyLockTest.java @@ -12,8 +12,12 @@ */ package com.netflix.conductor.core.sync.local; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Ignore; import org.junit.Test; import org.springframework.boot.test.context.runner.ApplicationContextRunner; @@ -22,6 +26,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; @Ignore // Test always times out in CI environment @@ -30,38 +35,71 @@ public class LocalOnlyLockTest { // Lock can be global since it uses global cache internally private final LocalOnlyLock localOnlyLock = new LocalOnlyLock(); + @After + public void tearDown() { + // Clean caches between tests as they are shared globally + localOnlyLock.cache().invalidateAll(); + localOnlyLock.scheduledFutures().values().forEach(f -> f.cancel(false)); + localOnlyLock.scheduledFutures().clear(); + } + @Test public void testLockUnlock() { - localOnlyLock.acquireLock("a", 100, 1000, TimeUnit.MILLISECONDS); + final boolean a = localOnlyLock.acquireLock("a", 100, 10000, TimeUnit.MILLISECONDS); + assertTrue(a); assertEquals(localOnlyLock.cache().estimatedSize(), 1); - assertEquals(localOnlyLock.cache().get("a").availablePermits(), 0); + assertEquals(localOnlyLock.cache().get("a").isLocked(), true); assertEquals(localOnlyLock.scheduledFutures().size(), 1); localOnlyLock.releaseLock("a"); assertEquals(localOnlyLock.scheduledFutures().size(), 0); - assertEquals(localOnlyLock.cache().get("a").availablePermits(), 1); + assertEquals(localOnlyLock.cache().get("a").isLocked(), false); localOnlyLock.deleteLock("a"); assertEquals(localOnlyLock.cache().estimatedSize(), 0); } @Test(timeout = 10 * 10_000) - public void testLockTimeout() { - localOnlyLock.acquireLock("c", 100, 1000, TimeUnit.MILLISECONDS); + public void testLockTimeout() throws InterruptedException, ExecutionException { + final ExecutorService executor = Executors.newFixedThreadPool(1); + executor.submit( + () -> { + localOnlyLock.acquireLock("c", 100, 1000, TimeUnit.MILLISECONDS); + }) + .get(); assertTrue(localOnlyLock.acquireLock("d", 100, 1000, TimeUnit.MILLISECONDS)); assertFalse(localOnlyLock.acquireLock("c", 100, 1000, TimeUnit.MILLISECONDS)); assertEquals(localOnlyLock.scheduledFutures().size(), 2); - localOnlyLock.releaseLock("c"); + executor.submit( + () -> { + localOnlyLock.releaseLock("c"); + }) + .get(); localOnlyLock.releaseLock("d"); assertEquals(localOnlyLock.scheduledFutures().size(), 0); } @Test(timeout = 10 * 10_000) - public void testLockLeaseTime() { - for (int i = 0; i < 10; i++) { - localOnlyLock.acquireLock("a", 1000, 100, TimeUnit.MILLISECONDS); + public void testReleaseFromAnotherThread() throws InterruptedException, ExecutionException { + final ExecutorService executor = Executors.newFixedThreadPool(1); + executor.submit( + () -> { + localOnlyLock.acquireLock("c", 100, 10000, TimeUnit.MILLISECONDS); + }) + .get(); + try { + localOnlyLock.releaseLock("c"); + } catch (IllegalMonitorStateException e) { + // expected + localOnlyLock.deleteLock("c"); + return; + } finally { + executor.submit( + () -> { + localOnlyLock.releaseLock("c"); + }) + .get(); } - localOnlyLock.acquireLock("a"); - assertEquals(0, localOnlyLock.cache().get("a").availablePermits()); - localOnlyLock.releaseLock("a"); + + fail(); } @Test(timeout = 10 * 10_000) @@ -73,7 +111,7 @@ public void testLockLeaseWithRelease() throws Exception { Thread.sleep(2000); localOnlyLock.acquireLock("b"); - assertEquals(0, localOnlyLock.cache().get("b").availablePermits()); + assertEquals(true, localOnlyLock.cache().get("b").isLocked()); localOnlyLock.releaseLock("b"); } @@ -81,7 +119,24 @@ public void testLockLeaseWithRelease() throws Exception { public void testRelease() { localOnlyLock.releaseLock("x54as4d2;23'4"); localOnlyLock.releaseLock("x54as4d2;23'4"); - assertEquals(1, localOnlyLock.cache().get("x54as4d2;23'4").availablePermits()); + assertEquals(false, localOnlyLock.cache().get("x54as4d2;23'4").isLocked()); + } + + @Test(timeout = 10 * 10_000) + public void testLockLeaseTime() throws InterruptedException { + for (int i = 0; i < 10; i++) { + final Thread thread = + new Thread( + () -> { + localOnlyLock.acquireLock("a", 1000, 100, TimeUnit.MILLISECONDS); + }); + thread.start(); + thread.join(); + } + localOnlyLock.acquireLock("a"); + assertTrue(localOnlyLock.cache().get("a").isLocked()); + localOnlyLock.releaseLock("a"); + localOnlyLock.deleteLock("a"); } @Test