Skip to content

fs2

Pure Streams

  • create finite streams: apply, empty, emit, emits, range
  • create infinite: iterate, unfold, constant
  • Stream[F, A]#chunks => Stream[F, Chunk[A]] convert a stream of types to chunked stream
  • Stream[F, Chunk[A]].flatMap(c => Stream.chunk(c)) => Stream[F, A] revert the chunked version to stream of type
  • building a stream
    • emit(_: A), emits(_: Seq[A]) produce Stream[Nothing, A]
    • eval(_: F[A]) => Stream[F, A]`
    • infinite stream: Stream.constant(42), Stream.awakeEvery[IO].(1.second), Stream.iterate(1.0)(_ * 2)
  • combinators: map, filter, flatMap
    • ++: appends second stream to the first stream
    • drain: discard all elements
    • find(O => Boolean): emits first element that matches
    • head, take(n), interruptAfter(10.minutes), interruptWhen(<condition>)
    • zip, st1.zipWith(st2)(fn), zipLeft/Right: like zip, but keep only left/right stream
    • compile: a projection of a stream that allows it to be converted to other types; e.g. stream.compile.toList
  • retries: Stream.retry(action, delay = 1.second, nextDelay = _ * 2, maxAttempts = 3)

Effectful Streams

  • create: eval, covary, iterateEval, unfoldEval
    • covary can convert not only pure streams, but any effect into another effect
    • exec, like eval but doesn't produce any element
  • must be compiled
    • can be compiled to a value .compile.toList or just for the effect .compile.drain
  • combinators: evalMap, evalTap, evalFilter, ++
    • transformation: map(A => B), flatMap(A => Stream[F, B]), evalMap()
  • Error handling: raiseError, handleErrorWith
  • Resource handling: bracket, resource, fromAutoCloseable
    val file: Stream[IO, BufferedReader] = Stream.bracket {
        IO {
            new BufferedReader(new FileReader(new File("./build.sbt")))
        }
    }(f => IO(f.close())
    
    file.flatMap { reader =>
        Stream
            .eval(IO(Option(reader.readLine())))
            .repeat
            .unNoneTerminate
    }.map(_.length) // returns an integer for each line read from the file
    

Timer combinators

  • timeout(1.second): throws an exception if the stream isn't consumed in 1 second
  • interruptAfter(1.second): stops stream from emitting any more data after 1 second
  • delayBy(2.second): delays emitting the first element by 2 seconds
  • metered: delays every emit. Delay is measured from previous delay, i.e. it's not affected by processing time taken by actual emit
  • debounce(1.second): sampling, emit only at intervals

Resource Management

val acq = IO.blocking(new BufferReader(new FileReader("input.csv")))
val rel = (r: BufferReader) => IO.blocking(r.close())
val res = Resource.make(acq)(rel)

val read = (r: BufferReader) => Stream.repeatEval(IO.blocking(r.readLine())).takeWhile(_ != null)

Stream.bracket(acq)(rel).flatMap(read)
Stream.resource(res).flatMap(read)
Stream.fromAutoCloseable(acq).flatMap(read)

Error Handling

  • Stream implements MonadError
  • raiseError[F] causes failure and rest of the stream to be ignored. Any side-effects from earlier are evaluated
  • handleErrorWith causes stream to stop and rest of the elements discarded.
  • attempt catches error and converts to a Left, any previous elements are returned as Right

Pipeline

Chunk

  • internal buffer implementation for performance, fast concat, fast indexing, avoid copying, list-like interface
  • constructors: apply, singleton, array, empty
  • combinators: map, flatMap, filter, take
  • methods:
    • size: total number of elements in the chunk
    • copyToArray: replace existing elements of an array from chunk
    • compact: physically combine all contained chunks into a single chunk

Pull

  • represent process for stream transformations, including stateful
  • has 3 type parameters Pull[F, A, R], F: effect, A element type, R is result
  • Pull is a monad over its result type
  • constructors:
    • output1: accepts a single element, with Unit as result type
    • pure[A]: emits nothing, A is result type
    • output: accepts a Chunk, Unit is result type
    • done: indicates end of data
    • Stream.pull: converts a Stream to ToPull (an intermediate type before finally converting to Pull)
  • methods:
    • stream: converts Pull to a Stream
    • scanChunksOpt: like fold but at Chunk level and can return None if remainder of the Stream needs to be discarded
  • converting streams to ToPull using s.pull; ToPull is
  • ToPull methods:
    • echo: copies elements of a stream to a new Pull, returns Unit
    • take(5): copies 5 elements of a stream to a new Pull and returns Option[Stream[F, A]], which is a stream of rest of the elements
    • drop(5): same as take(5) except it drops the elements
    • uncons() : Option[(Chunk[A], Stream[F, A])]: returns the first chunk from the stream and rest of the stream
      • useful in building Pipe (Stream transformations)

Pipe

  • stream transformation function: of type Stream[F, A] => Stream[F, B]
  • typical recipe:
    • create as a function that takes a stream
    • create a pull
    • use uncons to get the first chunk and remainder (a stream)
    • process chunk
    • recursively process the remainder

Parallel Processing

  • st1.merge(st2): runs both streams in parallel and merges results as they become available from either streams
    • final stream fails when either stream fails
    • mergeHaltL mergeHaltR mergeHaltBoth variations stop the stream when either left, right or both streams are exhausted
  • st1.concurrently(st2) is similar to merge, except st1 is treated as dominant or primary, and st2 is secondary. values from st2 are obtained only if items from st1 are unavailable
    • use case: primary stream could do the processing and secondary stream could report progress
    • use case: implement independent consumer and producer using cats Queue
  • Stream(st1, st2, ...).parJoinUnbounded is generalized version of merge
    • .parJoin(n) runs max n streams at a time, degree of parallelism
  • parEvalMap: like evalMap
  • use case: a queue with multiple consumers and producers using cats.effect.Queue
    def producer(id: Int, queue: Queue[IO, Int]): Stream[IO, Nothing] =
        Stream.repeatEval(queue.offer(id)).drain
    def consumer(id: Int, queue: Queue[IO, Int]): Stream[IO, Nothing] =
        Stream.repeatEval(queue.take(id)).printlns
    
    Stream.eval(Queue.unbounded[IO, Int]).flatMap { queue =>
        val ps = Stream.range(0, 4).map(id => producer(id, queue))  // 4 producers
        val cs = Stream.range(0, 6).map(id => consumer(id, queue))  // 6 consumers
        (ps ++ cs).parJoinUnbounded  // run all concurrently
    }.interruptAfter(5.second).compile.drain
    
  • parZip: unlike zip is non-deterministic, i.e. it'll pull from whichever stream is ready and then wait for the other
  • parEvalMap: like evalMap but runs in parallel, still keeps the order of individual elements processed
  • Stream.fixedRate: produces a unit value at every fixed interval; generally used with zipRight to throttle another stream
    • can implement metered with fixRate and zipRight
  • Stream.fixedDelay: like fixedRate except the delay between two emits is constant; similar to spaced
  • awakeDelay and awakeEvery like fixedDelay and fixedRate except the emit the time-interval in nano-second

Communication

  • communicate between concurrent streams

SignallingRef

  • signal from one process to another
    • .set: set a value
    • .descrete: Stream[IO, T]: returns a value when a signal is received
    • interruptWhen(signal): when signal is Boolean; like interrupt, except the worker stream checks for a signal after every emit instead of terminating abruptly
  • example:
    type Temperature = Double
    val program = Stream.eval(SignallingRef[IO, Temperature](20.0)).flatMap { alarm =>
        val sensor = Stream.repeatEval(IO(Random.between(-40, 40))).evalMap(t => (if t >= 25) alarm.set(t) else IO.unit).metered(300.millis).drain
        val cooler = alarm.descrete.evalMap(t => IO.println(f"$t%.1f is too high")).drain
        cooler.merge(sensor)
    }
    program.interruptAfter(3.seconds).compile.drain
    

Channel

  • for more than 1 value, more than 1 producer
  • Channel.unbounded for unlimited capacity and Channel.bounded(n) to hold n items
    • .send for producers, send values to the channel
    • .stream for consumers,
  • example:
    type Temperature = Double
    val program = Stream.eval(Channel[IO, Temperature](20.0)).flatMap { alarm =>
        val sensor = Stream.repeatEval(IO(Random.between(-40, 40))).evalMap(t => (if t >= 25) alarm.send(t) else IO.unit).metered(300.millis).drain
        val cooler = alarm.stream.evalMap(t => IO.println(f"$t%.1f is too high")).drain
        cooler.merge(sensor)
    }
    program.interruptAfter(3.seconds).compile.drain
    

Topic

  • for more than 1 value, 1+ producers, 1+ independent consumers
    • .publish for producers
    • .subscribe(n) for consumers, buffer to hold unread items, each consumer reads all items
      • creates back-pressure to stop producer instead of overrunning slow consumers

Queue

  • 1+ values, 1+ producers, 1+ shared consumers
  • Queue.unbounded for unlimited capacity and Queue.bounded(n) for n capacity
    • Producers: .offer
    • Consumers:
      • Stream.fromQueueUnterminated(queue) for infinite queues
      • Stream.from QueueNoneTerminated(queue) each item is available until a it's terminated. Producers must offer each value as Some or None