EDIT: as noleto mentions in his answer below, there is now approx_count_distinct
available since PySpark 2.1 that works over a window.
Original answer – exact distinct count (not an approximation)
We can use a combination of size
and collect_set
to mimic the functionality of countDistinct
over a window:
from pyspark.sql import functions as F, Window
# Function to calculate number of seconds from number of days
days = lambda i: i * 86400
# Create some test data
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
(13, "2017-03-15T12:27:18+00:00", "red"),
(25, "2017-03-18T11:27:18+00:00", "red")],
["dollars", "timestampGMT", "color"])
# Convert string timestamp to timestamp type
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
# Create window by casting timestamp to long (number of seconds)
w = Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)
# Use collect_set and size functions to perform countDistinct over a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))
df.show()
This results in the distinct count of color over the previous week of records:
+-------+--------------------+------+---------------------------------------+
|dollars| timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
| 17|2017-03-10 15:27:...|orange| 1|
| 13|2017-03-15 12:27:...| red| 2|
| 25|2017-03-18 11:27:...| red| 1|
+-------+--------------------+------+---------------------------------------+