Skip to content

Execution fixtures

provide.testkit.threading.execution_fixtures

Thread execution and testing helper fixtures.

Advanced fixtures for concurrent execution, synchronization testing, deadlock detection, and exception handling in threaded code.

Classes

ConcurrentExecutor

ConcurrentExecutor()

Run callables concurrently while capturing results and exceptions.

Source code in provide/testkit/threading/execution_fixtures.py
def __init__(self) -> None:
    self.results: list[Any] = []
    self.exceptions: list[Exception] = []
Functions
run_concurrent
run_concurrent(
    func: Callable[..., Any],
    args_list: Sequence[Sequence[Any] | Any],
    max_workers: int = 4,
) -> list[Any]

Run func concurrently with provided argument sets.

Source code in provide/testkit/threading/execution_fixtures.py
def run_concurrent(
    self,
    func: Callable[..., Any],
    args_list: Sequence[Sequence[Any] | Any],
    max_workers: int = 4,
) -> list[Any]:
    """Run ``func`` concurrently with provided argument sets."""
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for args in args_list:
            if isinstance(args, Sequence) and not isinstance(args, (str, bytes)):
                future = executor.submit(func, *args)
            else:
                future = executor.submit(func, args)
            futures.append(future)

        results: list[Any] = []
        for future in futures:
            try:
                result = future.result(timeout=10)
                results.append(result)
                self.results.append(result)
            except Exception as exc:
                self.exceptions.append(exc)
                results.append(None)

        return results
run_parallel
run_parallel(
    funcs: Sequence[Callable[[], Any]], timeout: float = 10
) -> list[Any]

Run multiple zero-argument callables in parallel.

Source code in provide/testkit/threading/execution_fixtures.py
def run_parallel(self, funcs: Sequence[Callable[[], Any]], timeout: float = 10) -> list[Any]:
    """Run multiple zero-argument callables in parallel."""
    with ThreadPoolExecutor(max_workers=len(funcs)) as executor:
        futures = [executor.submit(func) for func in funcs]
        results: list[Any] = []

        for future in futures:
            try:
                result = future.result(timeout=timeout)
                results.append(result)
            except Exception as exc:
                self.exceptions.append(exc)
                results.append(None)

        return results

DeadlockDetector

DeadlockDetector()

Track lock ownership to flag potential deadlocks.

Source code in provide/testkit/threading/execution_fixtures.py
def __init__(self) -> None:
    self.locks_held: dict[int, set[str]] = {}
    self.lock = threading.Lock()

ThreadExceptionHandler

ThreadExceptionHandler()

Capture exceptions raised from worker threads.

Source code in provide/testkit/threading/execution_fixtures.py
def __init__(self) -> None:
    self.exceptions: list[dict[str, Any]] = []
    self.lock = threading.Lock()

ThreadSynchronizer

ThreadSynchronizer()

Coordinate threads using named checkpoints.

Source code in provide/testkit/threading/execution_fixtures.py
def __init__(self) -> None:
    self.checkpoints: dict[str, list[tuple[int, float]]] = {}

Functions

concurrent_executor

concurrent_executor() -> ConcurrentExecutor

Helper for executing functions concurrently in tests.

Source code in provide/testkit/threading/execution_fixtures.py
@pytest.fixture
def concurrent_executor() -> ConcurrentExecutor:
    """Helper for executing functions concurrently in tests."""

    return ConcurrentExecutor()

deadlock_detector

deadlock_detector() -> DeadlockDetector

Helper for detecting potential deadlocks in tests.

Source code in provide/testkit/threading/execution_fixtures.py
@pytest.fixture
def deadlock_detector() -> DeadlockDetector:
    """Helper for detecting potential deadlocks in tests."""

    return DeadlockDetector()

thread_exception_handler

thread_exception_handler() -> ThreadExceptionHandler

Capture exceptions from threads for testing.

Source code in provide/testkit/threading/execution_fixtures.py
@pytest.fixture
def thread_exception_handler() -> ThreadExceptionHandler:
    """Capture exceptions from threads for testing."""

    return ThreadExceptionHandler()

thread_synchronizer

thread_synchronizer() -> ThreadSynchronizer

Helper for synchronizing test threads.

Source code in provide/testkit/threading/execution_fixtures.py
@pytest.fixture
def thread_synchronizer() -> ThreadSynchronizer:
    """Helper for synchronizing test threads."""

    return ThreadSynchronizer()