Streaming
- uses
readStream, writeStream instead of read, write
DataStreamWriter get converted to StreamingQuery using toTable or start
- starts a background task
StreamingQuery requires .awaitTermination
- Trigger can optionally be specified with
writeStream:
- continuous (default): omit any trigger spec, runs the next micro-batch immediately after the first one finishes
- Interval:
.trigger(processingTime='2 seconds')
- one-shot:
.trigger(availableNow=True), unlike batch it offers
- fault-tolerance, exactly-once guarantee, stateful operations (agg/dedup etc), checkpoint/offset management
outputMode
outputMode: determines how the target is written
append: results of micro-batch are appended to the target
complete: target is rewritten after merging last result with results of current micro-batch
- used for aggregates, running aggregates are maintained in the checkpoint directory
- although new results are computed incrementally, target is completely rewritten
update: only the new or updated results are used to update the target
- used with
foreachBatch and using MERGE sql with spark.sql() call
- micro-batch size can be limited by
maxFilesPerTrigger or maxBytesPerTrigger or both (the first to hit applies)
- sources: directory, delta table, kafka ...
- directory sources can specify option
clearSource as either delete or archive to move processed files to be deleted or moved (to sourceArchiveDir)
- targets: file/directory, delta table, kafka, foreach ...
- Kafka options:
df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("startingTimestamp", 1) # optional, value in epoch-time, ignored if checkpoint file exists
.option("subscribe", "topic1")
.load()
)
- checkpoint directory is used for storing offset and the state information
- state information enables Spark to calculate aggregates across all micro-batches when
outputMode="complete"
State
- certain transformations require state to be maintained: aggregation, windowing, joins
- stateless transformations:
filter, select, explode etc
- are not supported in
complete outputMode
- checkpoint directory is the default location for maintaining state
- Spark supports RocksDB for storing state
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
watermarking
- on a time-based micro-batches, a delay from the latest timed event received,
df.withWatermark("eventTimestamp", "10 minutes")
- it is possible to receive late-arriving event up to watermark delay. example
outputMode=append: write records once after watermark delay