PySpark: modify column values when another column value satisfies a condition

You can use when and otherwise like – from pyspark.sql.functions import * df\ .withColumn(‘Id_New’,when(df.Rank <= 5,df.Id).otherwise(‘other’))\ .drop(df.Id)\ .select(col(‘Id_New’).alias(‘Id’),col(‘Rank’))\ .show() this gives output as – +—–+—-+ | Id|Rank| +—–+—-+ | a| 5| |other| 7| |other| 8| | d| 1| +—–+—-+

Calculating duration by subtracting two datetime columns in string format

As of Spark 1.5 you can use unix_timestamp: from pyspark.sql import functions as F timeFmt = “yyyy-MM-dd’T’HH:mm:ss.SSS” timeDiff = (F.unix_timestamp(‘EndDateTime’, format=timeFmt) – F.unix_timestamp(‘StartDateTime’, format=timeFmt)) df = df.withColumn(“Duration”, timeDiff) Note the Java style time format. >>> df.show() +—+——————–+——————–+——–+ | ID| EndDateTime| StartDateTime|Duration| +—+——————–+——————–+——–+ |X01|2014-02-13T12:36:…|2014-02-13T12:31:…| 258| |X02|2014-02-13T12:35:…|2014-02-13T12:32:…| 204| |X03|2014-02-13T12:36:…|2014-02-13T12:32:…| 228| |XO4|2014-02-13T12:37:…|2014-02-13T12:32:…| 269| |XO5|2014-02-13T12:36:…|2014-02-13T12:33:…| 202| +—+——————–+——————–+——–+

Running scheduled Spark job

You can use a cron tab, but really as you start having spark jobs that depend on other spark jobs i would recommend pinball for coordination. https://github.com/pinterest/pinball To get a simple crontab working I would create wrapper script such as #!/bin/bash cd /locm/spark_jobs export SPARK_HOME=/usr/hdp/2.2.0.0-2041/spark export HADOOP_CONF_DIR=/etc/hadoop/conf export HADOOP_USER_NAME=hdfs export HADOOP_GROUP=hdfs #export SPARK_CLASSPATH=$SPARK_CLASSPATH:/locm/spark_jobs/configs/* CLASS=$1 MASTER=$2 … Read more

What do the blue blocks in spark stage DAG visualisation UI mean?

Each blue box is the steps of Apache Spark job. You are asking about the WholeStageCodegen this stuff is: Whole-Stage Code Generation (aka WholeStageCodegen or WholeStageCodegenExec) fuses multiple operators (as a subtree of plans that support codegen) together into a single Java function that is aimed at improving execution performance. It collapses a query into … Read more

What is the difference between sort and orderBy functions in Spark

OrderBy is just an alias for the sort function. From the Spark documentation: /** * Returns a new Dataset sorted by the given expressions. * This is an alias of the `sort` function. * * @group typedrel * @since 2.0.0 */ @scala.annotation.varargs def orderBy(sortCol: String, sortCols: String*): Dataset[T] = sort(sortCol, sortCols : _*)

What are the differences between saveAsTable and insertInto in different SaveMode(s)?

DISCLAIMER I’ve been exploring insertInto for some time and although I’m far from an expert in this area I’m sharing the findings for greater good. Does insertInto always expect the table to exist? Yes (per the table name and the database). Moreover not all tables can be inserted into, i.e. a (permanent) table, a temporary … Read more

How do I replace a string value with a NULL in PySpark?

You can combine when clause with NULL literal and types casting as follows: from pyspark.sql.functions import when, lit, col df = sc.parallelize([(1, “foo”), (2, “bar”)]).toDF([“x”, “y”]) def replace(column, value): return when(column != value, column).otherwise(lit(None)) df.withColumn(“y”, replace(col(“y”), “bar”)).show() ## +—+—-+ ## | x| y| ## +—+—-+ ## | 1| foo| ## | 2|null| ## +—+—-+ It … Read more

How spark read a large file (petabyte) when file can not be fit in spark’s main memory

First of all, Spark only starts reading in the data when an action (like count, collect or write) is called. Once an action is called, Spark loads in data in partitions – the number of concurrently loaded partitions depend on the number of cores you have available. So in Spark you can think of 1 … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)