Skip to content

API

API cache checkpoint persist
action No No no
Lineage preserved Yes No Yes
storage location? Default checkpoint-dir
Lazy? allowed Yes
  • randomSplit: splits a single DF into multiple; optionally by weight
  • coalesce: combines multiple partitions without a shuffle. If number of new partitions is more than current, nothing happens
  • colRegex: selects columns with name matching the regex
  • create[OrReplace][Global]TempView*: create/replace (Global)TempView,
  • unpivot:

DataFrame

  • ia a type alias for Dataset[Row]
  • uses SparkSession instead of SparkContext
  • Convert RDD[T].toDF to DataFrame in Scala, eg rdd.toDF("id", "name"...)
    • if T is case class, the column name list argument can be omitted
  • Converting RDD manually:
    • create schema using StructType(cols: Seq[StructField])
    • create an RDD containing Row(<attr1>, <attr2>, ...)
    • create a DataFrame as df = spark.DataFrame(rdd: RDD[Row], schema: StructType)
  • Supports most SQL data types, plus {Array,Map,Struct}Type and nullability
  • Column reference in Scala: eg $"age" or df("age")
  • Uses Tungsten off-heap (no GC) storage, column based

API

  • IO
    • read.csv("/path/data.csv") -> DataFrame or, read.format("csv").load("/path/data.csv")
    • infer schema, use option, e.g. read.format("csv").option("header", True).option("inferSchema", True).option("samplingRation", 0.001).load(...)
    • or, manually specify schema using StructType as array of StructField, e.g. read.format("csv").schema(csv_schema).load(...)
    • read.format("json").[.schema(myschema)][.option()...].load("/path/to/data.json") -> DataFrame
      • Associated schema can be inferred or specified
    • sql("select....") -> DataFrame
    • write.format("parquet").option("compression", "snappy").mode("overwrite").save("/path/to/file")
    • write.saveAsTable("table_name") (in Databricks, saves as delta table)
    • createOrReplace[Glocal]TempView: Global creates view in global_temp schema and is visible to other users and sessions. Both, GlobalTemp or Temp views are dropped after session closes
  • SQL like: select, groupBy, join, agg, where, union, orderBy, limit
    • groupBy returns RelationalGroupedDataset to which agg can be applied
    • filter is similar to where but supports more complex expression
  • Data cleaning: drop, fill, replace
  • Actions: collect, count, show, take/head/first
  • withColumnRenamed
  • sort
  • explain: show execution plan
  • schema, printSchema: show schema
  • createOrReplaceTempView("my_view"): registers the DF as temporary view to be processing with SQL
  • col("column-name") -> Column
  • expr("col1 - 5") -> Column
  • fromJson(<str>, <schema>): parse JSON string
  • struct(*): convert columns to dictionary

SQL

  • read from files as select * from csv."dbfs:/path/to/file.csv"

Dataset

  • allows both functional style of RDD and relational style of DataFrame
  • creating DS: spark.read.json("people.json").as[Person]
    • convert to DS: df.toDS or rdd.toDS
  • DF uses Column, whereas DS uses TypedColumn
  • Using Dataset APIs on DF requires type information
    • eg df.map(row => row(0).asInstanceOf[Int] + 1)
  • groupByKey returns KeyValueGroupedDataset which support
    • aggregations agg
    • mapGroups[U](f: (K, Iterator[V]) => U) and flatMapGroups
    • reduceGroups
  • BP Use Aggregator for better performance
  • Actions: collect, count, first/head, take, foreach, reduce, show
  • Limitations: Catalyst unlike relational expressions, can't optimize functional operations
    • eg: ds.filter($"city".as[String] === "Boston") can be optimized but ds.filter(_.city == "Boston") cannot be
    • in addition, functional expression require objects marshalled as Scala objects

RDD

  • Using reduceByKey() over GroupByKey() followed by reduce() is efficient because Spark does local reduce first before the shuffle
  • Dependencies between RDDs are represented by DAGs.
    • Narrow: each parent partition is used by at most one child partition, does not require shuffle
      • filter, map, union, mapValues, flatMap, mapPartitions* or join when co-partitioned inputs
    • Wide: each parent partition may be used by multiple child partition to derive data, requires shuffle
      • most other than above and join when inputs are not co-partitioned
  • Use rdd.dependencies method to get dependency type
    • {OneToOne,Prune,Range}Dependency are narrow
    • ShuffleDependency is wide
  • Use rdd.toDebugString to show how rdd is derived (similar to explain plan)

Partitioning

  • A single partition is always on one machine, machine has at least one partition
  • Number of partitions defaults to total number of cores on all executor nodes
  • Partition customization only applies to Pair RDDs
    • goal is to reduce shuffle which is an expensive operation
  • Partitioning kinds: Hash and Range
  • Range partitioning applies to keys that have Ordering property
    • eg rdd.partitionBy(new RangePartitioner(rdd)).persist()
    • RangePartitioner needs rdd to best figure out ranges to use
    • BP: call persist() to prevent Spark from shuffling each time rdd is reused
  • many transformations hold to and propagate a partitioner
  • some transformation use specific partitioner: eg sortByKey uses range, groupByKey uses hash
  • map, flatMap will cause RDD to lose it's default partitioner
    • this is because they are able to change value of the keys and thus may need repartitioning (instead just lose the paritioner)
    • BP use mapValues when possible
  • BP if a large Pair RDD is joined frequently on it's key, partition it manually before frequent join operations
    • eg. rdd.partitionBy(new HashPartitioned(100)).persist()