Add an empty column to Spark DataFrame

All you need here is importing StringType and using lit and cast: from pyspark.sql.types import StringType from pyspark.sql.functions import lit new_df = old_df.withColumn(‘new_column’, lit(None).cast(StringType())) A full example: df = sc.parallelize([row(1, “2”), row(2, “3”)]).toDF() df.printSchema() # root # |– foo: long (nullable = true) # |– bar: string (nullable = true) new_df = df.withColumn(‘new_column’, lit(None).cast(StringType())) new_df.printSchema() … Read more

How to pivot Spark DataFrame?

As mentioned by David Anderson Spark provides pivot function since version 1.6. General syntax looks as follows: df .groupBy(grouping_columns) .pivot(pivot_column, [values]) .agg(aggregate_expressions) Usage examples using nycflights13 and csv format: Python: from pyspark.sql.functions import avg flights = (sqlContext .read .format(“csv”) .options(inferSchema=”true”, header=”true”) .load(“flights.csv”) .na.drop()) flights.registerTempTable(“flights”) sqlContext.cacheTable(“flights”) gexprs = (“origin”, “dest”, “carrier”) aggexpr = avg(“arr_delay”) flights.count() ## … Read more

How does createOrReplaceTempView work in Spark?

createOrReplaceTempView creates (or replaces if that view name already exists) a lazily evaluated “view” that you can then use like a hive table in Spark SQL. It does not persist to memory unless you cache the dataset that underpins the view. scala> val s = Seq(1,2,3).toDF(“num”) s: org.apache.spark.sql.DataFrame = [num: int] scala> s.createOrReplaceTempView(“nums”) scala> spark.table(“nums”) … Read more

Cannot find col function in pyspark

It exists. It just isn’t explicitly defined. Functions exported from pyspark.sql.functions are thin wrappers around JVM code and, with a few exceptions which require special treatment, are generated automatically using helper methods. If you carefully check the source you’ll find col listed among other _functions. This dictionary is further iterated and _create_function is used to … Read more

Spark SQL: apply aggregate functions to a list of columns

There are multiple ways of applying aggregate functions to multiple columns. GroupedData class provides a number of methods for the most common functions, including count, max, min, mean and sum, which can be used directly as follows: Python: df = sqlContext.createDataFrame( [(1.0, 0.3, 1.0), (1.0, 0.5, 0.0), (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2)], (“col1”, “col2”, … Read more

Updating a dataframe column in spark

While you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you’d first create a UserDefinedFunction implementing the operation to apply and then selectively apply that function to the targeted column only. In Python: from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import StringType … Read more

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

Pyspark does include a dropDuplicates() method, which was introduced in 1.4. https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html >>> from pyspark.sql import Row >>> df = sc.parallelize([ \ … Row(name=”Alice”, age=5, height=80), \ … Row(name=”Alice”, age=5, height=80), \ … Row(name=”Alice”, age=10, height=80)]).toDF() >>> df.dropDuplicates().show() +—+——+—–+ |age|height| name| +—+——+—–+ | 5| 80|Alice| | 10| 80|Alice| +—+——+—–+ >>> df.dropDuplicates([‘name’, ‘height’]).show() +—+——+—–+ |age|height| name| … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)