GroupBy and concat array columns pyspark

You need a flattening UDF; starting from your own df: spark.version # u’2.2.0′ from pyspark.sql import functions as F import pyspark.sql.types as T def fudf(val): return reduce (lambda x, y:x+y, val) flattenUdf = F.udf(fudf, T.ArrayType(T.IntegerType())) df2 = df.groupBy(“store”).agg(F.collect_list(“values”)) df2.show(truncate=False) # +—–+———————————————-+ # |store| collect_list(values) | # +—–+———————————————-+ # |1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]| … Read more

Reading parquet files from multiple directories in Pyspark

A little late but I found this while I was searching and it may help someone else… You might also try unpacking the argument list to spark.read.parquet() paths=[‘foo’,’bar’] df=spark.read.parquet(*paths) This is convenient if you want to pass a few blobs into the path argument: basePath=”s3://bucket/” paths=[‘s3://bucket/partition_value1=*/partition_value2=2017-04-*’, ‘s3://bucket/partition_value1=*/partition_value2=2017-05-*’ ] df=spark.read.option(“basePath”,basePath).parquet(*paths) This is cool cause you don’t … Read more

Applying a Window function to calculate differences in pySpark

You can bring the previous day column by using lag function, and add additional column that does actual day-to-day return from the two columns, but you may have to tell spark how to partition your data and/or order it to do lag, something like this: from pyspark.sql.window import Window import pyspark.sql.functions as func from pyspark.sql.functions … Read more

How to get the lists’ length in one column in dataframe spark?

Pyspark has a built-in function to achieve exactly what you want called size. http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.size . To add it as column, you can simply call it during your select statement. from pyspark.sql.functions import size countdf = df.select(‘*’,size(‘products’).alias(‘product_cnt’)) Filtering works exactly as @titiro89 described. Furthermore, you can use the size function in the filter. This will allow … Read more

In pyspark, how do you add/concat a string to a column?

from pyspark.sql.functions import concat, col, lit df.select(concat(col(“firstname”), lit(” “), col(“lastname”))).show(5) +——————————+ |concat(firstname, , lastname)| +——————————+ | Emanuel Panton| | Eloisa Cayouette| | Cathi Prins| | Mitchel Mozdzierz| | Angla Hartzheim| +——————————+ only showing top 5 rows http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

How to extract model hyper-parameters from spark.ml in PySpark?

Ran into this problem as well. I found out you need to call the java property for some reason I don’t know why. So just do this: from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator from pyspark.ml.regression import LinearRegression from pyspark.ml.evaluation import RegressionEvaluator evaluator = RegressionEvaluator(metricName=”mae”) lr = LinearRegression() grid = ParamGridBuilder().addGrid(lr.maxIter, [500]) \ .addGrid(lr.regParam, [0]) \ … Read more

Pyspark dataframe LIKE operator

You can use where and col functions to do the same. where will be used for filtering of data based on a condition (here it is, if a column is like ‘%string%’). The col(‘col_name’) is used to represent the condition and like is the operator: df.where(col(‘col1’).like(“%string%”)).show()

Manually create a pyspark dataframe

Simple dataframe creation: df = spark.createDataFrame( [ (1, “foo”), # create your data here, be consistent in the types. (2, “bar”), ], [“id”, “label”] # add your column names here ) df.printSchema() root |– id: long (nullable = true) |– label: string (nullable = true) df.show() +—+—–+ | id|label| +—+—–+ | 1| foo| | 2| … Read more