Apache Spark: Splitting Pair RDD into multiple RDDs by key to save values

I think this problem is similar to Write to multiple outputs by key Spark – one Spark job Please refer the answer there. import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String … Read more

Apache spark dealing with case statements

These are few ways to write If-Else / When-Then-Else / When-Otherwise expression in pyspark. Sample dataframe df = spark.createDataFrame([(1,1),(2,2),(3,3)],[‘id’,’value’]) df.show() #+—+—–+ #| id|value| #+—+—–+ #| 1| 1| #| 2| 2| #| 3| 3| #+—+—–+ #Desired Output: #+—+—–+———-+ #| id|value|value_desc| #+—+—–+———-+ #| 1| 1| one| #| 2| 2| two| #| 3| 3| other| #+—+—–+———-+ Option#1: withColumn() … Read more

How to extract an element from an array in PySpark

Create sample data: from pyspark.sql import Row x = [Row(col1=”xx”, col2=”yy”, col3=”zz”, col4=[123,234])] rdd = sc.parallelize([Row(col1=”xx”, col2=”yy”, col3=”zz”, col4=[123,234])]) df = spark.createDataFrame(rdd) df.show() #+—-+—-+—-+———-+ #|col1|col2|col3| col4| #+—-+—-+—-+———-+ #| xx| yy| zz|[123, 234]| #+—-+—-+—-+———-+ Use getItem to extract element from the array column as this, in your actual case replace col4 with collect_set(TIMESTAMP): df = df.withColumn(“col5”, … Read more

Would Spark unpersist the RDD itself when it realizes it won’t be used anymore?

Yes, Apache Spark will unpersist the RDD when the RDD object is garbage collected. In RDD.persist you can see: sc.cleaner.foreach(_.registerRDDForCleanup(this)) This puts a WeakReference to the RDD in a ReferenceQueue leading to ContextCleaner.doCleanupRDD when the RDD is garbage collected. And there: sc.unpersistRDD(rddId, blocking) For more context see ContextCleaner in general and the commit that added … Read more

How to sort an RDD in Scala Spark?

If you only need the top 10, use rdd.top(10). It avoids sorting, so it is faster. rdd.top makes one parallel pass through the data, collecting the top N in each partition in a heap, then merges the heaps. It is an O(rdd.count) operation. Sorting would be O(rdd.count log rdd.count), and incur a lot of data … Read more

Pyspark: repartition vs partitionBy

repartition() is used for specifying the number of partitions considering the number of cores and the amount of data you have. partitionBy() is used for making shuffling functions more efficient, such as reduceByKey(), join(), cogroup() etc.. It is only beneficial in cases where a RDD is used for multiple times, so it is usually followed … Read more

Concatenating datasets of different RDDs in Apache spark using scala

I think you are looking for RDD.union val rddPart1 = ??? val rddPart2 = ??? val rddAll = rddPart1.union(rddPart2) Example (on Spark-shell) val rdd1 = sc.parallelize(Seq((1, “Aug”, 30),(1, “Sep”, 31),(2, “Aug”, 15),(2, “Sep”, 10))) val rdd2 = sc.parallelize(Seq((1, “Oct”, 10),(1, “Nov”, 12),(2, “Oct”, 5),(2, “Nov”, 15))) rdd1.union(rdd2).collect res0: Array[(Int, String, Int)] = Array((1,Aug,30), (1,Sep,31), (2,Aug,15), … Read more

How to extract an element from a array in pyspark

Create sample data: from pyspark.sql import Row x = [Row(col1=”xx”, col2=”yy”, col3=”zz”, col4=[123,234])] rdd = sc.parallelize([Row(col1=”xx”, col2=”yy”, col3=”zz”, col4=[123,234])]) df = spark.createDataFrame(rdd) df.show() #+—-+—-+—-+———-+ #|col1|col2|col3| col4| #+—-+—-+—-+———-+ #| xx| yy| zz|[123, 234]| #+—-+—-+—-+———-+ Use getItem to extract element from the array column as this, in your actual case replace col4 with collect_set(TIMESTAMP): df = df.withColumn(“col5”, … Read more

How spark read a large file (petabyte) when file can not be fit in spark’s main memory

First of all, Spark only starts reading in the data when an action (like count, collect or write) is called. Once an action is called, Spark loads in data in partitions – the number of concurrently loaded partitions depend on the number of cores you have available. So in Spark you can think of 1 … Read more