How to select the first row of each group?

Window functions: Something like this should do the trick: import org.apache.spark.sql.functions.{row_number, max, broadcast} import org.apache.spark.sql.expressions.Window val df = sc.parallelize(Seq( (0,”cat26″,30.9), (0,”cat13″,22.1), (0,”cat95″,19.6), (0,”cat105″,1.3), (1,”cat67″,28.5), (1,”cat4″,26.8), (1,”cat13″,12.6), (1,”cat23″,5.3), (2,”cat56″,39.6), (2,”cat40″,29.7), (2,”cat187″,27.9), (2,”cat68″,9.8), (3,”cat8″,35.6))).toDF(“Hour”, “Category”, “TotalValue”) val w = Window.partitionBy($”hour”).orderBy($”TotalValue”.desc) val dfTop = df.withColumn(“rn”, row_number.over(w)).where($”rn” === 1).drop(“rn”) dfTop.show // +—-+——–+———-+ // |Hour|Category|TotalValue| // +—-+——–+———-+ // | 0| … Read more

How to add a constant column in a Spark DataFrame?

Spark 2.2+ Spark 2.2 introduces typedLit to support Seq, Map, and Tuples (SPARK-19254) and following calls should be supported (Scala): import org.apache.spark.sql.functions.typedLit df.withColumn(“some_array”, typedLit(Seq(1, 2, 3))) df.withColumn(“some_struct”, typedLit((“foo”, 1, 0.3))) df.withColumn(“some_map”, typedLit(Map(“key1” -> 1, “key2” -> 2))) Spark 1.3+ (lit), 1.4+ (array, struct), 2.0+ (map): The second argument for DataFrame.withColumn should be a Column so … Read more

Add JAR files to a Spark job – spark-submit

ClassPath: ClassPath is affected depending on what you provide. There are a couple of ways to set something on the classpath: spark.driver.extraClassPath or it’s alias –driver-class-path to set extra classpaths on the node running the driver. spark.executor.extraClassPath to set extra class path on the Worker nodes. If you want a certain JAR to be effected … Read more

How to stop INFO messages displaying on spark console?

Edit your conf/log4j.properties file and change the following line: log4j.rootCategory=INFO, console to log4j.rootCategory=ERROR, console Another approach would be to : Start spark-shell and type in the following: import org.apache.log4j.Logger import org.apache.log4j.Level Logger.getLogger(“org”).setLevel(Level.OFF) Logger.getLogger(“akka”).setLevel(Level.OFF) You won’t see any logs after that. Other options for Level include: all, debug, error, fatal, info, off, trace, trace_int, warn Details … Read more

Apache Spark: The number of cores vs. the number of executors

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set … Read more

Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

[*] RDDs extend the Serialisable interface, so this is not what’s causing your task to fail. Now this doesn’t mean that you can serialise an RDD with Spark and avoid NotSerializableException Spark is a distributed computing engine and its main abstraction is a resilient distributed dataset (RDD), which can be viewed as a distributed collection. … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)