Thread execution and testing helper fixtures.
Advanced fixtures for concurrent execution, synchronization testing, deadlock detection,
and exception handling in threaded code.
Classes
ConcurrentExecutor
Run callables concurrently while capturing results and exceptions.
Source code in provide/testkit/threading/execution_fixtures.py
| def __init__(self) -> None:
self.results: list[Any] = []
self.exceptions: list[Exception] = []
|
Functions
run_concurrent
run_concurrent(
func: Callable[..., Any],
args_list: Sequence[Sequence[Any] | Any],
max_workers: int = 4,
) -> list[Any]
Run func concurrently with provided argument sets.
Source code in provide/testkit/threading/execution_fixtures.py
| def run_concurrent(
self,
func: Callable[..., Any],
args_list: Sequence[Sequence[Any] | Any],
max_workers: int = 4,
) -> list[Any]:
"""Run ``func`` concurrently with provided argument sets."""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for args in args_list:
if isinstance(args, Sequence) and not isinstance(args, (str, bytes)):
future = executor.submit(func, *args)
else:
future = executor.submit(func, args)
futures.append(future)
results: list[Any] = []
for future in futures:
try:
result = future.result(timeout=10)
results.append(result)
self.results.append(result)
except Exception as exc:
self.exceptions.append(exc)
results.append(None)
return results
|
run_parallel
run_parallel(
funcs: Sequence[Callable[[], Any]], timeout: float = 10
) -> list[Any]
Run multiple zero-argument callables in parallel.
Source code in provide/testkit/threading/execution_fixtures.py
| def run_parallel(self, funcs: Sequence[Callable[[], Any]], timeout: float = 10) -> list[Any]:
"""Run multiple zero-argument callables in parallel."""
with ThreadPoolExecutor(max_workers=len(funcs)) as executor:
futures = [executor.submit(func) for func in funcs]
results: list[Any] = []
for future in futures:
try:
result = future.result(timeout=timeout)
results.append(result)
except Exception as exc:
self.exceptions.append(exc)
results.append(None)
return results
|
DeadlockDetector
Track lock ownership to flag potential deadlocks.
Source code in provide/testkit/threading/execution_fixtures.py
| def __init__(self) -> None:
self.locks_held: dict[int, set[str]] = {}
self.lock = threading.Lock()
|
ThreadExceptionHandler
Capture exceptions raised from worker threads.
Source code in provide/testkit/threading/execution_fixtures.py
| def __init__(self) -> None:
self.exceptions: list[dict[str, Any]] = []
self.lock = threading.Lock()
|
ThreadSynchronizer
Coordinate threads using named checkpoints.
Source code in provide/testkit/threading/execution_fixtures.py
| def __init__(self) -> None:
self.checkpoints: dict[str, list[tuple[int, float]]] = {}
|
Functions
concurrent_executor
concurrent_executor() -> ConcurrentExecutor
Helper for executing functions concurrently in tests.
Source code in provide/testkit/threading/execution_fixtures.py
| @pytest.fixture
def concurrent_executor() -> ConcurrentExecutor:
"""Helper for executing functions concurrently in tests."""
return ConcurrentExecutor()
|
deadlock_detector
deadlock_detector() -> DeadlockDetector
Helper for detecting potential deadlocks in tests.
Source code in provide/testkit/threading/execution_fixtures.py
| @pytest.fixture
def deadlock_detector() -> DeadlockDetector:
"""Helper for detecting potential deadlocks in tests."""
return DeadlockDetector()
|
thread_exception_handler
thread_exception_handler() -> ThreadExceptionHandler
Capture exceptions from threads for testing.
Source code in provide/testkit/threading/execution_fixtures.py
| @pytest.fixture
def thread_exception_handler() -> ThreadExceptionHandler:
"""Capture exceptions from threads for testing."""
return ThreadExceptionHandler()
|
thread_synchronizer
thread_synchronizer() -> ThreadSynchronizer
Helper for synchronizing test threads.
Source code in provide/testkit/threading/execution_fixtures.py
| @pytest.fixture
def thread_synchronizer() -> ThreadSynchronizer:
"""Helper for synchronizing test threads."""
return ThreadSynchronizer()
|