AsyncMQ¶
⚡ Supercharge your async applications with tasks so fast, you’ll think you’re bending time itself. ⚡
Welcome to AsyncMQ, the modern task queue that brings powerful, flexible, and lightning-fast background processing to your Python stack. Whether you're building microservices, handling high-throughput data pipelines, or just want reliable delayed jobs, AsyncMQ has your back.
🚀 Why Choose AsyncMQ?¶
-
Asyncio & AnyIO-Native No more wrestling with threads or callbacks—AsyncMQ is built on
asyncio
andanyio
, so your tasks are non-blocking and integrate seamlessly into modern async frameworks like FastAPI and Esmerald. -
Multi-Backend Flexibility From the blistering speed of Redis to the ACID guarantees of Postgres, or the schema-less power of MongoDB, AsyncMQ supports multiple backends out of the box—and you can write your own by implementing
BaseBackend
. -
Advanced Scheduling
-
Delayed Jobs: Fire off tasks in the future with precise delays.
- Repeatable & Cron: Create heartbeat jobs or cron-like schedules with a single line.
-
Scan & Scheduler: Smart polling intervals and cron logic keep your timings accurate without burning CPU cycles.
-
Robust Error Handling
-
Retries & Backoff: Exponential or custom backoff strategies to gracefully handle failures.
-
Dead Letter Queue (DLQ): Automatically route permanently failed tasks for later inspection and reprocessing.
-
Observability & Event Hooks Tap into lifecycle events (
job:started
,job:completed
,job:failed
,job:progress
) to power real-time dashboards, metrics, or custom alerts. -
Rate Limiting & Concurrency Control Fine-tune throughput with token-bucket rate limiting and capacity limiters. Scale your worker concurrency without overloading downstream systems.
-
Sandboxed Execution Run untrusted or CPU-bound tasks in isolated subprocesses with timeouts to protect your main workers from rogue code or infinite loops.
-
Flow/DAG Orchestration Create complex task graphs with dependencies using
FlowProducer
. Enqueue entire pipelines atomically or fall back to safe sequential logic. -
CLI & Dev Experience A feature-rich CLI for managing queues, jobs, and workers—intuitive commands, JSON output support, and built-in help for every scenario.
-
Seamless ASGI Integration Out-of-the-box compatibility with FastAPI, Esmerald, or any ASGI application. Manage workers within your app’s lifecycle events.
Comparison with Other Python Task Queues¶
Feature | AsyncMQ | Celery | RQ | Dramatiq | Huey |
---|---|---|---|---|---|
AsyncIO Native | ✅ | ❌ | ❌ | ✅ | ❌ |
Pluggable Backends | ✅ Redis, Postgres, Mongo, In-Mem | RabbitMQ only | Redis only | Redis only | Redis only |
Rate Limiting | ✅ built-in | ❌ | ❌ | ✅ via addon | ❌ |
Cron & Repeatable Jobs | ✅ interval & cron expressions | ✅ | ✅ | ✅ | ✅ |
Progress Reporting | ✅ event hooks | ✅ callbacks | ❌ | ✅ hooks | ❌ |
Dead Letter Queue | ✅ | ✅ | ❌ | ✅ | ✅ |
Flow / DAG Support | ✅ FlowProducer |
✅ chords | ❌ | ✅ extensions | ❌ |
ASGI-Friendly | ✅ FastAPI/Esmerald integration | ❌ | ❌ | ❌ | ❌ |
CLI Management | ✅ rich, JSON-friendly | ✅ | ✅ | ✅ | ✅ |
Where AsyncMQ Shines¶
- True AsyncIO & AnyIO integration: zero thread hacks, full non-blocking background tasks.
- Backend flexibility: swap Redis, Postgres, MongoDB, or In-Memory with a single setting.
- Built-in rate limiting & concurrency control: protect downstream services out-of-the-box.
- Event-driven hooks: subscribe to
job:started
,completed
,failed
,progress
, etc., for metrics and alerts. - ASGI integration: manage workers within FastAPI or Esmerald lifecycles—no extra wrappers needed.
⚡ Core Features at a Glance¶
Category | Highlights |
---|---|
Task Definition | @task decorator, .enqueue() , progress callbacks, TTL, retries, dependencies, repeats |
Queue API | add() , add_bulk() , add_repeatable() , pause() , resume() , clean() , stats |
Worker Engine | process_job() , capacity & rate limiters, sandbox, event emitter, DLQ handling |
Scheduling | delayed_job_scanner , repeatable_scheduler , cron vs interval support |
Flow Orchestration | FlowProducer.add_flow() , atomic backend support, fallback mode for dependency wiring |
Configuration | Central Settings dataclass, env var override, dynamic tuning of concurrency & intervals |
Observability | LoggingConfig protocol, structured logs, event subscriptions, metrics integration |
CLI Management | asyncmq queue , asyncmq job , asyncmq worker , asyncmq info commands |
🎬 Quickstart¶
- Install
pip install asyncmq
Override the default settings from asyncmq.conf import Settings
and create your own, then make it global.
export ASYNCMQ_SETTINGS_MODULE=myapp.settings.Settings
from asyncmq.tasks import task
@task(queue="emails", retries=2, ttl=120)
async def send_welcome(email: str):
# Imagine real email-sending logic here
print(f"📧 Sent welcome to {email}")
import anyio
from asyncmq.queues import Queue
async def main():
q = Queue("emails")
await send_welcome.enqueue(q.backend, "alice@example.com", delay=10)
anyio.run(main)
asyncmq worker start emails --concurrency 5
AsyncMQ is more than just a task queue, it’s the swiss army knife of async background processing. Dive into the Learn section to master every feature, or jump into the Features docs for quick reference.
Ready to bend time?
Get started today and experience async tasking at warp speed! 🎉