LimitPal
Your friendly Python resilient execution toolkit
A fast, modular resilient execution toolkit for Python with sync and async support. In-memory, zero dependencies, thread-safe.
Features
- Resilience: combine retry, circuit breaker and rate-limiters in one executor
- Composite limiters (combine multiple limiters for burst control)
- Token Bucket and Leaky Bucket algorithms
- Sync and async APIs for all the functionality
- MockClock for deterministic tests
- No external dependencies, Python ≥ 3.10
Installation
Or with uv:
Quick example
AsyncResilientExecutor/ResilientExecutor (async/sync ready)
Combine Limiting + Retry + CircuitBreaker + BurstControl strategies in one executor
""" Async example """
from limitpal import AsyncResilientExecutor, AsyncTokenBucket, CircuitBreaker, RetryPolicy
# Async rate limiting for burst control.
limiter = AsyncTokenBucket(capacity=5, refill_rate=10)
# Async retries with the same policy.
retry = RetryPolicy(max_attempts=3, base_delay=0.2, backoff=2.0)
# Same breaker semantics in async workflows.
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)
# Async executor wraps limiter + retry + breaker.
executor = AsyncResilientExecutor(
limiter=limiter,
retry_policy=retry,
circuit_breaker=breaker,
)
# Your real-world async call.
async def call_api() -> str:
return await request_external_service()
# Run with async protection.
result = await executor.run("user:123", call_api)
allow / acquire (same API in sync/async)
allow() is non-blocking: it answers “can I proceed right now?”.
acquire() waits for quota (or until timeout) and then proceeds.
Async versions have the same contract — only await differs.
from limitpal import TokenBucket
limiter = TokenBucket(capacity=2, refill_rate=1)
if limiter.allow("user:123"):
process_request()
else:
return "Rate limited"
limiter.acquire("user:123", timeout=2.0) # wait until a token is available
process_request()
from limitpal import AsyncTokenBucket
limiter = AsyncTokenBucket(capacity=2, refill_rate=1)
if await limiter.allow("user:123"):
await process_request()
else:
return "Rate limited"
await limiter.acquire("user:123", timeout=2.0)
await process_request()
Key-based limiting (per-user, per-IP, per-tenant)
Limiters keep separate buckets per key. Use keys to isolate users, IPs, or any other dimension you need.
from limitpal import TokenBucket
limiter = TokenBucket(capacity=2, refill_rate=1)
# user:123 has its own bucket
limiter.allow("user:123")
limiter.allow("user:123") # consumes user:123 quota
limiter.allow("user:123") # likely False (rate limited)
# user:456 is independent
limiter.allow("user:456") # allowed, separate bucket
Composite limiters (combine strategies)
Use this when you need both burst control and a smooth global throughput limit at the same time. All limiters must allow the request. (Sync/Async)
from limitpal import AsyncCompositeLimiter, AsyncLeakyBucket, AsyncTokenBucket
per_user = AsyncTokenBucket(capacity=10, refill_rate=5)
global_smooth = AsyncLeakyBucket(capacity=50, leak_rate=20)
limiter = AsyncCompositeLimiter([per_user, global_smooth])
if await limiter.allow("user:123"):
await process_request()
else:
return "Rate limited"
Next steps
- Quick Start — basic usage (sync, async, blocking)
- Algorithms — Token Bucket vs Leaky Bucket
- Cookbook — real-world patterns
- API Reference — full API
- Examples — FastAPI middleware