Skip to content

cats-effect

  • .pure doesn't suspend computation, like .delay does
  • IO.apply === IO.delay
  • use .flatTap(IO.println) to print the results before returning them
  • add .as(ExitCode.Success) to and IO to exit with success
  • extension method: .pure[IO] to lift any type to IO
  • .void turns any IO[A] to IO[Unit]
  • .memoize: IO[IO[A]] returns a nested IO that is not evaluated, but can be flatMaped to get a value if needed
    • This is an effect-full equivalent of lazy val
  • .foreverM, repeat the value forever
  • .timeoutTo(dur: FiniteDuration, fallback: IO[A]): IO[A], runs for max of dur, returns results if completes else the fallback

Fibers

  • fiber is cats concurrency primitive
  • created using IO.start
  • optionally specify,
    • onCancel: code block to run on cancellation: IO.onCancel(action: F[A]).start
    • timeout: e.g. IO.timeout(500.millis).onCancel(...).start()
    • timeoutTo: specify alternate action to run IO.timeoutTo(500.millis, planB).start
  • IO.never returns a fiber that never returns
  • IO.foreverM: loops infinitely running the same task
  • IO.background: creates a resource that exposes surround method that runs a thunk in a task and terminates
  • fibers release all resources even if they are cancelled

Fiber methods

  • join: wait for completion
    • returns Outcome
  • joinWith(onCancel: F[A]) allows a fallback computation in case a fiber is cancelled
  • cancel: cancel a running fiber

Parallelism and Concurrency

  • parMapN, parTraverse, parSequence
  • IO.race(l: IO[A}, r: IO[B]): IO[Either[A, B]], run two IOs in parallel, returning the winner and cancelling the loser
  • .both(b: IO[B]): IO[(A, B)]: run both IOs concurrently, and returns values as tuple

Async

  • IO.async_: passes a call-back that the called function uses to convey the result of Future
  • IO.fromFuture: provides a convenience method over IO.async_
  • evalOn

Resource Safety

  • .bracket(use)(release) for single resource
  • Resource class: easier for handling acquiring/releasing multiple resources
    val in = Resource.make[F, A](acquire: F[A])(close: A => F[Unit])
    val out = Resource.fromAutoCloseable[F, B](acquire: F[B])  // convenience method for Java AutoCloseable
    val inOut = in.flatMap(i => out.map(o => (i, o)))  // compose multiple resources, released in reverse order
    
    inOut.use(f: (A, B) => F[Unit])
    

Error handling

  • .handleError and .handleErrorWith handles all errors, without and with effect respectively
  • .recover and .recoverWith handles some errors, without and with effect respectively
  • .rethrow and .attempt convert Either[Throwable, A] to an error and back respectively
  • .handleErrorWith(f: (E) => F[A])
    • Tip: use NonFatal to ignore handling any fatal errors (such as internal JVM errors)
      import scala.util.control.NonFatal
      result.handleErrorWith {
          case NonFatal(e) => Response("Internal Error").pure[IO]
      }
      

Clock

  • IO.realTime: IO[Scala.concurrent.duration.FiniteDuration] time at expression evaluation
  • IO.monotonic: nanoseconds precision, for measuring

Ref

  • Backed by Java's AtomicRef
  • Cannot be empty, must always contain exactly one value
  • methods: .get: A, .set(a: A): Unit, .update(f: A => A): Unit and .modify(f: A => (A, B)): B
  • Ref.of(a: A): F[Ref[F[A]]]: creating a Ref itself is an effect-full operation

Deferred

  • has semantics of promise
  • can be empty
  • .get will block if empty

Queue

  • FIFO,
  • .take: F[A], .tryTake: F[Option[A]]: get element at front of the queue, with/without blocking
  • .offer(a: A): Unit, .tryOffer(a: A): F[Boolean]: push element at back of the queue, with/without blocking
  • Queue.unbounded[F, A]: F[Queue[F, A]]: create a queue that never gets full
  • Queue.bounded[F, A](Int): F[Queue[F, A]]: queue with a bounded capacity
    • queues with capacity 0, can be useful for synchronization, it's as if producer directly pass event to consumer

Parallel

  • is a type-class that provides natural transformations between monad and applicative
  • parSequence, parTraverse, parMapN are all defined for monads (such as IO) that implement Parallel

Console

  • provides println as: Console[F].println(user.show)
  • import cats.effect.std.Console

Thread pools

  • provided by java.util.concurrent.Executors.new{FixedThread,Cached,WorkStealing}ThreadPool() class factory methods
  • factory methods return ExecutorService which provide APIs for submitting new work
  • typical steps:
    1. allocate pool
    2. .submit new work
    3. .shutdown() prevents any new work from being accepted
    4. .awaitTermination() wait for all executing and waiting work to be completed

ExecutionContext

  • a thin wrapper over ExecutorService
  • ExecutionContext.fromExecutorService(es: ExecutorService): ExecutionContext
  • default ExecutionContext uses work-stealing thread pool
  • IOApp provides two main pools, compute and blocking

Fixed thread pools

  • fixed number of workers/threads

Cached thread pools

  • synchronous queue when new work comes in, and an idle thread isn't available, a new one is created
  • idle (default 60 seconds) threads are removed
  • default is high (thousands)
  • ideal for blocking work

work-stealing thread pools:

  • in addition to the main submission queue, each worker has an internal queue which is filled from the main queue
  • if a worker is done with its own tasks, and no other tasks available in the main queue, it will steal work from other workers' queue from the end
  • default size = number of cores
  • ideal for computational work (blocked task wastes a thread)

Type classes

trait Sync[F[_]]:
    def delay[A](thunk: => A): F[A]
    def blocking[A](thunk: => A): F[A]

trait MonadCancel[F[_], E]:
    def bracket[A,B](acquire: F[A])(use: A => F[B])(release: A => F[Unit])

trait GenConcurrent[F[_], E]:  // type Concurrent[F[_]] = GenConcurrent[F[_], Throwable]
    def ref[A](a: A): F[Ref[F, A]]
    def deferred[A]: F[Deferred[F, A]]
    def memoize[A](fa: F[A]): F[F[A]]

trait GenSpawn[F[_], E]:
    def race[A,B](fa: F[A], fb: F[B]): F[Either[A,B]]

trait Async[F[_]]:
    def async_[A](cb: (Either[Throwable, A] => Unit) => Unit): F[A]
    def executionContext: F[ExecutionContext]
    def evalOn[A](fa: F[A], ec: ExecutionContext): F[A]

trait GenTemporal[F[_], E]:  // Temporal[F] = GenTemporal[F, Throwable]
    def sleep(time: FiniteDuration): F[Unit]
    def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A]

trait Clock[F[_]]:
    def monotonic: F[FiniteDuration]
    def realTime: F[FiniteDuration]