Skip to content

AsyncMQ

AsyncMQ Logo

⚡ Supercharge your async applications with tasks so fast, you'll think you're bending time itself. ⚡

Test Suite Package version Supported Python versions CodSpeed


Documentation: https://asyncmq.dymmond.com 📚

Source Code: https://github.com/dymmond/asyncmq

The official supported version is always the latest released.


AsyncMQ is an asynchronous Python job queue focused on asyncio/anyio workloads.

It gives you: - task registration via @task - queue and worker runtime APIs - delayed jobs, retries/backoff, TTL expiration, and dead-letter routing - multiple backends (Redis, Postgres, MongoDB, RabbitMQ, in-memory) - a CLI (asyncmq) and a built-in dashboard app

What AsyncMQ Is (and Is Not)

AsyncMQ is:

  • a library-first queue/worker runtime you embed in Python apps
  • backend-pluggable through a shared BaseBackend contract
  • suitable for both local development and production deployments

AsyncMQ is not:

  • a hosted queue service
  • a guaranteed exactly-once execution system
  • a replacement for domain-level idempotency in your task code

Architecture Overview

At runtime, AsyncMQ has four main layers:

  1. Task registration: @task(queue=...) stores handlers in TASK_REGISTRY and adds .enqueue() helpers.
  2. Queue API: Queue wraps backend operations (enqueue, pause, list_jobs, delayed/repeatable APIs).
  3. Worker runtime: process_job/handle_job run tasks, manage state transitions, retries, and acknowledgements.
  4. Backend and store: concrete backends persist job state and queue metadata.

For an end-to-end walkthrough, start with Core Concepts.

flowchart LR
    A["Producer"] --> B["@task/.enqueue"]
    B --> C["Queue API"]
    C --> D["Backend (waiting/delayed state)"]
    D --> E["Worker runtime"]
    E --> F["Task handler execution"]
    F --> G["Result/state update + ack/DLQ"]

Feature Map

Minimal Quickstart (In-Memory)

Use in-memory backend first so you can run without Redis/Postgres.

# myapp/settings.py
from asyncmq.backends.memory import InMemoryBackend
from asyncmq.conf.global_settings import Settings


class AppSettings(Settings):
    backend = InMemoryBackend()
export ASYNCMQ_SETTINGS_MODULE=myapp.settings.AppSettings
# myapp/tasks.py
from asyncmq.tasks import task


@task(queue="emails", retries=2, ttl=300)
async def send_welcome(email: str) -> None:
    print(f"sent welcome email to {email}")
# producer.py
import anyio
from asyncmq.queues import Queue
from myapp.tasks import send_welcome


async def main() -> None:
    queue = Queue("emails")
    job_id = await send_welcome.enqueue("alice@example.com", backend=queue.backend)
    print("enqueued", job_id)


anyio.run(main)
asyncmq worker start emails --concurrency 1

Next Steps

  1. Configure your target backend in Settings.
  2. Add failure handling and retries with Jobs and Workers.
  3. Add operations visibility with the Dashboard and CLI reference.