API Reference
This documentation is automatically generated from the source code docstrings, ensuring it always matches the actual implementation.
Core Classes
SharedJson
SharedJson
Thread-safe shared JSON data across pytest-xdist workers.
This class provides atomic operations on a JSON file, ensuring data consistency when multiple workers access the same data concurrently.
All data must be JSON-serializable (dict, list, str, int, float, bool, None). For timestamps, use time.time() instead of datetime objects.
Attributes:
| Name | Type | Description |
|---|---|---|
data_file |
Path to the JSON data file |
|
lock_file |
Path to the lock file for synchronization |
|
timeout |
Timeout in seconds for acquiring locks (-1 = wait forever) |
name
property
Get the name derived from the data file path.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The stem (filename without extension) of the data file, with the pytest_shared_ prefix removed if present |
__init__
Initialize the SharedJson instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_file
|
Path
|
Path where JSON data will be stored |
required |
lock_file
|
Path
|
Path for the lock file |
required |
timeout
|
float
|
Timeout in seconds for lock acquisition (-1 = wait forever) |
-1
|
Raises:
| Type | Description |
|---|---|
Timeout
|
If lock cannot be acquired within timeout period |
locked_dict
Context manager for atomic read-modify-write operations.
Yields a dict that can be modified in-place. All changes are written back to the file atomically when the context exits.
Yields:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: The current data from the JSON file (modifiable) |
Raises:
| Type | Description |
|---|---|
Timeout
|
If lock cannot be acquired within timeout period |
Example
with shared.locked_dict() as data: data['count'] = data.get('count', 0) + 1 data.setdefault('errors', []).append(error)
Note
The dict is a regular Python dict, so all dict operations work (get, setdefault, update, etc.). However, only JSON-serializable values can be stored (no datetime, custom objects, etc.).
read
Read the current data atomically (read-only snapshot).
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Dict[str, Any]
|
A copy of the current data from the JSON file |
Raises:
| Type | Description |
|---|---|
Timeout
|
If lock cannot be acquired within timeout period |
Example
data = shared.read() count = data.get('count', 0)
update
Update specific keys atomically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates
|
Dict[str, Any]
|
Dictionary of key-value pairs to update |
required |
Raises:
| Type | Description |
|---|---|
Timeout
|
If lock cannot be acquired within timeout period |
Example
shared.update({'count': 5, 'status': 'active'})
RateLimitTimeout
RateLimitTimeout
Bases: Exception
Raised when rate limiter timeout is exceeded.
Rate
Rate
Represents a rate limit with convenient factory methods for different time units.
Examples:
>>> rate = Rate.per_second(10) # 10 calls per second
>>> rate = Rate.per_minute(600) # 600 calls per minute
>>> rate = Rate.per_hour(3600) # 3600 calls per hour
>>> rate = Rate.per_day(86400) # 86400 calls per day
TokenBucketPacer
TokenBucketPacer
A token bucket pacer that generates load at a controlled rate.
This class uses the token bucket algorithm to pace operations at a target rate, allowing for controlled bursts of activity. It is designed to be used with pytest-xdist to coordinate call pacing across multiple worker processes.
The pacer can be used as a callable context manager:
Example
with pacer() as ctx: print(f"Using pacer {ctx.id} with rate {ctx.hourly_rate}/hr") perform_action()
__init__
__init__(shared_state: SharedJson, hourly_rate: Union[Rate, Callable[[], Rate]], max_drift: float = 0.1, on_drift_callback: Optional[Callable[[DriftEvent], None]] = None, num_calls_between_checks: int = 10, seconds_before_first_check: float = 60.0, burst_capacity: Optional[int] = None, max_calls: int = -1, on_max_calls_callback: Optional[Callable[[MaxCallsEvent], None]] = None, on_periodic_check_callback: Optional[Callable[[PeriodicCheckEvent], None]] = None, rate_windows: Optional[List[int]] = None)
Initialize a token bucket pacer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
shared_state
|
SharedJson
|
SharedJson instance for state management across workers |
required |
hourly_rate
|
Union[Rate, Callable[[], Rate]]
|
Target rate specification. Can be: - Rate: target rate object (e.g., Rate.per_second(10)) - Callable: function returning Rate |
required |
max_drift
|
float
|
Maximum allowed drift from the target rate (as a fraction) |
0.1
|
on_drift_callback
|
Optional[Callable[[DriftEvent], None]]
|
Callback function to execute when drift exceeds max_drift Function signature: (event: DriftEvent) -> None |
None
|
num_calls_between_checks
|
int
|
Number of calls between periodic checks (default: 10) Used for both drift checking and periodic metrics callbacks |
10
|
seconds_before_first_check
|
float
|
Minimum elapsed time (seconds) before rate checking begins (default: 60.0 seconds) |
60.0
|
burst_capacity
|
Optional[int]
|
Maximum number of tokens that can be stored in the bucket (defaults to 10% of hourly rate or 1, whichever is larger) |
None
|
max_calls
|
int
|
Maximum number of calls allowed (-1 for unlimited) |
-1
|
on_max_calls_callback
|
Optional[Callable[[MaxCallsEvent], None]]
|
Callback function to execute when max_calls is reached Function signature: (event: MaxCallsEvent) -> None |
None
|
on_periodic_check_callback
|
Optional[Callable[[PeriodicCheckEvent], None]]
|
Callback function for periodic metrics checks Function signature: (event: PeriodicCheckEvent) -> None Provides metrics for custom analysis (bottleneck detection, monitoring, etc.) |
None
|
rate_windows
|
Optional[List[int]]
|
Time windows in seconds for rate calculation (default: [60, 300, 900]) |
None
|
rate_limited_context
Context manager that rate-limits the enclosed code using token bucket algorithm.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
Optional[float]
|
Maximum time in seconds to wait for a token (None for no timeout) |
None
|
Example
with rate_limiter.rate_limited_context() as ctx: print(f"Using rate limiter {ctx.id} with rate {ctx.hourly_rate}/hr") print(f"Current call count: {ctx.call_count}") print(f"First call at: {ctx.start_time}") print(f"Waited {ctx.seconds_waited:.2f} seconds") perform_action()
with rate_limiter.rate_limited_context(timeout=5.0) as ctx: print(f"Will timeout after 5 seconds") perform_action()
RateLimitContext
RateLimitContext
dataclass
Context object yielded by rate_limited_context that provides access to rate limiter metrics.
Properties
id: Rate limiter identifier hourly_rate: Configured rate limit in calls per hour call_count: Total number of calls made exceptions: Total number of exceptions encountered start_time: Unix timestamp of when the first call was made seconds_waited: Number of seconds waited before entering the context
Event Classes
PacerEvent
PacerEvent
dataclass
Base class for all pacer events.
Provides common context available to all callback events.
Attributes:
| Name | Type | Description |
|---|---|---|
limiter_id |
str
|
Unique identifier for the pacer |
limiter |
TokenBucketPacer
|
Reference to the TokenBucketPacer instance |
state_snapshot |
Dict[str, Any]
|
Snapshot of shared state at the time of the event |
DriftEvent
DriftEvent
dataclass
Bases: PacerEvent
Event fired when rate drift exceeds the configured threshold.
Attributes:
| Name | Type | Description |
|---|---|---|
current_rate |
float
|
Actual rate in calls per hour |
target_rate |
float
|
Target rate in calls per hour |
drift |
float
|
Drift as a fraction of target rate (0.1 = 10% drift) |
max_drift |
float
|
Maximum allowed drift (from configuration) |
MaxCallsEvent
MaxCallsEvent
dataclass
Bases: PacerEvent
Event fired when the max_calls limit is reached.
Attributes:
| Name | Type | Description |
|---|---|---|
max_calls |
int
|
Maximum number of calls allowed (from configuration) |
PeriodicCheckEvent
PeriodicCheckEvent
dataclass
Bases: PacerEvent
Event fired during periodic checks with current metrics.
Provides comprehensive distribution-based statistics for load test analysis and bottleneck detection. Uses TDigest for accurate percentile calculations without storing all samples.
Attributes:
| Name | Type | Description |
|---|---|---|
worker_count |
int
|
Number of workers detected from environment |
duration_digest |
Optional[TDigest]
|
TDigest of call durations (None if insufficient data) |
wait_digest |
Optional[TDigest]
|
TDigest of wait times (None if insufficient data) |
windowed_rates |
Dict[int, float]
|
Rates for configured time windows in seconds (e.g., {60: 3500, 300: 3450}) |
sample_count |
int
|
Total samples in digests |
target_rate |
float
|
Target rate in calls/hour (from configuration) |
current_rate |
float
|
Current actual rate in calls/hour |
drift |
Optional[float]
|
Drift as a fraction of target rate (None if insufficient data) |
wait_ratio
property
Ratio of median wait time to median call duration.
This metric reveals whether your pacer (traffic generator) is constraining throughput or if the System Under Test (SUT) is the limiting factor.
Interpretation: - Low ratio (< 0.1): SUT-bound operation * Tests spend 10x more time executing than waiting for pacer * The pacer is not constraining throughput - SUT capacity may be a bottleneck
- High ratio (> 1.0): Pacer-bound operation
- Tests spend more time waiting for pacer than executing
- SUT is fast relative to the pacing rate
- SUT can handle higher rates; increase pace if more load needed
Use this metric to tune your load testing: high ratio means you can push harder; low ratio means you've found the SUT's capacity limit.
Fixture Factories
make_shared_json
make_shared_json
make_shared_json(request: FixtureRequest, tmp_path_factory: TempPathFactory, worker_id: str) -> Generator[Callable[..., SharedJson], None, None]
Factory for creating shared JSON fixtures across pytest-xdist workers.
This is a session-scoped fixture that returns a factory function for creating SharedJson instances with proper worker coordination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
FixtureRequest
|
The pytest fixture request object |
required |
tmp_path_factory
|
TempPathFactory
|
Pytest's temporary path factory |
required |
worker_id
|
str
|
The xdist worker ID (e.g., 'gw0', 'gw1', or 'master') |
required |
Returns:
| Type | Description |
|---|---|
None
|
Callable[..., SharedJson]: Factory function that creates SharedJson instances |
Example
@pytest.fixture(scope="session") def api_rate_tracker(make_shared_json): def init_data(): return { 'count': 0, 'limit': 100, 'errors': [] }
def report(shared):
data = shared.read()
print(f"Total API calls: {data['count']}")
return make_shared_json(
name="api_rate_tracker",
on_first_worker=init_data,
on_last_worker=report
)
def test_api_call(api_rate_tracker): with api_rate_tracker.locked_dict() as data: data['count'] = data.get('count', 0) + 1
make_pacer
make_pacer
Factory for creating pacer fixtures across pytest-xdist workers.
This fixture provides a way to create TokenBucketPacer instances that share state across workers using SharedJson.
Example
@pytest.fixture(scope="session") def pacer(make_pacer): from pytest_xdist_rate_limit import Rate
return make_pacer(
name="pacer",
hourly_rate=Rate.per_second(10)
)
def test_api_call(pacer): with pacer() as ctx: # Entering the context will wait if required to respect the rate pass
make_rate_limiter (Deprecated)
make_rate_limiter
Deprecated: Use make_pacer instead.
Factory for creating pacer fixtures across pytest-xdist workers. This fixture is deprecated and will be removed in a future version. Please use make_pacer instead.
Deprecated Aliases
The following names are deprecated and maintained for backward compatibility:
RateLimit- UseRateinsteadTokenBucketRateLimiter- UseTokenBucketPacerinsteadmake_rate_limiter- Usemake_pacerinstead