Skip to content

AsyncMQ

AsyncMQ Logo

⚡ Supercharge your async applications with tasks so fast, you'll think you're bending time itself. ⚡

Test Suite Package version Supported Python versions CodSpeed


Documentation: https://asyncmq.dymmond.com 📚

Source Code: https://github.com/dymmond/asyncmq

The official supported version is always the latest released.


AsyncMQ is an asynchronous Python job queue focused on asyncio/anyio workloads.

It gives you: - task registration via @task - queue and worker runtime APIs - delayed jobs, retries/backoff, TTL expiration, and dead-letter routing - multiple backends (Redis, Postgres, MongoDB, RabbitMQ, in-memory) - a CLI (asyncmq) and a built-in dashboard app

What AsyncMQ Is (and Is Not)

AsyncMQ is:

  • a library-first queue/worker runtime you embed in Python apps
  • backend-pluggable through a shared BaseBackend contract
  • suitable for both local development and production deployments

AsyncMQ is not:

  • a hosted queue service
  • a guaranteed exactly-once execution system
  • a replacement for domain-level idempotency in your task code

Architecture Overview

At runtime, AsyncMQ has four main layers:

  1. Task registration: @task(queue=...) stores handlers in TASK_REGISTRY and adds .enqueue() helpers.
  2. Queue API: Queue wraps backend operations (enqueue, pause, list_jobs, delayed/repeatable APIs).
  3. Worker runtime: process_job/handle_job run tasks, manage state transitions, retries, and acknowledgements.
  4. Backend and store: concrete backends persist job state and queue metadata.

For an end-to-end walkthrough, start with Core Concepts.

flowchart LR
    A["Producer"] --> B["@task/.enqueue"]
    B --> C["Queue API"]
    C --> D["Backend (waiting/delayed state)"]
    D --> E["Worker runtime"]
    E --> F["Task handler execution"]
    F --> G["Result/state update + ack/DLQ"]

Feature Map

Minimal Quickstart (In-Memory)

Use in-memory backend first so you can run without Redis/Postgres.

# myapp/settings.py
from asyncmq.backends.memory import InMemoryBackend
from asyncmq.conf.global_settings import Settings


class AppSettings(Settings):
    backend = InMemoryBackend()
export ASYNCMQ_SETTINGS_MODULE=myapp.settings.AppSettings
# myapp/tasks.py
from asyncmq.tasks import task


@task(queue="emails", retries=2, ttl=300)
async def send_welcome(email: str) -> None:
    print(f"sent welcome email to {email}")
# producer.py
import anyio
from asyncmq.queues import Queue
from myapp.tasks import send_welcome


async def main() -> None:
    queue = Queue("emails")
    job_id = await send_welcome.enqueue("alice@example.com", backend=queue.backend)
    print("enqueued", job_id)


anyio.run(main)
asyncmq worker start emails --concurrency 1

Next Steps

  1. Configure your target backend in Settings.
  2. Add failure handling and retries with Jobs and Workers.
  3. Add scheduling and orchestration with Schedulers and Flows.
  4. Add operations visibility with the Dashboard, CLI reference, and Production Operations.