Skip to content

Async fixtures

provide.testkit.process.async_fixtures

Async-specific test fixtures for process testing.

Provides fixtures for testing async operations, event loops, and async context management across the provide-io ecosystem.

Classes

AsyncPipeline

AsyncPipeline()

Pipeline helper for chaining async processing stages.

Source code in provide/testkit/process/async_fixtures.py
def __init__(self) -> None:
    self.stages: list[StageFunc] = []
    self.results: list[Any] = []
Functions
add_stage
add_stage(func: StageFunc) -> AsyncPipeline

Register a pipeline stage.

Source code in provide/testkit/process/async_fixtures.py
def add_stage(self, func: StageFunc) -> AsyncPipeline:
    """Register a pipeline stage."""
    self.stages.append(func)
    return self
clear
clear() -> None

Reset stages and stored results.

Source code in provide/testkit/process/async_fixtures.py
def clear(self) -> None:
    """Reset stages and stored results."""
    self.stages.clear()
    self.results.clear()
process async
process(data: Any) -> Any

Process a single item through all stages.

Source code in provide/testkit/process/async_fixtures.py
async def process(self, data: Any) -> Any:
    """Process a single item through all stages."""
    result = data
    for stage in self.stages:
        if asyncio.iscoroutinefunction(stage):
            result = await stage(result)  # type: ignore[arg-type]
        else:
            result = stage(result)
        self.results.append(result)
    return result
process_batch async
process_batch(items: Sequence[Any]) -> list[Any]

Process a batch of items.

Source code in provide/testkit/process/async_fixtures.py
async def process_batch(self, items: Sequence[Any]) -> list[Any]:
    """Process a batch of items."""
    tasks = [self.process(item) for item in items]
    return list(await asyncio.gather(*tasks))

AsyncRateLimiter

AsyncRateLimiter(rate: int = 10, per: float = 1.0)

Co-operative async rate limiter.

Source code in provide/testkit/process/async_fixtures.py
def __init__(self, rate: int = 10, per: float = 1.0) -> None:
    self.rate = rate
    self.per = per
    self.allowance = float(rate)
    self.last_check = asyncio.get_event_loop().time()
Functions
acquire async
acquire() -> None

Acquire permission to continue, respecting rate limits.

Source code in provide/testkit/process/async_fixtures.py
async def acquire(self) -> None:
    """Acquire permission to continue, respecting rate limits."""
    current = asyncio.get_event_loop().time()
    time_passed = current - self.last_check
    self.last_check = current

    self.allowance += time_passed * (self.rate / self.per)
    if self.allowance > self.rate:
        self.allowance = float(self.rate)

    if self.allowance < 1.0:
        sleep_time = (1.0 - self.allowance) * (self.per / self.rate)
        await asyncio.sleep(sleep_time)
        self.allowance = 0.0
    else:
        self.allowance -= 1.0

AsyncTaskGroup

AsyncTaskGroup()

Track asyncio tasks and guarantee cleanup.

Source code in provide/testkit/process/async_fixtures.py
def __init__(self) -> None:
    self.tasks: list[asyncio.Task[Any]] = []
Functions
cancel_all async
cancel_all() -> None

Cancel all running tasks.

Source code in provide/testkit/process/async_fixtures.py
async def cancel_all(self) -> None:
    """Cancel all running tasks."""
    for task in self.tasks:
        if not task.done():
            task.cancel()

    if self.tasks:
        await asyncio.gather(*self.tasks, return_exceptions=True)
create_task
create_task(
    coro: Coroutine[Any, Any, T],
) -> asyncio.Task[T]

Create and track a task.

Source code in provide/testkit/process/async_fixtures.py
def create_task(self, coro: Coroutine[Any, Any, T]) -> asyncio.Task[T]:
    """Create and track a task."""
    task = asyncio.create_task(coro)
    self.tasks.append(task)
    return task
wait_all async
wait_all(timeout: float | None = None) -> list[Any]

Wait for all tracked tasks to finish.

Source code in provide/testkit/process/async_fixtures.py
async def wait_all(self, timeout: float | None = None) -> list[Any]:
    """Wait for all tracked tasks to finish."""
    if not self.tasks:
        return []

    done, pending = await asyncio.wait(
        self.tasks,
        timeout=timeout,
        return_when=asyncio.ALL_COMPLETED,
    )

    for task in pending:
        task.cancel()

    results: list[Any] = []
    for task in done:
        try:
            results.append(task.result())
        except Exception as exc:
            results.append(exc)

    return results

Functions

async_condition_waiter

async_condition_waiter() -> (
    Callable[
        [Callable[[], bool], float, float], Awaitable[bool]
    ]
)

Helper for waiting on async conditions in tests.

Returns:

Type Description
Callable[[Callable[[], bool], float, float], Awaitable[bool]]

Function to wait for conditions with timeout.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_condition_waiter() -> Callable[[Callable[[], bool], float, float], Awaitable[bool]]:
    """
    Helper for waiting on async conditions in tests.

    Returns:
        Function to wait for conditions with timeout.
    """

    async def _wait_for(condition: Callable[[], bool], timeout: float = 5.0, interval: float = 0.1) -> bool:
        """
        Wait for a condition to become true.

        Args:
            condition: Function that returns True when condition is met
            timeout: Maximum wait time
            interval: Check interval

        Returns:
            True if condition met, False if timeout
        """
        start = asyncio.get_event_loop().time()

        while asyncio.get_event_loop().time() - start < timeout:
            if condition():
                return True
            await asyncio.sleep(interval)

        return False

    return _wait_for

async_context_manager async

async_context_manager() -> (
    Callable[[Any | None, Any | None], AsyncMock]
)

Factory for creating mock async context managers.

Returns:

Type Description
Callable[[Any | None, Any | None], AsyncMock]

Function that creates configured async context manager mocks.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
async def async_context_manager() -> Callable[[Any | None, Any | None], AsyncMock]:
    """
    Factory for creating mock async context managers.

    Returns:
        Function that creates configured async context manager mocks.
    """

    def _create_async_cm(enter_value: Any | None = None, exit_value: Any | None = None) -> AsyncMock:
        """
        Create a mock async context manager.

        Args:
            enter_value: Value to return from __aenter__
            exit_value: Value to return from __aexit__

        Returns:
            AsyncMock configured as context manager
        """
        mock_cm = AsyncMock()
        mock_cm.__aenter__ = AsyncMock(return_value=enter_value)
        mock_cm.__aexit__ = AsyncMock(return_value=exit_value)
        return mock_cm

    return _create_async_cm

async_gather_helper

async_gather_helper() -> (
    Callable[..., Awaitable[list[Any]]]
)

Helper for testing asyncio.gather operations.

Returns:

Type Description
Callable[..., Awaitable[list[Any]]]

Function to gather async results with error handling.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_gather_helper() -> Callable[..., Awaitable[list[Any]]]:
    """
    Helper for testing asyncio.gather operations.

    Returns:
        Function to gather async results with error handling.
    """

    async def _gather(*coroutines: Awaitable[Any], return_exceptions: bool = False) -> list[Any]:
        """
        Gather results from multiple coroutines.

        Args:
            *coroutines: Coroutines to gather
            return_exceptions: Whether to return exceptions as results

        Returns:
            List of results from coroutines
        """
        return await asyncio.gather(*coroutines, return_exceptions=return_exceptions)

    return _gather

async_iterator async

async_iterator() -> (
    Callable[[Sequence[T]], AsyncIterable[T]]
)

Factory for creating mock async iterators.

Returns:

Type Description
Callable[[Sequence[T]], AsyncIterable[T]]

Function that creates async iterator mocks with specified values.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
async def async_iterator() -> Callable[[Sequence[T]], AsyncIterable[T]]:
    """
    Factory for creating mock async iterators.

    Returns:
        Function that creates async iterator mocks with specified values.
    """

    def _create_async_iter(values: Sequence[T]) -> AsyncIterable[T]:
        """
        Create a mock async iterator.

        Args:
            values: List of values to yield

        Returns:
            Async iterator that yields the specified values
        """
        return _AsyncIterator(values)

    return _create_async_iter

async_lock async

async_lock() -> asyncio.Lock

Create an async lock for testing synchronization.

Returns:

Type Description
Lock

asyncio.Lock instance for testing.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
async def async_lock() -> asyncio.Lock:
    """
    Create an async lock for testing synchronization.

    Returns:
        asyncio.Lock instance for testing.
    """
    return asyncio.Lock()

async_pipeline

async_pipeline() -> AsyncPipeline

Create an async pipeline for testing data flow.

Returns:

Type Description
AsyncPipeline

AsyncPipeline instance for chaining async operations.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_pipeline() -> AsyncPipeline:
    """
    Create an async pipeline for testing data flow.

    Returns:
        AsyncPipeline instance for chaining async operations.
    """

    return AsyncPipeline()

async_queue

async_queue() -> asyncio.Queue[Any]

Create an async queue for testing producer/consumer patterns.

Returns:

Type Description
Queue[Any]

asyncio.Queue instance for testing.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_queue() -> asyncio.Queue[Any]:
    """
    Create an async queue for testing producer/consumer patterns.

    Returns:
        asyncio.Queue instance for testing.
    """
    return asyncio.Queue()

async_rate_limiter

async_rate_limiter() -> AsyncRateLimiter

Create an async rate limiter for testing.

Returns:

Type Description
AsyncRateLimiter

AsyncRateLimiter instance for controlling request rates.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_rate_limiter() -> AsyncRateLimiter:
    """
    Create an async rate limiter for testing.

    Returns:
        AsyncRateLimiter instance for controlling request rates.
    """

    return AsyncRateLimiter()

async_task_group

async_task_group() -> AsyncTaskGroup

Manage a group of async tasks with cleanup.

Returns:

Type Description
AsyncTaskGroup

AsyncTaskGroup instance for managing tasks.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_task_group() -> AsyncTaskGroup:
    """
    Manage a group of async tasks with cleanup.

    Returns:
        AsyncTaskGroup instance for managing tasks.
    """

    return AsyncTaskGroup()

async_timeout

async_timeout() -> (
    Callable[[Awaitable[T], float], Awaitable[T]]
)

Provide configurable timeout wrapper for async operations.

Returns:

Type Description
Callable[[Awaitable[T], float], Awaitable[T]]

A function that wraps async operations with a timeout.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def async_timeout() -> Callable[[Awaitable[T], float], Awaitable[T]]:
    """
    Provide configurable timeout wrapper for async operations.

    Returns:
        A function that wraps async operations with a timeout.
    """

    def _timeout_wrapper(coro: Awaitable[T], seconds: float = 5.0) -> Awaitable[T]:
        """
        Wrap a coroutine with a timeout.

        Args:
            coro: Coroutine to wrap
            seconds: Timeout in seconds

        Returns:
            Result of the coroutine or raises asyncio.TimeoutError
        """
        return asyncio.wait_for(coro, timeout=seconds)

    return _timeout_wrapper

clean_event_loop async

clean_event_loop() -> AsyncGenerator[None, None]

Ensure clean event loop for async tests.

Cancels all pending tasks after the test to prevent event loop issues.

Yields:

Type Description
AsyncGenerator[None, None]

None - fixture for test setup/teardown.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
async def clean_event_loop() -> AsyncGenerator[None, None]:
    """
    Ensure clean event loop for async tests.

    Cancels all pending tasks after the test to prevent event loop issues.

    Yields:
        None - fixture for test setup/teardown.
    """
    yield

    # Clean up any pending tasks
    loop = asyncio.get_event_loop()
    pending = asyncio.all_tasks(loop)

    for task in pending:
        if not task.done():
            task.cancel()

    # Wait for all tasks to complete cancellation
    if pending:
        await asyncio.gather(*pending, return_exceptions=True)

event_loop_policy

event_loop_policy() -> (
    Generator[asyncio.AbstractEventLoopPolicy, None, None]
)

Set event loop policy for tests to avoid conflicts.

Returns:

Type Description
None

New event loop policy for isolated testing.

Source code in provide/testkit/process/async_fixtures.py
@pytest.fixture
def event_loop_policy() -> Generator[asyncio.AbstractEventLoopPolicy, None, None]:
    """
    Set event loop policy for tests to avoid conflicts.

    Returns:
        New event loop policy for isolated testing.
    """
    policy = asyncio.get_event_loop_policy()
    new_policy = asyncio.DefaultEventLoopPolicy()
    asyncio.set_event_loop_policy(new_policy)

    yield new_policy

    # Restore original policy
    asyncio.set_event_loop_policy(policy)