Skip to content

asyncio.to_thread could be a lot more efficient #136084

Open
@HansBrende

Description

@HansBrende

Feature or enhancement

Proposal:

It seems like there is a lot of layers of overhead involved in asyncio.to_thread: extracting a concurrent.futures.Future via submission to the ThreadPoolExecutor, creating a corresponding asyncio.Future, and then synchronizing these two via callbacks, including translating the exceptions from concurrent-speak to asyncio-speak etc. Also involved in this process is a lot of (unnecessary-in-this-case) locking within the concurrent.futures.Future.

Why not simply have the ThreadPoolExecutor update the asyncio.Future directly? That would eliminate nearly all these layers of pointless overhead, skipping the concurrent.futures.Future altogether.

Here's a small proof of concept (including a couple hacks that could be smoothed over) to demonstrate how easily this could be achieved (even allows you to rip out the functools.partial as a bonus, since ThreadPoolExecutor allows kwargs submissions):

from asyncio import get_running_loop, AbstractEventLoop, Future
from asyncio.futures import _set_result_unless_cancelled
from concurrent.futures import thread
from typing import Callable

async def to_thread_new_and_improved[**P, T](fn: Callable[P, T], /, *args: P.args, **kwargs: P.kwargs) -> T:
    loop = get_running_loop()
    executor = loop._default_executor
    if not executor:
        # one-time hack to get the loop's default executor
        loop.run_in_executor(None, int)
        executor = loop._default_executor

    # Now copy-and-paste (mostly) from ThreadPoolExecutor.submit:
    with executor._shutdown_lock, thread._global_shutdown_lock:
          if executor._broken or executor._shutdown or thread._shutdown:
               raise TheRelevantException
          
          # The key change:
          f = loop.create_future()    # instead of f = concurrent.futures.Future()

          executor._work_queue.put(thread._WorkItem(FakeFuture(loop, f), fn, args, kwargs))
          executor._adjust_thread_count()
    
     del fn, args, kwargs, loop, executor  # don't know if all this is necessary but never hurts
     return await f


# where "FakeFuture" is defined as follows 
# (simply delegating to an asyncio.Future while "pretending" to be a concurrent.futures.Future):
class FakeFuture[T]:
    def __init__(self, loop: AbstractEventLoop, fut: Future[T]):
        self._loop = loop
        self._fut = fut

    def set_running_or_notify_cancel(self):
        return not (self._loop.is_closed() or self._fut.cancelled())

    def set_result(self, result: T):
        self._callback(_set_result_unless_cancelled, self._fut, result)

    def set_exception(self, exception: BaseException):
        self._callback(_set_exception_unless_cancelled, self._fut, exception)

    def cancel(self):
        self._callback(self._fut.cancel)

    def _callback(self, f, *args):
        try:
            self._loop.call_soon_threadsafe(f, *args)
        except RuntimeError:
            pass

Note: I did not handle contextvars here, but that would be a straightforward tweak; also, _set_exception_unless_cancelled doesn't exist but you can infer how that is written from _set_result_unless_cancelled.

Has this already been discussed elsewhere?

This is a minor feature, which does not need previous discussion elsewhere

Links to previous discussion of this feature:

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    performancePerformance or resource usagestdlibPython modules in the Lib dirtopic-asynciotype-featureA feature request or enhancement

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions