Skip to content

API Reference

This documentation is automatically generated from the source code docstrings, ensuring it always matches the actual implementation.

Core Classes

SharedJson

SharedJson

Thread-safe shared JSON data across pytest-xdist workers.

This class provides atomic operations on a JSON file, ensuring data consistency when multiple workers access the same data concurrently.

All data must be JSON-serializable (dict, list, str, int, float, bool, None). For timestamps, use time.time() instead of datetime objects.

Attributes:

Name Type Description
data_file

Path to the JSON data file

lock_file

Path to the lock file for synchronization

timeout

Timeout in seconds for acquiring locks (-1 = wait forever)

name property

name: str

Get the name derived from the data file path.

Returns:

Name Type Description
str str

The stem (filename without extension) of the data file, with the pytest_shared_ prefix removed if present

__init__

__init__(data_file: Path, lock_file: Path, timeout: float = -1)

Initialize the SharedJson instance.

Parameters:

Name Type Description Default
data_file Path

Path where JSON data will be stored

required
lock_file Path

Path for the lock file

required
timeout float

Timeout in seconds for lock acquisition (-1 = wait forever)

-1

Raises:

Type Description
Timeout

If lock cannot be acquired within timeout period

locked_dict

locked_dict() -> Generator[Dict[str, Any], None, None]

Context manager for atomic read-modify-write operations.

Yields a dict that can be modified in-place. All changes are written back to the file atomically when the context exits.

Yields:

Type Description
Dict[str, Any]

Dict[str, Any]: The current data from the JSON file (modifiable)

Raises:

Type Description
Timeout

If lock cannot be acquired within timeout period

Example

with shared.locked_dict() as data: data['count'] = data.get('count', 0) + 1 data.setdefault('errors', []).append(error)

Note

The dict is a regular Python dict, so all dict operations work (get, setdefault, update, etc.). However, only JSON-serializable values can be stored (no datetime, custom objects, etc.).

read

read() -> Dict[str, Any]

Read the current data atomically (read-only snapshot).

Returns:

Name Type Description
dict Dict[str, Any]

A copy of the current data from the JSON file

Raises:

Type Description
Timeout

If lock cannot be acquired within timeout period

Example

data = shared.read() count = data.get('count', 0)

update

update(updates: Dict[str, Any]) -> None

Update specific keys atomically.

Parameters:

Name Type Description Default
updates Dict[str, Any]

Dictionary of key-value pairs to update

required

Raises:

Type Description
Timeout

If lock cannot be acquired within timeout period

Example

shared.update({'count': 5, 'status': 'active'})

RateLimitTimeout

RateLimitTimeout

Bases: Exception

Raised when rate limiter timeout is exceeded.

Rate

Rate

Represents a rate limit with convenient factory methods for different time units.

Examples:

>>> rate = Rate.per_second(10)  # 10 calls per second
>>> rate = Rate.per_minute(600)  # 600 calls per minute
>>> rate = Rate.per_hour(3600)  # 3600 calls per hour
>>> rate = Rate.per_day(86400)  # 86400 calls per day

calls_per_hour property

calls_per_hour: int

__init__

__init__(calls_per_hour: int)

per_second classmethod

per_second(calls: Union[int, float]) -> Rate

per_minute classmethod

per_minute(calls: Union[int, float]) -> Rate

per_hour classmethod

per_hour(calls: int) -> Rate

per_day classmethod

per_day(calls: Union[int, float]) -> Rate

TokenBucketPacer

TokenBucketPacer

A token bucket pacer that generates load at a controlled rate.

This class uses the token bucket algorithm to pace operations at a target rate, allowing for controlled bursts of activity. It is designed to be used with pytest-xdist to coordinate call pacing across multiple worker processes.

The pacer can be used as a callable context manager:

Example

with pacer() as ctx: print(f"Using pacer {ctx.id} with rate {ctx.hourly_rate}/hr") perform_action()

id property

id: str

Get the identifier from the shared state name.

hourly_rate property

hourly_rate: int

Get the current hourly rate.

__init__

__init__(shared_state: SharedJson, hourly_rate: Union[Rate, Callable[[], Rate]], max_drift: float = 0.1, on_drift_callback: Optional[Callable[[DriftEvent], None]] = None, num_calls_between_checks: int = 10, seconds_before_first_check: float = 60.0, burst_capacity: Optional[int] = None, max_calls: int = -1, on_max_calls_callback: Optional[Callable[[MaxCallsEvent], None]] = None, on_periodic_check_callback: Optional[Callable[[PeriodicCheckEvent], None]] = None, rate_windows: Optional[List[int]] = None)

Initialize a token bucket pacer.

Parameters:

Name Type Description Default
shared_state SharedJson

SharedJson instance for state management across workers

required
hourly_rate Union[Rate, Callable[[], Rate]]

Target rate specification. Can be: - Rate: target rate object (e.g., Rate.per_second(10)) - Callable: function returning Rate

required
max_drift float

Maximum allowed drift from the target rate (as a fraction)

0.1
on_drift_callback Optional[Callable[[DriftEvent], None]]

Callback function to execute when drift exceeds max_drift Function signature: (event: DriftEvent) -> None

None
num_calls_between_checks int

Number of calls between periodic checks (default: 10) Used for both drift checking and periodic metrics callbacks

10
seconds_before_first_check float

Minimum elapsed time (seconds) before rate checking begins (default: 60.0 seconds)

60.0
burst_capacity Optional[int]

Maximum number of tokens that can be stored in the bucket (defaults to 10% of hourly rate or 1, whichever is larger)

None
max_calls int

Maximum number of calls allowed (-1 for unlimited)

-1
on_max_calls_callback Optional[Callable[[MaxCallsEvent], None]]

Callback function to execute when max_calls is reached Function signature: (event: MaxCallsEvent) -> None

None
on_periodic_check_callback Optional[Callable[[PeriodicCheckEvent], None]]

Callback function for periodic metrics checks Function signature: (event: PeriodicCheckEvent) -> None Provides metrics for custom analysis (bottleneck detection, monitoring, etc.)

None
rate_windows Optional[List[int]]

Time windows in seconds for rate calculation (default: [60, 300, 900])

None

rate_limited_context

rate_limited_context(timeout: Optional[float] = None) -> Generator[RateLimitContext, Any, None]

Context manager that rate-limits the enclosed code using token bucket algorithm.

Parameters:

Name Type Description Default
timeout Optional[float]

Maximum time in seconds to wait for a token (None for no timeout)

None
Example

with rate_limiter.rate_limited_context() as ctx: print(f"Using rate limiter {ctx.id} with rate {ctx.hourly_rate}/hr") print(f"Current call count: {ctx.call_count}") print(f"First call at: {ctx.start_time}") print(f"Waited {ctx.seconds_waited:.2f} seconds") perform_action()

with rate_limiter.rate_limited_context(timeout=5.0) as ctx: print(f"Will timeout after 5 seconds") perform_action()

__call__

__call__(timeout: Optional[float] = None)

Make the rate limiter callable as a context manager.

Equivalent to calling rate_limited_context()

RateLimitContext

RateLimitContext dataclass

Context object yielded by rate_limited_context that provides access to rate limiter metrics.

Properties

id: Rate limiter identifier hourly_rate: Configured rate limit in calls per hour call_count: Total number of calls made exceptions: Total number of exceptions encountered start_time: Unix timestamp of when the first call was made seconds_waited: Number of seconds waited before entering the context

id property

id: str

hourly_rate property

hourly_rate: int

call_count property

call_count: int

exceptions property

exceptions: int

start_time property

start_time: float

Timestamp of when the first call was made (Unix timestamp).

seconds_waited class-attribute instance-attribute

seconds_waited: float = 0.0

Event Classes

PacerEvent

PacerEvent dataclass

Base class for all pacer events.

Provides common context available to all callback events.

Attributes:

Name Type Description
limiter_id str

Unique identifier for the pacer

limiter TokenBucketPacer

Reference to the TokenBucketPacer instance

state_snapshot Dict[str, Any]

Snapshot of shared state at the time of the event

limiter_id instance-attribute

limiter_id: str

limiter instance-attribute

limiter: TokenBucketPacer

state_snapshot instance-attribute

state_snapshot: Dict[str, Any]

call_count property

call_count: int

Total number of calls made.

exceptions property

exceptions: int

Total number of exceptions encountered.

start_time property

start_time: float

Unix timestamp of when the first call was made.

elapsed_time property

elapsed_time: float

Time elapsed since the first call (in seconds).

DriftEvent

DriftEvent dataclass

Bases: PacerEvent

Event fired when rate drift exceeds the configured threshold.

Attributes:

Name Type Description
current_rate float

Actual rate in calls per hour

target_rate float

Target rate in calls per hour

drift float

Drift as a fraction of target rate (0.1 = 10% drift)

max_drift float

Maximum allowed drift (from configuration)

current_rate instance-attribute

current_rate: float

target_rate instance-attribute

target_rate: float

drift instance-attribute

drift: float

max_drift instance-attribute

max_drift: float

MaxCallsEvent

MaxCallsEvent dataclass

Bases: PacerEvent

Event fired when the max_calls limit is reached.

Attributes:

Name Type Description
max_calls int

Maximum number of calls allowed (from configuration)

max_calls instance-attribute

max_calls: int

PeriodicCheckEvent

PeriodicCheckEvent dataclass

Bases: PacerEvent

Event fired during periodic checks with current metrics.

Provides comprehensive distribution-based statistics for load test analysis and bottleneck detection. Uses TDigest for accurate percentile calculations without storing all samples.

Attributes:

Name Type Description
worker_count int

Number of workers detected from environment

duration_digest Optional[TDigest]

TDigest of call durations (None if insufficient data)

wait_digest Optional[TDigest]

TDigest of wait times (None if insufficient data)

windowed_rates Dict[int, float]

Rates for configured time windows in seconds (e.g., {60: 3500, 300: 3450})

sample_count int

Total samples in digests

target_rate float

Target rate in calls/hour (from configuration)

current_rate float

Current actual rate in calls/hour

drift Optional[float]

Drift as a fraction of target rate (None if insufficient data)

worker_count instance-attribute

worker_count: int

duration_digest instance-attribute

duration_digest: Optional[TDigest]

wait_digest instance-attribute

wait_digest: Optional[TDigest]

windowed_rates instance-attribute

windowed_rates: Dict[int, float]

sample_count instance-attribute

sample_count: int

target_rate instance-attribute

target_rate: float

current_rate instance-attribute

current_rate: float

drift instance-attribute

drift: Optional[float]

duration_p50 property

duration_p50: Optional[float]

Median call duration in seconds.

duration_p90 property

duration_p90: Optional[float]

90th percentile call duration in seconds.

duration_p99 property

duration_p99: Optional[float]

99th percentile call duration in seconds.

wait_p50 property

wait_p50: Optional[float]

Median wait time in seconds.

wait_p90 property

wait_p90: Optional[float]

90th percentile wait time in seconds.

wait_p99 property

wait_p99: Optional[float]

99th percentile wait time in seconds.

wait_ratio property

wait_ratio: Optional[float]

Ratio of median wait time to median call duration.

This metric reveals whether your pacer (traffic generator) is constraining throughput or if the System Under Test (SUT) is the limiting factor.

Interpretation: - Low ratio (< 0.1): SUT-bound operation * Tests spend 10x more time executing than waiting for pacer * The pacer is not constraining throughput - SUT capacity may be a bottleneck

  • High ratio (> 1.0): Pacer-bound operation
  • Tests spend more time waiting for pacer than executing
  • SUT is fast relative to the pacing rate
  • SUT can handle higher rates; increase pace if more load needed

Use this metric to tune your load testing: high ratio means you can push harder; low ratio means you've found the SUT's capacity limit.

Fixture Factories

make_shared_json

make_shared_json

make_shared_json(request: FixtureRequest, tmp_path_factory: TempPathFactory, worker_id: str) -> Generator[Callable[..., SharedJson], None, None]

Factory for creating shared JSON fixtures across pytest-xdist workers.

This is a session-scoped fixture that returns a factory function for creating SharedJson instances with proper worker coordination.

Parameters:

Name Type Description Default
request FixtureRequest

The pytest fixture request object

required
tmp_path_factory TempPathFactory

Pytest's temporary path factory

required
worker_id str

The xdist worker ID (e.g., 'gw0', 'gw1', or 'master')

required

Returns:

Type Description
None

Callable[..., SharedJson]: Factory function that creates SharedJson instances

Example

@pytest.fixture(scope="session") def api_rate_tracker(make_shared_json): def init_data(): return { 'count': 0, 'limit': 100, 'errors': [] }

def report(shared):
    data = shared.read()
    print(f"Total API calls: {data['count']}")

return make_shared_json(
    name="api_rate_tracker",
    on_first_worker=init_data,
    on_last_worker=report
)

def test_api_call(api_rate_tracker): with api_rate_tracker.locked_dict() as data: data['count'] = data.get('count', 0) + 1

make_pacer

make_pacer

make_pacer(make_shared_json)

Factory for creating pacer fixtures across pytest-xdist workers.

This fixture provides a way to create TokenBucketPacer instances that share state across workers using SharedJson.

Example

@pytest.fixture(scope="session") def pacer(make_pacer): from pytest_xdist_rate_limit import Rate

return make_pacer(
    name="pacer",
    hourly_rate=Rate.per_second(10)
)

def test_api_call(pacer): with pacer() as ctx: # Entering the context will wait if required to respect the rate pass

make_rate_limiter (Deprecated)

make_rate_limiter

make_rate_limiter(make_pacer)

Deprecated: Use make_pacer instead.

Factory for creating pacer fixtures across pytest-xdist workers. This fixture is deprecated and will be removed in a future version. Please use make_pacer instead.

Deprecated Aliases

The following names are deprecated and maintained for backward compatibility: