FastAPI runs api-calls in serial instead of parallel fashion

As per FastAPI’s documentation:

When you declare a path operation function with normal def instead
of async def, it is run in an external threadpool that is then
awaited
, instead of being called directly (as it would block the
server).

also, as described here:

If you are using a third party library that communicates with
something (a database, an API, the file system, etc.) and doesn’t have
support for using await, (this is currently the case for most
database libraries), then declare your path operation functions as
normally, with just def.

If your application (somehow) doesn’t have to communicate with
anything else and wait for it to respond, use async def.

If you just don’t know, use normal def.

Note: You can mix def and async def in your path operation functions as much as you need and define each one using the best
option for you. FastAPI will do the right thing with them.

Anyway, in any of the cases above, FastAPI will still work
asynchronously
and be extremely fast.

But by following the steps above, it will be able to do some
performance optimizations.

Thus, def endpoints (in the context of asynchronous programming, a function defined with just def is called synchronous function), in FastAPI, run in a separate thread from an external threadpool that is then awaited, and hence, FastAPI will still work asynchronously. In other words, the server will process requests to such endpoints concurrently. Whereas, async def endpoints run directly in the event loop—which runs in the main (single) thread—that is, the server will also process requests to such endpoints concurrently/asynchronously, as long as there is an await call to non-blocking I/O-bound operations inside such async def endpoints/routes, such as waiting for (1) data from the client to be sent through the network, (2) contents of a file in the disk to be read, (3) a database operation to finish, etc., (have a look here). If, however, an endpoint defined with async def does not await for some coroutine inside, in order to give up time for other tasks in the event loop to run (e.g., requests to the same or other endpoints, background tasks, etc.), each request to such an endpoint will have to be completely finished (i.e., exit the endpoint), before returning control back to the event loop and allowing other tasks to run. In other words, in such cases, the server will process requests sequentially.

Note that the same concept also applies to functions defined with normal def that are used as StreamingResponse‘s generators (see StreamingResponse class implementation) or Background Tasks (see BackgroundTask class implementation and this answer), meaning that FastAPI, behind the scenes, will also run such functions in a separate thread from the same external threadpool. The default number of worker threads of that external threadpool is 40 and can be adjusted as required—please have a look at this answer on how to do that. Hence, after reading this answer to the end, you should be able to decide whether you should define a FastAPI endpoint or StreamingResponse‘s generator or background task function with def or async def.

The keyword await (which only works within an async def function) passes function control back to the event loop. In other words, it suspends the execution of the surrounding coroutine (i.e., a coroutine object is the result of calling an async def function), and tells the event loop to let some other task run, until that awaited task is completed. Note that just because you may define a custom function with async def and then await it inside your async def endpoint, it doesn’t mean that your code will work asynchronously, if that custom function contains, for example, calls to time.sleep(), CPU-bound tasks, non-async I/O libraries, or any other blocking call that is incompatible with asynchronous Python code. In FastAPI, for example, when using the async methods of UploadFile, such as await file.read() and await file.write(), FastAPI/Starlette, behind the scenes, actually calls the corresponding synchronous File methods in a separate thread from the external threadpool described earlier (using the async run_in_threadpool() function) and awaits it; otherwise, such methods/operations would block the event loop—you could find out more by looking at the implementation of the UploadFile class.

Note that async does not mean parallel, but concurrently. As mentioned earlier, asynchronous code with async and await is many times summarised as using coroutines. Coroutines are collaborative (or cooperatively multitasked), meaning that “at any given time, a program with coroutines is running only one of its coroutines, and this running coroutine suspends its execution only when it explicitly requests to be suspended” (see here and here for more info on coroutines). As described in this article:

Specifically, whenever execution of a currently-running coroutine
reaches an await expression, the coroutine may be suspended, and
another previously-suspended coroutine may resume execution if what it
was suspended on has since returned a value. Suspension can also
happen when an async for block requests the next value from an
asynchronous iterator or when an async with block is entered or
exited, as these operations use await under the hood.

If, however, a blocking I/O-bound or CPU-bound operation was directly executed/called inside an async def function/endpoint, it would then block the event loop, and hence, the main thread (the event loop runs in the main thread of a process/worker). Hence, a blocking operation such as time.sleep() in an async def endpoint would block the entire server (as in the code example provided in your question). Thus, if your endpoint is not going to make any async calls, you could declare it with normal def instead, in which case, FastAPI would run it in a separate thread from the external threadpool and await it, as explained earlier (more solutions are given in the following sections). Example:

@app.get("/ping")
def ping(request: Request):
    #print(request.client)
    print("Hello")
    time.sleep(5)
    print("bye")
    return "pong"

Otherwise, if the functions that you had to execute inside the endpoint are async functions that you had to await, you should define your endpoint with async def. To demonstrate this, the example below uses the asyncio.sleep() function (from the asyncio library), which provides a non-blocking sleep operation. The await asyncio.sleep() method will suspend the execution of the surrounding coroutine (until the sleep operation is completed), thus allowing other tasks in the event loop to run. Similar examples are given here and here as well.

import asyncio
 
@app.get("/ping")
async def ping(request: Request):
    #print(request.client)
    print("Hello")
    await asyncio.sleep(5)
    print("bye")
    return "pong"

Both the endpoints above will print out the specified messages to the screen in the same order as mentioned in your question—if two requests arrived at (around) the same time—that is:

Hello
Hello
bye
bye

Important Note

When using a browser to call the same endpoint for the second (third, and so on) time, please remember to do that from a tab that is isolated from the browser’s main session; otherwise, succeeding requests (i.e., coming after the first one) might be blocked by the browser (on client side), as the browser might be waiting for a response to the previous request from the server, before sending the next request. This is a common behaviour for Chrome browser at least, due to waiting to see the result of a request and check if the result can be cached, before requesting the same resource again.

You could confirm that by using print(request.client) inside the endpoint, where you would see the hostname and port number being the same for all incoming requests—in case the requests were initiated from tabs opened in the same browser window/session; otherwise, the port number would normally be different for every request—and hence, those requests would be processed sequentially by the server, because of the browser/client sending them sequentially in the first place. To overcome this, you could either:

  1. Reload the same tab (as is running), or

  2. Open a new tab in an Incognito Window, or

  3. Use a different browser/client to send the request, or

  4. Use the httpx library to make asynchronous HTTP requests, along with the awaitable asyncio.gather(), which allows executing multiple asynchronous operations concurrently and then returns a list of results in the same order the awaitables (tasks) were passed to that function (have a look at this answer for more details).

    Example:

    import httpx
    import asyncio
    
    URLS = ['http://127.0.0.1:8000/ping'] * 2
    
    async def send(url, client):
        return await client.get(url, timeout=10)
    
    async def main():
        async with httpx.AsyncClient() as client:
            tasks = [send(url, client) for url in URLS]
            responses = await asyncio.gather(*tasks)
            print(*[r.json() for r in responses], sep='\n')
    
    asyncio.run(main())
    

    In case you had to call different endpoints that may take different time to process a request, and you would like to print the response out on client side as soon as it is returned from the server—instead of waiting for asyncio.gather() to gather the results of all tasks and print them out in the same order the tasks were passed to the send() function—you could replace the send() function of the example above with the one shown below:

    async def send(url, client):
        res = await client.get(url, timeout=10)
        print(res.json())
        return res
    

Async/await and Blocking I/O-bound or CPU-bound Operations

If you are required to define a FastAPI endpoint (or a StreamingResponse‘s generator, or a background task function) with async def (as you might need to await for some coroutines inside it), but also have some synchronous blocking I/O-bound or CPU-bound operation (computationally intensive task) that would block the event loop (essentially, the entire server) and wouldn’t let other requests to go through, for example:

@app.post("/ping")
async def ping(file: UploadFile = File(...)):
    print("Hello")
    try:
        contents = await file.read()
        res = cpu_bound_task(contents)  # this would block the event loop
    finally:
        await file.close()
    print("bye")
    return "pong"

then:

  1. You should check whether you could change your endpoint’s definition to normal def instead of async def. For example, if the only method in your endpoint that has to be awaited is the one reading the file contents (as you mentioned in the comments section below), you could instead declare the type of the endpoint’s parameter as bytes (i.e., file: bytes = File()) and thus, FastAPI would read the file for you and you would receive the contents as bytes. Hence, there would be no need to use await file.read(). Please note that the above approach should work for small files, as the enitre file contents would be stored into memory (see the documentation on File Parameters); and hence, if your system does not have enough RAM available to accommodate the accumulated data (if, for example, you have 8GB of RAM, you can’t load a 50GB file), your application may end up crashing. Alternatively, you could call the .read() method of the SpooledTemporaryFile directly (which can be accessed through the .file attribute of the UploadFile object), so that again you don’t have to await the .read() method—and as you can now declare your endpoint with normal def, each request will run in a separate thread (example is given below). For more details on how to upload a File, as well how Starlette/FastAPI uses SpooledTemporaryFile behind the scenes, please have a look at this answer and this answer.

    @app.post("/ping")
    def ping(file: UploadFile = File(...)):
        print("Hello")
        try:
            contents = file.file.read()
            res = cpu_bound_task(contents)
        finally:
            file.file.close()
        print("bye")
        return "pong"
    
  2. Use FastAPI’s (Starlette’s) run_in_threadpool() function from the concurrency module—as @tiangolo suggested here—which “will run the function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked” (see here). As described by @tiangolo here, “run_in_threadpool is an awaitable function; the first parameter is a normal function, the following parameters are passed to that function directly. It supports both sequence arguments and keyword arguments”.

    from fastapi.concurrency import run_in_threadpool
    
    res = await run_in_threadpool(cpu_bound_task, contents)
    
  3. Alternatively, use asyncio‘s loop.run_in_executor()—after obtaining the running event loop using asyncio.get_running_loop()—to run the task, which, in this case, you can await for it to complete and return the result(s), before moving on to the next line of code. Passing None to the executor argument, the default executor will be used; which is a ThreadPoolExecutor:

    import asyncio
    
    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, cpu_bound_task, contents)
    

    or, if you would like to pass keyword arguments instead, you could use a lambda expression (e.g., lambda: cpu_bound_task(some_arg=contents)), or, preferably, functools.partial(), which is specifically recommended in the documentation for loop.run_in_executor():

    import asyncio
    from functools import partial
    
    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
    

    In Python 3.9+, you could also use asyncio.to_thread() to asynchronously run a synchronous function in a separate thread—which, essentially, uses await loop.run_in_executor(None, func_call) under the hood, as can been seen in the implementation of asyncio.to_thread(). The to_thread() function takes the name of a blocking function to execute, as well as any arguments (*args and/or **kwargs) to the function, and then returns a coroutine that can be awaited. Example:

    import asyncio
    
    res = await asyncio.to_thread(cpu_bound_task, contents)
    

    Note that as explained in this answer, passing None to the executor argument does not create a new ThreadPoolExecutor every time you call await loop.run_in_executor(None, ...), but instead re-uses the default executor with the default number of worker threads (i.e., min(32, os.cpu_count() + 4)). Thus, depending on the requirements of your application, that number might be quite low. In that case, you should rather use a custom ThreadPoolExecutor. For instance:

    import asyncio
    import concurrent.futures
    
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, cpu_bound_task, contents)
    

    I would strongly recommend having a look at the linked answer above to learn about the difference between using run_in_threadpool() and run_in_executor(), as well as how to create a re-usable custom ThreadPoolExecutor at the application startup, and adjust the number of maximum worker threads as needed.

  4. ThreadPoolExecutor will successfully prevent the event loop from being blocked, but won’t give you the performance improvement you would expect from running code in parallel; especially, when one needs to perform CPU-bound tasks, such as the ones described here (e.g., audio or image processing, machine learning, and so on). It is thus preferable to run CPU-bound tasks in a separate process—using ProcessPoolExecutor, as shown below—which, again, you can integrate with asyncio, in order to await it to finish its work and return the result(s). As described here, it is important to protect the entry point of the program to avoid recursive spawning of subprocesses, etc. Basically, your code must be under if __name__ == '__main__'.

    import concurrent.futures
    
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, cpu_bound_task, contents) 
    

    Again, I’d suggest having a look at the linked answer earlier on how to create a re-usable ProcessPoolExecutor at the application startup. You might find this answer helpful as well.

  5. Use more workers to take advantage of multi-core CPUs, in order to run multiple processes in parallel and be able to serve more requests. For example, uvicorn main:app --workers 4 (if you are using Gunicorn as a process manager with Uvicorn workers, please have a look at this answer). When using 1 worker, only one process is run. When using multiple workers, this will spawn multiple processes (all single threaded). Each process has a separate Global Interpreter Lock (GIL), as well as its own event loop, which runs in the main thread of each process and executes all tasks in its thread. That means, there is only one thread that can take a lock on the interpreter of each process; unless, of course, you employ additional threads, either outside or inside the event loop, e.g., when using a ThreadPoolExecutor with loop.run_in_executor, or defining endpoints/background tasks/StreamingResponse‘s generators with normal def instead of async def, as well as when calling UploadFile‘s methods (see the first two paragraphs of this answer for more details).

    Note: Each worker “has its own things, variables and memory”. This means that global variables/objects, etc., won’t be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and here. Additionally, note that “if you are consuming a large amount of memory in your code, each process will consume an equivalent amount of memory”.

  6. If you need to perform heavy background computation and you don’t necessarily need it to be run by the same process (for example, you don’t need to share memory, variables, etc), you might benefit from using other bigger tools like Celery, as described in FastAPI’s documentation.

Leave a Comment