Scatter-Gather pattern in parallelism
The gather
function is primarily designed for the Scatter-Gather pattern. In this pattern, when you need to calculate something from all task-results (e.g. in an aggregation function), the gather function is handy. The switch return_exceptions
controls whether a single failure should invalidate the aggregated result.
By default, any failure will be immediately propagated to the gather task.
This behavior is desired in aggregation tasks like sum(await gather(*tasks))
because the sum becomes invalid due to a single failure.
However, no task will be cancelled or altered in any way by gather
.
It is merely one observable, waiting and gathering information quietly.
Your question 1:
I cannot even make sense with what the documentations says – if an exception is raised …, I would not even be able to get the returned results
Usually you can and need to define multiple observables
in a Scatter-Gather job.
After one gather fails, the main routine should continue with another gather
or wait
or as_completed
or asyncio.sleep
.
Remember that tasks are still running and the Future objects are still there. By design, Scatter-Gather tasks should be independent, and the gather operation should not have any side effects, so that you can continue to handle those awaitables independently, e.g., by querying their tasks[i].result()
or explicitly killing them.
Question 2:
Am I missing anything, or is there a problem with the documentation?
In your test program, no other tasks (observables) are defined after catching the exception, so the main program simply exits, which gives you an illusion that all tasks are cancelled.
They are indeed cancelled when the event loop are closed by asyncio.run(main())
, some milliseconds after catching the exception.
By adding another waiting task, either await asyncio.wait(tasks)
or simply await asyncio.sleep(20)
at the end of main()
, those worker tasks will have their chance to complete. Thus the documentation is correct.
Your test program can be considered as a DAG of calculation tasks. The gather task is your root target passed to asyncio.run
. Therefore, when the only mission fails, all sub-tasks are aborted.
— updated in 2023
original answer
I’ve run your code and got the following output, as expected from documentation.
Task C: Compute factorial(2)...
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
==>> Task C DONE: factorial(2) = 2
Task A: Compute factorial(3)...
Task B: Compute factorial(3)...
Task A: Compute factorial(4)...
Task B: Compute factorial(4)...
Task B: raising Exception
Got an exception: Bad Task B
Task A: Compute factorial(5)...
==>> Task A DONE: factorial(5) = 120
What’s going on
- Tasks A,B and C are submitted to the queue;
- All tasks are running while C finishes earliest.
- Task B raises and exception.
- The
await asyncio.gater()
returns immediately andprint('Got an exception:', e)
to the screen. - Task A continues to run and print “==>> Task A DONE …”
What’s wrong with your test
As @deceze commented,
your program exited immediately after the exception was caught and main()
returns. Thus, the tasks A and C are terminated because the entire process dies, not because of cancellation.
To fix it, add await asyncio.sleep(20)
to the end of the main()
function.