Queues & Queue
API¶
Queues are the traffic directors of AsyncMQ, orchestrating the flow of jobs from producers to workers.
This guide covers everything about the Queue
class—its constructor knobs, core methods, advanced patterns,
and best practices—served with a dash of humor to keep you awake. 🚦
1. What Is Queue
?¶
The Queue
class (in asyncmq.queues
) is your main interface to:
- Schedule jobs (single, bulk, delayed, repeatable)
- Inspect queue state (waiting, active, completed, failed)
- Control worker behavior (pause, resume, run, start)
- Clean up old or unwanted jobs
Behind the scenes, Queue
delegates to a BaseBackend
for persistence and uses run_worker
to launch workers.
2. Constructor Parameters¶
from asyncmq.queues import Queue
Queue(
name: str,
backend: BaseBackend | None = None,
concurrency: int = 3,
rate_limit: int | None = None,
rate_interval: float = 1.0,
scan_interval: float | None = None,
)
Param | Type | Default | Purpose & Tuning Tips |
---|---|---|---|
name |
str |
— | Identifier for this queue. Keep it short and meaningful (e.g., "email", "reports"). |
backend |
BaseBackend or None |
settings.backend |
Storage driver (Redis, Postgres, etc.). Override for custom or test backends. |
concurrency |
int |
3 |
Max simultaneous jobs. High for I/O-bound, low for CPU-bound. |
rate_limit |
int or None |
None |
Token-bucket max calls per rate_interval . Block with 0 , disable with None . |
rate_interval |
float |
1.0 |
Window (seconds) for rate limiting. |
scan_interval |
float or None |
settings.scan_interval |
Polling frequency for delayed & repeatable jobs. Lower = lower latency but more backend ops. |
🔧 Pro Tip: Set
scan_interval
shorter than your shortest delay or cron window to avoid missing schedules.
3. Core Methods¶
3.1. add(...)
(Single Job)¶
Schedules a single job:
job_id = await queue.add(
task_id="app.send_email",
args=["alice@example.com"],
kwargs={"subject": "Hi"},
retries=2,
ttl=300,
backoff=None,
priority=5,
delay=10.0,
)
- Returns
job_id
(UUID). delay
(sec): schedule future execution.priority
: lower numbers run first.retries
&ttl
: control failure and expiry.
3.2. add_bulk([...])
(Batch Enqueue)¶
Efficiently enqueue many jobs:
ids = await queue.add_bulk([
{"task_id": "app.process", "args": [1]},
{"task_id": "app.process", "args": [2]},
])
- Use case: high-throughput ingestion or batch jobs.
- Backend support: must implement
bulk_enqueue
.
3.3. add_repeatable(...)
(Periodic Jobs)¶
Register repeatable definitions:
queue.add_repeatable(
task_id="app.cleanup",
every=3600, # seconds
args=None,
cron=None,
)
every
: interval in seconds or a cron-like string.cron
: cron expression (requires repeatable scheduler).- Note: Jobs enqueued when
run()
starts.
3.4. pause()
/ resume()
¶
Temporarily halt or resume workers:
await queue.pause()
# ... maintenance window ...
await queue.resume()
- Prevents new dequeues; in-flight jobs finish.
3.5. clean(state, older_than)
(Purge)¶
Purge jobs by state & age:
await queue.clean("completed", older_than=time.time() - 3600)
state
: e.g., "completed", "failed", "expired".older_than
: Unix timestamp threshold.
4. Running Workers¶
4.1. run()
(Async)¶
Starts the worker with:
- Concurrency & rate limiting
- Delayed-job scanner (using
scan_interval
) - Repeatable scheduler (for
_repeatables
)
# In an async context
await queue.run()
4.2. start()
(Blocking)¶
Convenience wrapper using AnyIO:
# In a script
queue.start()
- Blocks until cancellation (SIGINT) or error.
Tip
Combine with ASGI lifespan events to manage workers in web apps.
5. Inspecting & Utilities¶
queue.enqueue(payload)
/enqueue_delayed
: low-level methods.queue.delay(payload, run_at)
: alias for enqueue + enqueue_delayed.queue.get_due_delayed()
,list_delayed()
,remove_delayed(job_id)
: manage delayed set.list_repeatables()
,pause_repeatable()
,resume_repeatable()
: introspect repeatables.cancel_job(job_id)
,is_job_cancelled(job_id)
: cancel or check cancellations.queue_stats()
: counts per state.list_jobs(state)
: list jobs by state.
stats = await queue.queue_stats()
jobs = await queue.list_jobs("waiting")
6. Advanced Patterns & Examples¶
6.1. Dependency Chains¶
# Step A
id_a = await queue.add("app.step_a")
# Step B after A
await queue.add("app.step_b", depends_on=[id_a])
6.2. Parallel Fan-Out¶
# A triggers two parallel tasks, then aggregates
id_a = await queue.add("app.ingest")
await queue.add_bulk([
{"task_id": "app.transform_a", "depends_on": [id_a]},
{"task_id": "app.transform_b", "depends_on": [id_a]},
])
# aggregator waits on both
await queue.add("app.aggregate", depends_on=[id_a, id_b])
6.3. Cron via FlowProducer
¶
For full cron support, see the Advanced Patterns guide. You can also register a repeatable with cron
in add_repeatable
.
7. Testing & Best Practices¶
- Use InMemoryBackend for unit tests:
from asyncmq.backends.inmemory import InMemoryBackend
q = Queue(name="test", backend=InMemoryBackend())
delay=0.1
and await anyio.sleep(0.2)
.
* Assert Stats: stats = await q.queue_stats()
.
Warning
Don’t call queue.run()
in tests without a timeout—tests will hang!
8. Common Pitfalls & FAQs¶
- Missing Backend: If you omit
backend
, uses globalsettings.backend
—watch for surprises if you override per-queue. - Zero
rate_limit
: Blocks all processing (by design)—use carefully. - Repeatable Jobs Not Firing: Ensure you call
run()
(orstart()
) so the scheduler runs. - Cleaning Too Aggressively: Purging mid-run can orphan dependencies—schedule cleanups during off-hours.
Armed with the Queue
API and these examples, you can tame any workload from simple enqueues to complex
DAG orchestrations—while having a laugh or two along the way. 🎉