You can compile the project with sbt:
scala-effects> sbt
sbt> project supermarket
sbt> compile
Our aim is to code up a stream that will represent shoppers paying at two checkouts, one of which is for fast shoppers only. You can see the illustration in supermarket/supermarket.png
for a rough idea of the input and output streams.
You can see and edit a diagram here.
Take a look at the App
in the threading
project. Check that you can compile and run it.
sbt
sbt> project threading
sbt> compile
Note: that the supermarket
project will fail to compile. Please only compile threading
.
The app can run two different kinds of work, found in the Work
object.
-
How long does the
writeToTheDatabase
work take to run? -
How long does the
calculateHash
work take to run?You will need to edit the following line of code:
Work.time(Work.writeToTheDatabase)
Find out the number of available processors on your computer:
- Enter the SBT console
sbt
sbt> project threading
sbt> console
scala> Runtime.getRuntime().availableProcessors()
val res0: Int = 16 // This is the number of available processors
-
How long does it take to run the app with this number of threads?
// In App.scala override def runtime: unsafe.IORuntime = Setup.createBasicRuntime(Setup.bounded("global", 16))
-
What about twice this number?
-
What about half this number?
The evalOn
function allows us to execute an IO
on a different thread pool (an ExecutionContext
is another name for a thread pool).
-
Take a look at the new
Work.factorial
function. Time it and see how long it takes:// In App.scala def run: IO[Unit] = Work.time(factorial)
If it takes less than a second, increase the
2000000000L
number within the function.What is printed as the thread name?
-
Now execute it on the
scala.concurrent.ExecutionContext.global
// In App.scala def run: IO[Unit] = Work.time(factorial.evalOn(scala.concurrent.ExecutionContext.global))
What is printed as the thread name?
-
Take a look at
writeToTheDatabase
. It now queries postgres. -
Run docker with
docker compose up -d
. This should start a postgres container. -
Run the application with
sbt
. Check that you can connect to postgres. How long does the query take?
The app runs a single writeToTheDatabase
task. It has:
- an unbounded blocking thread pool as part of
IORuntime
- a bounded compute pool
- an
ec
threadpool with a single thread that is passed to hikari.
-
Predict which threads will be blocked when running the app.
-
Run the app. In the session, we will profile this with visualvm to check your results.
The snooze
task sleeps a thread for 100 seconds.
Consider a factorial task followed by a snooze task:
Work.factorial >> Work.snooze
- If many of these tasks are run in parallel, predict how many factorials will be computed in the first 30 seconds.
Work.doLotsOf(Work.time(Work.factorial) >> Work.snooze)
- Run the app. In the session, we will profile this with visualvm to check your results.
This exercise explores the thread pool used by Hikari.
The hikari threadpool is configured with a single thread. There are only three connections allowed at once (the maximumPoolSize
is 3
). There is a connection timeout of two seconds.
-
Consider:
Work.doLotsOf(Work.handleError(Work.writeToTheDatabase(transactor)))
What errors do you expect to be printed to the console and when?
-
Consider configuring the thread pool with two threads:
val ecResource: Resource[IO, ExecutionContext] = ExecutionContexts.fixedThreadPool[IO](2)
What do you expect to be printed to the console and when?
The threading project now contains a HttpApp
.
- Start the app with
run
. - Query the app with
./work.sh 1
. How many factorial tasks do you expect to run?
The HttpApp
runs items of work
.
-
Start the app with
sbt run
. -
Query the app with
./work.sh 5
. How many work items do you expect to run concurrently? -
Consider the route:
case GET -> Root / "work" => work >> IO.println("Wrote to the db") >> Ok("Wrote to the db\n")
Modify
work
towork.start
. Query the app again with./work.sh 5
. How many work items do you expect to run concurrently?
The app now has two endpoints: sync-work
and async-work
.
- Start the app with
sbt run
.
You can call the endpoints with the shell script, e.g: ./work.sh sync-work 4
.
2. Consider the difference between the sync-work
and async-work
endpoints.
- How do they behave on failure? The fourth request made will fail due to a connection timeout.
- What status codes do they respond with?
- In both cases, how do they schedule work?
The app now has two endpoints under work
.
- The
POST
endpoint starts an async task. - The
GET
endpoint checks its status.
- Think about the code needed to properly implement these endpoints. Draw a rough diagram of the design in Excelidraw (or your preferred tool).
We'll begin today's session by mobbing on a design.
The app has some stubbed code under the work
endpoint.
for {
taskId <- Work.randomUUID
_ <- Work.queueTask(taskId)
_ <- Work.recordTask(taskId)
result <- Ok(taskId.toString)
} yield result
This queues a task (e.g. by sending it a kafka topic) and records it in some data store.
- What possible states can a task be in? You can consider "queued" and "running" to be states.
- What happens if
queueTask
succeeds, butrecordTask
fails? - Can
recordTask
ever succeed ifqueueTask
fails?
The messageQueue
project consumes messages (from kafka, for example), processes them and commits the offset.
Take a look at the processMessages
function.
- Can it ever commit an offset for a task before the task has been processed?
- Can it ever process a task more than once? Consider the case of application failure and restarts.
Consider processMessages
. It processes each message sequentially.
- Could we use
parEvalMap
to process these messages? - What would the consequences be of using
parEvalMapUnordered
?
In this session, we'll take a look at error handling. The code has been amended such that the message time is an Int
.
- Should it be possible for the user to submit negative times? If so, would you expect an error?
- What possible errors can occur when querying the database? For each error, consider whether we should recover from it.
We'll explore error handling with the egg
project.
-
Run the
FryEggApp
:sbt sbt> project egg sbt:egg> run
You should see an exception being thrown indicating
"The yolk broke during frying"
. -
Read through the
FryCook.fry
function to get a gist of what it does. -
Take a look at the
cookWithPower
function.- What is the difference between throwing an exception and returning a value?
- Is this a pure function? If not, how could we make it pure?
- Take a look at the cats API docs for
ApplicativeError
. In particular, look at thehandleError
andrecover
functions. - The
crack
andcook
functions capture errors in anIO
: either function may fail. Consider how you can use the functions onApplicativeError
to perform the following tasks:
- If the yolk is broken during cooking, return a scrambled egg instead
- If the egg is rotten, crack another egg
- If there are any errors, print "Sorry! Something wen't wrong."
In this session, we'll take a look at error handling and scopes.
For reference, here is our current implementation of fry
:
def fry(power: Ref[IO, Boolean], eggBox: Queue[IO, RawEgg]): IO[CookedEgg] = {
crack(eggBox).flatMap { egg =>
cook(power)(egg)
.recover { case YolkIsBroken => CookedEgg.Scrambled }
}.handleErrorWith(_ => fry(power, eggBox))
}
- Consider the following implementation of
fry
, paying attention to the position of therecover
function. Is the implementation correct?:
def fry(power: Ref[IO, Boolean], eggBox: Queue[IO, RawEgg]): IO[CookedEgg] = {
crack(eggBox).flatMap { egg =>
cook(power)(egg)
}.recover { case YolkIsBroken => CookedEgg.Scrambled }
.handleErrorWith(_ => fry(power, eggBox))
}
- What about the following implementation, paying attention to
handleErrorWith
?
def fry(power: Ref[IO, Boolean], eggBox: Queue[IO, RawEgg]): IO[CookedEgg] = {
crack(eggBox).flatMap { egg =>
cook(power)(egg)
.recover { case YolkIsBroken => CookedEgg.Scrambled }
.handleErrorWith(_ => fry(power, eggBox))
}
}
- What about the following implementation?
def fry(power: Ref[IO, Boolean], eggBox: Queue[IO, RawEgg]): IO[CookedEgg] = {
crack(eggBox).flatMap { egg =>
cook(power)(egg)
}
.handleErrorWith(_ => fry(power, eggBox))
.recover { case YolkIsBroken => CookedEgg.Scrambled }
}
In this session, we'll experiment with the order in which we handle errors.
For reference, here is our current implementation of fry
:
def fry(power: Ref[IO, Boolean], eggBox: Queue[IO, RawEgg]): IO[CookedEgg] = {
crack(eggBox).flatMap { egg =>
cook(power)(egg) // Previous position of `recover` handler
}
.recover { case YolkIsBroken => CookedEgg.Scrambled } // Current position
.handleErrorWith(_ => fry(power, eggBox))
}
We saw that moving the recover
handler did not change the behaviour.
- What about the following implementation? Are
YolkIsBroken
exceptions handled in the same way?
def fry(power: Ref[IO, Boolean], eggBox: Queue[IO, RawEgg]): IO[CookedEgg] = {
crack(eggBox).flatMap { egg =>
cook(power)(egg)
}
.handleErrorWith(_ => fry(power, eggBox))
.recover { case YolkIsBroken => CookedEgg.Scrambled }
}
- What about the following implementation, paying attention to the position of
handleErrorWith
? AreRottenEgg
exceptions still handled in the same way?
def fry(power: Ref[IO, Boolean], eggBox: Queue[IO, RawEgg]): IO[CookedEgg] = {
crack(eggBox).flatMap { egg =>
cook(power)(egg)
.recover { case YolkIsBroken => CookedEgg.Scrambled }
.handleErrorWith(_ => fry(power, eggBox))
}
}
Take a look at the numbers
project:
sbt
> project numbers
> compile
> test
- Run the code with
sbt run
- Test the code with
sbt test
- You'll see some tests in
NumbersTest
that are failing. How can you use thehandleError
functions to implement the correct behaviour?