BullMQ Migration Guide¶
This guide is for teams that already know BullMQ and want to move to AsyncMQ without losing the operational model they rely on.
The short version:
- most day-to-day concepts map cleanly
- AsyncMQ keeps the model async-first and Python-native
- AsyncMQ avoids Redis lock-in by expressing capabilities through backends
Concept Mapping¶
| BullMQ concept | AsyncMQ equivalent |
|---|---|
Queue |
asyncmq.queues.Queue |
Worker |
Queue.run(), run_worker(...), or Worker.run() |
QueueScheduler |
delayed scanner + repeatable scheduler + optional stalled recovery |
FlowProducer |
asyncmq.flow.FlowProducer |
jobId |
job_id= on Queue.add(...) |
| deduplication / debounce | deduplication={...} or debounce={...} |
| repeatable job scheduler | Queue.upsert_repeatable(...) |
| queue getters / counts / clean / drain / obliterate | AsyncMQ queue inspection/admin APIs |
Producer Migration¶
BullMQ:
AsyncMQ:
await queue.add(
"myapp.tasks.send_email",
kwargs={"email": "alice@example.com"},
job_id="welcome-alice",
)
The practical semantics are the same:
- if a non-removed job with that id already exists in the queue
- AsyncMQ returns the existing id and does not enqueue a duplicate
Bulk Producer Migration¶
BullMQ:
await queue.addBulk([
{ name: "sync-a", data: { id: 1 }, opts: { jobId: "sync-1" } },
{ name: "sync-b", data: { id: 2 } },
]);
AsyncMQ:
await queue.add_bulk(
[
{"task_id": "sync.a", "kwargs": {"id": 1}, "jobId": "sync-1"},
{"task_id": "sync.b", "kwargs": {"id": 2}},
]
)
AsyncMQ accepts both jobId and job_id in bulk payloads to make migration
easier.
Deduplication Migration¶
BullMQ:
AsyncMQ:
Notes:
- AsyncMQ uses seconds for timing values in Python APIs
- deduplication is stored on job metadata, not in Redis-only side keys
- the practical behavior is still simple deduplication, throttle windows, and debounce replacement
Worker Migration¶
BullMQ usually looks like this:
In AsyncMQ the handler is defined separately and workers consume by queue:
from asyncmq.tasks import task
@task(queue="emails", retries=3)
async def send_email(email: str) -> None:
...
This separation is the biggest ergonomic difference:
- BullMQ often passes the processor inline to the worker
- AsyncMQ resolves the handler through the Python task registry
Scheduler Migration¶
BullMQ users often think in terms of QueueScheduler. AsyncMQ splits that role
into explicit loops:
- delayed job scanner
- repeatable scheduler
- optional stalled recovery scheduler
For most users, Queue.run() is enough because it starts delayed and
repeatable scheduling automatically.
If you enable stalled recovery, also run:
Repeatable Job Migration¶
BullMQ repeatables map to durable AsyncMQ repeatables:
await queue.upsert_repeatable(
"reports.generate_daily",
cron="0 6 * * *",
kwargs={"tenant": "acme"},
)
AsyncMQ also has a local-only helper:
Use add_repeatable(...) only when the worker process itself is the intended
source of truth for that schedule.
Flow Migration¶
BullMQ's FlowProducer maps directly in spirit:
from asyncmq.flow import FlowProducer
from asyncmq.jobs import Job
producer = FlowProducer(backend)
parent = Job(task_id="etl.extract", args=[], kwargs={})
child = Job(task_id="etl.transform", args=[], kwargs={}, depends_on=[parent.id])
await producer.add_flow("etl", [parent, child])
Important behavioral note:
- AsyncMQ supports dependency gating and
waiting-childreninspection - AsyncMQ does not try to clone every BullMQ flow-specific failure policy API
If your BullMQ usage depends on advanced parent-state policies, model those policies explicitly in application logic or operator workflows.
Queue Inspection Migration¶
Common mappings:
await queue.get_job(job_id)
await queue.get_jobs(["waiting", "delayed"], start=0, end=49)
await queue.get_job_counts("waiting", "active", "completed", "failed")
await queue.clean_jobs(grace=3600, limit=1000, state="completed")
await queue.drain(include_delayed=True)
await queue.obliterate(force=True)
Intentional difference:
pausedandprioritizedare inspection buckets for parity, not separate persisted backend buckets
Events and Telemetry¶
BullMQ users may expect Redis-stream-based QueueEvents. AsyncMQ takes a
different route:
- local lifecycle events through the event emitter
- dashboard-oriented telemetry and SSE surfaces
- backend broadcast hooks where supported
This is one of the largest intentional differences because AsyncMQ is not built around Redis streams as the universal event substrate.
Operational Differences to Expect¶
Naming¶
- BullMQ: camelCase-heavy JavaScript surface
- AsyncMQ: Pythonic snake_case surface
Timing units¶
- BullMQ commonly uses milliseconds
- AsyncMQ Python APIs use seconds
Backend model¶
- BullMQ is Redis-native
- AsyncMQ is backend-agnostic and lets coordination quality vary by backend
Queue scheduler model¶
- BullMQ historically centralized more behavior in Redis scheduler concepts
- AsyncMQ exposes explicit runtime loops and backend capability contracts
Migration Checklist¶
- replace inline BullMQ processors with
@task(queue=...)handlers - translate
jobIdtojob_id - convert timing values from milliseconds to seconds
- migrate repeatables to
upsert_repeatable(...) - review any flow logic that depends on advanced parent-failure policies
- choose the AsyncMQ backend that matches your coordination requirements
- wire delayed, repeatable, and stalled-recovery loops appropriately
- validate queue inspection and dashboard workflows in staging