pyspark: isin vs join

Considering import pyspark.sql.functions as psf There are two types of broadcasting: sc.broadcast() to copy python objects to every node for a more efficient use of psf.isin psf.broadcast inside a join to copy your pyspark dataframe to every node when the dataframe is small: df1.join(psf.broadcast(df2)). It is usually used for cartesian products (CROSS JOIN in pig). … Read more

Apache spark dealing with case statements

These are few ways to write If-Else / When-Then-Else / When-Otherwise expression in pyspark. Sample dataframe df = spark.createDataFrame([(1,1),(2,2),(3,3)],[‘id’,’value’]) df.show() #+—+—–+ #| id|value| #+—+—–+ #| 1| 1| #| 2| 2| #| 3| 3| #+—+—–+ #Desired Output: #+—+—–+———-+ #| id|value|value_desc| #+—+—–+———-+ #| 1| 1| one| #| 2| 2| two| #| 3| 3| other| #+—+—–+———-+ Option#1: withColumn() … Read more

GroupBy and concat array columns pyspark

You need a flattening UDF; starting from your own df: spark.version # u’2.2.0′ from pyspark.sql import functions as F import pyspark.sql.types as T def fudf(val): return reduce (lambda x, y:x+y, val) flattenUdf = F.udf(fudf, T.ArrayType(T.IntegerType())) df2 = df.groupBy(“store”).agg(F.collect_list(“values”)) df2.show(truncate=False) # +—–+———————————————-+ # |store| collect_list(values) | # +—–+———————————————-+ # |1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]| … Read more

What’s the most efficient way to filter a DataFrame

My code (following the description of your first method) runs normally in Spark 1.4.0-SNAPSHOT on these two configurations: Intellij IDEA’s test Spark Standalone cluster with 8 nodes (1 master, 7 worker) Please check if any differences exists val bc = sc.broadcast(Array[String](“login3”, “login4”)) val x = Array((“login1”, 192), (“login2”, 146), (“login3”, 72)) val xdf = sqlContext.createDataFrame(x).toDF(“name”, … Read more

Defining a UDF that accepts an Array of objects in a Spark DataFrame?

What you’re looking for is Seq[o.a.s.sql.Row]: import org.apache.spark.sql.Row val my_size = udf { subjects: Seq[Row] => subjects.size } Explanation: Current representation of ArrayType is, as you already know, WrappedArray so Array won’t work and it is better to stay on the safe side. According to the official specification, the local (external) type for StructType is … Read more

How to convert list of dictionaries into Pyspark DataFrame

In the past, you were able to simply pass a dictionary to spark.createDataFrame(), but this is now deprecated: mylist = [ {“type_activity_id”:1,”type_activity_name”:”xxx”}, {“type_activity_id”:2,”type_activity_name”:”yyy”}, {“type_activity_id”:3,”type_activity_name”:”zzz”} ] df = spark.createDataFrame(mylist) #UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead # warnings.warn(“inferring schema from dict is deprecated,” As this warning message says, you should use pyspark.sql.Row instead. … Read more

Unpivot in Spark SQL / PySpark

You can use the built in stack function, for example in Scala: scala> val df = Seq((“G”,Some(4),2,None),(“H”,None,4,Some(5))).toDF(“A”,”X”,”Y”, “Z”) df: org.apache.spark.sql.DataFrame = [A: string, X: int … 2 more fields] scala> df.show +—+—-+—+—-+ | A| X| Y| Z| +—+—-+—+—-+ | G| 4| 2|null| | H|null| 4| 5| +—+—-+—+—-+ scala> df.select($”A”, expr(“stack(3, ‘X’, X, ‘Y’, Y, ‘Z’, … Read more