-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
alerts-server: Add ShutdownableTaskRunner
Using this class we can create task runners that work with the ShutdownHandler to perform graceful shutdowns.
- Loading branch information
Showing
1 changed file
with
67 additions
and
0 deletions.
There are no files selected for viewing
67 changes: 67 additions & 0 deletions
67
alerts-server/app/com/alexitc/coinalerts/tasks/ShutdownableTaskRunner.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package com.alexitc.coinalerts.tasks | ||
|
||
import akka.actor.ActorSystem | ||
import com.alexitc.coinalerts.core.ShutdownHandler | ||
import org.slf4j.LoggerFactory | ||
|
||
import scala.concurrent.ExecutionContext.Implicits.global | ||
import scala.concurrent.Future | ||
import scala.concurrent.duration.FiniteDuration | ||
import scala.util.control.NonFatal | ||
|
||
trait ShutdownableTaskRunner { | ||
|
||
private val logger = LoggerFactory.getLogger(this.getClass) | ||
|
||
protected def shutdownHandler: ShutdownHandler | ||
|
||
protected def actorSystem: ActorSystem | ||
|
||
protected def initialDelay: FiniteDuration | ||
|
||
protected def interval: FiniteDuration | ||
|
||
protected def run(): Future[Unit] | ||
|
||
@volatile | ||
private var running = false | ||
|
||
protected def register() = { | ||
logger.info("Registering task") | ||
|
||
shutdownHandler.addShutdownHook(() => Future { | ||
|
||
if (running) { | ||
logger.info("Waiting for task to complete") | ||
} | ||
|
||
// if we are shutting down, do we care no making a mess with threads? I hope we don't | ||
scala.concurrent.blocking { | ||
while (running) { | ||
Thread.sleep(1000) | ||
} | ||
} | ||
}) | ||
|
||
val _ = actorSystem.scheduler.schedule( | ||
initialDelay = initialDelay, | ||
interval = interval) { execute() } | ||
} | ||
|
||
private def execute() = { | ||
if (!shutdownHandler.isShuttingDown) { | ||
running = true | ||
|
||
logger.info("Running task") | ||
|
||
run() | ||
.recover { | ||
case NonFatal(ex) => | ||
logger.error("Failed to run task", ex) | ||
} | ||
.foreach { _ => | ||
running = false | ||
} | ||
} | ||
} | ||
} |