Skip to content

Process Testing

provide.testkit.process

Process and async testing fixtures for the provide-io ecosystem.

Standard fixtures for testing async code, subprocess operations, and event loop management across any project that depends on provide.foundation.

Functions

async_condition_waiter

async_condition_waiter() -> (
    Callable[
        [Callable[[], bool], float, float], Awaitable[bool]
    ]
)

Helper for waiting on async conditions in tests.

Returns:

Type Description
Callable[[Callable[[], bool], float, float], Awaitable[bool]]

Function to wait for conditions with timeout.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_condition_waiter() -> Callable[[Callable[[], bool], float, float], Awaitable[bool]]:
    """
    Helper for waiting on async conditions in tests.

    Returns:
        Function to wait for conditions with timeout.
    """

    async def _wait_for(condition: Callable[[], bool], timeout: float = 5.0, interval: float = 0.1) -> bool:
        """
        Wait for a condition to become true.

        Args:
            condition: Function that returns True when condition is met
            timeout: Maximum wait time
            interval: Check interval

        Returns:
            True if condition met, False if timeout
        """
        start = asyncio.get_event_loop().time()

        while asyncio.get_event_loop().time() - start < timeout:
            if condition():
                return True
            await asyncio.sleep(interval)

        return False

    return _wait_for

async_context_manager async

async_context_manager() -> (
    Callable[[Any | None, Any | None], AsyncMock]
)

Factory for creating mock async context managers.

Returns:

Type Description
Callable[[Any | None, Any | None], AsyncMock]

Function that creates configured async context manager mocks.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
async def async_context_manager() -> Callable[[Any | None, Any | None], AsyncMock]:
    """
    Factory for creating mock async context managers.

    Returns:
        Function that creates configured async context manager mocks.
    """

    def _create_async_cm(enter_value: Any | None = None, exit_value: Any | None = None) -> AsyncMock:
        """
        Create a mock async context manager.

        Args:
            enter_value: Value to return from __aenter__
            exit_value: Value to return from __aexit__

        Returns:
            AsyncMock configured as context manager
        """
        mock_cm = AsyncMock()
        mock_cm.__aenter__ = AsyncMock(return_value=enter_value)
        mock_cm.__aexit__ = AsyncMock(return_value=exit_value)
        return mock_cm

    return _create_async_cm

async_gather_helper

async_gather_helper() -> (
    Callable[..., Awaitable[list[Any]]]
)

Helper for testing asyncio.gather operations.

Returns:

Type Description
Callable[..., Awaitable[list[Any]]]

Function to gather async results with error handling.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_gather_helper() -> Callable[..., Awaitable[list[Any]]]:
    """
    Helper for testing asyncio.gather operations.

    Returns:
        Function to gather async results with error handling.
    """

    async def _gather(*coroutines: Awaitable[Any], return_exceptions: bool = False) -> list[Any]:
        """
        Gather results from multiple coroutines.

        Args:
            *coroutines: Coroutines to gather
            return_exceptions: Whether to return exceptions as results

        Returns:
            List of results from coroutines
        """
        return await asyncio.gather(*coroutines, return_exceptions=return_exceptions)

    return _gather

async_iterator async

async_iterator() -> (
    Callable[[Sequence[T]], AsyncIterable[T]]
)

Factory for creating mock async iterators.

Returns:

Type Description
Callable[[Sequence[T]], AsyncIterable[T]]

Function that creates async iterator mocks with specified values.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
async def async_iterator() -> Callable[[Sequence[T]], AsyncIterable[T]]:
    """
    Factory for creating mock async iterators.

    Returns:
        Function that creates async iterator mocks with specified values.
    """

    def _create_async_iter(values: Sequence[T]) -> AsyncIterable[T]:
        """
        Create a mock async iterator.

        Args:
            values: List of values to yield

        Returns:
            Async iterator that yields the specified values
        """
        return _AsyncIterator(values)

    return _create_async_iter

async_lock async

async_lock() -> asyncio.Lock

Create an async lock for testing synchronization.

Returns:

Type Description
Lock

asyncio.Lock instance for testing.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
async def async_lock() -> asyncio.Lock:
    """
    Create an async lock for testing synchronization.

    Returns:
        asyncio.Lock instance for testing.
    """
    return asyncio.Lock()

async_mock_server

async_mock_server() -> AsyncMockServer

Create a mock async server for testing.

Returns:

Type Description
AsyncMockServer

Mock server with async methods.

Source code in provide/testkit/process/subprocess_fixtures.py
@pytest.fixture
def async_mock_server() -> AsyncMockServer:
    """
    Create a mock async server for testing.

    Returns:
        Mock server with async methods.
    """

    return AsyncMockServer()

async_pipeline

async_pipeline() -> AsyncPipeline

Create an async pipeline for testing data flow.

Returns:

Type Description
AsyncPipeline

AsyncPipeline instance for chaining async operations.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_pipeline() -> AsyncPipeline:
    """
    Create an async pipeline for testing data flow.

    Returns:
        AsyncPipeline instance for chaining async operations.
    """

    return AsyncPipeline()

async_queue

async_queue() -> asyncio.Queue[Any]

Create an async queue for testing producer/consumer patterns.

Returns:

Type Description
Queue[Any]

asyncio.Queue instance for testing.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_queue() -> asyncio.Queue[Any]:
    """
    Create an async queue for testing producer/consumer patterns.

    Returns:
        asyncio.Queue instance for testing.
    """
    return asyncio.Queue()

async_rate_limiter

async_rate_limiter() -> AsyncRateLimiter

Create an async rate limiter for testing.

Returns:

Type Description
AsyncRateLimiter

AsyncRateLimiter instance for controlling request rates.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_rate_limiter() -> AsyncRateLimiter:
    """
    Create an async rate limiter for testing.

    Returns:
        AsyncRateLimiter instance for controlling request rates.
    """

    return AsyncRateLimiter()

async_stream_reader async

async_stream_reader() -> AsyncMock

Mock async stream reader for subprocess stdout/stderr.

Returns:

Type Description
AsyncMock

AsyncMock configured as a stream reader.

Source code in provide/testkit/process/subprocess_fixtures.py
@pytest.fixture
async def async_stream_reader() -> AsyncMock:
    """
    Mock async stream reader for subprocess stdout/stderr.

    Returns:
        AsyncMock configured as a stream reader.
    """
    reader = AsyncMock()

    # Simulate reading lines
    async def readline_side_effect() -> AsyncGenerator[bytes, None]:
        for line in [b"line1\n", b"line2\n", b""]:
            yield line

    reader.readline = AsyncMock(side_effect=readline_side_effect().__anext__)
    reader.read = AsyncMock(return_value=b"full content")
    reader.at_eof = Mock(side_effect=[False, False, True])

    return reader

async_subprocess

async_subprocess() -> (
    Callable[[int, bytes, bytes, int], AsyncMock]
)

Create mock async subprocess for testing.

Returns:

Type Description
Callable[[int, bytes, bytes, int], AsyncMock]

Function that creates mock subprocess with configurable behavior.

Source code in provide/testkit/process/subprocess_fixtures.py
@pytest.fixture
def async_subprocess() -> Callable[[int, bytes, bytes, int], AsyncMock]:
    """
    Create mock async subprocess for testing.

    Returns:
        Function that creates mock subprocess with configurable behavior.
    """

    def _create_subprocess(
        returncode: int = 0,
        stdout: bytes = b"",
        stderr: bytes = b"",
        pid: int = 12345,
    ) -> AsyncMock:
        """
        Create a mock async subprocess.

        Args:
            returncode: Process return code
            stdout: Process stdout output
            stderr: Process stderr output
            pid: Process ID

        Returns:
            AsyncMock configured as subprocess
        """
        process = AsyncMock()
        process.returncode = returncode
        process.pid = pid
        process.communicate = AsyncMock(return_value=(stdout, stderr))
        process.wait = AsyncMock(return_value=returncode)
        process.kill = Mock()
        process.terminate = Mock()
        process.send_signal = Mock()

        # Add stdout/stderr as async stream readers
        process.stdout = AsyncMock()
        process.stdout.read = AsyncMock(return_value=stdout)
        process.stdout.readline = AsyncMock(side_effect=[stdout, b""])
        process.stdout.at_eof = Mock(side_effect=[False, True])

        process.stderr = AsyncMock()
        process.stderr.read = AsyncMock(return_value=stderr)
        process.stderr.readline = AsyncMock(side_effect=[stderr, b""])
        process.stderr.at_eof = Mock(side_effect=[False, True])

        process.stdin = AsyncMock()
        process.stdin.write = AsyncMock()
        process.stdin.drain = AsyncMock()
        process.stdin.close = Mock()

        return process

    return _create_subprocess

async_task_group

async_task_group() -> AsyncTaskGroup

Manage a group of async tasks with cleanup.

Returns:

Type Description
AsyncTaskGroup

AsyncTaskGroup instance for managing tasks.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_task_group() -> AsyncTaskGroup:
    """
    Manage a group of async tasks with cleanup.

    Returns:
        AsyncTaskGroup instance for managing tasks.
    """

    return AsyncTaskGroup()

async_test_client

async_test_client() -> AsyncTestClient

Create an async HTTP test client.

Returns:

Type Description
AsyncTestClient

Mock async HTTP client for testing.

Source code in provide/testkit/process/subprocess_fixtures.py
@pytest.fixture
def async_test_client() -> AsyncTestClient:
    """
    Create an async HTTP test client.

    Returns:
        Mock async HTTP client for testing.
    """

    return AsyncTestClient()

async_timeout

async_timeout() -> (
    Callable[[Awaitable[T], float], Awaitable[T]]
)

Provide configurable timeout wrapper for async operations.

Returns:

Type Description
Callable[[Awaitable[T], float], Awaitable[T]]

A function that wraps async operations with a timeout.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_timeout() -> Callable[[Awaitable[T], float], Awaitable[T]]:
    """
    Provide configurable timeout wrapper for async operations.

    Returns:
        A function that wraps async operations with a timeout.
    """

    def _timeout_wrapper(coro: Awaitable[T], seconds: float = 5.0) -> Awaitable[T]:
        """
        Wrap a coroutine with a timeout.

        Args:
            coro: Coroutine to wrap
            seconds: Timeout in seconds

        Returns:
            Result of the coroutine or raises asyncio.TimeoutError
        """
        return asyncio.wait_for(coro, timeout=seconds)

    return _timeout_wrapper

clean_event_loop async

clean_event_loop() -> AsyncGenerator[None, None]

Ensure clean event loop for async tests.

Cancels all pending tasks after the test to prevent event loop issues.

Yields:

Type Description
AsyncGenerator[None, None]

None - fixture for test setup/teardown.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
async def clean_event_loop() -> AsyncGenerator[None, None]:
    """
    Ensure clean event loop for async tests.

    Cancels all pending tasks after the test to prevent event loop issues.

    Yields:
        None - fixture for test setup/teardown.
    """
    yield

    # Clean up any pending tasks
    loop = asyncio.get_event_loop()
    pending = asyncio.all_tasks(loop)

    for task in pending:
        if not task.done():
            task.cancel()

    # Wait for all tasks to complete cancellation
    if pending:
        await asyncio.gather(*pending, return_exceptions=True)

disable_setproctitle

disable_setproctitle() -> Generator[None, None, None]

Disables setproctitle during tests to prevent pytest-xdist performance issues.

The setproctitle module causes severe performance degradation when running tests with pytest-xdist parallelization: - Immediate slowdown on test start - Progressive performance degradation over time - High CPU usage due to frequent system calls

Mocks setproctitle as a no-op during regular test runs while preserving functionality for mutation testing tools like mutmut that use it for displaying progress information.

Autouse and session-scoped - applies automatically to all tests in a session.

Yields:

Name Type Description
None None

Context manager for test execution with setproctitle disabled

Note

Only disables setproctitle when NOT running under mutmut. Mutmut detection uses sys.argv to check for "mutmut" in arguments.

Source code in provide/testkit/process/system_fixtures.py
@pytest.fixture(scope="session", autouse=True)
def disable_setproctitle() -> Generator[None, None, None]:
    """Disables setproctitle during tests to prevent pytest-xdist performance issues.

    The setproctitle module causes severe performance degradation when running
    tests with pytest-xdist parallelization:
    - Immediate slowdown on test start
    - Progressive performance degradation over time
    - High CPU usage due to frequent system calls

    Mocks setproctitle as a no-op during regular test runs while preserving
    functionality for mutation testing tools like mutmut that use it for
    displaying progress information.

    Autouse and session-scoped - applies automatically to all tests in a session.

    Yields:
        None: Context manager for test execution with setproctitle disabled

    Note:
        Only disables setproctitle when NOT running under mutmut.
        Mutmut detection uses sys.argv to check for "mutmut" in arguments.
    """
    # Only disable if not running under mutmut
    # mutmut needs setproctitle to show which mutations are being tested
    if not any("mutmut" in arg for arg in sys.argv):
        # Check if setproctitle is already imported
        original_module = sys.modules.get("setproctitle")

        # Create a mock module with common setproctitle functions
        mock_setproctitle = MagicMock()
        mock_setproctitle.setproctitle = MagicMock(return_value=None)
        mock_setproctitle.getproctitle = MagicMock(return_value="python")
        mock_setproctitle.setthreadtitle = MagicMock(return_value=None)
        mock_setproctitle.getthreadtitle = MagicMock(return_value="")

        # Inject the mock into sys.modules before any imports
        sys.modules["setproctitle"] = mock_setproctitle

        try:
            yield
        finally:
            # Restore original module if it existed
            if original_module is not None:
                sys.modules["setproctitle"] = original_module
            elif "setproctitle" in sys.modules:
                del sys.modules["setproctitle"]
    else:
        # When running under mutmut, don't disable setproctitle
        yield

event_loop_policy

event_loop_policy() -> (
    Generator[asyncio.AbstractEventLoopPolicy, None, None]
)

Set event loop policy for tests to avoid conflicts.

Returns:

Type Description
None

New event loop policy for isolated testing.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def event_loop_policy() -> Generator[asyncio.AbstractEventLoopPolicy, None, None]:
    """
    Set event loop policy for tests to avoid conflicts.

    Returns:
        New event loop policy for isolated testing.
    """
    policy = asyncio.get_event_loop_policy()
    new_policy = asyncio.DefaultEventLoopPolicy()
    asyncio.set_event_loop_policy(new_policy)

    yield new_policy

    # Restore original policy
    asyncio.set_event_loop_policy(policy)

mock_async_process

mock_async_process() -> AsyncMock

Mock async subprocess for testing.

Returns:

Type Description
AsyncMock

AsyncMock configured as a subprocess with common attributes.

Source code in provide/testkit/process/subprocess_fixtures.py
@pytest.fixture
def mock_async_process() -> AsyncMock:
    """
    Mock async subprocess for testing.

    Returns:
        AsyncMock configured as a subprocess with common attributes.
    """
    mock_process = AsyncMock()
    mock_process.communicate = AsyncMock(return_value=(b"output", b""))
    mock_process.returncode = 0
    mock_process.pid = 12345
    mock_process.stdin = AsyncMock()
    mock_process.stdout = AsyncMock()
    mock_process.stderr = AsyncMock()
    mock_process.wait = AsyncMock(return_value=0)
    mock_process.kill = Mock()
    mock_process.terminate = Mock()

    return mock_process