Find maximum row per group in Spark DataFrame

Using join (it will result in more than one row in group in case of ties): import pyspark.sql.functions as F from pyspark.sql.functions import count, col cnts = df.groupBy(“id_sa”, “id_sb”).agg(count(“*”).alias(“cnt”)).alias(“cnts”) maxs = cnts.groupBy(“id_sa”).agg(F.max(“cnt”).alias(“mx”)).alias(“maxs”) cnts.join(maxs, (col(“cnt”) == col(“mx”)) & (col(“cnts.id_sa”) == col(“maxs.id_sa”)) ).select(col(“cnts.id_sa”), col(“cnts.id_sb”)) Using window functions (will drop ties): from pyspark.sql.functions import row_number from pyspark.sql.window import … Read more

Why does a job fail with “No space left on device”, but df says otherwise?

By default Spark uses the /tmp directory to store intermediate data. If you actually do have space left on some device — you can alter this by creating the file SPARK_HOME/conf/spark-defaults.conf and adding the line. Here SPARK_HOME is wherever you root directory for the spark install is. spark.local.dir SOME/DIR/WHERE/YOU/HAVE/SPACE

Unpacking a list to select multiple columns from a spark data frame

Use df.select(cols.head, cols.tail: _*) Let me know if it works 🙂 Explanation from @Ben: The key is the method signature of select: select(col: String, cols: String*) The cols:String* entry takes a variable number of arguments. :_* unpacks arguments so that they can be handled by this argument. Very similar to unpacking in python with *args. … Read more

How to import multiple csv files in a single load?

Use wildcard, e.g. replace 2008 with *: df = sqlContext.read .format(“com.databricks.spark.csv”) .option(“header”, “true”) .load(“../Downloads/*.csv”) // <– note the star (*) Spark 2.0 // these lines are equivalent in Spark 2.0 spark.read.format(“csv”).option(“header”, “true”).load(“../Downloads/*.csv”) spark.read.option(“header”, “true”).csv(“../Downloads/*.csv”) Notes: Replace format(“com.databricks.spark.csv”) by using format(“csv”) or csv method instead. com.databricks.spark.csv format has been integrated to 2.0. Use spark not sqlContext

PySpark: withColumn() with two conditions and three outcomes

There are a few efficient ways to implement this. Let’s start with required imports: from pyspark.sql.functions import col, expr, when You can use Hive IF function inside expr: new_column_1 = expr( “””IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))””” ) or when + otherwise: new_column_2 = when( col(“fruit1”).isNull() | col(“fruit2”).isNull(), … Read more

How to melt Spark DataFrame?

Spark >= 3.4 In Spark 3.4 or later you can use built-in melt method (sdf .melt( ids=[‘A’], values=[‘B’, ‘C’], variableColumnName=”variable”, valueColumnName=”value”) .show()) +—+——–+—–+ | A|variable|value| +—+——–+—–+ | a| B| 1| | a| C| 2| | b| B| 3| | b| C| 4| | c| B| 5| | c| C| 6| +—+——–+—–+ This method is available … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)