Spark final task takes 100x times longer than first 199, how to improve

Spark >= 3.0 Since 3.0 Spark provides built-in optimizations for handling skewed joins – which can be enabled using spark.sql.adaptive.optimizeSkewedJoin.enabled property. See SPARK-29544 for details. Spark < 3.0 You clearly have a problem with a huge right data skew. Lets take a look a the statistics you’ve provided: df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] df2 = … Read more

What’s the most efficient way to filter a DataFrame

My code (following the description of your first method) runs normally in Spark 1.4.0-SNAPSHOT on these two configurations: Intellij IDEA’s test Spark Standalone cluster with 8 nodes (1 master, 7 worker) Please check if any differences exists val bc = sc.broadcast(Array[String](“login3”, “login4”)) val x = Array((“login1”, 192), (“login2”, 146), (“login3”, 72)) val xdf = sqlContext.createDataFrame(x).toDF(“name”, … Read more

Spark : Read file only if the path exists

You can filter out the irrelevant files as in @Psidom’s answer. In spark, the best way to do so is to use the internal spark hadoop configuration. Given that spark session variable is called “spark” you can do: import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) def testDirExist(path: String): Boolean = { val p … Read more

Defining a UDF that accepts an Array of objects in a Spark DataFrame?

What you’re looking for is Seq[o.a.s.sql.Row]: import org.apache.spark.sql.Row val my_size = udf { subjects: Seq[Row] => subjects.size } Explanation: Current representation of ArrayType is, as you already know, WrappedArray so Array won’t work and it is better to stay on the safe side. According to the official specification, the local (external) type for StructType is … Read more

Spark save(write) parquet only one file

Use coalesce before write operation dataFrame.coalesce(1).write.format(“parquet”).mode(“append”).save(“temp.parquet”) EDIT-1 Upon a closer look, the docs do warn about coalesce However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1) Therefore as … Read more

Unpivot in Spark SQL / PySpark

You can use the built in stack function, for example in Scala: scala> val df = Seq((“G”,Some(4),2,None),(“H”,None,4,Some(5))).toDF(“A”,”X”,”Y”, “Z”) df: org.apache.spark.sql.DataFrame = [A: string, X: int … 2 more fields] scala> df.show +—+—-+—+—-+ | A| X| Y| Z| +—+—-+—+—-+ | G| 4| 2|null| | H|null| 4| 5| +—+—-+—+—-+ scala> df.select($”A”, expr(“stack(3, ‘X’, X, ‘Y’, Y, ‘Z’, … Read more

Pyspark – converting json string to DataFrame

You can do the following newJson = ‘{“Name”:”something”,”Url”:”https://stackoverflow.com”,”Author”:”jangcy”,”BlogEntries”:100,”Caller”:”jangcy”}’ df = spark.read.json(sc.parallelize([newJson])) df.show(truncate=False) which should give +——+———–+——+———+————————-+ |Author|BlogEntries|Caller|Name |Url | +——+———–+——+———+————————-+ |jangcy|100 |jangcy|something|https://stackoverflow.com| +——+———–+——+———+————————-+