Performance Tuning & Benchmarking¶
Welcome to the deep-dive where we squeeze every last drop of performance out of AsyncMQ because milliseconds matter, and bragging rights at benchmark time are essential. 🏎️💨
In this chapter you’ll learn how to: 1. Measure baseline performance with realistic workloads. 2. Tune worker concurrency and rate limits. 3. Optimize delayed-job scanning and cron scheduling. 4. Minimize backend latency (Redis, Postgres, MongoDB). 5. Integrate monitoring & profiling tools. 6. Interpret benchmarks and avoid common pitfalls.
1. Benchmarking Your Workload¶
Before you tune, you must measure! Use a representative workload:
import asyncio
import time
from asyncmq.backends.redis import RedisBackend
from asyncmq.logging import logger
from asyncmq.queues import Queue
from asyncmq.tasks import task
backend = RedisBackend(redis_url="redis://localhost:6379/0")
queue = Queue(name="perf", backend=backend, concurrency=10)
@task(queue="perf")
async def noop():
"""A minimal task for pure overhead measurement."""
return True
async def enqueue_jobs(n: int):
for _ in range(n):
await noop.enqueue(backend)
async def run_benchmark(n: int):
start = time.time()
await enqueue_jobs(n)
end_enqueue = time.time()
logger.info(f"Enqueued {n} jobs in {end_enqueue - start:.2f}s")
# Process jobs
await queue.run()
end_process = time.time()
logger.info(f"Processed {n} jobs in {end_process - end_enqueue:.2f}s")
if __name__ == "__main__":
asyncio.run(run_benchmark(1000))
- Metric Separation: Track enqueue vs. processing durations independently.
- Repeatable Tests: Run multiple iterations and average to smooth out noise.
Tip
Pro tip: Isolate benchmarking on dedicated systems to avoid background load skewing results.
2. Concurrency & Rate Limits¶
2.1. Worker Concurrency¶
- I/O-bound tasks: scale up concurrency (10–50) for better throughput.
- CPU-bound tasks: keep concurrency low (1–4) to avoid thread contention.
Tune via:
asyncmq worker start perf --concurrency 20
Or programmatically:
queue = Queue(name="perf", backend=backend, concurrency=20)
2.2. Rate Limiting¶
Throttle job starts using token bucket:
queue = Queue(
name="api_calls",
backend=backend,
rate_limit=5, # max 5 jobs
rate_interval=1.0 # per second
)
Ideal for external APIs with rate caps.
3. Scanning & Scheduling Overhead¶
3.1. Delayed Job Scanner¶
Tuning how often AsyncMQ checks for due delayed and repeatable jobs can dramatically influence both task latency and backend load. You now have two configuration options:
- Global Setting via your
Settings
class (applies to all queues):
from asyncmq.backends.base import BaseBackend
from asyncmq.backends.redis import RedisBackend
from asyncmq.conf.global_settings import Settings as BaseSettings
class Settings(BaseSettings):
backend: BaseBackend = RedisBackend(redis_url="redis://localhost:6379/0")
# Lower scan interval to 0.2 seconds for sub-second delayed job latency
scan_interval: float = 0.2
- Per-Queue Override using the new
scan_interval
parameter onQueue
(applies only to that instance):
from asyncmq.backends.redis import RedisBackend
from asyncmq.queues import Queue
backend = RedisBackend(redis_url="redis://localhost:6379/0")
# Override to poll every 0.2s just for the 'perf' queue
queue = Queue(
name="perf",
backend=backend,
concurrency=20,
scan_interval=0.2
)
Latency vs. Scan Frequency Tradeoff¶
scan_interval | Approx. Max Delay | Backend Calls/sec |
---|---|---|
0.1s | ~0.1s | 10× baseline |
1.0s | ~1.0s | 1× baseline |
5.0s | up to 5.0s | 0.2× baseline |
- Lower intervals: near real-time but higher backend load.
- Higher intervals: lower load but increased scheduling latency.
Benchmark the delay between delay_until
and actual execution to pick the sweet spot.
3.2. Cron & Repeatable Scheduling¶
High-frequency or cron-based jobs (e.g., every minute) rely on the same scanner. Ensure your scanner interval is shorter than your cron window:
# Schedule a job every minute
queue.add_repeatable(
task_id="app.cleanup",
cron="*/1 * * * *",
)
With scan_interval=0.2
, jobs will fire within \~200ms of each minute boundary.
4. Backend Latency Optimization¶
4.1. Redis¶
- Pipelines: batch
LPUSH
+ metadata writes. - Connection Pool: adjust
minsize
/maxsize
to match concurrency. - Key Expiration: TTL on job hashes to auto-clean old entries.
4.2. Postgres¶
- Indexes: on
status
,run_at
,priority
. - Prepared Statements: reuse queries for enqueue & dequeue.
- Pooling:
asyncpg.create_pool(max_size=20)
.
4.3. MongoDB¶
- TTL Index: expire delayed & completed docs automatically.
- Bulk Writes: batch insert many jobs.
5. Monitoring & Profiling¶
- Prometheus: expose counters/histograms via
event_emitter
:
from prometheus_client import Counter, Histogram, start_http_server
from asyncmq.core.event import event_emitter
JOB_STARTED = Counter('asyncmq_job_started', 'Jobs started')
JOB_DURATION = Histogram('asyncmq_job_duration_seconds', 'Task duration')
event_emitter.on('job:started', lambda _: JOB_STARTED.inc())
event_emitter.on('job:completed', lambda p: JOB_DURATION.observe(
p['timestamps']['finished_at'] - p['timestamps']['created_at']
))
start_http_server(8000)
- Profilers: use Py-Spy or cProfile to spot hotspots in worker loops.
6. Interpreting Results¶
- Enqueue vs Process: Large gaps suggest backend or worker bottlenecks.
- 95th/99th Percentiles: flag outlier tasks.
- Queue Depth Trends: sustained depth >0 indicates under-provisioned workers.
- Retry Rates: high retry ratio points to flaky tasks or misconfigs.
Final tip: Automate benchmarks in CI to catch regressions—no surprises in production! 🎉
With these strategies, you’ll wield AsyncMQ like a power tool, fast, reliable, and scalable.
Next up: Security & Compliance—locking down your pipeline end-to-end.