How to export a table dataframe in PySpark to csv?

If data frame fits in a driver memory and you want to save to local files system you can convert Spark DataFrame to local Pandas DataFrame using toPandas method and then simply use to_csv: df.toPandas().to_csv(‘mycsv.csv’) Otherwise you can use spark-csv: Spark 1.3 df.save(‘mycsv.csv’, ‘com.databricks.spark.csv’) Spark 1.4+ df.write.format(‘com.databricks.spark.csv’).save(‘mycsv.csv’) In Spark 2.0+ you can use csv data … Read more

Convert pyspark string to date format

Update (1/10/2018): For Spark 2.2+ the best way to do this is probably using the to_date or to_timestamp functions, which both support the format argument. From the docs: >>> from pyspark.sql.functions import to_timestamp >>> df = spark.createDataFrame([(‘1997-02-28 10:30:00’,)], [‘t’]) >>> df.select(to_timestamp(df.t, ‘yyyy-MM-dd HH:mm:ss’).alias(‘dt’)).collect() [Row(dt=datetime.datetime(1997, 2, 28, 10, 30))] Original Answer (for Spark < 2.2) It … Read more

Best way to get the max value in a Spark dataframe column

>df1.show() +—–+——————–+——–+———-+———–+ |floor| timestamp| uid| x| y| +—–+——————–+——–+———-+———–+ | 1|2014-07-19T16:00:…|600dfbe2| 103.79211|71.50419418| | 1|2014-07-19T16:00:…|5e7b40e1| 110.33613|100.6828393| | 1|2014-07-19T16:00:…|285d22e4|110.066315|86.48873585| | 1|2014-07-19T16:00:…|74d917a1| 103.78499|71.45633073| >row1 = df1.agg({“x”: “max”}).collect()[0] >print row1 Row(max(x)=110.33613) >print row1[“max(x)”] 110.33613 The answer is almost the same as method3. but seems the “asDict()” in method3 can be removed

Load CSV file with PySpark

Spark 2.0.0+ You can use built-in csv data source directly: spark.read.csv( “some_input_file.csv”, header=True, mode=”DROPMALFORMED”, schema=schema ) or ( spark.read .schema(schema) .option(“header”, “true”) .option(“mode”, “DROPMALFORMED”) .csv(“some_input_file.csv”) ) without including any external dependencies. Spark < 2.0.0: Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv: Make sure that Spark … Read more

Sort in descending order in PySpark

In PySpark 1.3 sort method doesn’t take ascending parameter. You can use desc method instead: from pyspark.sql.functions import col (group_by_dataframe .count() .filter(“`count` >= 10”) .sort(col(“count”).desc())) or desc function: from pyspark.sql.functions import desc (group_by_dataframe .count() .filter(“`count` >= 10”) .sort(desc(“count”)) Both methods can be used with with Spark >= 1.3 (including Spark 2.x).

Spark Dataframe distinguish columns with duplicated name

Lets start with some data: from pyspark.mllib.linalg import SparseVector from pyspark.sql import Row df1 = sqlContext.createDataFrame([ Row(a=107831, f=SparseVector( 5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})), Row(a=125231, f=SparseVector( 5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})), ]) df2 = sqlContext.createDataFrame([ Row(a=107831, f=SparseVector( 5, {0: 0.0, 1: 0.0, 2: 0.0, … Read more

How to define partitioning of DataFrame?

Spark >= 2.3.0 SPARK-22614 exposes range partitioning. val partitionedByRange = df.repartitionByRange(42, $”k”) partitionedByRange.explain // == Parsed Logical Plan == // ‘RepartitionByExpression [‘k ASC NULLS FIRST], 42 // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6] // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#5 ASC NULLS FIRST], 42 … Read more

How to convert rdd object to dataframe in spark

This code works perfectly from Spark 2.x with Scala 2.11 Import necessary classes import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} Create SparkSession Object, and Here it’s spark val spark: SparkSession = SparkSession.builder.master(“local”).getOrCreate val sc = spark.sparkContext // Just used to create test RDDs Let’s an RDD to make it DataFrame val rdd = sc.parallelize( … Read more

How to change a dataframe column from String type to Double type in PySpark?

There is no need for an UDF here. Column already provides cast method with DataType instance : from pyspark.sql.types import DoubleType changedTypedf = joindf.withColumn(“label”, joindf[“show”].cast(DoubleType())) or short string: changedTypedf = joindf.withColumn(“label”, joindf[“show”].cast(“double”)) where canonical string names (other variations can be supported as well) correspond to simpleString value. So for atomic types: from pyspark.sql import types … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)