diff --git a/.travis.yml b/.travis.yml index 614d248c..7a9cf688 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,7 @@ sudo: false language: scala scala: -- 2.11.8 -jdk: -- oraclejdk8 +- 2.12.8 cache: directories: - '$HOME/.ivy2/cache' diff --git a/build.sbt b/build.sbt index cdc2ad3a..57e3f1b7 100644 --- a/build.sbt +++ b/build.sbt @@ -1,11 +1,13 @@ resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/" +val akkaVersion = "2.5.23" + val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.14.0" val specs2 = "org.specs2" %% "specs2-core" % "4.3.2" val scalaTest = "org.scalatest" %% "scalatest" % "3.0.5" val mockito = "org.mockito" % "mockito-core" % "2.21.0" -val akkaStreamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % "2.5.14" +val akkaStreamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion val snakeYaml = "org.yaml" % "snakeyaml" % "1.21" val commonsIO = "commons-io" % "commons-io" % "2.6" @@ -14,11 +16,11 @@ val bouncyCastle = "org.bouncycastle" % "bcpkix-jdk15on" % "1.60" // the client API request/response handing uses Akka Http val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.3" -val akkaStream = "com.typesafe.akka" %% "akka-stream" % "2.5.14" -val akka = "com.typesafe.akka" %% "akka-actor" % "2.5.14" +val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion +val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion // Skuber uses akka logging, so the examples config uses the akka slf4j logger with logback backend -val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.5.14" +val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion val logback = "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime // the Json formatters are based on Play Json @@ -29,7 +31,7 @@ scalacOptions += "-target:jvm-1.8" scalacOptions in Test ++= Seq("-Yrangepos") -version in ThisBuild := "2.2.0" +version in ThisBuild := "2.3.0" sonatypeProfileName := "io.skuber" diff --git a/client/src/main/scala/skuber/api/client/KubernetesClient.scala b/client/src/main/scala/skuber/api/client/KubernetesClient.scala index 92d7708e..68cc78bb 100644 --- a/client/src/main/scala/skuber/api/client/KubernetesClient.scala +++ b/client/src/main/scala/skuber/api/client/KubernetesClient.scala @@ -1,11 +1,15 @@ package skuber.api.client import akka.stream.scaladsl.{Sink, Source} +import akka.http.scaladsl.Http import akka.util.ByteString -import play.api.libs.json.{Writes,Format} +import play.api.libs.json.{Format, Writes} import skuber.{DeleteOptions, HasStatusSubresource, LabelSelector, ListOptions, ListResource, ObjectResource, Pod, ResourceDefinition, Scale} import skuber.api.patch.Patch +import skuber.api.watch.WatchSource +import skuber.batch.Job +import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent.{Future, Promise} /** @@ -88,6 +92,15 @@ trait KubernetesClient { */ def deleteWithOptions[O <: ObjectResource](name: String, options: DeleteOptions)(implicit rd: ResourceDefinition[O], lc: LoggingContext): Future[Unit] + /** + * Monitor a resource existence until no longer available + * @param name the name of the resource to monitor its existence + * @param monitorRepeatDelay delay for repeating the monitoring as long as the resource is available by name + * @tparam O the specific object resource type e.g. Pod, Deployment + * @return A future that will be set to success when Kubernetes confirm the resource is no longer available by name, otherwise failure + */ + def monitorResourceUntilUnavailable[O <: ObjectResource](name: String, monitorRepeatDelay: FiniteDuration)(implicit fmt: Format[O], rd: ResourceDefinition[O]): Future[Unit] + /** * Delete all resources of specified type in current namespace * @tparam L list resource type of resources to delete e.g. PodList, DeploymentList @@ -216,10 +229,12 @@ trait KubernetesClient { * Watch a specific object resource continuously. This returns a source that will continue to produce * events on any updates to the object even if the server times out, by transparently restarting the watch as needed. * @param obj the object resource to watch + * @param pool reuse a skuber pool for querying the server if any or create a new one * @tparam O the type of the resource e.g Pod - * @return A future containing an Akka streams Source of WatchEvents that will be emitted + * @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized + * value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any. */ - def watchContinuously[O <: ObjectResource](obj: O)(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] + def watchContinuously[O <: ObjectResource](obj: O, pool: Option[Pool[WatchSource.Start[O]]])(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] /** * Watch a specific object resource continuously. This returns a source that will continue to produce @@ -232,11 +247,13 @@ trait KubernetesClient { * applicable type (e.g. PodList, DeploymentList) and then supplies that to this method to receive any future updates. If no resource version is specified, * a single ADDED event will be produced for an already existing object followed by events for any future changes. * @param bufSize optional buffer size for received object updates, normally the default is more than enough + * @param pool reuse a skuber pool for querying the server if any or create a new one * @tparam O the type of the resource - * @return A future containing an Akka streams Source of WatchEvents that will be emitted + * @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized + * value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any. */ - def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] + def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] /** * Watch all object resources of a specified type continuously. This returns a source that will continue to produce @@ -248,24 +265,29 @@ trait KubernetesClient { * applicable type (e.g. PodList, DeploymentList) and then supplies that to this method to receive any future updates. If no resource version is specified, * a single ADDED event will be produced for an already existing object followed by events for any future changes. * @param bufSize optional buffer size for received object updates, normally the default is more than enough + * @param pool reuse a skuber pool for querying the server if any or create a new one * @tparam O the type pf the resource - * @return A future containing an Akka streams Source of WatchEvents that will be emitted + * @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized + * value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any. */ - def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] + def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] /** * Watch all object resources of a specified type continuously, passing the specified options to the API server with the watch request. * This returns a source that will continue to produce events even if the server times out, by transparently restarting the watch as needed. + * * @param options a set of list options to pass to the server. See https://godoc.org/k8s.io/apimachinery/pkg/apis/meta/v1#ListOptions * for the meaning of the options. Note that the `watch` flag in the options will be ignored / overridden by the client, which * ensures a watch is always requested on the server. * @param bufsize optional buffer size for received object updates, normally the default is more than enough + * @param pool reuse a skuber pool for querying the server if any or create a new one * @tparam O the resource type to watch - * @return A future containing an Akka streams Source of WatchEvents that will be emitted + * @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized + * value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any. */ - def watchWithOptions[O <: ObjectResource](options: ListOptions, bufsize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] + def watchWithOptions[O <: ObjectResource](options: ListOptions, bufsize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] /** * Get the scale subresource of the named object resource @@ -350,6 +372,33 @@ trait KubernetesClient { tty: Boolean = false, maybeClose: Option[Promise[Unit]] = None)(implicit lc: LoggingContext): Future[Unit] + /** + * Execute a job, monitoring the progress of its pod until completion and monitor its deletion until complete + * @param job the Kubernetes job to execute + * @param labelSelector the label selector for monitoring the job's pod status + * @param podProgress the predicate for monitoring the pod status while satisfied before deleting the job + * @param podCompletion a callback invoked at the completion of the job's pod (successful or not), + * after which the job will be deleted if and only if the podCompletion result is true + * @param watchContinuouslyRequestTimeout the delay for continuously monitoring the pod progress + * @param deletionMonitorRepeatDelay the delay for continuously monitoring the job deletion + * @param pool a skuber pool to reuse, if any, or to create otherwise + * @param bufSize optional buffer size for received object updates, normally the default is more than enough + * @return A future consisting of a triple of the following: + * - the skuber pool suitable for subsequently executing other jobs on the same server + * - the akka host connection pool that can be shutdown when no further jobs need to be executed on the same server + * - the last pod status received when the pod progress predicate became unsatisfied + */ + def executeJobAndWaitUntilDeleted( + job: Job, + labelSelector: LabelSelector, + podProgress: WatchEvent[Pod] => Boolean, + podCompletion: WatchEvent[Pod] => Future[Boolean], + watchContinuouslyRequestTimeout: Duration, + deletionMonitorRepeatDelay: FiniteDuration, + pool: Option[Pool[WatchSource.Start[Pod]]], + bufSize: Int = 10000)(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod]): + Future[(Pool[WatchSource.Start[Pod]], Option[Http.HostConnectionPool], WatchEvent[Pod])] + /** * Return list of API versions supported by the server * @param lc diff --git a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala index 3bdbd3a6..613bba54 100644 --- a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala +++ b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala @@ -7,12 +7,13 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.http.scaladsl.unmarshalling.Unmarshal import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext} +import akka.pattern.after import akka.stream.Materializer -import akka.stream.scaladsl.{Sink, Source} +import akka.stream.scaladsl.{Keep, Sink, Source} import akka.util.ByteString import com.typesafe.config.{Config, ConfigFactory} import javax.net.ssl.SSLContext -import play.api.libs.json.{Format, Writes, Reads} +import play.api.libs.json.{Format, Reads, Writes} import skuber._ import skuber.api.client.exec.PodExecImpl import skuber.api.client.{K8SException => _, _} @@ -22,6 +23,7 @@ import skuber.json.PlayJsonSupportForAkkaHttp._ import skuber.json.format.apiobj.statusReads import skuber.json.format.{apiVersionsFormat, deleteOptionsFmt, namespaceListFmt} import skuber.api.patch._ +import skuber.batch.Job import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future, Promise} @@ -403,6 +405,17 @@ class KubernetesClientImpl private[client] ( } yield () } + override def monitorResourceUntilUnavailable[O <: ObjectResource](name: String, monitorRepeatDelay: FiniteDuration)( + implicit fmt: Format[O], rd: ResourceDefinition[O]): Future[Unit] = + getOption[O](name).flatMap { + case None => + Future.successful(()) + case Some(_) => + after(monitorRepeatDelay, actorSystem.scheduler)( + monitorResourceUntilUnavailable[O](name, monitorRepeatDelay) + ) + } + override def deleteAll[L <: ListResource[_]]()( implicit fmt: Format[L], rd: ResourceDefinition[L], lc: LoggingContext): Future[L] = { @@ -481,30 +494,30 @@ class KubernetesClientImpl private[client] ( Watch.eventsOnKind[O](this, sinceResourceVersion, bufSize) } - override def watchContinuously[O <: ObjectResource](obj: O)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] = + override def watchContinuously[O <: ObjectResource](obj: O, pool: Option[Pool[WatchSource.Start[O]]])( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { - watchContinuously(obj.name) + watchContinuously(obj.name, pool = pool) } - override def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] = + override def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { val options=ListOptions(resourceVersion = sinceResourceVersion, timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds) ) - WatchSource(this, buildLongPollingPool(), Some(name), options, bufSize) + WatchSource(this, pool.getOrElse(buildLongPollingPool()), Some(name), options, bufSize) } - override def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] = + override def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { val options=ListOptions(resourceVersion = sinceResourceVersion, timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds)) - WatchSource(this, buildLongPollingPool(), None, options, bufSize) + WatchSource(this, pool.getOrElse(buildLongPollingPool()), None, options, bufSize) } - override def watchWithOptions[O <: skuber.ObjectResource](options: ListOptions, bufsize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] = + override def watchWithOptions[O <: skuber.ObjectResource](options: ListOptions, bufsize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { - WatchSource(this, buildLongPollingPool(), None, options, bufsize) + WatchSource(this, pool.getOrElse(buildLongPollingPool()), None, options, bufsize) } private def buildLongPollingPool[O <: ObjectResource]() = { @@ -622,6 +635,45 @@ class KubernetesClientImpl private[client] ( PodExecImpl.exec(this, podName, command, maybeContainerName, maybeStdin, maybeStdout, maybeStderr, tty, maybeClose) } + override def executeJobAndWaitUntilDeleted( + job: Job, + labelSelector: LabelSelector, + podProgress: WatchEvent[Pod] => Boolean, + podCompletion: WatchEvent[Pod] => Future[Boolean], + watchContinuouslyRequestTimeout: Duration, + deletionMonitorRepeatDelay: FiniteDuration, + pool: Option[Pool[WatchSource.Start[Pod]]], + bufSize: Int = 10000)(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod]) + : Future[(Pool[WatchSource.Start[Pod]], Option[Http.HostConnectionPool], WatchEvent[Pod])] = + for { + j <- create(job) + (p, hcp, lastPodEvent) <- { + watchWithOptions[Pod]( + options = ListOptions( + labelSelector = Some(labelSelector), + timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds) + ), + bufsize = bufSize, + pool = pool + ) + .takeWhile(podProgress, inclusive = true) + .toMat(Sink.last)(Keep.both) + .run() match { + case ((pool: Pool[WatchSource.Start[Pod]], + hostConnectionPool: Option[Http.HostConnectionPool]), + f: Future[WatchEvent[Pod]]) => + f.map { ev => + (pool, hostConnectionPool, ev) + } + } + } + delete <- podCompletion(lastPodEvent) + _ <- if (delete) + deleteWithOptions[Job](name = j.metadata.name, options = DeleteOptions(propagationPolicy = Some(DeletePropagation.Foreground))) + .flatMap(_ => monitorResourceUntilUnavailable[Job](j.metadata.name, deletionMonitorRepeatDelay)) + else Future.successful(()) + } yield (p, hcp, lastPodEvent) + override def close: Unit = { isClosed = true diff --git a/client/src/main/scala/skuber/api/client/package.scala b/client/src/main/scala/skuber/api/client/package.scala index 9069ed9d..d6268971 100644 --- a/client/src/main/scala/skuber/api/client/package.scala +++ b/client/src/main/scala/skuber/api/client/package.scala @@ -3,8 +3,8 @@ package skuber.api import java.time.Instant import java.util.UUID -import akka.NotUsed import akka.actor.ActorSystem +import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.stream.Materializer import akka.stream.scaladsl.Flow @@ -23,7 +23,13 @@ import skuber.api.client.impl.KubernetesClientImpl */ package object client { - type Pool[T] = Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] + /** + * The materialized value is an optional host connection pool. + * For testing, allows mocking without creating a host connection pool. + * For development and production, provides access to the host connection pool created (if none was provided). + * @tparam T The type of elements flowing in and out. + */ + type Pool[T] = Flow[(HttpRequest, T), (Try[HttpResponse], T), Option[Http.HostConnectionPool]] final val sysProps = new SystemProperties diff --git a/client/src/main/scala/skuber/api/watch/LongPollingPool.scala b/client/src/main/scala/skuber/api/watch/LongPollingPool.scala index 13a1ec2b..c534db74 100644 --- a/client/src/main/scala/skuber/api/watch/LongPollingPool.scala +++ b/client/src/main/scala/skuber/api/watch/LongPollingPool.scala @@ -1,6 +1,5 @@ package skuber.api.watch -import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.http.scaladsl.{Http, HttpsConnectionContext} @@ -19,19 +18,19 @@ private[api] object LongPollingPool { Http().newHostConnectionPool[T]( host, port, buildHostConnectionPool(poolIdleTimeout, clientConnectionSettings, system) - ).mapMaterializedValue(_ => NotUsed) + ).mapMaterializedValue(Some(_)) case "https" => Http().newHostConnectionPoolHttps[T]( host, port, httpsConnectionContext.getOrElse(Http().defaultClientHttpsContext), buildHostConnectionPool(poolIdleTimeout, clientConnectionSettings, system) - ).mapMaterializedValue(_ => NotUsed) + ).mapMaterializedValue(Some(_)) case unsupported => throw new IllegalArgumentException(s"Schema $unsupported is not supported") } } - private def buildHostConnectionPool[T](poolIdleTimeout: Duration, clientConnectionSettings: ClientConnectionSettings, system: ActorSystem) = { + def buildHostConnectionPool[T](poolIdleTimeout: Duration, clientConnectionSettings: ClientConnectionSettings, system: ActorSystem) = { ConnectionPoolSettings(system) .withMaxConnections(1) // Limit number the of open connections to one .withPipeliningLimit(1) // Limit pipelining of requests to one diff --git a/client/src/main/scala/skuber/api/watch/WatchSource.scala b/client/src/main/scala/skuber/api/watch/WatchSource.scala index 6b26f4c5..d84b60a0 100644 --- a/client/src/main/scala/skuber/api/watch/WatchSource.scala +++ b/client/src/main/scala/skuber/api/watch/WatchSource.scala @@ -2,32 +2,35 @@ package skuber.api.watch import akka.NotUsed import akka.actor.ActorSystem +import akka.http.scaladsl.Http import akka.http.scaladsl.model._ -import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Source} +import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Merge, Source} import akka.stream.{Materializer, SourceShape} import play.api.libs.json.Format import skuber.api.client._ import skuber.api.client.impl.KubernetesClientImpl -import skuber.{K8SRequestContext, ObjectResource, ResourceDefinition, ListOptions} +import skuber.{ListOptions, ObjectResource, ResourceDefinition} import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ -import scala.util.{Failure, Success} - -private[api] object WatchSource { - sealed trait StreamElement[O <: ObjectResource] {} - case class End[O <: ObjectResource]() extends StreamElement[O] +import scala.util.{Failure, Success, Try} + +/** + * @author David O'Riordan + */ +object WatchSource { + private[api] sealed trait StreamElement[O <: ObjectResource] {} + private[api] case class End[O <: ObjectResource]() extends StreamElement[O] case class Start[O <: ObjectResource](resourceVersion: Option[String]) extends StreamElement[O] - case class Result[O <: ObjectResource](resourceVersion: String, value: WatchEvent[O]) extends StreamElement[O] + private[api] case class Result[O <: ObjectResource](resourceVersion: String, value: WatchEvent[O]) extends StreamElement[O] - sealed trait StreamState {} - case object Waiting extends StreamState - case object Processing extends StreamState - case object Finished extends StreamState + private[api] sealed trait StreamState {} + private[api] case object Waiting extends StreamState + private[api] case object Processing extends StreamState + private[api] case object Finished extends StreamState - case class StreamContext(currentResourceVersion: Option[String], state: StreamState) + private[api] case class StreamContext(currentResourceVersion: Option[String], state: StreamState) - def apply[O <: ObjectResource](client: KubernetesClientImpl, + private[api] def apply[O <: ObjectResource](client: KubernetesClientImpl, pool: Pool[Start[O]], name: Option[String], options: ListOptions, @@ -35,76 +38,78 @@ private[api] object WatchSource { fm: Materializer, format: Format[O], rd: ResourceDefinition[O], - lc: LoggingContext): Source[WatchEvent[O], NotUsed] = { - Source.fromGraph(GraphDSL.create() { implicit b => - import GraphDSL.Implicits._ - - implicit val dispatcher: ExecutionContext = sys.dispatcher - - def createWatchRequest(since: Option[String]) = - { - val nameFieldSelector=name.map(objName => s"metadata.name=$objName") - val watchOptions=options.copy( - resourceVersion = since, - watch = Some(true), - fieldSelector = nameFieldSelector.orElse(options.fieldSelector) - ) - client.buildRequest( - HttpMethods.GET, rd, None, query = Some(Uri.Query(watchOptions.asMap)) - ) + lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { + + implicit val dispatcher: ExecutionContext = sys.dispatcher + + val singleEnd = Source.single(End[O]()) + + def singleStart(s:StreamElement[O]) = Source.single(s) + + val httpFlow: Flow[(HttpRequest, Start[O]), StreamElement[O], Option[Http.HostConnectionPool]] = + Flow[(HttpRequest, Start[O])].map { request => // log request + client.logInfo(client.logConfig.logRequestBasic, s"about to send HTTP request: ${request._1.method.value} ${request._1.uri.toString}") + request + }.viaMat[(Try[HttpResponse], Start[O]), Option[Http.HostConnectionPool], Option[Http.HostConnectionPool]](pool)(Keep.right).flatMapConcat { + case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), se) => + client.logInfo(client.logConfig.logResponseBasic, s"received response with HTTP status 200") + singleStart(se).concat( + BytesToWatchEventSource[O](entity.dataBytes, bufSize).map { event => + Result[O](event._object.resourceVersion, event) + } + ).concat(singleEnd) + case (Success(HttpResponse(sc, _, entity, _)), _) => + client.logWarn(s"Error watching resource. Received a status of ${sc.intValue()}") + entity.discardBytes() + throw new K8SException(Status(message = Some("Error watching resource"), code = Some(sc.intValue()))) + case (Failure(f), _) => + client.logError("Error watching resource.", f) + throw new K8SException(Status(message = Some("Error watching resource"), details = Some(f.getMessage))) } - val singleEnd = Source.single(End[O]()) + val httpFlowMat: Flow[(HttpRequest, Start[O]), StreamElement[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = + httpFlow.mapMaterializedValue { pool -> _ } - def singleStart(s:StreamElement[O]) = Source.single(s) - - val initSource = Source.single( - (createWatchRequest(options.resourceVersion), Start[O](options.resourceVersion)) + def createWatchRequest(since: Option[String]) = + { + val nameFieldSelector=name.map(objName => s"metadata.name=$objName") + val watchOptions=options.copy( + resourceVersion = since, + watch = Some(true), + fieldSelector = nameFieldSelector.orElse(options.fieldSelector) ) - - val httpFlow: Flow[(HttpRequest, Start[O]), StreamElement[O], NotUsed] = - Flow[(HttpRequest, Start[O])].map { request => // log request - client.logInfo(client.logConfig.logRequestBasic, s"about to send HTTP request: ${request._1.method.value} ${request._1.uri.toString}") - request - }.via(pool).flatMapConcat { - case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), se) => - client.logInfo(client.logConfig.logResponseBasic, s"received response with HTTP status 200") - singleStart(se).concat( - BytesToWatchEventSource[O](entity.dataBytes, bufSize).map { event => - Result[O](event._object.resourceVersion, event) - } - ).concat(singleEnd) - case (Success(HttpResponse(sc, _, entity, _)), _) => - client.logWarn(s"Error watching resource. Received a status of ${sc.intValue()}") - entity.discardBytes() - throw new K8SException(Status(message = Some("Error watching resource"), code = Some(sc.intValue()))) - case (Failure(f), _) => - client.logError("Error watching resource.", f) - throw new K8SException(Status(message = Some("Error watching resource"), details = Some(f.getMessage))) + client.buildRequest( + HttpMethods.GET, rd, None, query = Some(Uri.Query(watchOptions.asMap)) + ) + } + + val initSource = Source.single( + (createWatchRequest(options.resourceVersion), Start[O](options.resourceVersion)) + ) + + val outboundFlow: Flow[StreamElement[O], WatchEvent[O], NotUsed] = + Flow[StreamElement[O]] + .filter(_.isInstanceOf[Result[O]]) + .map{ + case Result(_, event) => event + case _ => throw new K8SException(Status(message = Some("Error processing watch events."))) } - val outboundFlow: Flow[StreamElement[O], WatchEvent[O], NotUsed] = - Flow[StreamElement[O]] - .filter(_.isInstanceOf[Result[O]]) - .map{ - case Result(_, event) => event - case _ => throw new K8SException(Status(message = Some("Error processing watch events."))) - } - - - val feedbackFlow: Flow[StreamElement[O], (HttpRequest, Start[O]), NotUsed] = - Flow[StreamElement[O]].scan(StreamContext(None, Waiting)){(cxt, next) => - next match { - case Start(rv) => StreamContext(rv, Processing) - case Result(rv, _) => StreamContext(Some(rv), Processing) - case End() => cxt.copy(state = Finished) - } - }.filter(_.state == Finished).map { acc => - (createWatchRequest(acc.currentResourceVersion), Start[O](acc.currentResourceVersion)) + val feedbackFlow: Flow[StreamElement[O], (HttpRequest, Start[O]), NotUsed] = + Flow[StreamElement[O]].scan(StreamContext(None, Waiting)){(cxt, next) => + next match { + case Start(rv) => StreamContext(rv, Processing) + case Result(rv, _) => StreamContext(Some(rv), Processing) + case End() => cxt.copy(state = Finished) } + }.filter(_.state == Finished).map { acc => + (createWatchRequest(acc.currentResourceVersion), Start[O](acc.currentResourceVersion)) + } + + Source.fromGraph(GraphDSL.create(httpFlowMat) { implicit b => http => + import GraphDSL.Implicits._ val init = b.add(initSource) - val http = b.add(httpFlow) val merge = b.add(Merge[(HttpRequest, Start[O])](2)) val broadcast = b.add(Broadcast[StreamElement[O]](2, eagerCancel = true)) val outbound = b.add(outboundFlow) diff --git a/client/src/test/scala/skuber/api/WatchSourceSpec.scala b/client/src/test/scala/skuber/api/WatchSourceSpec.scala index 64e29eba..876034ff 100644 --- a/client/src/test/scala/skuber/api/WatchSourceSpec.scala +++ b/client/src/test/scala/skuber/api/WatchSourceSpec.scala @@ -4,6 +4,7 @@ import java.net.ConnectException import java.time.{ZoneId, ZonedDateTime} import akka.actor.ActorSystem +import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.stream.scaladsl.Framing.FramingException import akka.stream.scaladsl.{Flow, Keep, TcpIdleTimeoutException} @@ -517,13 +518,13 @@ class WatchSourceSpec extends Specification with MockitoSugar { def mockPool[O <: ObjectResource](requestResponses: Map[HttpRequest, HttpResponse]): Pool[Start[O]] = { Flow[(HttpRequest, Start[O])].map { x => (Try(requestResponses(x._1)), x._2) - } + }.mapMaterializedValue(_ => Option.empty[Http.HostConnectionPool]) } def mockPool[O <: ObjectResource](error: Throwable): Pool[Start[O]] = { Flow[(HttpRequest, Start[O])].map { x => (Try(throw error), x._2) - } + }.mapMaterializedValue(_ => Option.empty[Http.HostConnectionPool]) } def retrieveWatchJson(path: String): String = { diff --git a/docs/Examples.md b/docs/Examples.md index 25804e51..bd0f2181 100644 --- a/docs/Examples.md +++ b/docs/Examples.md @@ -72,7 +72,6 @@ val podFuture = k8s.create(pod) // handle future as you see fit ``` - ## Create deployment This example creates a nginx service (accessed via port 30001 on each Kubernetes cluster node) that is backed by a deployment of five nginx replicas. @@ -117,6 +116,270 @@ createOnK8s.onComplete { } ``` +## Execute a job and monitor its execution until completed (successfully or not) and monitor its deletion + +First, define a suitable progress predicate for monitoring the execution of a Kubernetes pod. +For example: + +```scala + def podProgress( + ev: WatchEvent[Pod] + ): Boolean = { + + def containerStatusProgress(acc: Boolean, x: Container.Status): Boolean = { + x.state.fold[Boolean](acc) { + case Container.Waiting(None) => acc + case Container.Waiting(Some(reason)) => + !(reason.startsWith("Err") || reason.endsWith("BackOff")) + case Container.Running(_) => acc + case _: Container.Terminated => false + } + } + + def podStatusProgress( + s: Pod.Status + ): Boolean = { + val ok1 = s.initContainerStatuses + .foldLeft[Boolean](true)(containerStatusProgress) + val ok2 = s.containerStatuses + .foldLeft[Boolean](ok1)(containerStatusProgress) + val ok3 = s.conditions.foldLeft[Boolean](ok2) { + case (acc, _: Pod.Condition) => + acc + } + ok3 + } + + ev._type != EventType.DELETED && + ev._type != EventType.ERROR && + ev._object.status.fold[Boolean](true)(podStatusProgress) + } +``` + +Next, define a suitable completion callback for handling a completed. +For example: + +```scala + def podCompletion(k8s: KubernetesClient)(lastPodEvent: WatchEvent[Pod])( + implicit ec: ExecutionContext, + mat: ActorMaterializer): Future[Unit] = { + + def printLogFlow(cntrName: String): Sink[ByteString, Future[Done]] = + Flow[ByteString] + .via( + Framing.delimiter(ByteString("\n"), + maximumFrameLength = 10000, + allowTruncation = true)) + .map(_.utf8String) + .toMat(Sink.foreach(text => println(s"[$cntrName logs] $text")))(Keep.right) + + def showContainerStateIfSuccessful(cs: Container.Status, + podName: String, + message: String): Future[Unit] = { + val terminatedSuccessfully = cs.state.foldLeft[Boolean](false) { + case (_, s: Container.Terminated) => + 0 == s.exitCode + case (flag, _) => + flag + } + + if (terminatedSuccessfully) + for { + logSource <- k8s.getPodLogSource( + name = podName, + queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) + _ <- logSource.runWith(printLogFlow(message)) + } yield () + else { + println(s"$message: no output because of unsuccessful execution") + Future.successful(()) + } + } + + lastPodEvent._object.status match { + case None => + Future.successful(()) + case Some(s) => + val podName = lastPodEvent._object.name + for { + _ <- s.initContainerStatuses + .foldLeft[Future[Unit]](Future.successful(())) { + case (_, cs) => + showContainerStateIfSuccessful( + cs, + podName, + s"init/$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + } + _ <- s.containerStatuses + .foldLeft[Future[Unit]](Future.successful(())) { + case (_, cs) => + showContainerStateIfSuccessful( + cs, + podName, + s"$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + } + } yield () + } + } +``` + +Next, define some suitable delays for monitoring: + +```scala +val watchContinuouslyRequestTimeout: Duration = ... +val deletionMonitorRepeatDelay: FiniteDuration = ... +``` + +There are different strategies to execute jobs. + +- Sequentially + + Define a list of jobs to execute. + This example generates a sequence of jobs, some that cannot be executed. + + ```scala + val jobs = Seq.tabulate[Job](n = 10) { n => + if (n % 3 == 0) { + // simulate a job failure + val piContainer = Container(name = "pi", + image = "nowhere/does-not-exist:latest", + command = List("/bin/bash"), + args = List("-c", "env")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } else { + val piContainer = Container( + name = "pi", + image = "perl", + command = + List("perl", "-Mbignum=bpi", "-wle", s"print bpi(${n * 10})")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } + } + ``` + + Execute the first job without a host connection pool + and reuse the pool obtained for executing subsequent jobs. + Finally, shutdown the connection pool. + + ```scala + val (firstJob, otherJobs) = (jobs.head, jobs.tail) + + val f: Future[Unit] = for { + + // First run: create a pool. + (pool, hcp, podEvent) <- k8s.executeJobAndWaitUntilDeleted( + firstJob, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + None) + + // Subsequent runs: reuse the same pool. + _ <- Source + .fromIterator(() => otherJobs.toIterator) + .mapAsync(parallelism = 1) { job: Job => + k8s.executeJobAndWaitUntilDeleted(job, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + Some(pool)) + } + .runForeach(_ => ()) + + // Shutdown the pool, if any. + _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) + + } yield () + ``` + + For a working example, see: [PiJobsSequential.scala](examples/job/PiJobsSequential.scala) + +- In parallel + + Define a list of job execution futures, + taking care of shutting down the pool after completion. + + ```scala + + def metadata(n: Int) = + ObjectMeta(name = s"pi-$n", + labels = Map("job-kind" -> s"piTest$n", "iteration" -> s"$n")) + def labelSelector(n: Int) = + LabelSelector(LabelSelector.IsEqualRequirement("job-kind", s"piTest$n")) + + val jobs = Seq.tabulate[Future[Unit]](n = 10) { n => + val jname = s"pi-$n" + val job: Job = if (n % 3 == 0) { + // simulate a job failure + val piContainer = Container(name = "pi", + image = "nowhere/does-not-exist:latest", + command = List("/bin/bash"), + args = List("-c", "env")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job(jname).withTemplate(piTemplateSpec) + } else { + val piContainer = Container( + name = "pi", + image = "perl", + command = + List("perl", "-Mbignum=bpi", "-wle", s"print bpi(${n * 10})")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job(jname).withTemplate(piTemplateSpec) + } + + for { + // Execute the job with a unique pool + (_, hcp, _) <- k8s.executeJobAndWaitUntilDeleted( + job, + labelSelector(n), + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + None) + + // Shutdown the pool, if any. + _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) + } yield () + + } + ``` + + Execute the job futures in parallel. + + ```scala + val f: Future[Unit] = + Future.foldLeft[Unit, Unit](jobs)(())((_: Unit, _: Unit) => ()) + ``` + + For a working example, see: [PiJobsParallel.scala](examples/job/PiJobsParallel.scala) + ## Safely shutdown the client ```scala diff --git a/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala b/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala new file mode 100644 index 00000000..9c4ee6de --- /dev/null +++ b/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala @@ -0,0 +1,243 @@ +package skuber.examples.job + +import akka.Done +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import com.typesafe.config.{Config, ConfigFactory} +import skuber.api.client.{ + EventType, + KubernetesClient, + WatchEvent, + defaultK8sConfig +} +import skuber.batch.Job +import skuber.json.batch.format._ +import skuber.json.format._ +import skuber.{ + Container, + LabelSelector, + ObjectMeta, + Pod, + RestartPolicy, + k8sInit +} + +import scala.concurrent.duration._ +import scala.collection.immutable._ +import scala.concurrent.{Await, ExecutionContext, Future} + +/** + * Demonstrates two things: + * 1) executing jobs in parallel, each with an independent pool + * 2) watching continuously pod events until any container status or pod status indicates a non-progress condition. + * 3) making sure that the host connection pool used for watching is shutdown + */ +object PiJobsParallel { + + def podProgress( + ev: WatchEvent[Pod] + ): Boolean = { + + def containerStatusProgress(acc: Boolean, x: Container.Status): Boolean = { + x.state.fold[Boolean](acc) { + case Container.Waiting(None) => acc + case Container.Waiting(Some(reason)) => + !(reason.startsWith("Err") || reason.endsWith("BackOff")) + case Container.Running(_) => acc + case _: Container.Terminated => false + } + } + + def podStatusProgress( + s: Pod.Status + ): Boolean = { + val ok1 = s.initContainerStatuses + .foldLeft[Boolean](true)(containerStatusProgress) + val ok2 = s.containerStatuses + .foldLeft[Boolean](ok1)(containerStatusProgress) + val ok3 = s.conditions.foldLeft[Boolean](ok2) { + case (acc, _: Pod.Condition) => + acc + } + ok3 + } + + ev._type != EventType.DELETED && + ev._type != EventType.ERROR && + ev._object.status.fold[Boolean](true)(podStatusProgress) + } + + def durationFomConfig(config: Config)(configKey: String): Option[Duration] = + Some(Duration.fromNanos(config.getDuration(configKey).toNanos)) + + def getSkuberConfig[T](config: Config, + key: String, + fromConfig: String => Option[T], + default: T): T = { + val skuberConfigKey = s"skuber.$key" + if (config.getIsNull(skuberConfigKey)) { + default + } else { + fromConfig(skuberConfigKey) match { + case None => default + case Some(t) => t + } + } + } + + def podCompletion(k8s: KubernetesClient)(lastPodEvent: WatchEvent[Pod])( + implicit ec: ExecutionContext, + mat: ActorMaterializer): Future[Boolean] = { + + def printLogFlow(cntrName: String): Sink[ByteString, Future[Done]] = + Flow[ByteString] + .via( + Framing.delimiter(ByteString("\n"), + maximumFrameLength = 10000, + allowTruncation = true)) + .map(_.utf8String) + .toMat(Sink.foreach(text => println(s"[$cntrName logs] $text")))( + Keep.right) + + def showContainerStateIfSuccessful(cs: Container.Status, + podName: String, + message: String): Future[Boolean] = { + val terminatedSuccessfully = cs.state.foldLeft[Boolean](false) { + case (_, s: Container.Terminated) => + 0 == s.exitCode + case (flag, _) => + flag + } + + if (terminatedSuccessfully) + for { + logSource <- k8s.getPodLogSource( + name = podName, + queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) + _ <- logSource.runWith(printLogFlow(message)) + } yield true + else { + println(s"$message: no output because of unsuccessful execution") + Future.successful(false) + } + } + + lastPodEvent._object.status match { + case None => + Future.successful(false) + case Some(s) => + val podName = lastPodEvent._object.name + for { + delete1 <- s.initContainerStatuses + .foldLeft[Future[Boolean]](Future.successful(true)) { + case (flag, cs) => + Future.reduceLeft( + Seq( + flag, + showContainerStateIfSuccessful( + cs, + podName, + s"init/$podName (iteration=${lastPodEvent._object.metadata + .labels("iteration")})")))(_ || _) + } + delete2 <- s.containerStatuses + .foldLeft[Future[Boolean]](Future.successful(delete1)) { + case (flag, cs) => + Future.reduceLeft( + Seq(flag, + showContainerStateIfSuccessful( + cs, + podName, + s"$podName (iteration=${lastPodEvent._object.metadata + .labels("iteration")})")))(_ || _) + } + } yield delete2 + } + } + + def main( + args: Array[String] + ): Unit = { + + implicit val as: ActorSystem = ActorSystem("PiJobsSequential") + implicit val ec: ExecutionContext = as.dispatcher + implicit val mat: ActorMaterializer = ActorMaterializer() + val sconfig: skuber.api.Configuration = defaultK8sConfig + val aconfig: Config = ConfigFactory.load() + implicit val k8s: KubernetesClient = + k8sInit(config = sconfig, appConfig = aconfig) + + val watchContinuouslyRequestTimeout: Duration = getSkuberConfig( + aconfig, + "watch-continuously.request-timeout", + durationFomConfig(aconfig), + 30.seconds) + + val deletionMonitorRepeatDelay: FiniteDuration = 1.second + + def metadata(n: Int) = + ObjectMeta(name = s"pi-$n", + labels = Map("job-kind" -> s"piTest$n", "iteration" -> s"$n")) + def labelSelector(n: Int) = + LabelSelector(LabelSelector.IsEqualRequirement("job-kind", s"piTest$n")) + + val jobs = Seq.tabulate[Future[Unit]](n = 10) { n => + val jname = s"pi-$n" + val job: Job = if (n % 3 == 0) { + // simulate a job failure + val piContainer = Container(name = "pi", + image = "nowhere/does-not-exist:latest", + command = List("/bin/bash"), + args = List("-c", "env")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job(jname).withTemplate(piTemplateSpec) + } else { + val piContainer = Container( + name = "pi", + image = "perl", + command = + List("perl", "-Mbignum=bpi", "-wle", s"print bpi(${n * 10})")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job(jname).withTemplate(piTemplateSpec) + } + + for { + // Execute the job with a unique pool + (_, hcp, _) <- k8s.executeJobAndWaitUntilDeleted( + job, + labelSelector(n), + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + None) + + // Shutdown the pool, if any. + _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) + } yield () + + } + + val f: Future[Unit] = + Future.foldLeft[Unit, Unit](jobs)(())((_: Unit, _: Unit) => ()) + + // Wait until done and shutdown k8s & akka. + Await.result(f.flatMap { _ => + k8s.close + as.terminate().map(_ => ()) + }, Duration.Inf) + + } +} diff --git a/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala b/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala new file mode 100644 index 00000000..d0df87cb --- /dev/null +++ b/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala @@ -0,0 +1,255 @@ +package skuber.examples.job + +import akka.Done +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import com.typesafe.config.{Config, ConfigFactory} +import skuber.api.client.{ + EventType, + KubernetesClient, + WatchEvent, + defaultK8sConfig +} +import skuber.batch.Job +import skuber.json.batch.format._ +import skuber.json.format._ +import skuber.{ + Container, + LabelSelector, + ObjectMeta, + Pod, + RestartPolicy, + k8sInit +} + +import scala.collection.immutable.Seq +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +/** + * Demonstrates three things: + * 1) executing jobs sequentially, creating a pool for the first job and reusing it afterwards + * 2) watching continuously pod events until any container status or pod status indicates a non-progress condition. + * 3) making sure that the host connection pool used for watching is shutdown + */ +object PiJobsSequential { + + def podProgress( + ev: WatchEvent[Pod] + ): Boolean = { + + def containerStatusProgress(acc: Boolean, x: Container.Status): Boolean = { + x.state.fold[Boolean](acc) { + case Container.Waiting(None) => acc + case Container.Waiting(Some(reason)) => + !(reason.startsWith("Err") || reason.endsWith("BackOff")) + case Container.Running(_) => acc + case _: Container.Terminated => false + } + } + + def podStatusProgress( + s: Pod.Status + ): Boolean = { + val ok1 = s.initContainerStatuses + .foldLeft[Boolean](true)(containerStatusProgress) + val ok2 = s.containerStatuses + .foldLeft[Boolean](ok1)(containerStatusProgress) + val ok3 = s.conditions.foldLeft[Boolean](ok2) { + case (acc, _: Pod.Condition) => + acc + } + ok3 + } + + ev._type != EventType.DELETED && + ev._type != EventType.ERROR && + ev._object.status.fold[Boolean](true)(podStatusProgress) + } + + def durationFomConfig(config: Config)(configKey: String): Option[Duration] = + Some(Duration.fromNanos(config.getDuration(configKey).toNanos)) + + def getSkuberConfig[T](config: Config, + key: String, + fromConfig: String => Option[T], + default: T): T = { + val skuberConfigKey = s"skuber.$key" + if (config.getIsNull(skuberConfigKey)) { + default + } else { + fromConfig(skuberConfigKey) match { + case None => default + case Some(t) => t + } + } + } + + def podCompletion(k8s: KubernetesClient)(lastPodEvent: WatchEvent[Pod])( + implicit ec: ExecutionContext, + mat: ActorMaterializer): Future[Boolean] = { + + def printLogFlow(cntrName: String): Sink[ByteString, Future[Done]] = + Flow[ByteString] + .via( + Framing.delimiter(ByteString("\n"), + maximumFrameLength = 10000, + allowTruncation = true)) + .map(_.utf8String) + .toMat(Sink.foreach(text => println(s"[$cntrName logs] $text")))( + Keep.right) + + def showContainerStateIfSuccessful(cs: Container.Status, + podName: String, + message: String): Future[Boolean] = { + val terminatedSuccessfully = cs.state.foldLeft[Boolean](false) { + case (_, s: Container.Terminated) => + 0 == s.exitCode + case (flag, _) => + flag + } + + if (terminatedSuccessfully) + for { + logSource <- k8s.getPodLogSource( + name = podName, + queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) + _ <- logSource.runWith(printLogFlow(message)) + } yield true + else { + println(s"$message: no output because of unsuccessful execution") + Future.successful(false) + } + } + + lastPodEvent._object.status match { + case None => + Future.successful(false) + case Some(s) => + val podName = lastPodEvent._object.name + for { + delete1 <- s.initContainerStatuses + .foldLeft[Future[Boolean]](Future.successful(true)) { + case (flag, cs) => + Future.reduceLeft( + Seq( + flag, + showContainerStateIfSuccessful( + cs, + podName, + s"init/$podName (iteration=${lastPodEvent._object.metadata + .labels("iteration")})")))(_ || _) + } + delete2 <- s.containerStatuses + .foldLeft[Future[Boolean]](Future.successful(delete1)) { + case (flag, cs) => + Future.reduceLeft( + Seq(flag, + showContainerStateIfSuccessful( + cs, + podName, + s"$podName (iteration=${lastPodEvent._object.metadata + .labels("iteration")})")))(_ || _) + } + } yield delete2 + } + } + + def main( + args: Array[String] + ): Unit = { + + implicit val as: ActorSystem = ActorSystem("PiJobsSequential") + implicit val ec: ExecutionContext = as.dispatcher + implicit val mat: ActorMaterializer = ActorMaterializer() + val sconfig: skuber.api.Configuration = defaultK8sConfig + val aconfig: Config = ConfigFactory.load() + implicit val k8s: KubernetesClient = + k8sInit(config = sconfig, appConfig = aconfig) + + val watchContinuouslyRequestTimeout: Duration = getSkuberConfig( + aconfig, + "watch-continuously.request-timeout", + durationFomConfig(aconfig), + 30.seconds) + + val deletionMonitorRepeatDelay: FiniteDuration = 1.second + + def metadata(n: Int) = + ObjectMeta(name = "pi", + labels = Map("job-kind" -> "piTest", "iteration" -> s"$n")) + val labelSelector = LabelSelector( + LabelSelector.IsEqualRequirement("job-kind", "piTest")) + + val jobs = Seq.tabulate[Job](n = 10) { n => + if (n % 3 == 0) { + // simulate a job failure + val piContainer = Container(name = "pi", + image = "nowhere/does-not-exist:latest", + command = List("/bin/bash"), + args = List("-c", "env")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } else { + val piContainer = Container( + name = "pi", + image = "perl", + command = + List("perl", "-Mbignum=bpi", "-wle", s"print bpi(${n * 10})")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } + } + val (firstJob, otherJobs) = (jobs.head, jobs.tail) + + val f: Future[Unit] = for { + + // First run: create a pool. + (pool, hcp, podEvent) <- k8s.executeJobAndWaitUntilDeleted( + firstJob, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + None) + + // Subsequent runs: reuse the same pool. + _ <- Source + .fromIterator(() => otherJobs.toIterator) + .mapAsync(parallelism = 1) { job: Job => + k8s.executeJobAndWaitUntilDeleted(job, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + Some(pool)) + } + .runForeach(_ => ()) + + // Shutdown the pool, if any. + _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) + + } yield () + + // Wait until done and shutdown k8s & akka. + Await.result(f.flatMap { _ => + k8s.close + as.terminate().map(_ => ()) + }, Duration.Inf) + + } +} diff --git a/project/build.properties b/project/build.properties index 7b6213bd..c0bab049 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.0.1 +sbt.version=1.2.8