Create Spark DataFrame. Can not infer schema for type

SparkSession.createDataFrame, which is used under the hood, requires an RDD / list of Row/tuple/list/dict* or pandas.DataFrame, unless schema with DataType is provided. Try to convert float to tuple like this: myFloatRdd.map(lambda x: (x, )).toDF() or even better: from pyspark.sql import Row row = Row(“val”) # Or some other column name myFloatRdd.map(row).toDF() To create a DataFrame … Read more

Add an empty column to Spark DataFrame

All you need here is importing StringType and using lit and cast: from pyspark.sql.types import StringType from pyspark.sql.functions import lit new_df = old_df.withColumn(‘new_column’, lit(None).cast(StringType())) A full example: df = sc.parallelize([row(1, “2”), row(2, “3”)]).toDF() df.printSchema() # root # |– foo: long (nullable = true) # |– bar: string (nullable = true) new_df = df.withColumn(‘new_column’, lit(None).cast(StringType())) new_df.printSchema() … Read more

How to pivot Spark DataFrame?

As mentioned by David Anderson Spark provides pivot function since version 1.6. General syntax looks as follows: df .groupBy(grouping_columns) .pivot(pivot_column, [values]) .agg(aggregate_expressions) Usage examples using nycflights13 and csv format: Python: from pyspark.sql.functions import avg flights = (sqlContext .read .format(“csv”) .options(inferSchema=”true”, header=”true”) .load(“flights.csv”) .na.drop()) flights.registerTempTable(“flights”) sqlContext.cacheTable(“flights”) gexprs = (“origin”, “dest”, “carrier”) aggexpr = avg(“arr_delay”) flights.count() ## … Read more

How to link PyCharm with PySpark?

With PySpark package (Spark 2.2.0 and later) With SPARK-1267 being merged you should be able to simplify the process by pip installing Spark in the environment you use for PyCharm development. Go to File -> Settings -> Project Interpreter Click on install button and search for PySpark Click on install package button. Manually with user … Read more

Cannot find col function in pyspark

It exists. It just isn’t explicitly defined. Functions exported from pyspark.sql.functions are thin wrappers around JVM code and, with a few exceptions which require special treatment, are generated automatically using helper methods. If you carefully check the source you’ll find col listed among other _functions. This dictionary is further iterated and _create_function is used to … Read more

Updating a dataframe column in spark

While you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you’d first create a UserDefinedFunction implementing the operation to apply and then selectively apply that function to the targeted column only. In Python: from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import StringType … Read more

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

Pyspark does include a dropDuplicates() method, which was introduced in 1.4. https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html >>> from pyspark.sql import Row >>> df = sc.parallelize([ \ … Row(name=”Alice”, age=5, height=80), \ … Row(name=”Alice”, age=5, height=80), \ … Row(name=”Alice”, age=10, height=80)]).toDF() >>> df.dropDuplicates().show() +—+——+—–+ |age|height| name| +—+——+—–+ | 5| 80|Alice| | 10| 80|Alice| +—+——+—–+ >>> df.dropDuplicates([‘name’, ‘height’]).show() +—+——+—–+ |age|height| name| … Read more

How do I set the driver’s python version in spark?

Setting both PYSPARK_PYTHON=python3 and PYSPARK_DRIVER_PYTHON=python3 works for me. I did this using export in my .bashrc. In the end, these are the variables I create: export SPARK_HOME=”$HOME/Downloads/spark-1.4.0-bin-hadoop2.4″ export IPYTHON=1 export PYSPARK_PYTHON=/usr/bin/python3 export PYSPARK_DRIVER_PYTHON=ipython3 export PYSPARK_DRIVER_PYTHON_OPTS=”notebook” I also followed this tutorial to make it work from within Ipython3 notebook: http://ramhiser.com/2015/02/01/configuring-ipython-notebook-support-for-pyspark/

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)