Skip to content

Spark

Troubleshooting

  • Container killed by YARN for exceeding memory limit (Max 10.4GB).
    • Increase number of Spark partitions
    • Decrease number of executors, thus allocating more of node memory / executor
    • Instead of persisting RDDs in memory, use disk+memory (or just disk)
    • check for and remedy any data skews
  • AWS: Executors are removed even when jobs are running
    • Auto-scaling policy had ContainerPendingRatio trigger set to 0, which started scaling down when no applications were waiting for allocation, but jobs were still running.

v/s Pandas

  • Add a column with a static value
    df_teams['sport'] = 'football'
    
    import org.apache.spark.sql.functions.lit
    val newTeams = teams.withColumn("sport", lit("football"))
    
  • Add a column with a derived value
    df_teams['premier_league'] = df_teams['league'].apply(lambda x: x == 'Premier League')
    
    import org.apache.spark.sql.functions.col
    val isPremUDF: UserDefinedFunction = udf[Boolean, String](_ == "Premier League")
    val teamsWithLeague: DataFrame = teams.withColumn("premier_league", isPremUDF(col("league")))
    
    Note: adding a column will return a DataFrame, which can be converted to Dataset by: df.as[NewType]
  • Drop a column
    df_teams.drop("player_name", axis=1)
    
    teams.drop("player_name")
    
  • Filter rows
    df_teams = df_teams[df_teams['goals_this_season'] > 50]
    
    val filteredTeams = teams.filter(col("goals_this_season") > 50)
    
  • Aggregation
    df_teams.groupby(['league']).count()
    
    teams.groupBy("league").count()
    teams.agg(Map("matches_played" -> "avg", "goals_this_season" -> "count")) // multiple aggregates
    
  • Union
    pd.concat([teams, another_teams], ignore_index=True)
    
    scala teams.unionByName(anotherTeams)