Description
Feature or enhancement
Proposal:
It seems like there is a lot of layers of overhead involved in asyncio.to_thread
: extracting a concurrent.futures.Future
via submission to the ThreadPoolExecutor, creating a corresponding asyncio.Future
, and then synchronizing these two via callbacks, including translating the exceptions from concurrent-speak to asyncio-speak etc. Also involved in this process is a lot of (unnecessary-in-this-case) locking within the concurrent.futures.Future
.
Why not simply have the ThreadPoolExecutor update the asyncio.Future directly? That would eliminate nearly all these layers of pointless overhead, skipping the concurrent.futures.Future
altogether.
Here's a small proof of concept (including a couple hacks that could be smoothed over) to demonstrate how easily this could be achieved (even allows you to rip out the functools.partial as a bonus, since ThreadPoolExecutor allows kwargs submissions):
from asyncio import get_running_loop, AbstractEventLoop, Future
from asyncio.futures import _set_result_unless_cancelled
from concurrent.futures import thread
from typing import Callable
async def to_thread_new_and_improved[**P, T](fn: Callable[P, T], /, *args: P.args, **kwargs: P.kwargs) -> T:
loop = get_running_loop()
executor = loop._default_executor
if not executor:
# one-time hack to get the loop's default executor
loop.run_in_executor(None, int)
executor = loop._default_executor
# Now copy-and-paste (mostly) from ThreadPoolExecutor.submit:
with executor._shutdown_lock, thread._global_shutdown_lock:
if executor._broken or executor._shutdown or thread._shutdown:
raise TheRelevantException
# The key change:
f = loop.create_future() # instead of f = concurrent.futures.Future()
executor._work_queue.put(thread._WorkItem(FakeFuture(loop, f), fn, args, kwargs))
executor._adjust_thread_count()
del fn, args, kwargs, loop, executor # don't know if all this is necessary but never hurts
return await f
# where "FakeFuture" is defined as follows
# (simply delegating to an asyncio.Future while "pretending" to be a concurrent.futures.Future):
class FakeFuture[T]:
def __init__(self, loop: AbstractEventLoop, fut: Future[T]):
self._loop = loop
self._fut = fut
def set_running_or_notify_cancel(self):
return not (self._loop.is_closed() or self._fut.cancelled())
def set_result(self, result: T):
self._callback(_set_result_unless_cancelled, self._fut, result)
def set_exception(self, exception: BaseException):
self._callback(_set_exception_unless_cancelled, self._fut, exception)
def cancel(self):
self._callback(self._fut.cancel)
def _callback(self, f, *args):
try:
self._loop.call_soon_threadsafe(f, *args)
except RuntimeError:
pass
Note: I did not handle contextvars here, but that would be a straightforward tweak; also, _set_exception_unless_cancelled
doesn't exist but you can infer how that is written from _set_result_unless_cancelled
.
Has this already been discussed elsewhere?
This is a minor feature, which does not need previous discussion elsewhere
Links to previous discussion of this feature:
No response
Metadata
Metadata
Assignees
Projects
Status