Pulling data from Neo4j using PySpark

I am not sure I fully understand the problem. But as I read it:

  • You have no problems connecting to Neo4J or handling any of the Neo4J-specific elements.
  • You are not able to properly manage multiple partitions; there is no persistency or means to ensure that communication across partitions is correct.

If my understanding is correct:

  • I suggest looking at (pandas) Spark UDFs.
    • These have nothing to do with Neo4J, but I have found them to be a great way to handle any ad hoc ETL work where any particular summarization or calculation can be done on a single partition.
    • It is a surprisingly easy-to-use API, except the a priori definition of schemas in the UDF can be a pain.

From the documentation:

import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a sturct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# |    |-- col1: string (nullable = true)

df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# |    |-- col1: string (nullable = true)
# |    |-- col2: long (nullable = true)

To be clear, those pd.Series and pd.DataFrame are not really pandas objects that you have to create. Instead, by decorating your function with @pandas_udf, you supply Spark objects, and they are treated like pandas objects in each partition.

I do not know the technical details beyond this, other than to say it has worked for any weird UDF I have ever needed to try (if the schema can be known a priori!).

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)