API
API
cache
checkpoint
persist
action
No
No
no
Lineage preserved
Yes
No
Yes
storage location?
Default
checkpoint-dir
Lazy?
allowed
Yes
randomSplit: splits a single DF into multiple; optionally by weight
coalesce: combines multiple partitions without a shuffle. If number of new partitions is more than current, nothing happens
colRegex: selects columns with name matching the regex
create[OrReplace][Global]TempView*: create/replace (Global)TempView,
unpivot:
DataFrame
ia a type alias for Dataset[Row]
uses SparkSession instead of SparkContext
Convert RDD[T].toDF to DataFrame in Scala, eg rdd.toDF("id", "name"...)
if T is case class, the column name list argument can be omitted
Converting RDD manually:
create schema using StructType(cols: Seq[StructField])
create an RDD containing Row(<attr1>, <attr2>, ...)
create a DataFrame as df = spark.DataFrame(rdd: RDD[Row], schema: StructType)
Supports most SQL data types, plus {Array,Map,Struct}Type and nullability
Column reference in Scala: eg $"age" or df("age")
Uses Tungsten off-heap (no GC) storage, column based
API
IO
read.csv("/path/data.csv") -> DataFrame or, read.format("csv").load("/path/data.csv")
infer schema, use option, e.g. read.format("csv").option("header", True).option("inferSchema", True).option("samplingRation", 0.001).load(...)
or, manually specify schema using StructType as array of StructField, e.g. read.format("csv").schema(csv_schema).load(...)
read.format("json").[.schema(myschema)][.option()...].load("/path/to/data.json") -> DataFrame
Associated schema can be inferred or specified
sql("select....") -> DataFrame
write.format("parquet").option("compression", "snappy").mode("overwrite").save("/path/to/file")
write.saveAsTable("table_name") (in Databricks, saves as delta table)
createOrReplace[Glocal]TempView: Global creates view in global_temp schema and is visible to other users and sessions. Both, GlobalTemp or Temp views are dropped after session closes
SQL like: select, groupBy, join, agg, where, union, orderBy, limit
groupBy returns RelationalGroupedDataset to which agg can be applied
filter is similar to where but supports more complex expression
Data cleaning : drop, fill, replace
Actions: collect, count, show, take/head/first
withColumnRenamed
sort
explain: show execution plan
schema, printSchema: show schema
createOrReplaceTempView("my_view"): registers the DF as temporary view to be processing with SQL
col("column-name") -> Column
expr("col1 - 5") -> Column
fromJson(<str>, <schema>): parse JSON string
struct(*): convert columns to dictionary
SQL
read from files as select * from csv."dbfs:/path/to/file.csv"
Dataset
allows both functional style of RDD and relational style of DataFrame
creating DS: spark.read.json("people.json").as[Person]
convert to DS: df.toDS or rdd.toDS
DF uses Column, whereas DS uses TypedColumn
Using Dataset APIs on DF requires type information
eg df.map(row => row(0).asInstanceOf[Int] + 1)
groupByKey returns KeyValueGroupedDataset which support
aggregations agg
mapGroups[U](f: (K, Iterator[V]) => U) and flatMapGroups
reduceGroups
BP Use Aggregator for better performance
Actions: collect, count, first/head, take, foreach, reduce, show
Limitations: Catalyst unlike relational expressions, can't optimize functional operations
eg: ds.filter($"city".as[String] === "Boston") can be optimized but ds.filter(_.city == "Boston") cannot be
in addition, functional expression require objects marshalled as Scala objects
RDD
Using reduceByKey() over GroupByKey() followed by reduce() is efficient because Spark does local reduce first before the shuffle
Dependencies between RDDs are represented by DAGs.
Narrow : each parent partition is used by at most one child partition, does not require shuffle
filter, map, union, mapValues, flatMap, mapPartitions* or join when co-partitioned inputs
Wide : each parent partition may be used by multiple child partition to derive data, requires shuffle
most other than above and join when inputs are not co-partitioned
Use rdd.dependencies method to get dependency type
{OneToOne,Prune,Range}Dependency are narrow
ShuffleDependency is wide
Use rdd.toDebugString to show how rdd is derived (similar to explain plan)
Partitioning
A single partition is always on one machine, machine has at least one partition
Number of partitions defaults to total number of cores on all executor nodes
Partition customization only applies to Pair RDDs
goal is to reduce shuffle which is an expensive operation
Partitioning kinds: Hash and Range
Range partitioning applies to keys that have Ordering property
eg rdd.partitionBy(new RangePartitioner(rdd)).persist()
RangePartitioner needs rdd to best figure out ranges to use
BP: call persist() to prevent Spark from shuffling each time rdd is reused
many transformations hold to and propagate a partitioner
some transformation use specific partitioner: eg sortByKey uses range, groupByKey uses hash
map, flatMap will cause RDD to lose it's default partitioner
this is because they are able to change value of the keys and thus may need repartitioning (instead just lose the paritioner)
BP use mapValues when possible
BP if a large Pair RDD is joined frequently on it's key, partition it manually before frequent join operations
eg. rdd.partitionBy(new HashPartitioned(100)).persist()