Avoiding memory leaks with Scalaz 7 zipWithIndex/group enumeratees

This will come as little consolation for anyone who’s stuck with the older iteratee API, but I recently verified that an equivalent test passes against the scalaz-stream API. This is a newer stream processing API that is intended to replace iteratee.

For completeness, here’s the test code:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

This should work with any value for the n parameter (provided you’re willing to wait long enough) — I tested with 2^14 32MiB arrays (i.e., a total of half a TiB of memory allocated over time).

Leave a Comment