cats-effect
.pure doesn't suspend computation, like .delay does
IO.apply === IO.delay
- use
.flatTap(IO.println) to print the results before returning them
- add
.as(ExitCode.Success) to and IO to exit with success
- extension method:
.pure[IO] to lift any type to IO
.void turns any IO[A] to IO[Unit]
.memoize: IO[IO[A]] returns a nested IO that is not evaluated, but can be flatMaped to get a value if needed
- This is an effect-full equivalent of
lazy val
.foreverM, repeat the value forever
.timeoutTo(dur: FiniteDuration, fallback: IO[A]): IO[A], runs for max of dur, returns results if completes else the fallback
Fibers
- fiber is cats concurrency primitive
- created using
IO.start
- optionally specify,
onCancel: code block to run on cancellation: IO.onCancel(action: F[A]).start
timeout: e.g. IO.timeout(500.millis).onCancel(...).start()
timeoutTo: specify alternate action to run IO.timeoutTo(500.millis, planB).start
IO.never returns a fiber that never returns
IO.foreverM: loops infinitely running the same task
IO.background: creates a resource that exposes surround method that runs a thunk in a task and terminates
- fibers release all resources even if they are cancelled
Fiber methods
join: wait for completion
joinWith(onCancel: F[A]) allows a fallback computation in case a fiber is cancelled
cancel: cancel a running fiber
Parallelism and Concurrency
parMapN, parTraverse, parSequence
IO.race(l: IO[A}, r: IO[B]): IO[Either[A, B]], run two IOs in parallel, returning the winner and cancelling the loser
.both(b: IO[B]): IO[(A, B)]: run both IOs concurrently, and returns values as tuple
Async
IO.async_: passes a call-back that the called function uses to convey the result of Future
IO.fromFuture: provides a convenience method over IO.async_
evalOn
Resource Safety
.bracket(use)(release) for single resource
Resource class: easier for handling acquiring/releasing multiple resources
val in = Resource.make[F, A](acquire: F[A])(close: A => F[Unit])
val out = Resource.fromAutoCloseable[F, B](acquire: F[B]) // convenience method for Java AutoCloseable
val inOut = in.flatMap(i => out.map(o => (i, o))) // compose multiple resources, released in reverse order
inOut.use(f: (A, B) => F[Unit])
Error handling
.handleError and .handleErrorWith handles all errors, without and with effect respectively
.recover and .recoverWith handles some errors, without and with effect respectively
.rethrow and .attempt convert Either[Throwable, A] to an error and back respectively
.handleErrorWith(f: (E) => F[A])
- Tip: use
NonFatal to ignore handling any fatal errors (such as internal JVM errors)
import scala.util.control.NonFatal
result.handleErrorWith {
case NonFatal(e) => Response("Internal Error").pure[IO]
}
Clock
IO.realTime: IO[Scala.concurrent.duration.FiniteDuration] time at expression evaluation
IO.monotonic: nanoseconds precision, for measuring
Ref
- Backed by Java's
AtomicRef
- Cannot be empty, must always contain exactly one value
- methods:
.get: A, .set(a: A): Unit, .update(f: A => A): Unit and .modify(f: A => (A, B)): B
Ref.of(a: A): F[Ref[F[A]]]: creating a Ref itself is an effect-full operation
Deferred
- has semantics of promise
- can be empty
.get will block if empty
Queue
- FIFO,
.take: F[A], .tryTake: F[Option[A]]: get element at front of the queue, with/without blocking
.offer(a: A): Unit, .tryOffer(a: A): F[Boolean]: push element at back of the queue, with/without blocking
Queue.unbounded[F, A]: F[Queue[F, A]]: create a queue that never gets full
Queue.bounded[F, A](Int): F[Queue[F, A]]: queue with a bounded capacity
- queues with capacity 0, can be useful for synchronization, it's as if producer directly pass event to consumer
Parallel
- is a type-class that provides natural transformations between monad and applicative
parSequence, parTraverse, parMapN are all defined for monads (such as IO) that implement Parallel
Console
- provides
println as: Console[F].println(user.show)
import cats.effect.std.Console
Thread pools
- provided by
java.util.concurrent.Executors.new{FixedThread,Cached,WorkStealing}ThreadPool() class factory methods
- factory methods return
ExecutorService which provide APIs for submitting new work
- typical steps:
- allocate pool
.submit new work
.shutdown() prevents any new work from being accepted
.awaitTermination() wait for all executing and waiting work to be completed
ExecutionContext
- a thin wrapper over
ExecutorService
ExecutionContext.fromExecutorService(es: ExecutorService): ExecutionContext
- default
ExecutionContext uses work-stealing thread pool
IOApp provides two main pools, compute and blocking
Fixed thread pools
- fixed number of workers/threads
Cached thread pools
- synchronous queue when new work comes in, and an idle thread isn't available, a new one is created
- idle (default 60 seconds) threads are removed
- default is high (thousands)
- ideal for blocking work
work-stealing thread pools:
- in addition to the main submission queue, each worker has an internal queue which is filled from the main queue
- if a worker is done with its own tasks, and no other tasks available in the main queue, it will steal work from other workers' queue from the end
- default size = number of cores
- ideal for computational work (blocked task wastes a thread)
Type classes
trait Sync[F[_]]:
def delay[A](thunk: => A): F[A]
def blocking[A](thunk: => A): F[A]
trait MonadCancel[F[_], E]:
def bracket[A,B](acquire: F[A])(use: A => F[B])(release: A => F[Unit])
trait GenConcurrent[F[_], E]: // type Concurrent[F[_]] = GenConcurrent[F[_], Throwable]
def ref[A](a: A): F[Ref[F, A]]
def deferred[A]: F[Deferred[F, A]]
def memoize[A](fa: F[A]): F[F[A]]
trait GenSpawn[F[_], E]:
def race[A,B](fa: F[A], fb: F[B]): F[Either[A,B]]
trait Async[F[_]]:
def async_[A](cb: (Either[Throwable, A] => Unit) => Unit): F[A]
def executionContext: F[ExecutionContext]
def evalOn[A](fa: F[A], ec: ExecutionContext): F[A]
trait GenTemporal[F[_], E]: // Temporal[F] = GenTemporal[F, Throwable]
def sleep(time: FiniteDuration): F[Unit]
def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A]
trait Clock[F[_]]:
def monotonic: F[FiniteDuration]
def realTime: F[FiniteDuration]