Skip to content

Optimize asyncio.to_thread to avoid contextvars.copy_context() overhead for empty contexts #136157

Open
@heliang666s

Description

@heliang666s

Feature or enhancement

Proposal:

asyncio.to_thread is a critical utility for running blocking operations without blocking the event loop. The current implementation always wraps the target function with functools.partial(contextvars.copy_context().run, ...). This ensures context variables are propagated but incurs a significant performance penalty, even when the context is empty.

In high-frequency use cases, such as web servers or data processing pipelines, the context is often empty. The overhead from copy_context() and, more substantially, from the ctx.run() wrapper, becomes a bottleneck. The cost is not just in the context management itself but also in the extra function call layer (partial) that is passed to the executor.

This proposal suggests a direct optimization: check if the copied context is empty. If it is, we can call loop.run_in_executor with the original function directly. If the context is not empty, we then use ctx.run as the callable for the executor. This approach bypasses both the context-running and the functools.partial overhead in the common case of an empty context.

This change is fully backward-compatible and offers a measurable performance improvement by streamlining the execution path for the most frequent use case.

Here is the proposed implementation for Lib/asyncio/threads.py:

# Lib/asyncio/threads.py
import contextvars
from . import events

async def to_thread(func, /, *args, **kwargs):
    """Asynchronously run function *func* in a separate thread.
    
    Any *args and **kwargs supplied for this function are directly passed
    to *func*. Also, the current :class:`contextvars.Context` is propagated,
    allowing context variables from the main thread to be accessed in the
    separate thread.

    Return a coroutine that can be awaited to get the eventual result of *func*.
    """
    loop = events.get_running_loop()
    ctx = contextvars.copy_context()

    # Optimization: If the context is empty, we can avoid the overhead of
    # both `functools.partial` and `ctx.run()`. We pass the function
    # and its arguments directly to the executor.
    if not ctx:
        return await loop.run_in_executor(None, func, *args, **kwargs)
    
    # If the context is not empty, we use `ctx.run` as the callable
    # for the executor to ensure context propagation.
    return await loop.run_in_executor(
        None, ctx.run, func, *args, **kwargs)

Benchmark

To validate the performance gain, a benchmark was conducted to measure the theoretical maximum performance improvement by isolating the to_thread call with a no-op function. The test was run with 500,000 operations.

Benchmark Results:
The results show a clear and consistent performance gain.

  • Original Implementation Best Time: 21.0525s (23,750 ops/sec)
  • Optimized Implementation Best Time: 20.0661s (24,918 ops/sec)
  • Theoretical Max Performance Improvement: 4.69%

This benchmark demonstrates that the optimization provides a tangible speedup by reducing overhead in the most common execution path.

Benchmark Code:

import asyncio
import time
import contextvars
import functools

# Original `to_thread` implementation for comparison
async def original_to_thread(func, /, *args, **kwargs):
    loop = asyncio.get_running_loop()
    ctx = contextvars.copy_context()
    func_call = functools.partial(ctx.run, func, *args, **kwargs)
    return await loop.run_in_executor(None, func_call)

# Optimized `to_thread` implementation
async def optimized_to_thread(func, /, *args, **kwargs):
    loop = asyncio.get_running_loop()
    ctx = contextvars.copy_context()
    if not ctx:
        return await loop.run_in_executor(None, func, *args, **kwargs)
    else:
        return await loop.run_in_executor(
            None, ctx.run, func, *args, **kwargs)

def blocking_noop():
    """A blocking function that does nothing, for measuring overhead."""
    pass

async def run_benchmark(to_thread_func, n_calls):
    """Runs a benchmark for a given to_thread implementation."""
    tasks = [to_thread_func(blocking_noop) for _ in range(n_calls)]
    start_time = time.monotonic()
    await asyncio.gather(*tasks)
    end_time = time.monotonic()
    return end_time - start_time

async def main():
    n_calls = 500_000
    print(f"--- Running benchmark for {n_calls} calls ---")

    # Benchmark original implementation
    original_duration = await run_benchmark(original_to_thread, n_calls)
    print(f"Original to_thread took: {original_duration:.4f} seconds")

    # Benchmark optimized implementation
    optimized_duration = await run_benchmark(optimized_to_thread, n_calls)
    print(f"Optimized to_thread took: {optimized_duration:.4f} seconds")

    # Calculate and display performance improvement
    improvement = (original_duration - optimized_duration) / original_duration * 100
    if improvement > 0:
        print(f"\nPerformance improvement: {improvement:.2f}%")
        print(f"Speedup: {original_duration / optimized_duration:.2f}x")
    else:
        print("\nNo significant performance improvement observed.")

if __name__ == "__main__":
    asyncio.run(main())

Has this already been discussed elsewhere?

This is a minor feature, which does not need previous discussion elsewhere

Links to previous discussion of this feature:

No response

Linked PRs

Metadata

Metadata

Assignees

No one assigned

    Labels

    performancePerformance or resource usagestdlibPython modules in the Lib dirtopic-asynciotype-featureA feature request or enhancement

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions