Skip to content

Commit

Permalink
add CLIMain
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-butcher committed Oct 23, 2024
1 parent 987bddd commit 218f69e
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,3 +1,58 @@
package weco.pipeline.batcher object CLIMain {
package weco.pipeline.batcher
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.IOResult
import org.apache.pekko.stream.scaladsl.{
Flow,
Framing,
Sink,
Source,
StreamConverters
}
import org.apache.pekko.util.ByteString

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

object CLIMain extends App {
implicit val actorSystem: ActorSystem =
ActorSystem("main-actor-system")
implicit val ec: ExecutionContext =
actorSystem.dispatcher

println("hello")
val stdinSource: Source[ByteString, Future[IOResult]] =
StreamConverters.fromInputStream(() => System.in)

val lineDelimiter: Flow[ByteString, ByteString, NotUsed] =
Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true
)
val toStringFlow: Flow[ByteString, String, NotUsed] =
Flow[ByteString].map(_.utf8String)

val pathsProcessorFlow: Flow[Seq[String], Future[Seq[Long]], NotUsed] =
Flow[Seq[String]].map {
paths: Seq[String] =>
PathsProcessor(
40, // TODO: 40 is the number in the config used by Main, do this properly later
paths.toList,
STDIODownstream
)
}

stdinSource
.via(lineDelimiter)
.via(toStringFlow)
// this number is pretty arbitrary, but grouping of some kind is needed in order to
// provide a list to the next step, rather than individual paths
.grouped(10000)
.via(pathsProcessorFlow)
.runWith(Sink.seq)
actorSystem.terminate()
private object STDIODownstream extends Downstream {
override def notify(batch: Batch): Try[Unit] = Try(println(batch))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
A/B/C/D
A/B
E/F/G
F/G/H
Z
Y/Z

0 comments on commit 218f69e

Please sign in to comment.