How to cancel Java 8 completable future?

When you call CompletableFuture#cancel, you only stop the downstream part of the chain. Upstream part, i. e. something that will eventually call complete(...) or completeExceptionally(...), doesn’t get any signal that the result is no more needed.

What are those ‘upstream’ and ‘downstream’ things?

Let’s consider the following code:

CompletableFuture
        .supplyAsync(() -> "hello")               //1
        .thenApply(s -> s + " world!")            //2
        .thenAccept(s -> System.out.println(s));  //3

Here, the data flows from top to bottom – from being created by supplier, through being modified by function, to being consumed by println. The part above particular step is called upstream, and the part below is downstream. E. g. steps 1 and 2 are upstream for step 3.

Here’s what happens behind the scenes. This is not precise, rather it’s a convenient mind model of what’s going on.

  1. Supplier (step 1) is being executed (inside the JVM’s common ForkJoinPool).
  2. The result of the supplier is then being passed by complete(...) to the next CompletableFuture downstream.
  3. Upon receiving the result, that CompletableFuture invokes next step – a function (step 2) which takes in previous step result and returns something that will be passed further, to the downstream CompletableFuture‘s complete(...).
  4. Upon receiving the step 2 result, step 3 CompletableFuture invokes the consumer, System.out.println(s). After consumer is finished, the downstream CompletableFuture will receive it’s value, (Void) null

As we can see, each CompletableFuture in this chain has to know who are there downstream waiting for the value to be passed to their’s complete(...) (or completeExceptionally(...)). But the CompletableFuture don’t have to know anything about it’s upstream (or upstreams – there might be several).

Thus, calling cancel() upon step 3 doesn’t abort steps 1 and 2, because there’s no link from step 3 to step 2.

It is supposed that if you’re using CompletableFuture then your steps are small enough so that there’s no harm if a couple of extra steps will get executed.

If you want cancellation to be propagated upstream, you have two options:

  • Implement this yourself – create a dedicated CompletableFuture (name it like cancelled) which is checked after every step (something like step.applyToEither(cancelled, Function.identity()))
  • Use reactive stack like RxJava 2, ProjectReactor/Flux or Akka Streams

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)