Python asyncio: handling exceptions in gather() – documentation unclear?

Scatter-Gather pattern in parallelism

The gather function is primarily designed for the Scatter-Gather pattern. In this pattern, when you need to calculate something from all task-results (e.g. in an aggregation function), the gather function is handy. The switch return_exceptions controls whether a single failure should invalidate the aggregated result.
By default, any failure will be immediately propagated to the gather task.
This behavior is desired in aggregation tasks like sum(await gather(*tasks)) because the sum becomes invalid due to a single failure.
However, no task will be cancelled or altered in any way by gather.
It is merely one observable, waiting and gathering information quietly.

Your question 1:

I cannot even make sense with what the documentations says – if an exception is raised …, I would not even be able to get the returned results

Usually you can and need to define multiple observables
in a Scatter-Gather job.
After one gather fails, the main routine should continue with another gather or wait or as_completed or asyncio.sleep.
Remember that tasks are still running and the Future objects are still there. By design, Scatter-Gather tasks should be independent, and the gather operation should not have any side effects, so that you can continue to handle those awaitables independently, e.g., by querying their tasks[i].result() or explicitly killing them.

Question 2:

Am I missing anything, or is there a problem with the documentation?

In your test program, no other tasks (observables) are defined after catching the exception, so the main program simply exits, which gives you an illusion that all tasks are cancelled.
They are indeed cancelled when the event loop are closed by asyncio.run(main()), some milliseconds after catching the exception.
By adding another waiting task, either await asyncio.wait(tasks) or simply await asyncio.sleep(20) at the end of main(), those worker tasks will have their chance to complete. Thus the documentation is correct.

Your test program can be considered as a DAG of calculation tasks. The gather task is your root target passed to asyncio.run. Therefore, when the only mission fails, all sub-tasks are aborted.

updated in 2023


original answer

I’ve run your code and got the following output, as expected from documentation.

  Task C: Compute factorial(2)...
  Task A: Compute factorial(2)...
  Task B: Compute factorial(2)...
==>> Task C DONE: factorial(2) = 2
  Task A: Compute factorial(3)...
  Task B: Compute factorial(3)...
  Task A: Compute factorial(4)...
  Task B: Compute factorial(4)...
  Task B: raising Exception
Got an exception: Bad Task B
  Task A: Compute factorial(5)...
==>> Task A DONE: factorial(5) = 120

What’s going on

  1. Tasks A,B and C are submitted to the queue;
  2. All tasks are running while C finishes earliest.
  3. Task B raises and exception.
  4. The await asyncio.gater() returns immediately and print('Got an exception:', e) to the screen.
  5. Task A continues to run and print “==>> Task A DONE …”

What’s wrong with your test

As @deceze commented,
your program exited immediately after the exception was caught and main() returns. Thus, the tasks A and C are terminated because the entire process dies, not because of cancellation.

To fix it, add await asyncio.sleep(20) to the end of the main() function.

Leave a Comment

tech