Skip to content

Sync fixtures

provide.testkit.threading.sync_fixtures

Thread synchronization test fixtures.

Fixtures for thread barriers, events, conditions, and other synchronization primitives.

Functions

thread_barrier

thread_barrier() -> (
    Callable[[int, float | None], threading.Barrier]
)

Create a barrier for thread synchronization.

Returns:

Type Description
Callable[[int, float | None], Barrier]

Function to create barriers for N threads.

Source code in provide/testkit/threading/sync_fixtures.py
@pytest.fixture
def thread_barrier() -> Callable[[int, float | None], threading.Barrier]:
    """
    Create a barrier for thread synchronization.

    Returns:
        Function to create barriers for N threads.
    """
    barriers: list[threading.Barrier] = []

    def _create_barrier(n_threads: int, timeout: float | None = None) -> threading.Barrier:
        """
        Create a barrier for synchronizing threads.

        Args:
            n_threads: Number of threads to synchronize
            timeout: Optional timeout for barrier

        Returns:
            Barrier instance
        """
        barrier = threading.Barrier(n_threads, timeout=timeout)
        barriers.append(barrier)
        return barrier

    yield _create_barrier

    # Cleanup: abort all barriers
    for barrier in barriers:
        with suppress(threading.BrokenBarrierError):
            barrier.abort()

thread_condition

thread_condition() -> (
    Callable[[threading.Lock | None], threading.Condition]
)

Create condition variables for thread coordination.

Returns:

Type Description
Callable[[Lock | None], Condition]

Function to create condition variables.

Source code in provide/testkit/threading/sync_fixtures.py
@pytest.fixture
def thread_condition() -> Callable[[threading.Lock | None], threading.Condition]:
    """
    Create condition variables for thread coordination.

    Returns:
        Function to create condition variables.
    """

    def _create_condition(lock: threading.Lock | None = None) -> threading.Condition:
        """
        Create a condition variable.

        Args:
            lock: Optional lock to use (creates new if None)

        Returns:
            Condition variable
        """
        return threading.Condition(lock)

    return _create_condition

thread_event

thread_event() -> Callable[[], threading.Event]

Create thread events for signaling.

Returns:

Type Description
Callable[[], Event]

Function to create thread events.

Source code in provide/testkit/threading/sync_fixtures.py
@pytest.fixture
def thread_event() -> Callable[[], threading.Event]:
    """
    Create thread events for signaling.

    Returns:
        Function to create thread events.
    """
    events: list[threading.Event] = []

    def _create_event() -> threading.Event:
        """Create a thread event."""
        event = threading.Event()
        events.append(event)
        return event

    yield _create_event

    # Cleanup: set all events to release waiting threads
    for event in events:
        event.set()