Streaming¶
- uses
readStream,writeStreaminstead ofread,write DataStreamWriterget converted toStreamingQueryusingtoTableorstart- starts a background task
StreamingQueryrequires.awaitTermination- Trigger can optionally be specified with
writeStream:- continuous (default): omit any trigger spec, runs the next micro-batch immediately after the first one finishes
- Interval:
.trigger(processingTime='2 seconds') - one-shot:
.trigger(availableNow=True), unlike batch it offers- fault-tolerance, exactly-once guarantee, stateful operations (agg/dedup etc), checkpoint/offset management
outputMode¶
outputMode: determines how the target is writtenappend: results of micro-batch are appended to the targetcomplete: target is rewritten after merging last result with results of current micro-batch- used for aggregates, running aggregates are maintained in the checkpoint directory
- although new results are computed incrementally, target is completely rewritten
update: only the new or updated results are used to update the target- used with
foreachBatchand usingMERGEsql withspark.sql()call
- used with
- micro-batch size can be limited by
maxFilesPerTriggerormaxBytesPerTriggeror both (the first to hit applies) - sources: directory, delta table, kafka ...
- directory sources can specify option
clearSourceas eitherdeleteorarchiveto move processed files to be deleted or moved (tosourceArchiveDir)
- directory sources can specify option
- targets: file/directory, delta table, kafka, foreach ...
- Kafka options:
- checkpoint directory is used for storing offset and the state information
- state information enables Spark to calculate aggregates across all micro-batches when
outputMode="complete"
- state information enables Spark to calculate aggregates across all micro-batches when
State¶
- certain transformations require state to be maintained: aggregation, windowing, joins
- stateless transformations:
filter,select,explodeetc- are not supported in
completeoutputMode
- are not supported in
- checkpoint directory is the default location for maintaining state
- Spark supports RocksDB for storing state
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
watermarking¶
- on a time-based micro-batches, a delay from the latest timed event received,
df.withWatermark("eventTimestamp", "10 minutes") - it is possible to receive late-arriving event up to watermark delay. example
outputMode=append: write records once after watermark delay