How to use asyncio with existing blocking library?

There are (sort of) two questions here:

  1. how can I run blocking code asynchronously within a coroutine
  2. how can I run multiple async tasks at the “same” time (as an aside: asyncio is single-threaded, so it is concurrent, but not truly parallel).

Concurrent tasks can be created using the high-level asyncio.create_task or the low-level asyncio.ensure_future. Starting with 3.11, they can also be created through asyncio task groups, as pioneered by the Trio library (the creator of Trio has an excellent blog post on the subject here).

To run synchronous code, you will need to run the blocking code in an executor. Example:

import concurrent.futures
import asyncio
import time

def blocking(delay):
    time.sleep(delay)
    print('Completed.')


async def non_blocking(executor):
    loop = asyncio.get_running_loop()
    # Run three of the blocking tasks concurrently. asyncio.wait will
    # automatically wrap these in Tasks. If you want explicit access
    # to the tasks themselves, use asyncio.ensure_future, or add a
    # "done, pending = asyncio.wait..." assignment
    await asyncio.wait(
        fs={
            # Returns after delay=12 seconds
            loop.run_in_executor(executor, blocking, 12),
            
            # Returns after delay=14 seconds
            loop.run_in_executor(executor, blocking, 14),
            
            # Returns after delay=16 seconds
            loop.run_in_executor(executor, blocking, 16)
        },
        return_when=asyncio.ALL_COMPLETED
    )

executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
asyncio.run(non_blocking(executor))

If you want to schedule these tasks using a for loop (as in your example), you have several different strategies, but the underlying approach is to schedule the tasks using the for loop (or list comprehension, etc), await them with asyncio.wait, and then retrieve the results. Example:

done, pending = await asyncio.wait(
    fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
    return_when=asyncio.ALL_COMPLETED
)

# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)