Core Concepts & Architecture¶
This deep dive explains exactly how AsyncMQ moves your tasks from producer to worker, how jobs are stored, retried, and expired, and why each design decision matters. Code references correspond to modules in the AsyncMQ source.
1. Queue Abstraction (asyncmq.queues.Queue
)¶
A Queue
object ties together the queue name, backend storage, concurrency, and rate limit settings.
- Provide a name and backend (The backend is optional as it will default to the
settings.backend
). - Optionally tune concurrency (defaults to 3) and rate limit (disabled by default).
from asyncmq.backends.redis import RedisBackend
from asyncmq.queues import Queue
queue = Queue(
name="default",
backend=RedisBackend(redis_url="redis://localhost:6379/0"),
concurrency=5,
rate_limit=10, # max 10 jobs per rate_interval
rate_interval=1.0 # per second
)
- Why
.concurrency=3
by default? Balances throughput without overwhelming your event loop or external resources. Override for CPU-bound or I/O-heavy tasks. - Rate limiting uses a token-bucket (
asyncmq.rate_limiter.RateLimiter
) to throttle job starts, ideal for API rate caps.
1.1. Enqueuing Jobs (delay)¶
- Create a Job instance via the task decorator (see Tasks section).
- Get a
payload
dict viajob.to_dict()
.
await queue.enqueue(payload) # immediate
await queue.enqueue_delayed(payload, run_at) # schedule for future
Under the hood, enqueue()
delegates to BaseBackend.enqueue
:
- Redis:
LPUSH queue:default:jobs <payload>
- Memory: Python
list.insert(0, payload)
Delayed jobs are stored separately (Sorted Set
in Redis) and polled by delayed_job_scanner
.
1.1.1 Alternatively using the delay
.¶
For those familiar to a different interface like Celery, this is the equivalent of delay
which is great so AsyncMQ also
provides the .delay()
option for you. It does literally the same as the enqueue
combined with enqueue_delayed
.
await queue.delay(payload) # immediate
await queue.delay(payload, run_at) # schedule for future
Under the hood, delay()
delegates to .enqueue(...)
and if a run_at
is provided, then delegates to .enqueue_delayed(...)
:
1.2. Inspecting & Controlling¶
from asyncmq.core.enums import State
await queue.list_jobs(state=State.WAITING)
await queue.pause() # workers stop dequeuing new jobs
await queue.resume() # workers resume
stats = await queue.queue_stats()
- pause()/resume() flip a backend flag (
pause_queue
/resume_queue
) preventingdequeue()
from returning jobs. - queue_stats() aggregates counts of jobs per state.
1.3. Running Workers via Code¶
AsyncIO-native:
# Asynchronous entry point
await queue.run()
# or
queue.start() # blocking, calls async run() internally via anyio.run
This uses asyncmq.runners.run_worker
, passing your queue.concurrency
, rate_limit
, and any repeatable jobs
registered on this Queue
instance.
- Why
queue.start()
? Provides a simple blocking call in synchronous scripts.
2. Task Registration (asyncmq.tasks.task
decorator)¶
The @task
decorator wraps your function to:
- Register metadata in
TASK_REGISTRY
(module.func_name
, queue, retries, TTL, progress). - Attach an
enqueue(backend, *args, **kwargs)
helper to schedule background execution. - Wrap execution logic so workers can call your function uniformly.
from asyncmq.backends.redis import RedisBackend
from asyncmq.tasks import task
backend = RedisBackend(redis_url=...)
@task(queue="default", retries=2, ttl=60)
async def greet(name: str):
print(f"Hello {name}")
# Enqueue a job explicitly decorating alone does NOT enqueue on call.
# You explicitly want to pass a backend
await greet.enqueue(backend, "Alice", delay=5, priority=3)
- Why explicit
.enqueue()
? Separates function invocation from scheduling, preventing accidental background jobs. - Retries & TTL managed per job: if your task raises, worker decrements retries and re-enqueues or moves to DLQ.
2.1. Without specifying a backend¶
You don't need to specify a backend
if you already have one declared in the settings.backend
which by default is redis
.
from asyncmq.tasks import task
@task(queue="default", retries=2, ttl=60)
async def greet(name: str):
print(f"Hello {name}")
# Enqueue a job explicitly decorating alone does NOT enqueue on call.
await greet.enqueue("Alice", delay=5, priority=3)
3. Workers & Job Processing¶
3.1. Worker Components¶
Workers consist of three concurrent coroutines (via asyncio.gather
in run_worker
):
process_job
: dequeues and handles jobs, respecting concurrency via semaphore and rate limits.delayed_job_scanner
: polls delayed storage (everyinterval
, default 2s) to re-enqueue due jobs.repeatable_scheduler
(optional): if you’ve added repeatable jobs viaQueue.add_repeatable
orFlowProducer
.
3.2. Job Lifecycle in process_job
(asyncmq.workers.process_job
)¶
- Dequeue:
job_data = await backend.dequeue(queue_name)
- Instantiate:
job = Job.from_dict(job_data)
- Cancellation Check:
is_job_cancelled
; ack and emitjob:cancelled
- TTL Check:
job.is_expired()
→ update state toexpired
,move_to_dlq
, emitjob:expired
- Dependency Check: if
job.depends_on
unresolved, re-enqueue to wait - Emit Event:
job:started
- Execute: call function from
TASK_REGISTRY
withawait
or thread-wrap for sync functions - Success:
- emit
job:completed
save_job_result
- if
repeat_every
set, schedule next run
- emit
- Failure:
- emit
job:failed
- if
retries_left > 0
, awaitbackend.enqueue(...)
- else, move to DLQ via
backend.move_to_dlq
- emit
- Ack: remove from active queue via
backend.ack
Note
Why separate handle_job
and process_job
?
process_job
manages loop, concurrency, and rate-limiting.handle_job
focuses on per-job lifecycle (TTL, retries, events).
3.3. CLI Worker vs. Queue.run()
¶
- CLI (
asyncmq worker start default --concurrency 2
): usesstart_worker
inasyncmq.runners
—spawnsworker_loop
s (single coroutine per worker) without rate limiting or delayed scanner by default. queue.run()
: usesrun_worker
- Integrates concurrency, rate limiting, delayed scanner, and repeatable scheduling.
Tip
Use CLI for quick testing; use queue.run()
in long-running services to leverage full feature set.
4. Backends & Persistence¶
AsyncMQ decouples queue operations (BaseBackend
) from low-level storage (BaseStore
in asyncmq.stores
).
Backend | Cargo | Data Structures | Use Case |
---|---|---|---|
InMemoryBackend | tests & prototypes | Python dicts, lists, heapq | Fast, ephemeral |
RedisBackend | redis.py |
Lists (waiting), ZSets (delayed) | High throughput |
PostgresBackend | postgres.py uses asyncpg |
Tables with indexes | ACID, complex queries |
MongoBackend | mongodb.py using motor |
Collections with TTL indices | Flexible schemas |
The default backend used by AsyncMQ
is redis and the reason for that its that AsyncMQ was heavily inspired by
BullMQ which only works with Redis but AsyncMQ is a lot more flexible allowing to support more than just that and allows
you to extend to your own store and backend if required.
Note
In the end, we just BullMQ so much that we wanted something like that in Python
.
4.1. Implementing Custom Backends¶
Subclass asyncmq.backends.base.BaseBackend
and implement all abstract methods: enqueue, dequeue, ack, move_to_dlq, enqueue_delayed, get_due_delayed, remove_delayed, update_job_state, save_job_result, get_job_state, add_dependencies, resolve_dependency, pause_queue, resume_queue, and the repeatable methods.
Why this separation? Enables adapting AsyncMQ to cloud queues, message brokers, or other persistence layers without touching core logic.
5. Scheduling & Flows¶
Beyond individual tasks and delays, AsyncMQ lets you orchestrate multiple, interdependent jobs as a flow using FlowProducer. This is ideal for batch pipelines, DAGs, or any scenario where jobs must run in a specific order.
from asyncmq.backends.redis import RedisBackend
from asyncmq.flow import FlowProducer
from asyncmq.jobs import Job
# 1. Initialize your backend and flow producer
backend = RedisBackend(redis_url="redis://localhost:6379/0")
# Not mandatory as it reads the default backend from the settings.
flow = FlowProducer(backend=backend)
# 2. Create Job objects for each step in your pipeline
# - job1 runs first
# - job2 depends_on job1 and runs only after job1 completes
job1 = Job(
task_id="app.say_hello", # must match your registered task ID
args=["Alice"],
kwargs={},
# Other params (retries, ttl, priority) use defaults
)
job2 = Job(
task_id="app.process_data",
args=[42],
kwargs={"verbose": True},
depends_on=[job1.id],
)
# 3. Atomically enqueue the flow to the "default" queue
# If your backend supports `atomic_add_flow`, this will happen in one operation;
# otherwise, AsyncMQ falls back to sequential enqueue + dependency registration.
job_ids = await flow.add_flow("default", [job1, job2])
print(f"Enqueued jobs: {job_ids}")
- Atomic Flow: Backends like PostgresBackend can enqueue all jobs and link dependencies in a single transaction via atomic_add_flow (if implemented).
- Fallback Logic: For other backends, AsyncMQ sequentially calls enqueue for each job and then registers dependencies with add_dependencies.
- Repeatable Jobs: To schedule a job to re-run automatically, set the repeat_every field on a Job (in seconds) when constructing it. Workers will re-enqueue these on successful completion.
6. Event System (asyncmq.core.event.event_emitter
)¶
AsyncMQ emits structured events at key points:
job:started
job:progress
(whenprogress=True
)job:completed
job:failed
job:expired
job:cancelled
Subscribe to events to integrate metrics, logging, or side-effects:
from asyncmq.core.event import event_emitter
def on_complete(payload):
print(f"Job {payload['id']} completed at {payload['timestamps']['finished_at']}")
event_emitter.on("job:completed", on_complete)
anyio.EventStream
for async-safe pub/sub.
That concludes a precise, code-verified exploration of AsyncMQ’s core.