Skip to content

fs2

Pure Streams

  • create finite streams: apply, empty, emit, emits, range
  • create infinite: iterate, unfold, constant
  • Stream[F, A]#chunks => Stream[F, Chunk[A]] convert a stream of types to chunked stream
  • Stream[F, Chunk[A]].flatMap(c => Stream.chunk(c)) => Stream[F, A] revert the chunked version to stream of type
  • building a stream
  • emit(_: A), emits(_: Seq[A]) produce Stream[Nothing, A]
  • eval(_: F[A]) => Stream[F, A]
  • infinite stream: Stream.constant(42), Stream.awakeEvery[IO].(1.second), Stream.iterate(1.0)(_ * 2)
  • combinators: map, filter, flatMap
  • ++: appends second stream to the first stream
  • drain: discard all elements
  • find(O => Boolean): emits first element that matches
  • head, take(n), interruptAfter(10.minutes), interruptWhen(<condition>)
  • zip, st1.zipWith(st2)(fn), zipLeft/Right: like zip, but keep only left/right stream
  • compile: a projection of a stream that allows it to be converted to other types; e.g. stream.compile.toList
  • retries: Stream.retry(action, delay = 1.second, nextDelay = _ * 2, maxAttempts = 3)

Effectful Streams

  • create: eval, covary, iterateEval, unfoldEval
  • covary can convert not only pure streams, but any effect into another effect
  • exec, like eval but doesn't produce any element
  • fromIterator is effectful because an Iterator maintains a state
    • Stream.unfold can seeminly produce a pure stream, but it is unsafe because the function Iterator#next mutates hidden state
  • must be compiled
  • can be compiled to a value .compile.toList or just for the effect .compile.drain
  • combinators: evalMap, evalTap, evalFilter, ++
  • transformation: map(A => B), flatMap(A => Stream[F, B]), evalMap()
  • Error handling: raiseError, handleErrorWith
  • Resource handling: bracket, resource, fromAutoCloseable
val file: Stream[IO, BufferedReader] = Stream.bracket {
 IO {
   new BufferedReader(new FileReader(new File("./build.sbt")))
 }
}(f => IO(f.close())

file.flatMap { reader =>
 Stream
   .eval(IO(Option(reader.readLine())))
   .repeat
   .unNoneTerminate
}.map(_.length) // returns an integer for each line read from the file

Timer combinators

  • timeout(1.second): throws an exception if the stream isn't consumed in 1 second
  • interruptAfter(1.second): stops stream from emitting any more data after 1 second
  • delayBy(2.second): delays emitting the first element by 2 seconds
  • metered: delays every emit. Delay is measured from previous delay, i.e. it's not affected by processing time taken by actual emit
  • debounce(1.second): sampling, emit only at intervals

Resource Management

val acq = IO.blocking(new BufferReader(new FileReader("input.csv")))
val rel = (r: BufferReader) => IO.blocking(r.close())
val res = Resource.make(acq)(rel)

val read = (r: BufferReader) => Stream.repeatEval(IO.blocking(r.readLine())).takeWhile(_ != null)

Stream.bracket(acq)(rel).flatMap(read)
Stream.resource(res).flatMap(read)
Stream.fromAutoCloseable(acq).flatMap(read)

Error Handling

  • Stream implements MonadError
  • raiseError[F] causes failure and rest of the stream to be ignored. Any side-effects from earlier are evaluated
  • handleErrorWith causes stream to stop and rest of the elements discarded.
  • attempt catches error and converts to a Left, any previous elements are returned as Right

Pipeline

Chunk

  • internal buffer implementation for performance, fast concat, fast indexing, avoid copying, list-like interface
  • constructors: apply, singleton, array, empty
  • combinators: map, flatMap, filter, take
  • methods:
  • size: total number of elements in the chunk
  • copyToArray: replace existing elements of an array from chunk
  • compact: physically combine all contained chunks into a single chunk

Pull

  • represent process for stream transformations, including stateful
  • has 3 type parameters Pull[F, A, R], F: effect, A element type, R is result
  • Pull is a monad over its result type
  • constructors:
  • output1: accepts a single element, with Unit as result type
  • pure[A]: emits nothing, A is result type
  • output: accepts a Chunk, Unit is result type
  • done: indicates end of data
  • Stream.pull: converts a Stream to ToPull (an intermediate type before finally converting to Pull)
  • methods:
  • stream: converts Pull to a Stream
  • scanChunksOpt: like fold but at Chunk level and can return None if remainder of the Stream needs to be discarded
  • converting streams to ToPull using s.pull; ToPull is
  • ToPull methods:
  • echo: copies elements of a stream to a new Pull, returns Unit
  • take(5): copies 5 elements of a stream to a new Pull and returns Option[Stream[F, A]], which is a stream of rest of the elements
  • drop(5): same as take(5) except it drops the elements
  • uncons() : Option[(Chunk[A], Stream[F, A])]: returns the first chunk from the stream and rest of the stream
    • useful in building Pipe (Stream transformations)

Pipe

  • stream transformation function: of type Stream[F, A] => Stream[F, B]
  • typical recipe:
  • create as a function that takes a stream
  • create a pull
  • use uncons to get the first chunk and remainder (a stream)
  • process chunk
  • recursively process the remainder

Parallel Processing

  • st1.merge(st2): runs both streams in parallel and merges results as they become available from either streams
  • final stream fails when either stream fails
  • mergeHaltL mergeHaltR mergeHaltBoth variations stop the stream when either left, right or both streams are exhausted
  • st1.concurrently(st2) is similar to merge, except st1 is treated as dominant or primary, and st2 is secondary. values from st2 are obtained only if items from st1 are unavailable
  • use case: primary stream could do the processing and secondary stream could report progress
  • use case: implement independent consumer and producer using cats Queue
  • Stream(st1, st2, ...).parJoinUnbounded is generalized version of merge
  • .parJoin(n) runs max n streams at a time, degree of parallelism
  • parEvalMap: like evalMap
  • use case: a queue with multiple consumers and producers using cats.effect.Queue
def producer(id: Int, queue: Queue[IO, Int]): Stream[IO, Nothing] =
 Stream.repeatEval(queue.offer(id)).drain
def consumer(id: Int, queue: Queue[IO, Int]): Stream[IO, Nothing] =
 Stream.repeatEval(queue.take(id)).printlns

Stream.eval(Queue.unbounded[IO, Int]).flatMap { queue =>
 val ps = Stream.range(0, 4).map(id => producer(id, queue))  // 4 producers
 val cs = Stream.range(0, 6).map(id => consumer(id, queue))  // 6 consumers
 (ps ++ cs).parJoinUnbounded  // run all concurrently
}.interruptAfter(5.second).compile.drain
  • parZip: unlike zip is non-deterministic, i.e. it'll pull from whichever stream is ready and then wait for the other
  • parEvalMap: like evalMap but runs in parallel, still keeps the order of individual elements processed
  • Stream.fixedRate: produces a unit value at every fixed interval; generally used with zipRight to throttle another stream
  • can implement metered with fixRate and zipRight
  • Stream.fixedDelay: like fixedRate except the delay between two emits is constant; similar to spaced
  • awakeDelay and awakeEvery like fixedDelay and fixedRate except the emit the time-interval in nano-second

Communication

  • communicate between concurrent streams

SignallingRef

  • signal from one process to another
  • .set: set a value
  • .descrete: Stream[IO, T]: returns a value when a signal is received
  • interruptWhen(signal): when signal is Boolean; like interrupt, except the worker stream checks for a signal after every emit instead of terminating abruptly
  • example:
type Temperature = Double
val program = Stream.eval(SignallingRef[IO, Temperature](20.0)).flatMap { alarm =>
 val sensor = Stream.repeatEval(IO(Random.between(-40, 40))).evalMap(t => (if t >= 25) alarm.set(t) else IO.unit).metered(300.millis).drain
 val cooler = alarm.descrete.evalMap(t => IO.println(f"$t%.1f is too high")).drain
 cooler.merge(sensor)
}
program.interruptAfter(3.seconds).compile.drain

Channel

  • for more than 1 value, more than 1 producer
  • Channel.unbounded for unlimited capacity and Channel.bounded(n) to hold n items
  • .send for producers, send values to the channel
  • .stream for consumers,
  • example:
type Temperature = Double
val program = Stream.eval(Channel[IO, Temperature](20.0)).flatMap { alarm =>
 val sensor = Stream.repeatEval(IO(Random.between(-40, 40))).evalMap(t => (if t >= 25) alarm.send(t) else IO.unit).metered(300.millis).drain
 val cooler = alarm.stream.evalMap(t => IO.println(f"$t%.1f is too high")).drain
 cooler.merge(sensor)
}
program.interruptAfter(3.seconds).compile.drain

Topic

  • for more than 1 value, 1+ producers, 1+ independent consumers
  • .publish for producers
  • .subscribe(n) for consumers, buffer to hold unread items, each consumer reads all items
    • creates back-pressure to stop producer instead of overrunning slow consumers

Queue

  • 1+ values, 1+ producers, 1+ shared consumers
  • Queue.unbounded for unlimited capacity and Queue.bounded(n) for n capacity
  • Producers: .offer
  • Consumers:
    • Stream.fromQueueUnterminated(queue) for infinite queues
    • Stream.from QueueNoneTerminated(queue) each item is available until a it's terminated. Producers must offer each value as Some or None