How to find spark RDD/Dataframe size?

If you are simply looking to count the number of rows in the rdd, do: val distFile = sc.textFile(file) println(distFile.count) If you are interested in the bytes, you can use the SizeEstimator: import org.apache.spark.util.SizeEstimator println(SizeEstimator.estimate(distFile)) https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/SizeEstimator.html

Spark read file from S3 using sc.textFile (“s3n://…)

Confirmed that this is related to the Spark build against Hadoop 2.60. Just installed Spark 1.4.0 “Pre built for Hadoop 2.4 and later” (instead of Hadoop 2.6). And the code now works OK. sc.textFile(“s3n://bucketname/Filename”) now raises another error: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password … Read more

Spark specify multiple column conditions for dataframe join

There is a Spark column/expression API join for such case: Leaddetails.join( Utm_Master, Leaddetails(“LeadSource”) <=> Utm_Master(“LeadSource”) && Leaddetails(“Utm_Source”) <=> Utm_Master(“Utm_Source”) && Leaddetails(“Utm_Medium”) <=> Utm_Master(“Utm_Medium”) && Leaddetails(“Utm_Campaign”) <=> Utm_Master(“Utm_Campaign”), “left” ) The <=> operator in the example means “Equality test that is safe for null values”. The main difference with simple Equality test (===) is that the … Read more

Explain the aggregate functionality in Spark (with Python and Scala)

I wasn’t fully convinced from the accepted answer, and JohnKnight’s answer helped, so here’s my point of view: First, let’s explain aggregate() in my own words: Prototype: aggregate(zeroValue, seqOp, combOp) Description: aggregate() lets you take an RDD and generate a single value that is of a different type than what was stored in the original … Read more

‘PipelinedRDD’ object has no attribute ‘toDF’ in PySpark

toDF method is a monkey patch executed inside SparkSession (SQLContext constructor in 1.x) constructor so to be able to use it you have to create a SQLContext (or SparkSession) first: # SQLContext or HiveContext in Spark 1.x from pyspark.sql import SparkSession from pyspark import SparkContext sc = SparkContext() rdd = sc.parallelize([(“a”, 1)]) hasattr(rdd, “toDF”) ## … Read more

DataFrame equality in Apache Spark

Scala (see below for PySpark) The spark-fast-tests library has two methods for making DataFrame comparisons (I’m the creator of the library): The assertSmallDataFrameEquality method collects DataFrames on the driver node and makes the comparison def assertSmallDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = { if (!actualDF.schema.equals(expectedDF.schema)) { throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF)) } if (!actualDF.collect().sameElements(expectedDF.collect())) { throw new … Read more

reduceByKey: How does it work internally?

Let’s break it down to discrete methods and types. That usually exposes the intricacies for new devs: pairs.reduceByKey((a, b) => a + b) becomes pairs.reduceByKey((a: Int, b: Int) => a + b) and renaming the variables makes it a little more explicit pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue) So, we can now see … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)