fs2
Pure Streams
- create finite streams:
apply, empty, emit, emits, range
- create infinite:
iterate, unfold, constant
Stream[F, A]#chunks => Stream[F, Chunk[A]] convert a stream of types to chunked stream
Stream[F, Chunk[A]].flatMap(c => Stream.chunk(c)) => Stream[F, A] revert the chunked version to stream of type
- building a stream
emit(_: A), emits(_: Seq[A]) produce Stream[Nothing, A]
eval(_: F[A]) => Stream[F, A]`
- infinite stream:
Stream.constant(42), Stream.awakeEvery[IO].(1.second), Stream.iterate(1.0)(_ * 2)
- combinators:
map, filter, flatMap
++: appends second stream to the first stream
drain: discard all elements
find(O => Boolean): emits first element that matches
head, take(n), interruptAfter(10.minutes), interruptWhen(<condition>)
zip, st1.zipWith(st2)(fn), zipLeft/Right: like zip, but keep only left/right stream
compile: a projection of a stream that allows it to be converted to other types; e.g. stream.compile.toList
- retries:
Stream.retry(action, delay = 1.second, nextDelay = _ * 2, maxAttempts = 3)
Effectful Streams
- create:
eval, covary, iterateEval, unfoldEval
covary can convert not only pure streams, but any effect into another effect
exec, like eval but doesn't produce any element
- must be compiled
- can be compiled to a value
.compile.toList or just for the effect .compile.drain
- combinators:
evalMap, evalTap, evalFilter, ++
- transformation:
map(A => B), flatMap(A => Stream[F, B]), evalMap()
- Error handling:
raiseError, handleErrorWith
- Resource handling:
bracket, resource, fromAutoCloseable
val file: Stream[IO, BufferedReader] = Stream.bracket {
IO {
new BufferedReader(new FileReader(new File("./build.sbt")))
}
}(f => IO(f.close())
file.flatMap { reader =>
Stream
.eval(IO(Option(reader.readLine())))
.repeat
.unNoneTerminate
}.map(_.length) // returns an integer for each line read from the file
Timer combinators
timeout(1.second): throws an exception if the stream isn't consumed in 1 second
interruptAfter(1.second): stops stream from emitting any more data after 1 second
delayBy(2.second): delays emitting the first element by 2 seconds
metered: delays every emit. Delay is measured from previous delay, i.e. it's not affected by processing time taken by actual emit
debounce(1.second): sampling, emit only at intervals
Resource Management
val acq = IO.blocking(new BufferReader(new FileReader("input.csv")))
val rel = (r: BufferReader) => IO.blocking(r.close())
val res = Resource.make(acq)(rel)
val read = (r: BufferReader) => Stream.repeatEval(IO.blocking(r.readLine())).takeWhile(_ != null)
Stream.bracket(acq)(rel).flatMap(read)
Stream.resource(res).flatMap(read)
Stream.fromAutoCloseable(acq).flatMap(read)
Error Handling
Stream implements MonadError
raiseError[F] causes failure and rest of the stream to be ignored. Any side-effects from earlier are evaluated
handleErrorWith causes stream to stop and rest of the elements discarded.
attempt catches error and converts to a Left, any previous elements are returned as Right
Pipeline
Chunk
- internal buffer implementation for performance, fast concat, fast indexing, avoid copying, list-like interface
- constructors:
apply, singleton, array, empty
- combinators:
map, flatMap, filter, take
- methods:
size: total number of elements in the chunk
copyToArray: replace existing elements of an array from chunk
compact: physically combine all contained chunks into a single chunk
Pull
- represent process for stream transformations, including stateful
- has 3 type parameters
Pull[F, A, R], F: effect, A element type, R is result
Pull is a monad over its result type
- constructors:
output1: accepts a single element, with Unit as result type
pure[A]: emits nothing, A is result type
output: accepts a Chunk, Unit is result type
done: indicates end of data
Stream.pull: converts a Stream to ToPull (an intermediate type before finally converting to Pull)
- methods:
stream: converts Pull to a Stream
scanChunksOpt: like fold but at Chunk level and can return None if remainder of the Stream needs to be discarded
- converting streams to
ToPull using s.pull; ToPull is
ToPull methods:
echo: copies elements of a stream to a new Pull, returns Unit
take(5): copies 5 elements of a stream to a new Pull and returns Option[Stream[F, A]], which is a stream of rest of the elements
drop(5): same as take(5) except it drops the elements
uncons() : Option[(Chunk[A], Stream[F, A])]: returns the first chunk from the stream and rest of the stream
- useful in building
Pipe (Stream transformations)
Pipe
- stream transformation function: of type
Stream[F, A] => Stream[F, B]
- typical recipe:
- create as a function that takes a stream
- create a pull
- use
uncons to get the first chunk and remainder (a stream)
- process chunk
- recursively process the remainder
Parallel Processing
st1.merge(st2): runs both streams in parallel and merges results as they become available from either streams
- final stream fails when either stream fails
mergeHaltL mergeHaltR mergeHaltBoth variations stop the stream when either left, right or both streams are exhausted
st1.concurrently(st2) is similar to merge, except st1 is treated as dominant or primary, and st2 is secondary. values from st2 are obtained only if items from st1 are unavailable
- use case: primary stream could do the processing and secondary stream could report progress
- use case: implement independent consumer and producer using cats
Queue
Stream(st1, st2, ...).parJoinUnbounded is generalized version of merge
.parJoin(n) runs max n streams at a time, degree of parallelism
parEvalMap: like evalMap
- use case: a queue with multiple consumers and producers using
cats.effect.Queue
def producer(id: Int, queue: Queue[IO, Int]): Stream[IO, Nothing] =
Stream.repeatEval(queue.offer(id)).drain
def consumer(id: Int, queue: Queue[IO, Int]): Stream[IO, Nothing] =
Stream.repeatEval(queue.take(id)).printlns
Stream.eval(Queue.unbounded[IO, Int]).flatMap { queue =>
val ps = Stream.range(0, 4).map(id => producer(id, queue)) // 4 producers
val cs = Stream.range(0, 6).map(id => consumer(id, queue)) // 6 consumers
(ps ++ cs).parJoinUnbounded // run all concurrently
}.interruptAfter(5.second).compile.drain
parZip: unlike zip is non-deterministic, i.e. it'll pull from whichever stream is ready and then wait for the other
parEvalMap: like evalMap but runs in parallel, still keeps the order of individual elements processed
Stream.fixedRate: produces a unit value at every fixed interval; generally used with zipRight to throttle another stream
- can implement
metered with fixRate and zipRight
Stream.fixedDelay: like fixedRate except the delay between two emits is constant; similar to spaced
awakeDelay and awakeEvery like fixedDelay and fixedRate except the emit the time-interval in nano-second
Communication
- communicate between concurrent streams
SignallingRef
- signal from one process to another
.set: set a value
.descrete: Stream[IO, T]: returns a value when a signal is received
interruptWhen(signal): when signal is Boolean; like interrupt, except the worker stream checks for a signal after every emit instead of terminating abruptly
- example:
type Temperature = Double
val program = Stream.eval(SignallingRef[IO, Temperature](20.0)).flatMap { alarm =>
val sensor = Stream.repeatEval(IO(Random.between(-40, 40))).evalMap(t => (if t >= 25) alarm.set(t) else IO.unit).metered(300.millis).drain
val cooler = alarm.descrete.evalMap(t => IO.println(f"$t%.1f is too high")).drain
cooler.merge(sensor)
}
program.interruptAfter(3.seconds).compile.drain
Channel
- for more than 1 value, more than 1 producer
Channel.unbounded for unlimited capacity and Channel.bounded(n) to hold n items
.send for producers, send values to the channel
.stream for consumers,
- example:
type Temperature = Double
val program = Stream.eval(Channel[IO, Temperature](20.0)).flatMap { alarm =>
val sensor = Stream.repeatEval(IO(Random.between(-40, 40))).evalMap(t => (if t >= 25) alarm.send(t) else IO.unit).metered(300.millis).drain
val cooler = alarm.stream.evalMap(t => IO.println(f"$t%.1f is too high")).drain
cooler.merge(sensor)
}
program.interruptAfter(3.seconds).compile.drain
Topic
- for more than 1 value, 1+ producers, 1+ independent consumers
.publish for producers
.subscribe(n) for consumers, buffer to hold unread items, each consumer reads all items
- creates back-pressure to stop producer instead of overrunning slow consumers
Queue
- 1+ values, 1+ producers, 1+ shared consumers
Queue.unbounded for unlimited capacity and Queue.bounded(n) for n capacity
- Producers:
.offer
- Consumers:
Stream.fromQueueUnterminated(queue) for infinite queues
Stream.from QueueNoneTerminated(queue) each item is available until a it's terminated. Producers must offer each value as Some or None