Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Make LocalOnlyLock reentrant #3513

Merged
merged 1 commit into from
Mar 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,16 +34,10 @@ public class LocalOnlyLock implements Lock {

private static final Logger LOGGER = LoggerFactory.getLogger(LocalOnlyLock.class);

private static final CacheLoader<String, Semaphore> LOADER =
new CacheLoader<String, Semaphore>() {
@Override
public Semaphore load(String key) {
return new Semaphore(1, true);
}
};
private static final CacheLoader<String, ReentrantLock> LOADER = key -> new ReentrantLock(true);
private static final ConcurrentHashMap<String, ScheduledFuture<?>> SCHEDULEDFUTURES =
new ConcurrentHashMap<>();
private static final LoadingCache<String, Semaphore> LOCKIDTOSEMAPHOREMAP =
private static final LoadingCache<String, ReentrantLock> LOCKIDTOSEMAPHOREMAP =
Caffeine.newBuilder().build(LOADER);
private static final ThreadGroup THREAD_GROUP = new ThreadGroup("LocalOnlyLock-scheduler");
private static final ThreadFactory THREAD_FACTORY =
Expand All @@ -54,14 +48,14 @@ public Semaphore load(String key) {
@Override
public void acquireLock(String lockId) {
LOGGER.trace("Locking {}", lockId);
LOCKIDTOSEMAPHOREMAP.get(lockId).acquireUninterruptibly();
LOCKIDTOSEMAPHOREMAP.get(lockId).lock();
}

@Override
public boolean acquireLock(String lockId, long timeToTry, TimeUnit unit) {
try {
LOGGER.trace("Locking {} with timeout {} {}", lockId, timeToTry, unit);
return LOCKIDTOSEMAPHOREMAP.get(lockId).tryAcquire(timeToTry, unit);
return LOCKIDTOSEMAPHOREMAP.get(lockId).tryLock(timeToTry, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Expand All @@ -80,7 +74,7 @@ public boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUn
if (acquireLock(lockId, timeToTry, unit)) {
LOGGER.trace("Releasing {} automatically after {} {}", lockId, leaseTime, unit);
SCHEDULEDFUTURES.put(
lockId, SCHEDULER.schedule(() -> releaseLock(lockId), leaseTime, unit));
lockId, SCHEDULER.schedule(() -> deleteLock(lockId), leaseTime, unit));
return true;
}
return false;
Expand All @@ -97,14 +91,13 @@ private void removeLeaseExpirationJob(String lockId) {
@Override
public void releaseLock(String lockId) {
// Synchronized to prevent race condition between semaphore check and actual release
// The check is here to prevent semaphore getting above 1
// e.g. in case when lease runs out but release is also called
synchronized (LOCKIDTOSEMAPHOREMAP) {
if (LOCKIDTOSEMAPHOREMAP.get(lockId).availablePermits() == 0) {
LOGGER.trace("Releasing {}", lockId);
LOCKIDTOSEMAPHOREMAP.get(lockId).release();
removeLeaseExpirationJob(lockId);
if (LOCKIDTOSEMAPHOREMAP.getIfPresent(lockId) == null) {
return;
}
LOGGER.trace("Releasing {}", lockId);
LOCKIDTOSEMAPHOREMAP.get(lockId).unlock();
removeLeaseExpirationJob(lockId);
}
}

Expand All @@ -115,7 +108,7 @@ public void deleteLock(String lockId) {
}

@VisibleForTesting
LoadingCache<String, Semaphore> cache() {
LoadingCache<String, ReentrantLock> cache() {
return LOCKIDTOSEMAPHOREMAP;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
*/
package com.netflix.conductor.core.sync.local;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
Expand All @@ -22,6 +26,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@Ignore
// Test always times out in CI environment
Expand All @@ -30,38 +35,71 @@ public class LocalOnlyLockTest {
// Lock can be global since it uses global cache internally
private final LocalOnlyLock localOnlyLock = new LocalOnlyLock();

@After
public void tearDown() {
// Clean caches between tests as they are shared globally
localOnlyLock.cache().invalidateAll();
localOnlyLock.scheduledFutures().values().forEach(f -> f.cancel(false));
localOnlyLock.scheduledFutures().clear();
}

@Test
public void testLockUnlock() {
localOnlyLock.acquireLock("a", 100, 1000, TimeUnit.MILLISECONDS);
final boolean a = localOnlyLock.acquireLock("a", 100, 10000, TimeUnit.MILLISECONDS);
assertTrue(a);
assertEquals(localOnlyLock.cache().estimatedSize(), 1);
assertEquals(localOnlyLock.cache().get("a").availablePermits(), 0);
assertEquals(localOnlyLock.cache().get("a").isLocked(), true);
assertEquals(localOnlyLock.scheduledFutures().size(), 1);
localOnlyLock.releaseLock("a");
assertEquals(localOnlyLock.scheduledFutures().size(), 0);
assertEquals(localOnlyLock.cache().get("a").availablePermits(), 1);
assertEquals(localOnlyLock.cache().get("a").isLocked(), false);
localOnlyLock.deleteLock("a");
assertEquals(localOnlyLock.cache().estimatedSize(), 0);
}

@Test(timeout = 10 * 10_000)
public void testLockTimeout() {
localOnlyLock.acquireLock("c", 100, 1000, TimeUnit.MILLISECONDS);
public void testLockTimeout() throws InterruptedException, ExecutionException {
final ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(
() -> {
localOnlyLock.acquireLock("c", 100, 1000, TimeUnit.MILLISECONDS);
})
.get();
assertTrue(localOnlyLock.acquireLock("d", 100, 1000, TimeUnit.MILLISECONDS));
assertFalse(localOnlyLock.acquireLock("c", 100, 1000, TimeUnit.MILLISECONDS));
assertEquals(localOnlyLock.scheduledFutures().size(), 2);
localOnlyLock.releaseLock("c");
executor.submit(
() -> {
localOnlyLock.releaseLock("c");
})
.get();
localOnlyLock.releaseLock("d");
assertEquals(localOnlyLock.scheduledFutures().size(), 0);
}

@Test(timeout = 10 * 10_000)
public void testLockLeaseTime() {
for (int i = 0; i < 10; i++) {
localOnlyLock.acquireLock("a", 1000, 100, TimeUnit.MILLISECONDS);
public void testReleaseFromAnotherThread() throws InterruptedException, ExecutionException {
final ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(
() -> {
localOnlyLock.acquireLock("c", 100, 10000, TimeUnit.MILLISECONDS);
})
.get();
try {
localOnlyLock.releaseLock("c");
} catch (IllegalMonitorStateException e) {
// expected
localOnlyLock.deleteLock("c");
return;
} finally {
executor.submit(
() -> {
localOnlyLock.releaseLock("c");
})
.get();
}
localOnlyLock.acquireLock("a");
assertEquals(0, localOnlyLock.cache().get("a").availablePermits());
localOnlyLock.releaseLock("a");

fail();
}

@Test(timeout = 10 * 10_000)
Expand All @@ -73,15 +111,32 @@ public void testLockLeaseWithRelease() throws Exception {
Thread.sleep(2000);

localOnlyLock.acquireLock("b");
assertEquals(0, localOnlyLock.cache().get("b").availablePermits());
assertEquals(true, localOnlyLock.cache().get("b").isLocked());
localOnlyLock.releaseLock("b");
}

@Test
public void testRelease() {
localOnlyLock.releaseLock("x54as4d2;23'4");
localOnlyLock.releaseLock("x54as4d2;23'4");
assertEquals(1, localOnlyLock.cache().get("x54as4d2;23'4").availablePermits());
assertEquals(false, localOnlyLock.cache().get("x54as4d2;23'4").isLocked());
}

@Test(timeout = 10 * 10_000)
public void testLockLeaseTime() throws InterruptedException {
for (int i = 0; i < 10; i++) {
final Thread thread =
new Thread(
() -> {
localOnlyLock.acquireLock("a", 1000, 100, TimeUnit.MILLISECONDS);
});
thread.start();
thread.join();
}
localOnlyLock.acquireLock("a");
assertTrue(localOnlyLock.cache().get("a").isLocked());
localOnlyLock.releaseLock("a");
localOnlyLock.deleteLock("a");
}

@Test
Expand Down