Cookbook
Real-world patterns for using LimitPal in production apps.
Resilient HTTP client (retry + circuit breaker + rate limit)
Use a resilient executor to keep a flaky third-party API under control.
import requests
from limitpal import (
TokenBucket,
ResilientExecutor,
RetryPolicy,
CircuitBreaker,
)
limiter = TokenBucket(capacity=10, refill_rate=20) # burst + steady
retry = RetryPolicy(max_attempts=4, base_delay=0.2, backoff=2.0, jitter=0.1)
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=10.0)
executor = ResilientExecutor(
limiter=limiter,
retry_policy=retry,
circuit_breaker=breaker,
)
def fetch_user(user_id: str) -> dict:
resp = requests.get(f"https://api.example.com/users/{user_id}", timeout=5)
resp.raise_for_status()
return resp.json()
result = executor.run("third_party_api", lambda: fetch_user("123"))
Async API client (rate limit + retries)
import aiohttp
from limitpal import AsyncResilientExecutor, AsyncTokenBucket, RetryPolicy
limiter = AsyncTokenBucket(capacity=5, refill_rate=10)
retry = RetryPolicy(max_attempts=3, base_delay=0.1, backoff=2.0)
executor = AsyncResilientExecutor(limiter=limiter, retry_policy=retry)
async def fetch_json(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=5) as resp:
resp.raise_for_status()
return await resp.json()
data = await executor.run("api", lambda: fetch_json("https://api.example.com"))
Database query protection (spike control)
Protect your database from sudden spikes (e.g., a hot endpoint or cache miss storm).
from limitpal import TokenBucket
db_reads = TokenBucket(capacity=50, refill_rate=200) # short bursts, 200 qps steady
def load_profile(user_id: str) -> dict:
if not db_reads.allow("db:read"):
raise RuntimeError("DB rate limit exceeded")
return query_db(user_id)
Message queue consumer with backpressure
Block consumers when downstream is saturated.
from limitpal import LeakyBucket
limiter = LeakyBucket(capacity=500, leak_rate=100) # 100 msgs/sec
def handle_message(msg) -> None:
# Apply backpressure to the consumer loop.
limiter.acquire("mq", timeout=2.0)
process(msg)
Background jobs with smooth throughput
Use a leaky bucket to keep job processing stable.
from limitpal import LeakyBucket
limiter = LeakyBucket(capacity=100, leak_rate=20) # 20 jobs/sec
def handle_job(job) -> None:
# Block until there is room in the queue.
limiter.acquire("worker", timeout=5.0)
process(job)
Multi-tier rate limiting (per-user + per-tenant + global)
Protect user fairness, tenant budgets, and overall system capacity.
from limitpal import CompositeLimiter, TokenBucket
user_limiter = TokenBucket(capacity=5, refill_rate=5)
tenant_limiter = TokenBucket(capacity=200, refill_rate=200)
global_limiter = TokenBucket(capacity=1000, refill_rate=1000)
limiter = CompositeLimiter([user_limiter, tenant_limiter, global_limiter])
if limiter.allow("user:123"):
process_request()
Avoid memory growth with TTL + max buckets
For multi-tenant or unbounded keys, enable eviction.