Skip to content

Performance

  • Daniel Tome
  • Ganglia for metrics collection and monitoring

  • V2 APIs for External Data Sources improves upon legacy V1 API by:

    • advanced predicate and aggregate pushdown support
    • external catalog support
    • transactional writes
    • support for vectorized and columnar reads

Cluster configuration

  • Too large: 1 executor / node, many tasks
    • Pro: increased sharing (e.g. variables) between all tasks of an executor
    • Con: large memory / executor => long GC times
    • Con: HDFS performance degrades with too many threads
  • Too small: 1 executor / core
    • Pro: increased parallelism
    • Con: no sharing, increases broadcast between executors
    • Con: too little memory / executor
  • ROT for HDFS and Yarn
    • threads for HDFS is 5
    • allocate 1 core for Yarn daemons
    • allocate 1 executor for ApplicationMaster
    • use 90% of node memory for executors, leave 10% for Yarn overhead
  • spark.dynamicAllocation.* lets Spark manage number of executors and use as much resources as available
  • To speed up GC, use following JVM options:
    • +UseG1GC: next gen GC
    • +AlwaysPreTouch:
    • -UseLargePages: Large memory pages => less fragmentation, fewer GC cycles
    • +UseTLAB: thread-local-activation-buffers, use thread's private memory instead of global heap for quick allocation
    • +ResizeTLAB: Let JVM resize TLAB
  • ROT: prefer fewer larger machines over many smaller machine to reduce network traffic between nodes
  • GPU nodes don't have disks (so no caching available)

Spark Jobs

  • Use cache or persist() to speed up jobs in case of a restart or reuse
    • neither of these methods are action, so caching happens on the next action statement
  • Use checkpoint to keep query plan from growing too big.
  • Turn joins involving a very large and a small DataFrames to broadcast join: largeDf.join(smallDf.hint("broadcast"), Seq("id"))
  • Types of joins
    • Sort-Merge: partitions must be co-located, otherwise, a shuffle will be required. Sort each partition and then merge
    • Broadcast: Smaller table is broadcast across all worker nodes
    • ShuffledHashJoin: first shuffle data on join columns to each partition. Not good for low cardinality join columns
    • Shuffle-replicate-nested-loop
  • Spark cannot split compressed files and creates only 1 partition. ROT: alway repartition manually
    • Single-line JSON files can be split into multiple parts, but multi-line JSON files cannot be split and read as a whole
  • Data skew can cause 1 partition to have more data and can affect join/aggregation
    • try adding a salt (a random number, or another column) to increase uniformity of distribution

Spark UI

  • exchange task: a shuffle is taking place
  • WholeStageCodegen: fuses multiple operators in a tree into a single Java function and eliminating virtual functions
  • AQE (Adaptive Query Execution): running optimizer at execution time (causing multiple jobs for a single query). It can dynamically,
    • change sort-merge join to broadcast join
    • coalesce multiple partitions
    • handle skew
    • detect and propagate empty relations
    • not supported with streaming
  • DataFrame/SQL tab won't show jobs that work with RDD (low-level API)

Datasets

  • used for co-locating related data
    • use z-order by when column cardinality is high, and not appropriate for partitioning

ETL support

  • Ability to handle dirty data
    • Ignore corrupt files: spark.sql.files.ignoreCorruptFiles = true
    • corrupt records:
      • option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "_bad_input") => bad data stored in named column
      • option("mode", "DROPMALFORMED") => drop all bad records
        • Databricks 3.0+ supports Exception Files
      • option("mode", "FAILFAST") => fail the job
  • Multi-line CSV and JSON records (Spark 2.2+ option("multiline", true))
  • Transformation using high-order functions in SQL for complex objects such maps, arrays and structures
    • EXISTS, TRANSFORM, FILTER, REDUCE
  • Unified write paths and interfaces (Spark 2.2+ supports hive tables v/s data source tables)
    • SQL: CREATE TABLE .... USING hive OPTIONS(fileFormat 'ORC')
  • Performane Delta Lake > Native Tables > Hive Tables