In project reactor or akka streams, what is the conceptual difference between sink and subscriber?

I see that Oleh Dokuka, from Project Reactor (missing disclaimer there), posted an answer already, however much of its assumptions about Akka Streams and Reactive Streams are incorrect, so allow me to clarify below. Disclaimer: I participated in Reactive Streams since it’s early days, and authored most of its Technology Compatibility Kit. I also maintain … Read more

The difference between onErrorResume and doOnError

onErrorResume: Gives a fallback stream when some exception occurs happens in the upstream. doOnError: Side-effect operator. Suppose you want to log what error happens in the upstream. Example: Mono.just(request) .flatMap(this::makeHTTPGet) .doOnError(err -> { log.error(“Some error occurred while making the POST call”,err) }) .onErrorResume(err -> Mono.just(getFallbackResponse())); You see, doOnError is a side-effect operator. It’s like inserting … Read more

Spring WebClient: How to stream large byte[] to file?

With recent stable Spring WebFlux (5.2.4.RELEASE as of writing): final WebClient client = WebClient.create(“https://example.com”); final Flux<DataBuffer> dataBufferFlux = client.get() .accept(MediaType.TEXT_HTML) .retrieve() .bodyToFlux(DataBuffer.class); // the magic happens here final Path path = FileSystems.getDefault().getPath(“target/example.html”); DataBufferUtils .write(dataBufferFlux, path, CREATE_NEW) .block(); // only block here if the rest of your code is synchronous For me the non-obvious part was … Read more

Comparison of Java reactive frameworks [closed]

I’m working on RxJava and I did some evaluations on Akka-Streams and Reactor recently. As far as I can tell with all the libraries, they converge to a single concept called Reactive-Streams so you can go back and forth between the implementations. I believe RxJava is the most generic of all, has zero dependencies on … Read more

publishOn vs subscribeOn in Project Reactor 3

It took me sometime to understand it, maybe because publishOn is usually explained before subscribeOn, here’s a hopefully more simple layman explanation. subscribeOn means running the initial source emission e.g subscribe(), onSubscribe() and request() on a specified scheduler worker (other thread), and also the same for any subsequent operations like for example onNext/onError/onComplete, map etc … Read more

Mono.Defer() vs Mono.create() vs Mono.just()?

Mono.just(value) is the most primitive – once you have a value you can wrap it into a Mono and subscribers down the line will get it. Mono.defer(monoSupplier) lets you provide the whole expression that supplies the resulting Mono instance. The evaluation of this expression is deferred until somebody subscribes. Inside of this expression you can … Read more

The bean could not be injected as a ‘Type’ because it is a JDK dynamic proxy that implements: reactor.fn.Consumer

While the other answer will solve this issue, I think it will be more appropriate for me to explain why applying proxyTargetClass = true will fix this. First of all, Spring, as a framework, utilizes proxying in order to supply the bean with some extended functionality, such as declarative transactions via @Transactional, or caching by … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)