Java 8 Generics: Reducing a Stream of Consumers to a single Consumer

You use a one-argument Stream.reduce(accumulator) version that has the following signature:

Optional<T> reduce(BinaryOperator<T> accumulator);

The BinaryOperator<T> accumulator can only accept elements of type T, but you have:

<? extends Consumer<? super T>>

I propose you to use a three-argument version of the Stream.reduce(...) method instead:

<U> U reduce(U identity,
             BiFunction<U, ? super T, U> accumulator
             BinaryOperator<U> combiner);

The BiFunction<U, ? super T, U> accumulator can accept parameters of two different types, has a less restrictive bound and is more suitable for your situation. A possible solution could be:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers.filter(Objects::nonNull)
                    .reduce(t -> {}, Consumer::andThen, Consumer::andThen);
}

The third argument BinaryOperator<U> combiner is called only in the parallel streams, but anyway it would be wise to provide a correct implementation of it.

In addition, for a better understanding one could represent the above code as follows:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {

    Consumer<T> identity = t -> {};
    BiFunction<Consumer<T>, Consumer<? super T>, Consumer<T>> acc = Consumer::andThen;
    BinaryOperator<Consumer<T>> combiner = Consumer::andThen;

    return consumers.filter(Objects::nonNull)
                    .reduce(identity, acc, combiner);
}

Now you can write:

Stream<? extends Consumer<? super Foo>> consumers = Stream.of();
combine(consumers);

Leave a Comment

tech