Explain the aggregate functionality in Spark (with Python and Scala)

I wasn’t fully convinced from the accepted answer, and JohnKnight’s answer helped, so here’s my point of view:

First, let’s explain aggregate() in my own words:

Prototype:

aggregate(zeroValue, seqOp, combOp)

Description:

aggregate() lets you take an RDD and generate a single value that is of a different type than what was stored in the original RDD.

Parameters:

  1. zeroValue: The initialization value, for your result, in the desired
    format.
  2. seqOp: The operation you want to apply to RDD records. Runs once for
    every record in a partition.
  3. combOp: Defines how the resulted objects (one for every partition),
    gets combined.

Example:

Compute the sum of a list and the length of that list. Return the result in a pair of (sum, length).

In a Spark shell, I first created a list with 4 elements, with 2 partitions:

listRDD = sc.parallelize([1,2,3,4], 2)

then I defined my seqOp:

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

and my combOp:

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

and then I aggregated:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

As you can see, I gave descriptive names to my variables, but let me explain it further:

The first partition has the sublist [1, 2]. We will apply the seqOp to each element of that list and this will produce a local result, a pair of (sum, length), that will reflect the result locally, only in that first partition.

So, let’s start: local_result gets initialized to the zeroValue parameter we provided the aggregate() with, i.e. (0, 0) and list_element is the first element of the list, i.e. 1. As a result this is what happens:

0 + 1 = 1
0 + 1 = 1

Now, the local result is (1, 1), that means, that so far, for the 1st partition, after processing only the first element, the sum is 1 and the length 1. Notice, that local_result gets updated from (0, 0), to (1, 1).

1 + 2 = 3
1 + 1 = 2

and now the local result is (3, 2), which will be the final result from the 1st partition, since they are no other elements in the sublist of the 1st partition.

Doing the same for 2nd partition, we get (7, 2).

Now we apply the combOp to each local result, so that we can form, the final, global result, like this: (3,2) + (7,2) = (10, 4)


Example described in ‘figure’:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

Inspired by this great example.


So now if the zeroValue is not (0, 0), but (1, 0), one would expect to get (8 + 4, 2 + 2) = (12, 4), which doesn’t explain what you experience. Even if we alter the number of partitions of my example, I won’t be able to get that again.

The key here is JohnKnight’s answer, which state that the zeroValue is not only analogous to the number of partitions, but may be applied more times than you expect.

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)