Skip to content

Index

provide.testkit.threading

Threading testing utilities for the provide-io ecosystem.

Fixtures and utilities for testing multi-threaded code, thread synchronization, and concurrent operations across any project that depends on provide.foundation.

Functions

concurrent_executor

concurrent_executor() -> ConcurrentExecutor

Helper for executing functions concurrently in tests.

Source code in provide/testkit/threading/execution_fixtures.py
@pytest.fixture
def concurrent_executor() -> ConcurrentExecutor:
    """Helper for executing functions concurrently in tests."""

    return ConcurrentExecutor()

deadlock_detector

deadlock_detector() -> DeadlockDetector

Helper for detecting potential deadlocks in tests.

Source code in provide/testkit/threading/execution_fixtures.py
@pytest.fixture
def deadlock_detector() -> DeadlockDetector:
    """Helper for detecting potential deadlocks in tests."""

    return DeadlockDetector()

mock_thread

mock_thread() -> Mock

Create a mock thread for testing without actual threading.

Returns:

Type Description
Mock

Mock thread object.

Source code in provide/testkit/threading/basic_fixtures.py
@pytest.fixture
def mock_thread() -> Mock:
    """
    Create a mock thread for testing without actual threading.

    Returns:
        Mock thread object.
    """
    mock = Mock(spec=threading.Thread)
    mock.is_alive.return_value = False
    mock.daemon = False
    mock.name = "MockThread"
    mock.ident = 12345
    mock.start = Mock()
    mock.join = Mock()
    mock.run = Mock()

    return mock

test_thread

test_thread() -> Callable[
    [
        Callable[..., Any],
        tuple[Any, ...],
        dict[str, Any] | None,
        bool,
    ],
    threading.Thread,
]

Create a test thread with automatic cleanup.

Returns:

Type Description
Callable[[Callable[..., Any], tuple[Any, ...], dict[str, Any] | None, bool], Thread]

Function to create and manage test threads.

Source code in provide/testkit/threading/basic_fixtures.py
@pytest.fixture
def test_thread() -> Callable[
    [Callable[..., Any], tuple[Any, ...], dict[str, Any] | None, bool],
    threading.Thread,
]:
    """
    Create a test thread with automatic cleanup.

    Returns:
        Function to create and manage test threads.
    """
    threads: list[threading.Thread] = []

    def _create_thread(
        target: Callable[..., Any],
        args: tuple[Any, ...] = (),
        kwargs: dict[str, Any] | None = None,
        daemon: bool = True,
    ) -> threading.Thread:
        """
        Create a test thread.

        Args:
            target: Function to run in thread
            args: Positional arguments for target
            kwargs: Keyword arguments for target
            daemon: Whether thread should be daemon

        Returns:
            Started thread instance
        """
        kwargs = kwargs or {}
        thread = threading.Thread(target=target, args=args, kwargs=kwargs, daemon=daemon)
        threads.append(thread)
        thread.start()
        return thread

    yield _create_thread

    # Cleanup: wait for all threads to complete
    for thread in threads:
        if thread.is_alive():
            thread.join(timeout=1.0)

thread_barrier

thread_barrier() -> (
    Callable[[int, float | None], threading.Barrier]
)

Create a barrier for thread synchronization.

Returns:

Type Description
Callable[[int, float | None], Barrier]

Function to create barriers for N threads.

Source code in provide/testkit/threading/sync_fixtures.py
@pytest.fixture
def thread_barrier() -> Callable[[int, float | None], threading.Barrier]:
    """
    Create a barrier for thread synchronization.

    Returns:
        Function to create barriers for N threads.
    """
    barriers: list[threading.Barrier] = []

    def _create_barrier(n_threads: int, timeout: float | None = None) -> threading.Barrier:
        """
        Create a barrier for synchronizing threads.

        Args:
            n_threads: Number of threads to synchronize
            timeout: Optional timeout for barrier

        Returns:
            Barrier instance
        """
        barrier = threading.Barrier(n_threads, timeout=timeout)
        barriers.append(barrier)
        return barrier

    yield _create_barrier

    # Cleanup: abort all barriers
    for barrier in barriers:
        with suppress(threading.BrokenBarrierError):
            barrier.abort()

thread_condition

thread_condition() -> (
    Callable[[threading.Lock | None], threading.Condition]
)

Create condition variables for thread coordination.

Returns:

Type Description
Callable[[Lock | None], Condition]

Function to create condition variables.

Source code in provide/testkit/threading/sync_fixtures.py
@pytest.fixture
def thread_condition() -> Callable[[threading.Lock | None], threading.Condition]:
    """
    Create condition variables for thread coordination.

    Returns:
        Function to create condition variables.
    """

    def _create_condition(lock: threading.Lock | None = None) -> threading.Condition:
        """
        Create a condition variable.

        Args:
            lock: Optional lock to use (creates new if None)

        Returns:
            Condition variable
        """
        return threading.Condition(lock)

    return _create_condition

thread_event

thread_event() -> Callable[[], threading.Event]

Create thread events for signaling.

Returns:

Type Description
Callable[[], Event]

Function to create thread events.

Source code in provide/testkit/threading/sync_fixtures.py
@pytest.fixture
def thread_event() -> Callable[[], threading.Event]:
    """
    Create thread events for signaling.

    Returns:
        Function to create thread events.
    """
    events: list[threading.Event] = []

    def _create_event() -> threading.Event:
        """Create a thread event."""
        event = threading.Event()
        events.append(event)
        return event

    yield _create_event

    # Cleanup: set all events to release waiting threads
    for event in events:
        event.set()

thread_exception_handler

thread_exception_handler() -> ThreadExceptionHandler

Capture exceptions from threads for testing.

Source code in provide/testkit/threading/execution_fixtures.py
@pytest.fixture
def thread_exception_handler() -> ThreadExceptionHandler:
    """Capture exceptions from threads for testing."""

    return ThreadExceptionHandler()

thread_local_storage

thread_local_storage() -> threading.local

Create thread-local storage for testing.

Returns:

Type Description
local

Thread-local storage object.

Source code in provide/testkit/threading/basic_fixtures.py
@pytest.fixture
def thread_local_storage() -> threading.local:
    """
    Create thread-local storage for testing.

    Returns:
        Thread-local storage object.
    """
    return threading.local()

thread_pool

thread_pool() -> ThreadPoolExecutor

Create a thread pool executor for testing.

Returns:

Type Description
ThreadPoolExecutor

ThreadPoolExecutor instance with automatic cleanup.

Source code in provide/testkit/threading/basic_fixtures.py
@pytest.fixture
def thread_pool() -> ThreadPoolExecutor:
    """
    Create a thread pool executor for testing.

    Returns:
        ThreadPoolExecutor instance with automatic cleanup.
    """
    executor = ThreadPoolExecutor(max_workers=4)
    yield executor
    executor.shutdown(wait=True, cancel_futures=True)

thread_safe_counter

thread_safe_counter() -> ThreadSafeCounter

Create a thread-safe counter.

Returns:

Type Description
ThreadSafeCounter

Thread-safe counter implementation.

Source code in provide/testkit/threading/data_fixtures.py
@pytest.fixture
def thread_safe_counter() -> ThreadSafeCounter:
    """
    Create a thread-safe counter.

    Returns:
        Thread-safe counter implementation.
    """
    return ThreadSafeCounter()

thread_safe_list

thread_safe_list() -> ThreadSafeList

Create a thread-safe list for collecting results.

Returns:

Type Description
ThreadSafeList

Thread-safe list implementation.

Source code in provide/testkit/threading/data_fixtures.py
@pytest.fixture
def thread_safe_list() -> ThreadSafeList:
    """
    Create a thread-safe list for collecting results.

    Returns:
        Thread-safe list implementation.
    """
    return ThreadSafeList()

thread_synchronizer

thread_synchronizer() -> ThreadSynchronizer

Helper for synchronizing test threads.

Source code in provide/testkit/threading/execution_fixtures.py
@pytest.fixture
def thread_synchronizer() -> ThreadSynchronizer:
    """Helper for synchronizing test threads."""

    return ThreadSynchronizer()