Integrating Spark Structured Streaming with the Confluent Schema Registry

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema … Read more

spark createOrReplaceTempView vs createGlobalTempView

The Answer to your questions is basically understanding the difference of a Spark Application and a Spark Session. Spark application can be used: for a single batch job an interactive session with multiple jobs a long-lived server continually satisfying requests A Spark job can consist of more than just a single map and reduce. A … Read more

How to express a column which name contains spaces in Spark SQL?

Backticks seem to work just fine: scala> val df = sc.parallelize(Seq((“a”, 1))).toDF(“foo bar”, “x”) df: org.apache.spark.sql.DataFrame = [foo bar: string, x: int] scala> df.registerTempTable(“df”) scala> sqlContext.sql(“””SELECT `foo bar` FROM df”””).show foo bar a Same as DataFrame API: scala> df.select($”foo bar”).show foo bar a So it looks like it is supported, although I doubt it is … Read more

Does spark.sql.autoBroadcastJoinThreshold work for joins using Dataset’s join operator?

First of all spark.sql.autoBroadcastJoinThreshold and broadcast hint are separate mechanisms. Even if autoBroadcastJoinThreshold is disabled setting broadcast hint will take precedence. With default settings: spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”) String = 10485760 val df1 = spark.range(100) val df2 = spark.range(100) Spark will use autoBroadcastJoinThreshold and automatically broadcast data: df1.join(df2, Seq(“id”)).explain == Physical Plan == *Project [id#0L] +- *BroadcastHashJoin [id#0L], … Read more

Would Spark unpersist the RDD itself when it realizes it won’t be used anymore?

Yes, Apache Spark will unpersist the RDD when the RDD object is garbage collected. In RDD.persist you can see: sc.cleaner.foreach(_.registerRDDForCleanup(this)) This puts a WeakReference to the RDD in a ReferenceQueue leading to ContextCleaner.doCleanupRDD when the RDD is garbage collected. And there: sc.unpersistRDD(rddId, blocking) For more context see ContextCleaner in general and the commit that added … Read more

Avoid performance impact of a single partition mode in Spark window functions

In practice performance impact will be almost the same as if you omitted partitionBy clause at all. All records will be shuffled to a single partition, sorted locally and iterated sequentially one by one. The difference is only in the number of partitions created in total. Let’s illustrate that with an example using simple dataset … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)