I had a similar problem debugging the useage of aiofile. I then found a solution using nest_asyncio. For example if one has the following async example script:
import asyncio
from aiofile import async_open
import nest_asyncio
async def main():
async with async_open("/tmp/hello.txt", 'w+') as afp:
await afp.write("Hello ")
await afp.write("world")
afp.seek(0)
breakpoint()
print(await afp.read())
if __name__=="__main__":
loop = asyncio.get_event_loop()
nest_asyncio.apply(loop)
loop.run_until_complete(main())
One can then do:
-> print(await afp.read())
(Pdb) loop = asyncio.get_event_loop()
(Pdb) loop.run_until_complete(afp.read())
'Hello world'
(Pdb)
Admittedly it is a bit more tedious then await asyncpg.fetch("select * from foo;")
or await afp.read()
but it gets the job done. Hopefully a more elegant solution will come up in the future.