Spark code organization and best practices [closed]

I think you can subscribe Apache Spark, databricks channel on youtube, listen more and know more, especially for the experiences and lessons from others. Apache Spark databricks Spark Technology Center here is some videos recommended: SparkUI Visualization slide SparkUI Visualization Spark in Production: Lessons from 100+ Production Users slide Spark in Production: Lessons from 100+ … Read more

Rename more than one column using withColumnRenamed

It is not possible to use a single withColumnRenamed call. You can use DataFrame.toDF method* data.toDF(‘x3’, ‘x4’) or new_names = [‘x3’, ‘x4’] data.toDF(*new_names) It is also possible to rename with simple select: from pyspark.sql.functions import col mapping = dict(zip([‘x1’, ‘x2’], [‘x3’, ‘x4’])) data.select([col(c).alias(mapping.get(c, c)) for c in data.columns]) Similarly in Scala you can: Rename all … Read more

Median / quantiles within PySpark groupBy

I guess you don’t need it anymore. But will leave it here for future generations (i.e. me next week when I forget). from pyspark.sql import Window import pyspark.sql.functions as F grp_window = Window.partitionBy(‘grp’) magic_percentile = F.expr(‘percentile_approx(val, 0.5)’) df.withColumn(‘med_val’, magic_percentile.over(grp_window)) Or to address exactly your question, this also works: df.groupBy(‘grp’).agg(magic_percentile.alias(‘med_val’)) And as a bonus, you can … Read more

How to loop through each row of dataFrame in pyspark

You simply cannot. DataFrames, same as other distributed data structures, are not iterable and can be accessed using only dedicated higher order function and / or SQL methods. You can of course collect for row in df.rdd.collect(): do_something(row) or convert toLocalIterator for row in df.rdd.toLocalIterator(): do_something(row) and iterate locally as shown above, but it beats … Read more

Spark – SELECT WHERE or filtering?

According to spark documentation “where() is an alias for filter()“ filter(condition) Filters rows using the given condition. where() is an alias for filter(). Parameters: condition – a Column of types.BooleanType or a string of SQL expression. >>> df.filter(df.age > 3).collect() [Row(age=5, name=u’Bob’)] >>> df.where(df.age == 2).collect() [Row(age=2, name=u’Alice’)] >>> df.filter(“age > 3”).collect() [Row(age=5, name=u’Bob’)] >>> … Read more

What is the relationship between workers, worker instances, and executors?

Extending to other great answers, I would like to describe with few images. In Spark Standalone mode, there are master node and worker nodes. If we represent both master and workers(each worker can have multiple executors if CPU and memory are available) at one place for standalone mode. If you are curious about how Spark … Read more

What is the difference between spark checkpoint and persist to a disk

There are few important differences but the fundamental one is what happens with lineage. Persist / cache keeps lineage intact while checkpoint breaks lineage. Lets consider following examples: import org.apache.spark.storage.StorageLevel val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _) cache / persist: val indCache = rdd.mapValues(_ > 4) indCache.persist(StorageLevel.DISK_ONLY) indCache.toDebugString // … Read more

Which cluster type should I choose for Spark?

Spark Standalone Manager : A simple cluster manager included with Spark that makes it easy to set up a cluster. By default, each application uses all the available nodes in the cluster. A few benefits of YARN over Standalone & Mesos: YARN allows you to dynamically share and centrally configure the same pool of cluster … Read more

How does createOrReplaceTempView work in Spark?

createOrReplaceTempView creates (or replaces if that view name already exists) a lazily evaluated “view” that you can then use like a hive table in Spark SQL. It does not persist to memory unless you cache the dataset that underpins the view. scala> val s = Seq(1,2,3).toDF(“num”) s: org.apache.spark.sql.DataFrame = [num: int] scala> s.createOrReplaceTempView(“nums”) scala> spark.table(“nums”) … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)