Async-specific test fixtures for process testing.
Provides fixtures for testing async operations, event loops, and
async context management across the provide-io ecosystem.
Classes
AsyncPipeline
Pipeline helper for chaining async processing stages.
Source code in provide/testkit/process/async_fixtures.py
| def __init__(self) -> None:
self.stages: list[StageFunc] = []
self.results: list[Any] = []
|
Functions
add_stage
add_stage(func: StageFunc) -> AsyncPipeline
Register a pipeline stage.
Source code in provide/testkit/process/async_fixtures.py
| def add_stage(self, func: StageFunc) -> AsyncPipeline:
"""Register a pipeline stage."""
self.stages.append(func)
return self
|
clear
Reset stages and stored results.
Source code in provide/testkit/process/async_fixtures.py
| def clear(self) -> None:
"""Reset stages and stored results."""
self.stages.clear()
self.results.clear()
|
process
async
process(data: Any) -> Any
Process a single item through all stages.
Source code in provide/testkit/process/async_fixtures.py
| async def process(self, data: Any) -> Any:
"""Process a single item through all stages."""
result = data
for stage in self.stages:
if asyncio.iscoroutinefunction(stage):
result = await stage(result) # type: ignore[arg-type]
else:
result = stage(result)
self.results.append(result)
return result
|
process_batch
async
process_batch(items: Sequence[Any]) -> list[Any]
Process a batch of items.
Source code in provide/testkit/process/async_fixtures.py
| async def process_batch(self, items: Sequence[Any]) -> list[Any]:
"""Process a batch of items."""
tasks = [self.process(item) for item in items]
return list(await asyncio.gather(*tasks))
|
AsyncRateLimiter
AsyncRateLimiter(rate: int = 10, per: float = 1.0)
Co-operative async rate limiter.
Source code in provide/testkit/process/async_fixtures.py
| def __init__(self, rate: int = 10, per: float = 1.0) -> None:
self.rate = rate
self.per = per
self.allowance = float(rate)
self.last_check = asyncio.get_event_loop().time()
|
Functions
acquire
async
Acquire permission to continue, respecting rate limits.
Source code in provide/testkit/process/async_fixtures.py
| async def acquire(self) -> None:
"""Acquire permission to continue, respecting rate limits."""
current = asyncio.get_event_loop().time()
time_passed = current - self.last_check
self.last_check = current
self.allowance += time_passed * (self.rate / self.per)
if self.allowance > self.rate:
self.allowance = float(self.rate)
if self.allowance < 1.0:
sleep_time = (1.0 - self.allowance) * (self.per / self.rate)
await asyncio.sleep(sleep_time)
self.allowance = 0.0
else:
self.allowance -= 1.0
|
AsyncTaskGroup
Track asyncio tasks and guarantee cleanup.
Source code in provide/testkit/process/async_fixtures.py
| def __init__(self) -> None:
self.tasks: list[asyncio.Task[Any]] = []
|
Functions
cancel_all
async
Cancel all running tasks.
Source code in provide/testkit/process/async_fixtures.py
| async def cancel_all(self) -> None:
"""Cancel all running tasks."""
for task in self.tasks:
if not task.done():
task.cancel()
if self.tasks:
await asyncio.gather(*self.tasks, return_exceptions=True)
|
create_task
create_task(
coro: Coroutine[Any, Any, T],
) -> asyncio.Task[T]
Create and track a task.
Source code in provide/testkit/process/async_fixtures.py
| def create_task(self, coro: Coroutine[Any, Any, T]) -> asyncio.Task[T]:
"""Create and track a task."""
task = asyncio.create_task(coro)
self.tasks.append(task)
return task
|
wait_all
async
wait_all(timeout: float | None = None) -> list[Any]
Wait for all tracked tasks to finish.
Source code in provide/testkit/process/async_fixtures.py
| async def wait_all(self, timeout: float | None = None) -> list[Any]:
"""Wait for all tracked tasks to finish."""
if not self.tasks:
return []
done, pending = await asyncio.wait(
self.tasks,
timeout=timeout,
return_when=asyncio.ALL_COMPLETED,
)
for task in pending:
task.cancel()
results: list[Any] = []
for task in done:
try:
results.append(task.result())
except Exception as exc:
results.append(exc)
return results
|
Functions
async_condition_waiter
async_condition_waiter() -> (
Callable[
[Callable[[], bool], float, float], Awaitable[bool]
]
)
Helper for waiting on async conditions in tests.
Returns:
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
def async_condition_waiter() -> Callable[[Callable[[], bool], float, float], Awaitable[bool]]:
"""
Helper for waiting on async conditions in tests.
Returns:
Function to wait for conditions with timeout.
"""
async def _wait_for(condition: Callable[[], bool], timeout: float = 5.0, interval: float = 0.1) -> bool:
"""
Wait for a condition to become true.
Args:
condition: Function that returns True when condition is met
timeout: Maximum wait time
interval: Check interval
Returns:
True if condition met, False if timeout
"""
start = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start < timeout:
if condition():
return True
await asyncio.sleep(interval)
return False
return _wait_for
|
async_context_manager
async
async_context_manager() -> (
Callable[[Any | None, Any | None], AsyncMock]
)
Factory for creating mock async context managers.
Returns:
| Type |
Description |
Callable[[Any | None, Any | None], AsyncMock]
|
Function that creates configured async context manager mocks.
|
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
async def async_context_manager() -> Callable[[Any | None, Any | None], AsyncMock]:
"""
Factory for creating mock async context managers.
Returns:
Function that creates configured async context manager mocks.
"""
def _create_async_cm(enter_value: Any | None = None, exit_value: Any | None = None) -> AsyncMock:
"""
Create a mock async context manager.
Args:
enter_value: Value to return from __aenter__
exit_value: Value to return from __aexit__
Returns:
AsyncMock configured as context manager
"""
mock_cm = AsyncMock()
mock_cm.__aenter__ = AsyncMock(return_value=enter_value)
mock_cm.__aexit__ = AsyncMock(return_value=exit_value)
return mock_cm
return _create_async_cm
|
async_gather_helper
async_gather_helper() -> (
Callable[..., Awaitable[list[Any]]]
)
Helper for testing asyncio.gather operations.
Returns:
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
def async_gather_helper() -> Callable[..., Awaitable[list[Any]]]:
"""
Helper for testing asyncio.gather operations.
Returns:
Function to gather async results with error handling.
"""
async def _gather(*coroutines: Awaitable[Any], return_exceptions: bool = False) -> list[Any]:
"""
Gather results from multiple coroutines.
Args:
*coroutines: Coroutines to gather
return_exceptions: Whether to return exceptions as results
Returns:
List of results from coroutines
"""
return await asyncio.gather(*coroutines, return_exceptions=return_exceptions)
return _gather
|
async_iterator
async
async_iterator() -> (
Callable[[Sequence[T]], AsyncIterable[T]]
)
Factory for creating mock async iterators.
Returns:
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
async def async_iterator() -> Callable[[Sequence[T]], AsyncIterable[T]]:
"""
Factory for creating mock async iterators.
Returns:
Function that creates async iterator mocks with specified values.
"""
def _create_async_iter(values: Sequence[T]) -> AsyncIterable[T]:
"""
Create a mock async iterator.
Args:
values: List of values to yield
Returns:
Async iterator that yields the specified values
"""
return _AsyncIterator(values)
return _create_async_iter
|
async_lock
async
async_lock() -> asyncio.Lock
Create an async lock for testing synchronization.
Returns:
| Type |
Description |
Lock
|
asyncio.Lock instance for testing.
|
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
async def async_lock() -> asyncio.Lock:
"""
Create an async lock for testing synchronization.
Returns:
asyncio.Lock instance for testing.
"""
return asyncio.Lock()
|
async_pipeline
async_pipeline() -> AsyncPipeline
Create an async pipeline for testing data flow.
Returns:
| Type |
Description |
AsyncPipeline
|
AsyncPipeline instance for chaining async operations.
|
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
def async_pipeline() -> AsyncPipeline:
"""
Create an async pipeline for testing data flow.
Returns:
AsyncPipeline instance for chaining async operations.
"""
return AsyncPipeline()
|
async_queue
async_queue() -> asyncio.Queue[Any]
Create an async queue for testing producer/consumer patterns.
Returns:
| Type |
Description |
Queue[Any]
|
asyncio.Queue instance for testing.
|
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
def async_queue() -> asyncio.Queue[Any]:
"""
Create an async queue for testing producer/consumer patterns.
Returns:
asyncio.Queue instance for testing.
"""
return asyncio.Queue()
|
async_rate_limiter
async_rate_limiter() -> AsyncRateLimiter
Create an async rate limiter for testing.
Returns:
| Type |
Description |
AsyncRateLimiter
|
AsyncRateLimiter instance for controlling request rates.
|
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
def async_rate_limiter() -> AsyncRateLimiter:
"""
Create an async rate limiter for testing.
Returns:
AsyncRateLimiter instance for controlling request rates.
"""
return AsyncRateLimiter()
|
async_task_group
async_task_group() -> AsyncTaskGroup
Manage a group of async tasks with cleanup.
Returns:
| Type |
Description |
AsyncTaskGroup
|
AsyncTaskGroup instance for managing tasks.
|
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
def async_task_group() -> AsyncTaskGroup:
"""
Manage a group of async tasks with cleanup.
Returns:
AsyncTaskGroup instance for managing tasks.
"""
return AsyncTaskGroup()
|
async_timeout
async_timeout() -> (
Callable[[Awaitable[T], float], Awaitable[T]]
)
Provide configurable timeout wrapper for async operations.
Returns:
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
def async_timeout() -> Callable[[Awaitable[T], float], Awaitable[T]]:
"""
Provide configurable timeout wrapper for async operations.
Returns:
A function that wraps async operations with a timeout.
"""
def _timeout_wrapper(coro: Awaitable[T], seconds: float = 5.0) -> Awaitable[T]:
"""
Wrap a coroutine with a timeout.
Args:
coro: Coroutine to wrap
seconds: Timeout in seconds
Returns:
Result of the coroutine or raises asyncio.TimeoutError
"""
return asyncio.wait_for(coro, timeout=seconds)
return _timeout_wrapper
|
clean_event_loop
async
clean_event_loop() -> AsyncGenerator[None, None]
Ensure clean event loop for async tests.
Cancels all pending tasks after the test to prevent event loop issues.
Yields:
| Type |
Description |
AsyncGenerator[None, None]
|
None - fixture for test setup/teardown.
|
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
async def clean_event_loop() -> AsyncGenerator[None, None]:
"""
Ensure clean event loop for async tests.
Cancels all pending tasks after the test to prevent event loop issues.
Yields:
None - fixture for test setup/teardown.
"""
yield
# Clean up any pending tasks
loop = asyncio.get_event_loop()
pending = asyncio.all_tasks(loop)
for task in pending:
if not task.done():
task.cancel()
# Wait for all tasks to complete cancellation
if pending:
await asyncio.gather(*pending, return_exceptions=True)
|
event_loop_policy
event_loop_policy() -> (
Generator[asyncio.AbstractEventLoopPolicy, None, None]
)
Set event loop policy for tests to avoid conflicts.
Returns:
| Type |
Description |
None
|
New event loop policy for isolated testing.
|
Source code in provide/testkit/process/async_fixtures.py
| @pytest.fixture
def event_loop_policy() -> Generator[asyncio.AbstractEventLoopPolicy, None, None]:
"""
Set event loop policy for tests to avoid conflicts.
Returns:
New event loop policy for isolated testing.
"""
policy = asyncio.get_event_loop_policy()
new_policy = asyncio.DefaultEventLoopPolicy()
asyncio.set_event_loop_policy(new_policy)
yield new_policy
# Restore original policy
asyncio.set_event_loop_policy(policy)
|