cats-effect¶
.puredoesn't suspend computation, like.delaydoesIO.apply===IO.delay- use
.flatTap(IO.println)to print the results before returning them - add
.as(ExitCode.Success)to and IO to exit with success - extension method:
.pure[IO]to lift any type to IO .voidturns anyIO[A]toIO[Unit].memoize: IO[IO[A]]returns a nestedIOthat is not evaluated, but can beflatMaped to get a value if needed- This is an effect-full equivalent of
lazy val .foreverM, repeat the value forever.timeoutTo(dur: FiniteDuration, fallback: IO[A]): IO[A], runs for max ofdur, returns results if completes else the fallback
Fibers¶
- fiber is cats concurrency primitive
- created using
IO.start - optionally specify,
onCancel: code block to run on cancellation:IO.onCancel(action: F[A]).starttimeout: e.g.IO.timeout(500.millis).onCancel(...).start()timeoutTo: specify alternate action to runIO.timeoutTo(500.millis, planB).startIO.neverreturns a fiber that never returnsIO.foreverM: loops infinitely running the same taskIO.background: creates a resource that exposessurroundmethod that runs a thunk in a task and terminates- fibers release all resources even if they are cancelled
Fiber methods¶
join: wait for completion- returns
Outcome joinWith(onCancel: F[A])allows a fallback computation in case a fiber is cancelledcancel: cancel a running fiber
Parallelism and Concurrency¶
parMapN,parTraverse,parSequenceIO.race(l: IO[A}, r: IO[B]): IO[Either[A, B]], run two IOs in parallel, returning the winner and cancelling the loser.both(b: IO[B]): IO[(A, B)]: run both IOs concurrently, and returns values as tuple
Async¶
IO.async_: passes a call-back that the called function uses to convey the result ofFutureIO.fromFuture: provides a convenience method overIO.async_evalOn
Resource Safety¶
.bracket(use)(release)for single resourceResourceclass: easier for handling acquiring/releasing multiple resources
val in = Resource.make[F, A](acquire: F[A])(close: A => F[Unit])
val out = Resource.fromAutoCloseable[F, B](acquire: F[B]) // convenience method for Java AutoCloseable
val inOut = in.flatMap(i => out.map(o => (i, o))) // compose multiple resources, released in reverse order
inOut.use(f: (A, B) => F[Unit])
Error handling¶
.handleErrorand.handleErrorWithhandles all errors, without and with effect respectively.recoverand.recoverWithhandles some errors, without and with effect respectively.rethrowand.attemptconvertEither[Throwable, A]to an error and back respectively.handleErrorWith(f: (E) => F[A])- Tip: use
NonFatalto ignore handling any fatal errors (such as internal JVM errors)
import scala.util.control.NonFatal
result.handleErrorWith {
case NonFatal(e) => Response("Internal Error").pure[IO]
}
Clock¶
IO.realTime: IO[Scala.concurrent.duration.FiniteDuration]time at expression evaluationIO.monotonic: nanoseconds precision, for measuring
Ref¶
- Backed by Java's
AtomicRef - Cannot be empty, must always contain exactly one value
- methods:
.get: A,.set(a: A): Unit,.update(f: A => A): Unitand.modify(f: A => (A, B)): B Ref.of(a: A): F[Ref[F[A]]]: creating aRefitself is an effect-full operation
Deferred¶
- has semantics of promise
- can be empty
.getwill block if empty
Queue¶
- FIFO,
.take: F[A],.tryTake: F[Option[A]]: get element at front of the queue, with/without blocking.offer(a: A): Unit,.tryOffer(a: A): F[Boolean]: push element at back of the queue, with/without blockingQueue.unbounded[F, A]: F[Queue[F, A]]: create a queue that never gets fullQueue.bounded[F, A](Int): F[Queue[F, A]]: queue with a bounded capacity- queues with capacity 0, can be useful for synchronization, it's as if producer directly pass event to consumer
Parallel¶
- is a type-class that provides natural transformations between monad and applicative
parSequence,parTraverse,parMapNare all defined for monads (such asIO) that implementParallel
Console¶
- provides
printlnas:Console[F].println(user.show) import cats.effect.std.Console
Thread pools¶
- provided by
java.util.concurrent.Executors.new{FixedThread,Cached,WorkStealing}ThreadPool()class factory methods - factory methods return
ExecutorServicewhich provide APIs for submitting new work -
typical steps:
-
allocate pool
.submitnew work.shutdown()prevents any new work from being accepted.awaitTermination()wait for all executing and waiting work to be completed
ExecutionContext¶
- a thin wrapper over
ExecutorService ExecutionContext.fromExecutorService(es: ExecutorService): ExecutionContext- default
ExecutionContextuses work-stealing thread pool IOAppprovides two main pools, compute and blocking
Fixed thread pools¶
- fixed number of workers/threads
Cached thread pools¶
- synchronous queue when new work comes in, and an idle thread isn't available, a new one is created
- idle (default 60 seconds) threads are removed
- default is high (thousands)
- ideal for blocking work
work-stealing thread pools¶
- in addition to the main submission queue, each worker has an internal queue which is filled from the main queue
- if a worker is done with its own tasks, and no other tasks available in the main queue, it will steal work from other workers' queue from the end
- default size = number of cores
- ideal for computational work (blocked task wastes a thread)
Type classes¶
trait Sync[F[_]]:
def delay[A](thunk: => A): F[A]
def blocking[A](thunk: => A): F[A]
trait MonadCancel[F[_], E]:
def bracket[A,B](acquire: F[A])(use: A => F[B])(release: A => F[Unit])
trait GenConcurrent[F[_], E]: // type Concurrent[F[_]] = GenConcurrent[F[_], Throwable]
def ref[A](a: A): F[Ref[F, A]]
def deferred[A]: F[Deferred[F, A]]
def memoize[A](fa: F[A]): F[F[A]]
trait GenSpawn[F[_], E]:
def race[A,B](fa: F[A], fb: F[B]): F[Either[A,B]]
trait Async[F[_]]:
def async_[A](cb: (Either[Throwable, A] => Unit) => Unit): F[A]
def executionContext: F[ExecutionContext]
def evalOn[A](fa: F[A], ec: ExecutionContext): F[A]
trait GenTemporal[F[_], E]: // Temporal[F] = GenTemporal[F, Throwable]
def sleep(time: FiniteDuration): F[Unit]
def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A]
trait Clock[F[_]]:
def monotonic: F[FiniteDuration]
def realTime: F[FiniteDuration]
log4cats¶
- logger objects can be either be created onece and passed around, or pass the factory object around and each classes/package creates its own logger object
| Feature | Passing a Logger | Creating New Loggers (via Factory) |
|---|---|---|
| Simplicity | High (Just pass one object) | Moderate (Needs a factory + init call) |
| Boilerplate | Minimal | Slightly more |
| Diagnostic Utility | Low (Logs look like they come from one place) | High (Logs show exact class/layer origin) |
| Best For | Small scripts, CLI tools | Large microservices, libraries |
import cats.effect.{IO, IOApp}
import cats.syntax.all.*
import org.typelevel.log4cats.*
import org.typelevel.log4cats.slf4j.Slf4jFactory
// --- 1. THE DATA LAYER ---
// This layer only needs a LoggerFactory to create its specific logger.
class UserRepository[F[_]: LoggerFactory: Monad]:
private val logger = LoggerFactory[F].getLogger // Auto-names by class name
def findUser(id: String): F[Unit] =
logger.info(s"Fetching user $id from database...")
// --- 2. THE BUSINESS LAYER ---
// We inject the factory into the constructor.
class UserService[F[_]: LoggerFactory: Monad](repo: UserRepository[F]):
private val logger = LoggerFactory[F].getLogger
def processUser(id: String): F[Unit] =
for
_ <- logger.info(s"Beginning processing for user: $id")
_ <- repo.findUser(id)
_ <- logger.info("Processing complete.")
yield ()
// --- 3. THE ENTRY POINT (THE EDGE) ---
object Main extends IOApp.Simple:
def run: IO[Unit] =
// We create the factory ONCE using Sync[IO]
implicit val logging: LoggerFactory[IO] = Slf4jFactory.create[IO]
// Initialize services by passing the factory through
val repo = new UserRepository[IO]
val service = new UserService[IO](repo)
service.processUser("user-123")