Schedulers¶
AsyncMQ scheduling is split into explicit loops instead of one Redis-only scheduler abstraction. That keeps the runtime backend-neutral while still covering the practical BullMQ feature set:
- delayed jobs
- repeatable jobs
- cron schedules
- interval schedules
- durable schedule storage
- schedule pause/resume/remove operations
- multi-worker scheduler ownership
Scheduler Components¶
AsyncMQ has three important timing-related loops:
Delayed job scanner¶
asyncmq.core.delayed_scanner.delayed_job_scanner(...)
Moves jobs whose delay_until timestamp is due back into the main queue.
Repeatable scheduler¶
asyncmq.schedulers.repeatable_scheduler(...)
Generates concrete job instances from repeatable definitions.
Stalled recovery scheduler¶
asyncmq.core.stalled.stalled_recovery_scheduler(...)
Detects active jobs that appear abandoned and re-enqueues them.
This page focuses on delayed and repeatable scheduling. See Workers for stalled recovery.
Delayed Jobs¶
Delayed jobs are regular jobs with a future delay_until timestamp.
You can create them directly:
Or they can be created indirectly by:
- retry backoff after handler failures
- dependency re-delay while waiting for parents
- debounce replacement workflows
The delayed scanner:
- polls the backend for due delayed entries
- moves them back into the normal queue
- sleeps for the configured interval
Queue.run() and run_worker(...) start this scanner automatically.
Repeatable Jobs¶
Repeatable jobs generate a new concrete job each time their schedule fires.
AsyncMQ supports two registration models.
Local repeatables¶
Use this when:
- the schedule is code-defined
- one worker process is intentionally the source of truth
- you are running tests or local development loops
Local repeatables live only in the current process.
Durable repeatables¶
await queue.upsert_repeatable(
"billing.send_statement",
cron="0 7 * * 1",
kwargs={"region": "eu"},
retries=3,
)
Use this when:
- schedules must survive producer or worker restarts
- operators create schedules through admin tooling or the dashboard
- multiple worker processes may consume the same queue
Durable repeatables are stored by the backend and discovered by workers.
every vs cron¶
Use every for simple fixed intervals:
Use cron for wall-clock schedules:
Practical rule:
everyis easier when exact human calendar alignment does not mattercronis better when the run time should line up with business hours, calendar boundaries, or operator expectations
How Repeatable Scheduling Works¶
The repeatable scheduler does two separate jobs:
- process local in-memory repeatable definitions passed in through
Queue - poll durable backend-managed repeatables and advance the next-run marker
When a schedule is due, the scheduler:
- builds one concrete
Job - enqueues it into the target queue
- computes the next scheduled execution time
- stores that next run for durable schedules
Generated job instances are normal jobs. They still go through the standard worker lifecycle, retries, TTL, deduplication, events, and admin inspection.
Scheduler Ownership¶
Multiple workers for the same queue may all start a repeatable scheduler. To avoid duplicate enqueue of durable schedules, AsyncMQ processes backend-managed repeatables under a queue-scoped scheduler lock.
What this means:
- Redis and Postgres coordinate scheduler ownership across processes
- in-memory and MongoDB coordinate only inside one process
- RabbitMQ inherits the coordination quality of its metadata store
This is the AsyncMQ equivalent of BullMQ's need for centralized schedule
advancement, but expressed through backend capability contracts instead of a
Redis-only QueueScheduler.
Schedule Management APIs¶
The queue API exposes a complete operational surface:
records = await queue.list_repeatables()
next_run = await queue.upsert_repeatable(
"reporting.rebuild",
every=900,
kwargs={"tenant": "acme"},
)
await queue.pause_repeatable({"task_id": "reporting.rebuild", "every": 900, "kwargs": {"tenant": "acme"}})
await queue.resume_repeatable({"task_id": "reporting.rebuild", "every": 900, "kwargs": {"tenant": "acme"}})
await queue.remove_repeatable({"task_id": "reporting.rebuild", "every": 900, "kwargs": {"tenant": "acme"}})
Operationally:
upsert_repeatable(...)creates or updates the schedule definitionlist_repeatables()exposes next run time and paused statuspause_repeatable(...)stops future occurrences without deleting the scheduleresume_repeatable(...)recomputes the next runremove_repeatable(...)deletes the durable definition
The dashboard uses the same backend-facing model, so these APIs are suitable for future admin tooling as well.
Production Example¶
Code-defined bootstrap schedule plus durable tenant schedules:
import anyio
from asyncmq.queues import Queue
from myapp import tasks # noqa: F401
async def main() -> None:
queue = Queue("maintenance", concurrency=4, scan_interval=2.0)
# Local, code-owned housekeeping schedule.
queue.add_repeatable("maintenance.cleanup_tmp", every=300)
# Durable, operator-visible schedule.
await queue.upsert_repeatable(
"maintenance.rotate_keys",
cron="0 2 * * 0",
kwargs={"scope": "prod"},
retries=5,
)
await queue.run()
anyio.run(main)
Failure and Recovery Behavior¶
Delayed and repeatable scheduling are restart-safe only to the extent the backend persists the underlying metadata.
Practical expectations:
- Redis and Postgres preserve durable schedules and delayed jobs well
- MongoDB preserves schedules but uses process-local coordination
- in-memory loses everything on process exit
- RabbitMQ durability depends on the broker plus the chosen metadata store
If a worker restarts:
- delayed jobs remain delayed in durable backends until scanned again
- durable repeatables are rediscovered from backend storage
- local repeatables must be re-registered by application code
Tuning¶
Use scan_interval to control how often scheduling loops poll:
Lower values:
- reduce delay between a due timestamp and actual enqueue
- increase backend polling and operational noise
Higher values:
- reduce backend churn
- increase scheduling latency
For most workloads, a small single-digit number of seconds is a reasonable starting point.
Debugging Checklist¶
If delayed or repeatable jobs are not appearing:
- confirm the worker was started with
Queue.run()orrun_worker(...) - check the queue's
scan_interval - confirm the task module is imported
- list durable schedules with
queue.list_repeatables() - check backend capabilities for coordination guarantees
- verify the queue is not paused
Useful inspection commands:
await queue.get_delayed()
await queue.list_repeatables()
await queue.get_job_counts("waiting", "delayed", "completed", "failed")
BullMQ Mapping¶
| BullMQ | AsyncMQ |
|---|---|
| delayed jobs | delayed scanner plus delay_until job metadata |
| repeatable jobs / job schedulers | Queue.upsert_repeatable(...) |
| local worker-owned schedules | Queue.add_repeatable(...) |
| remove repeatable | Queue.remove_repeatable(...) |
| pause/resume repeatable | Queue.pause_repeatable(...) / Queue.resume_repeatable(...) |
AsyncMQ intentionally separates schedule generation from worker execution loops instead of depending on Redis scheduler scripts.
Common Mistakes¶
- Using
add_repeatable(...)for production schedules that must survive restart. - Forgetting that
scan_intervalaffects delayed and repeatable latency. - Running multiple workers on a backend with only process-local locking and expecting cluster-wide schedule ownership.
- Treating repeatable jobs as a separate job type rather than a generator of normal jobs.