You use a one-argument Stream.reduce(accumulator)
version that has the following signature:
Optional<T> reduce(BinaryOperator<T> accumulator);
The BinaryOperator<T> accumulator
can only accept elements of type T
, but you have:
<? extends Consumer<? super T>>
I propose you to use a three-argument version of the Stream.reduce(...)
method instead:
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator
BinaryOperator<U> combiner);
The BiFunction<U, ? super T, U> accumulator
can accept parameters of two different types, has a less restrictive bound and is more suitable for your situation. A possible solution could be:
<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
return consumers.filter(Objects::nonNull)
.reduce(t -> {}, Consumer::andThen, Consumer::andThen);
}
The third argument BinaryOperator<U> combiner
is called only in the parallel streams, but anyway it would be wise to provide a correct implementation of it.
In addition, for a better understanding one could represent the above code as follows:
<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
Consumer<T> identity = t -> {};
BiFunction<Consumer<T>, Consumer<? super T>, Consumer<T>> acc = Consumer::andThen;
BinaryOperator<Consumer<T>> combiner = Consumer::andThen;
return consumers.filter(Objects::nonNull)
.reduce(identity, acc, combiner);
}
Now you can write:
Stream<? extends Consumer<? super Foo>> consumers = Stream.of();
combine(consumers);