How to shutdown the loop and print error if coroutine raised an exception with asyncio?

A graceful way is using error handling api.

https://docs.python.org/3/library/asyncio-eventloop.html#error-handling-api

Example:

import asyncio


async def run_division(a, b):
    await asyncio.sleep(2)
    return a / b


def custom_exception_handler(loop, context):
    # first, handle with default handler
    loop.default_exception_handler(context)

    exception = context.get('exception')
    if isinstance(exception, ZeroDivisionError):
        print(context)
        loop.stop()

loop = asyncio.get_event_loop()

# Set custom handler
loop.set_exception_handler(custom_exception_handler)
loop.create_task(run_division(1, 0))
loop.run_forever()

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)