Well, lets make your dataset marginally more interesting:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
We have six elements:
rdd.count
Long = 6
no partitioner:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
and eight partitions:
rdd.partitions.length
Int = 8
Now lets define small helper to count number of elements per partition:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Since we don’t have partitioner our dataset is distributed uniformly between partitions (Default Partitioning Scheme in Spark):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

Now lets repartition our dataset:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Since parameter passed to HashPartitioner defines number of partitions we have expect one partition:
rddOneP.partitions.length
Int = 1
Since we have only one partition it contains all elements:
countByPartition(rddOneP).collect
Array[Int] = Array(6)

Note that the order of values after the shuffle is non-deterministic.
Same way if we use HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
we’ll get 2 partitions:
rddTwoP.partitions.length
Int = 2
Since rdd is partitioned by key data won’t be distributed uniformly anymore:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Because with have three keys and only two different values of hashCode mod numPartitions there is nothing unexpected here:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Just to confirm the above:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

Finally with HashPartitioner(7) we get seven partitions, three non-empty with 2 elements each:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

Summary and Notes
HashPartitionertakes a single argument which defines number of partitions-
values are assigned to partitions using
hashof keys.hashfunction may differ depending on the language (Scala RDD may usehashCode,DataSetsuse MurmurHash 3, PySpark,portable_hash).In simple case like this, where key is a small integer, you can assume that
hashis an identity (i = hash(i)).Scala API uses
nonNegativeModto determine partition based on computed hash, -
if distribution of keys is not uniform you can end up in situations when part of your cluster is idle
-
keys have to be hashable. You can check my answer for A list as a key for PySpark’s reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:
Java arrays have hashCodes that are based on the arrays’ identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.
-
In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?
-
Hash partitioner is neither injective nor surjective. Multiple keys can be assigned to a single partition and some partitions can remain empty.
-
Please note that currently hash based methods don’t work in Scala when combined with REPL defined case classes (Case class equality in Apache Spark).
-
HashPartitioner(or any otherPartitioner) shuffles the data. Unless partitioning is reused between multiple operations it doesn’t reduce amount of data to be shuffled.