Spark final task takes 100x times longer than first 199, how to improve

Spark >= 3.0

Since 3.0 Spark provides built-in optimizations for handling skewed joins – which can be enabled using spark.sql.adaptive.optimizeSkewedJoin.enabled property.

See SPARK-29544 for details.

Spark < 3.0

You clearly have a problem with a huge right data skew. Lets take a look a the statistics you’ve provided:

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
df2 = [mean=1.0, stddev=0.0, count=18408194]

With mean around 5 and standard deviation over 2000 you get a long tail.

Since some keys are much more frequent than other after repartitioning some executors will have much more work to do than remaining ones.

Furthermoreb your description suggests that the problem can be with a single or a few keys which hash to the same partition.

So, let’s first identify outliers (pseudocode):

val mean = 4.989209978967438 
val sd = 2255.654165352454

val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache

val frequent = counts
  .where($"count" > mean + 2 * sd)  // Adjust threshold based on actual dist.
  .alias("frequent")
  .join(df1, Seq("userId"))

and the rest:

val infrequent = counts
  .where($"count" <= mean + 2 * sd)
  .alias("infrequent")
  .join(df1, Seq("userId"))

Is it really something to be expected? If not, try to identify the source of the issue upstream.

If it is expected, you can try:

  • broadcasting smaller table:

    val df2 = sqlContext.sql("Select * from Table2")
    df2.join(broadcast(df1), Seq("userId"), "rightouter")
    
  • splitting, unifying (union) and broadcasting only frequent:

    df2.join(broadcast(frequent), Seq("userId"), "rightouter")
      .union(df2.join(infrequent, Seq("userId"), "rightouter"))
    
  • salting userId with some random data

but you shouldn’t:

  • repartition all data and sort locally (although sorting locally alone shouldn’t be an issue)
  • perform standard hash joins on full data.

Leave a Comment