I am not sure I fully understand the problem. But as I read it:
- You have no problems connecting to Neo4J or handling any of the Neo4J-specific elements.
- You are not able to properly manage multiple partitions; there is no persistency or means to ensure that communication across partitions is correct.
If my understanding is correct:
- I suggest looking at (pandas) Spark UDFs.
- These have nothing to do with Neo4J, but I have found them to be a great way to handle any ad hoc ETL work where any particular summarization or calculation can be done on a single partition.
- It is a surprisingly easy-to-use API, except the a priori definition of schemas in the UDF can be a pain.
From the documentation:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3['col2'] = s1 + s2.str.len()
return s3
# Create a Spark DataFrame that has three columns including a sturct column.
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# | |-- col1: string (nullable = true)
df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# | |-- col1: string (nullable = true)
# | |-- col2: long (nullable = true)
To be clear, those pd.Series
and pd.DataFrame
are not really pandas objects that you have to create. Instead, by decorating your function with @pandas_udf
, you supply Spark objects, and they are treated like pandas objects in each partition.
I do not know the technical details beyond this, other than to say it has worked for any weird UDF I have ever needed to try (if the schema can be known a priori!).