Skip to content

Streaming

  • uses readStream, writeStream instead of read, write
  • DataStreamWriter get converted to StreamingQuery using toTable or start
    • starts a background task
  • StreamingQuery requires .awaitTermination
  • Trigger can optionally be specified with writeStream:
    • continuous (default): omit any trigger spec, runs the next micro-batch immediately after the first one finishes
    • Interval: .trigger(processingTime='2 seconds')
    • one-shot: .trigger(availableNow=True), unlike batch it offers
      • fault-tolerance, exactly-once guarantee, stateful operations (agg/dedup etc), checkpoint/offset management

outputMode

  • outputMode: determines how the target is written
    • append: results of micro-batch are appended to the target
    • complete: target is rewritten after merging last result with results of current micro-batch
      • used for aggregates, running aggregates are maintained in the checkpoint directory
      • although new results are computed incrementally, target is completely rewritten
    • update: only the new or updated results are used to update the target
      • used with foreachBatch and using MERGE sql with spark.sql() call
  • micro-batch size can be limited by maxFilesPerTrigger or maxBytesPerTrigger or both (the first to hit applies)
  • sources: directory, delta table, kafka ...
    • directory sources can specify option clearSource as either delete or archive to move processed files to be deleted or moved (to sourceArchiveDir)
  • targets: file/directory, delta table, kafka, foreach ...
  • Kafka options:
    df = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
        .option("startingTimestamp", 1) # optional, value in epoch-time, ignored if checkpoint file exists
        .option("subscribe", "topic1")
        .load()
    )
    
  • checkpoint directory is used for storing offset and the state information
    • state information enables Spark to calculate aggregates across all micro-batches when outputMode="complete"

State

  • certain transformations require state to be maintained: aggregation, windowing, joins
  • stateless transformations: filter, select, explode etc
    • are not supported in complete outputMode
  • checkpoint directory is the default location for maintaining state
  • Spark supports RocksDB for storing state
    • spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

watermarking

  • on a time-based micro-batches, a delay from the latest timed event received, df.withWatermark("eventTimestamp", "10 minutes")
  • it is possible to receive late-arriving event up to watermark delay. example
  • outputMode=append: write records once after watermark delay