Spark SQL: apply aggregate functions to a list of columns

There are multiple ways of applying aggregate functions to multiple columns. GroupedData class provides a number of methods for the most common functions, including count, max, min, mean and sum, which can be used directly as follows: Python: df = sqlContext.createDataFrame( [(1.0, 0.3, 1.0), (1.0, 0.5, 0.0), (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2)], (“col1”, “col2”, … Read more

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

Pyspark does include a dropDuplicates() method, which was introduced in 1.4. https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html >>> from pyspark.sql import Row >>> df = sc.parallelize([ \ … Row(name=”Alice”, age=5, height=80), \ … Row(name=”Alice”, age=5, height=80), \ … Row(name=”Alice”, age=10, height=80)]).toDF() >>> df.dropDuplicates().show() +—+——+—–+ |age|height| name| +—+——+—–+ | 5| 80|Alice| | 10| 80|Alice| +—+——+—–+ >>> df.dropDuplicates([‘name’, ‘height’]).show() +—+——+—–+ |age|height| name| … Read more

Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey

groupByKey: Syntax: sparkContext.textFile(“hdfs://”) .flatMap(line => line.split(” “) ) .map(word => (word,1)) .groupByKey() .map((x,y) => (x,sum(y))) groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers. reduceByKey: Syntax: sparkContext.textFile(“hdfs://”) .flatMap(line => line.split(” “)) .map(word => (word,1)) .reduceByKey((x,y)=> (x+y)) Data are combined at each partition, with only … Read more

How to find count of Null and Nan values for each column in a PySpark dataframe efficiently?

You can use method shown here and replace isNull with isnan: from pyspark.sql.functions import isnan, when, count, col df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show() +——-+———-+—+ |session|timestamp1|id2| +——-+———-+—+ | 0| 0| 3| +——-+———-+—+ or df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() +——-+———-+—+ |session|timestamp1|id2| +——-+———-+—+ | 0| 0| 5| +——-+———-+—+

Overwrite specific partitions in spark dataframe write method

Finally! This is now a feature in Spark 2.3.0: SPARK-20236 To use it, you need to set the spark.sql.sources.partitionOverwriteMode setting to dynamic, the dataset needs to be partitioned, and the write mode overwrite. Example: spark.conf.set(“spark.sql.sources.partitionOverwriteMode”,”dynamic”) data.write.mode(“overwrite”).insertInto(“partitioned_table”) I recommend doing a repartition based on your partition column before writing, so you won’t end up with 400 … Read more

pyspark dataframe filter or include based on list

what it says is “df.score in l” can not be evaluated because df.score gives you a column and “in” is not defined on that column type use “isin” The code should be like this: # define a dataframe rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)]) df = sqlContext.createDataFrame(rdd, [“id”, “score”]) # … Read more

What are the benefits of Apache Beam over Spark/Flink for batch processing?

There’s a few things that Beam adds over many of the existing engines. Unifying batch and streaming. Many systems can handle both batch and streaming, but they often do so via separate APIs. But in Beam, batch and streaming are just two points on a spectrum of latency, completeness, and cost. There’s no learning/rewriting cliff … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)