Thread-safe data structure test fixtures.
Fixtures for thread-safe lists, counters, and other data structures for testing.
Classes
ThreadSafeCounter
ThreadSafeCounter(initial: int = 0)
Thread-safe integer counter.
Source code in provide/testkit/threading/data_fixtures.py
| def __init__(self, initial: int = 0) -> None:
self._value = initial
self._lock = threading.Lock()
|
Attributes
Functions
decrement
decrement(amount: int = 1) -> int
Thread-safe decrement.
Source code in provide/testkit/threading/data_fixtures.py
| def decrement(self, amount: int = 1) -> int:
"""Thread-safe decrement."""
with self._lock:
self._value -= amount
return self._value
|
increment
increment(amount: int = 1) -> int
Thread-safe increment.
Source code in provide/testkit/threading/data_fixtures.py
| def increment(self, amount: int = 1) -> int:
"""Thread-safe increment."""
with self._lock:
self._value += amount
return self._value
|
reset
reset(value: int = 0) -> None
Reset counter.
Source code in provide/testkit/threading/data_fixtures.py
| def reset(self, value: int = 0) -> None:
"""Reset counter."""
with self._lock:
self._value = value
|
ThreadSafeList
Simple thread-safe list wrapper.
Source code in provide/testkit/threading/data_fixtures.py
| def __init__(self) -> None:
self._list: list[Any] = []
self._lock = threading.Lock()
|
Functions
append
append(item: Any) -> None
Thread-safe append.
Source code in provide/testkit/threading/data_fixtures.py
| def append(self, item: Any) -> None:
"""Thread-safe append."""
with self._lock:
self._list.append(item)
|
clear
Clear the list.
Source code in provide/testkit/threading/data_fixtures.py
| def clear(self) -> None:
"""Clear the list."""
with self._lock:
self._list.clear()
|
extend
extend(items: Iterable[Any]) -> None
Thread-safe extend.
Source code in provide/testkit/threading/data_fixtures.py
| def extend(self, items: Iterable[Any]) -> None:
"""Thread-safe extend."""
with self._lock:
self._list.extend(items)
|
get_all
Get copy of all items.
Source code in provide/testkit/threading/data_fixtures.py
| def get_all(self) -> list[Any]:
"""Get copy of all items."""
with self._lock:
return self._list.copy()
|
Functions
thread_safe_counter
thread_safe_counter() -> ThreadSafeCounter
Create a thread-safe counter.
Returns:
Source code in provide/testkit/threading/data_fixtures.py
| @pytest.fixture
def thread_safe_counter() -> ThreadSafeCounter:
"""
Create a thread-safe counter.
Returns:
Thread-safe counter implementation.
"""
return ThreadSafeCounter()
|
thread_safe_list
thread_safe_list() -> ThreadSafeList
Create a thread-safe list for collecting results.
Returns:
Source code in provide/testkit/threading/data_fixtures.py
| @pytest.fixture
def thread_safe_list() -> ThreadSafeList:
"""
Create a thread-safe list for collecting results.
Returns:
Thread-safe list implementation.
"""
return ThreadSafeList()
|