Spark 2.0
-
This is issue has been resolved in Spark 2.0 with SPARK-14241.
-
Another similar issue has been resolved in Spark 2.1 with SPARK-14393
Spark 1.x
Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id
is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.
It doesn’t take any parameters so from an optimizer perspective it doesn’t matter when it is called and can be pushed after all other operations. Hence the behavior you see.
If you take look at the code you’ll find out this is explicitly marked by extending MonotonicallyIncreasingID
expression with Nondeterministic
.
I don’t think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
bound = udf(lambda _, v: v, LongType())
(df
.withColumn("rn", monotonically_increasing_id())
# Due to nondeterministic behavior it has to be a separate step
.withColumn("rn", bound("P", "rn"))
.where("P"))
In general it could be cleaner to add indices using zipWithIndex
on a RDD
and then convert it back to a DataFrame
.
* Workaround shown above is no longer a valid solution (nor required) in Spark 2.x where Python UDFs are subject of the execution plan optimizations.