Skip to content

Commit

Permalink
refactor : Scheduler (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahoo-Wang authored Apr 12, 2023
1 parent 7995a03 commit cd7995b
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 142 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#

group=me.ahoo.simba
version=0.6.5
version=0.6.6
description=Simba aims to provide easy-to-use and flexible distributed lock services and supports multiple storage implementations: relational databases, Redis, and Zookeeper.
website=https://github.com/Ahoo-Wang/Simba
issues=https://github.com/Ahoo-Wang/Simba/issues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit
*/
abstract class AbstractScheduler(
val mutex: String,
private val config: ScheduleConfig,
contendServiceFactory: MutexContendServiceFactory
) {
companion object {
Expand All @@ -42,6 +41,7 @@ abstract class AbstractScheduler(
contendService = contendServiceFactory.createMutexContendService(WorkContender(mutex))
}

protected abstract val config: ScheduleConfig
protected abstract val worker: String
protected abstract fun work()
fun start() {
Expand All @@ -68,19 +68,19 @@ abstract class AbstractScheduler(

override fun onAcquired(mutexState: MutexState) {
super.onAcquired(mutexState)
if (workFuture == null || workFuture!!.isCancelled || workFuture!!.isDone) {
if (workFuture == null || workFuture!!.isDone) {
val initialDelay = config.initialDelay.toMillis()
val period = config.period.toMillis()
workFuture = if (ScheduleConfig.Strategy.FIXED_RATE == config.strategy) {
scheduledThreadPoolExecutor.scheduleAtFixedRate(
{ safeWork() },
this::safeWork,
initialDelay,
period,
TimeUnit.MILLISECONDS
)
} else {
scheduledThreadPoolExecutor.scheduleWithFixedDelay(
{ safeWork() },
this::safeWork,
initialDelay,
period,
TimeUnit.MILLISECONDS
Expand All @@ -94,6 +94,7 @@ abstract class AbstractScheduler(
workFuture?.cancel(true)
}

@Suppress("TooGenericExceptionCaught")
private fun safeWork() {
try {
work()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Service;

Expand All @@ -34,16 +35,16 @@
@Service
@Slf4j
public class ExampleScheduler extends AbstractScheduler implements SmartLifecycle {

public ExampleScheduler(MutexContendServiceFactory contendServiceFactory) {
super("example-scheduler", ScheduleConfig.delay(Duration.ofSeconds(0), Duration.ofSeconds(10)), contendServiceFactory);
super("example-scheduler", contendServiceFactory);
}

@Override
protected String getWorker() {
return "ExampleScheduler";
}

@SneakyThrows
@Override
protected void work() {
Expand All @@ -55,9 +56,15 @@ protected void work() {
log.info("do some work end!");
}
}

@Override
public boolean isRunning() {
return getRunning();
}

@NotNull
@Override
protected ScheduleConfig getConfig() {
return ScheduleConfig.delay(Duration.ofSeconds(0), Duration.ofSeconds(10));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package me.ahoo.simba.jdbc

import com.zaxxer.hikari.HikariDataSource
import me.ahoo.simba.core.MutexContendService
import me.ahoo.simba.core.MutexContendServiceFactory
import me.ahoo.simba.core.MutexContender
import me.ahoo.simba.test.MutexContendServiceSpec
import org.junit.jupiter.api.BeforeAll
Expand All @@ -27,16 +28,15 @@ import java.time.Duration
internal class JdbcMutexContendServiceTest : MutexContendServiceSpec() {

private lateinit var jdbcMutexOwnerRepository: JdbcMutexOwnerRepository
private lateinit var contendServiceFactory: JdbcMutexContendServiceFactory

override lateinit var mutexContendServiceFactory: MutexContendServiceFactory
@BeforeAll
fun setup() {
val hikariDataSource = HikariDataSource()
hikariDataSource.jdbcUrl = "jdbc:mysql://localhost:3306/simba_db"
hikariDataSource.username = "root"
hikariDataSource.password = "root"
jdbcMutexOwnerRepository = JdbcMutexOwnerRepository(hikariDataSource)
contendServiceFactory = JdbcMutexContendServiceFactory(
mutexContendServiceFactory = JdbcMutexContendServiceFactory(
mutexOwnerRepository = jdbcMutexOwnerRepository,
initialDelay = Duration.ofSeconds(2),
ttl = Duration.ofSeconds(2),
Expand All @@ -47,9 +47,7 @@ internal class JdbcMutexContendServiceTest : MutexContendServiceSpec() {
jdbcMutexOwnerRepository.tryInitMutex(RESTART_MUTEX)
jdbcMutexOwnerRepository.tryInitMutex(GUARD_MUTEX)
jdbcMutexOwnerRepository.tryInitMutex(MULTI_CONTEND_MUTEX)
jdbcMutexOwnerRepository.tryInitMutex(SCHEDULE_MUTEX)
}

override fun createMutexContendService(contender: MutexContender): MutexContendService {
return contendServiceFactory.createMutexContendService(contender)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ class JdbcSchedulerTest {
fun start() {
val countDownLatch = CountDownLatch(0)
val scheduler = object : AbstractScheduler(
"start", ScheduleConfig.delay(
Duration.ofSeconds(1),
Duration.ofSeconds(1)
), contendServiceFactory
"start", contendServiceFactory
) {
override val config: ScheduleConfig
get() = ScheduleConfig.delay(
Duration.ofSeconds(1),
Duration.ofSeconds(1)
)
override val worker: String
get() = "JdbcSchedulerTest"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
*/
package me.ahoo.simba.spring.redis

import me.ahoo.simba.core.MutexContendService
import me.ahoo.simba.core.MutexContender
import me.ahoo.simba.core.MutexContendServiceFactory
import me.ahoo.simba.test.MutexContendServiceSpec
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
Expand All @@ -35,7 +34,7 @@ import java.util.concurrent.ForkJoinPool
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class SpringRedisMutexContendServiceTest : MutexContendServiceSpec() {
lateinit var lettuceConnectionFactory: LettuceConnectionFactory
lateinit var contendServiceFactory: SpringRedisMutexContendServiceFactory
override lateinit var mutexContendServiceFactory: MutexContendServiceFactory
lateinit var listenerContainer: RedisMessageListenerContainer

@BeforeAll
Expand All @@ -48,7 +47,7 @@ internal class SpringRedisMutexContendServiceTest : MutexContendServiceSpec() {
listenerContainer.setConnectionFactory(lettuceConnectionFactory)
listenerContainer.afterPropertiesSet()
listenerContainer.start()
contendServiceFactory = SpringRedisMutexContendServiceFactory(
mutexContendServiceFactory = SpringRedisMutexContendServiceFactory(
ttl = Duration.ofSeconds(2),
transition = Duration.ofSeconds(1),
redisTemplate = stringRedisTemplate,
Expand All @@ -60,15 +59,12 @@ internal class SpringRedisMutexContendServiceTest : MutexContendServiceSpec() {

@AfterAll
fun destroy() {
if (lettuceConnectionFactory != null) {
if (this::lettuceConnectionFactory.isInitialized) {
lettuceConnectionFactory.destroy()
}
if (listenerContainer != null) {
if (this::listenerContainer.isInitialized) {
listenerContainer.stop()
}
}

override fun createMutexContendService(contender: MutexContender): MutexContendService {
return contendServiceFactory.createMutexContendService(contender)
}
}

This file was deleted.

Loading

0 comments on commit cd7995b

Please sign in to comment.