AsyncMQ¶
⚡ Supercharge your async applications with tasks so fast, you'll think you're bending time itself. ⚡
Documentation: https://asyncmq.dymmond.com 📚
Source Code: https://github.com/dymmond/asyncmq
The official supported version is always the latest released.
AsyncMQ is an asynchronous Python job queue focused on asyncio/anyio workloads.
It gives you:
- task registration via @task
- queue and worker runtime APIs
- delayed jobs, retries/backoff, TTL expiration, and dead-letter routing
- multiple backends (Redis, Postgres, MongoDB, RabbitMQ, in-memory)
- a CLI (asyncmq) and a built-in dashboard app
What AsyncMQ Is (and Is Not)¶
AsyncMQ is:
- a library-first queue/worker runtime you embed in Python apps
- backend-pluggable through a shared
BaseBackendcontract - suitable for both local development and production deployments
AsyncMQ is not:
- a hosted queue service
- a guaranteed exactly-once execution system
- a replacement for domain-level idempotency in your task code
Architecture Overview¶
At runtime, AsyncMQ has four main layers:
- Task registration:
@task(queue=...)stores handlers inTASK_REGISTRYand adds.enqueue()helpers. - Queue API:
Queuewraps backend operations (enqueue,pause,list_jobs, delayed/repeatable APIs). - Worker runtime:
process_job/handle_jobrun tasks, manage state transitions, retries, and acknowledgements. - Backend and store: concrete backends persist job state and queue metadata.
For an end-to-end walkthrough, start with Core Concepts.
flowchart LR
A["Producer"] --> B["@task/.enqueue"]
B --> C["Queue API"]
C --> D["Backend (waiting/delayed state)"]
D --> E["Worker runtime"]
E --> F["Task handler execution"]
F --> G["Result/state update + ack/DLQ"]
Feature Map¶
Minimal Quickstart (In-Memory)¶
Use in-memory backend first so you can run without Redis/Postgres.
# myapp/settings.py
from asyncmq.backends.memory import InMemoryBackend
from asyncmq.conf.global_settings import Settings
class AppSettings(Settings):
backend = InMemoryBackend()
# myapp/tasks.py
from asyncmq.tasks import task
@task(queue="emails", retries=2, ttl=300)
async def send_welcome(email: str) -> None:
print(f"sent welcome email to {email}")
# producer.py
import anyio
from asyncmq.queues import Queue
from myapp.tasks import send_welcome
async def main() -> None:
queue = Queue("emails")
job_id = await send_welcome.enqueue("alice@example.com", backend=queue.backend)
print("enqueued", job_id)
anyio.run(main)
Next Steps¶
- Configure your target backend in Settings.
- Add failure handling and retries with Jobs and Workers.
- Add operations visibility with the Dashboard and CLI reference.
