Spark 2.0 Dataset vs DataFrame

Difference between df.select(“foo”) and df.select($”foo”) is signature. The former one takes at least one String, the later one zero or more Columns. There is no practical difference beyond that. myDataSet.map(foo.someVal) type checks, but as any Dataset operation uses RDD of objects, and compared to DataFrame operations, there is a significant overhead. Let’s take a look … Read more

spark createOrReplaceTempView vs createGlobalTempView

The Answer to your questions is basically understanding the difference of a Spark Application and a Spark Session. Spark application can be used: for a single batch job an interactive session with multiple jobs a long-lived server continually satisfying requests A Spark job can consist of more than just a single map and reduce. A … Read more

Perform a typed join in Scala with Spark Datasets

Observation Spark SQL can optimize join only if join condition is based on the equality operator. This means we can consider equijoins and non-equijoins separately. Equijoin Equijoin can be implemented in a type safe manner by mapping both Datasets to (key, value) tuples, performing join based on keys, and reshaping the result: import org.apache.spark.sql.Encoder import … Read more

Encoder for Row Type Spark Datasets

The answer is to use a RowEncoder and the schema of the dataset using StructType. Below is a working example of a flatmap operation with Datasets: StructType structType = new StructType(); structType = structType.add(“id1”, DataTypes.LongType, false); structType = structType.add(“id2”, DataTypes.LongType, false); ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() { @Override public Iterator<Row> … Read more

DataFrame / Dataset groupBy behaviour/optimization

Yes, it is “smart enough“. groupBy performed on a DataFrame is not the same operation as groupBy performed on a plain RDD. In a scenario you’ve described there is no need to move raw data at all. Let’s create a small example to illustrate that: val df = sc.parallelize(Seq( (“a”, “foo”, 1), (“a”, “foo”, 3), … Read more

Difference between SparkContext, JavaSparkContext, SQLContext, and SparkSession?

sparkContext is a Scala implementation entry point and JavaSparkContext is a java wrapper of sparkContext. SQLContext is entry point of SparkSQL which can be received from sparkContext.Prior to 2.x.x, RDD ,DataFrame and Data-set were three different data abstractions.Since Spark 2.x.x, All three data abstractions are unified and SparkSession is the unified entry point of Spark. … Read more

Encoder error while trying to map dataframe row to updated row

There is nothing unexpected here. You’re trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0: in 1.x DataFrame.map is ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T] in 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T] To be honest it didn’t make much sense in 1.x either. Independent … Read more

Difference between DataFrame, Dataset, and RDD in Spark

First thing is DataFrame was evolved from SchemaRDD. Yes.. conversion between Dataframe and RDD is absolutely possible. Below are some sample code snippets. df.rdd is RDD[Row] Below are some of options to create dataframe. 1) yourrddOffrow.toDF converts to DataFrame. 2) Using createDataFrame of sql context val df = spark.createDataFrame(rddOfRow, schema) where schema can be from … Read more

Why is “Unable to find encoder for type stored in a Dataset” when creating a dataset of custom case class?

Spark Datasets require Encoders for data type which is about to be stored. For common types (atomics, product types) there is a number of predefined encoders available but you have to import these first from SparkSession.implicits to make it work: val sparkSession: SparkSession = ??? import sparkSession.implicits._ val dataset = sparkSession.createDataset(dataList) Alternatively you can provide … Read more

How to store custom objects in Dataset?

Update This answer is still valid and informative, although things are now better since 2.2/2.3, which adds built-in encoder support for Set, Seq, Map, Date, Timestamp, and BigDecimal. If you stick to making types with only case classes and the usual Scala types, you should be fine with just the implicit in SQLImplicits. Unfortunately, virtually … Read more