Personally I would go with Python UDF and wouldn’t bother with anything else:
Vectorsare not native SQL types so there will be performance overhead one way or another. In particular this process requires two steps where data is first converted from external type to row, and then from row to internal representation using genericRowEncoder.- Any downstream ML
Pipelinewill be much more expensive than a simple conversion. Moreover it requires a process which opposite to the one described above
But if you really want other options here you are:
-
Scala UDF with Python wrapper:
Install sbt following the instructions on the project site.
Create Scala package with following structure:
. ├── build.sbt └── udfs.scalaEdit
build.sbt(adjust to reflect Scala and Spark version):scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.4.4", "org.apache.spark" %% "spark-mllib" % "2.4.4" )Edit
udfs.scala:package com.example.spark.udfs import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) }Package:
sbt packageand include (or equivalent depending on Scala version):
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jaras an argument for
--driver-class-pathwhen starting shell / submitting application.In PySpark define a wrapper:
from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column)))Test:
with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show()+--------+------------------+----------------+ | city| temperatures| vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema()root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true) -
Dump data to a JSON format reflecting
DenseVectorschema and read it back:from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show()+--------+------------------+----------------+ | city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+with_parsed_vector.printSchema()root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true)