Stream API and Queues: Subscribe to BlockingQueue stream-style

I’m guessing a bit at what you’re expecting, but I think I have a good hunch.

The stream of a queue, like iterating over a queue, represents the current contents of the queue. When the iterator or the stream reaches the tail of the queue, it doesn’t block awaiting further elements to be added. The iterator or the stream is exhausted at that point and the computation terminates.

If you want a stream that consists of all current and future elements of the queue, you can do something like this:

Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException ie) {
            return "Interrupted!";
        }
    })
    .filter(s -> s.endsWith("x"))
    .forEach(System.out::println);   

(Unfortunately the need to handle InterruptedException makes this quite messy.)

Note that there is no way to close a queue, and there is no way for Stream.generate to stop generating elements, so this is effectively an infinite stream. The only way to terminate it is with a short-circuiting stream operation such as findFirst.

Leave a Comment