Performance¶
- Daniel Tome
-
Ganglia for metrics collection and monitoring
-
V2 APIs for External Data Sources improves upon legacy V1 API by:
- advanced predicate and aggregate pushdown support
- external catalog support
- transactional writes
- support for vectorized and columnar reads
Cluster configuration¶
- Too large: 1 executor / node, many tasks
- Pro: increased sharing (e.g. variables) between all tasks of an executor
- Con: large memory / executor => long GC times
- Con: HDFS performance degrades with too many threads
- Too small: 1 executor / core
- Pro: increased parallelism
- Con: no sharing, increases broadcast between executors
- Con: too little memory / executor
- ROT for HDFS and Yarn
- threads for HDFS is 5
- allocate 1 core for Yarn daemons
- allocate 1 executor for ApplicationMaster
- use 90% of node memory for executors, leave 10% for Yarn overhead
spark.dynamicAllocation.*lets Spark manage number of executors and use as much resources as available- To speed up GC, use following JVM options:
+UseG1GC: next gen GC+AlwaysPreTouch:-UseLargePages: Large memory pages => less fragmentation, fewer GC cycles+UseTLAB: thread-local-activation-buffers, use thread's private memory instead of global heap for quick allocation+ResizeTLAB: Let JVM resize TLAB
- ROT: prefer fewer larger machines over many smaller machine to reduce network traffic between nodes
- GPU nodes don't have disks (so no caching available)
Spark Jobs¶
- Use
cacheorpersist()to speed up jobs in case of a restart or reuse- neither of these methods are action, so caching happens on the next action statement
- Use
checkpointto keep query plan from growing too big. - Turn joins involving a very large and a small DataFrames to broadcast join:
largeDf.join(smallDf.hint("broadcast"), Seq("id")) - Types of joins
- Sort-Merge: partitions must be co-located, otherwise, a shuffle will be required. Sort each partition and then merge
- Broadcast: Smaller table is broadcast across all worker nodes
- ShuffledHashJoin: first shuffle data on join columns to each partition. Not good for low cardinality join columns
- Shuffle-replicate-nested-loop
- Spark cannot split compressed files and creates only 1 partition. ROT: alway repartition manually
- Single-line JSON files can be split into multiple parts, but multi-line JSON files cannot be split and read as a whole
- Data skew can cause 1 partition to have more data and can affect join/aggregation
- try adding a salt (a random number, or another column) to increase uniformity of distribution
Spark UI¶
- exchange task: a shuffle is taking place
- WholeStageCodegen: fuses multiple operators in a tree into a single Java function and eliminating virtual functions
- AQE (Adaptive Query Execution): running optimizer at execution time (causing multiple jobs for a single query). It can dynamically,
- change sort-merge join to broadcast join
- coalesce multiple partitions
- handle skew
- detect and propagate empty relations
- not supported with streaming
- DataFrame/SQL tab won't show jobs that work with RDD (low-level API)
Datasets¶
- used for co-locating related data
- use
z-order bywhen column cardinality is high, and not appropriate for partitioning
- use
ETL support¶
- Ability to handle dirty data
- Ignore corrupt files:
spark.sql.files.ignoreCorruptFiles = true - corrupt records:
option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "_bad_input")=> bad data stored in named columnoption("mode", "DROPMALFORMED")=> drop all bad records- Databricks 3.0+ supports Exception Files
option("mode", "FAILFAST")=> fail the job
- Ignore corrupt files:
- Multi-line CSV and JSON records (Spark 2.2+
option("multiline", true)) - Transformation using high-order functions in SQL for complex objects such maps, arrays and structures
EXISTS,TRANSFORM,FILTER,REDUCE
- Unified write paths and interfaces (Spark 2.2+ supports hive tables v/s data source tables)
- SQL:
CREATE TABLE .... USING hive OPTIONS(fileFormat 'ORC')
- SQL:
- Performane Delta Lake > Native Tables > Hive Tables