Is there an elegant way to process a stream in chunks?

Elegance is in the eye of the beholder. If you don’t mind using a stateful function in groupingBy, you can do this:

AtomicInteger counter = new AtomicInteger();

stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize))
    .values()
    .forEach(database::flushChunk);

This doesn’t win any performance or memory usage points over your original solution because it will still materialize the entire stream before doing anything.

If you want to avoid materializing the list, stream API will not help you. You will have to get the stream’s iterator or spliterator and do something like this:

Spliterator<Integer> split = stream.spliterator();
int chunkSize = 1000;

while(true) {
    List<Integer> chunk = new ArrayList<>(size);
    for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){};
    if (chunk.isEmpty()) break;
    database.flushChunk(chunk);
}

Leave a Comment