Skip to content

Spark

Troubleshooting

  • Container killed by YARN for exceeding memory limit (Max 10.4GB).
  • Increase number of Spark partitions
  • Decrease number of executors, thus allocating more of node memory / executor
  • Instead of persisting RDDs in memory, use disk+memory (or just disk)
  • check for and remedy any data skews
  • AWS: Executors are removed even when jobs are running
  • Auto-scaling policy had ContainerPendingRatio trigger set to 0, which started scaling down when no applications were waiting for allocation, but jobs were still running.

v/s Pandas

  • Add a column with a static value
df_teams['sport'] = 'football'
import org.apache.spark.sql.functions.lit
val newTeams = teams.withColumn("sport", lit("football"))
  • Add a column with a derived value
df_teams['premier_league'] = df_teams['league'].apply(lambda x: x == 'Premier League')
import org.apache.spark.sql.functions.col
val isPremUDF: UserDefinedFunction = udf[Boolean, String](_ == "Premier League")
val teamsWithLeague: DataFrame = teams.withColumn("premier_league", isPremUDF(col("league")))

Note: adding a column will return a DataFrame, which can be converted to Dataset by: df.as[NewType] - Drop a column

df_teams.drop("player_name", axis=1)
teams.drop("player_name")
  • Filter rows
df_teams = df_teams[df_teams['goals_this_season'] > 50]
val filteredTeams = teams.filter(col("goals_this_season") > 50)
  • Aggregation
df_teams.groupby(['league']).count()
teams.groupBy("league").count()
teams.agg(Map("matches_played" -> "avg", "goals_this_season" -> "count")) // multiple aggregates
  • Union
pd.concat([teams, another_teams], ignore_index=True)

scala teams.unionByName(anotherTeams)