Jobs¶
asyncmq.jobs.Job is AsyncMQ's durable unit of work. Producers create jobs,
backends persist them, workers execute them, and admin APIs inspect them.
If you understand the Job payload, you understand most of AsyncMQ's public
runtime contract.
What a Job Contains¶
Job(
task_id: str,
args: list[Any],
kwargs: dict[str, Any],
retries: int = 0,
max_retries: int = 3,
backoff: float | int | Callable | None = 1.5,
ttl: int | None = None,
job_id: str | None = None,
created_at: float | None = None,
priority: int = 5,
repeat_every: float | int | None = None,
depends_on: list[str] | None = None,
deduplication: dict[str, Any] | None = None,
)
The most important fields are:
task_id: the registry key that resolves to the actual handler function.argsandkwargs: the serialized call payload.job_id: the stable identifier stored asidin the payload.retriesandmax_retries: attempt counters used by the worker.backoff: how the next retry delay is computed after a failure.ttl: staleness limit measured fromcreated_at.priority: waiting-queue ordering metadata.delay_until: runtime scheduling field used for delayed jobs and retries.depends_on: parent job ids that must complete before this job can run.deduplication: producer-side metadata for BullMQ-style deduplication, debounce, and throttle behavior.
Lifecycle Semantics¶
The default status of a newly created job is waiting.
Typical transitions are:
waiting -> active -> completedwaiting -> active -> delayed -> active -> completedwaiting -> active -> failedwaiting -> delayed -> waiting -> activewaiting-children -> waiting -> activewaiting/active -> expired
AsyncMQ does not materialize every inspection bucket as a separate persisted state. In particular:
waiting-childrenis derived from unresolveddepends_onmetadata.pausedis queue-level control, not a job-level persisted state.prioritizedis ordering metadata on waiting jobs, not a separate job bucket.
That design keeps the model portable across Redis, Postgres, MongoDB, RabbitMQ metadata stores, and in-memory backends.
Retries and Backoff¶
When a handler raises an exception:
- the traceback is stored on the job payload
retriesis incremented- the worker computes
next_retry_delay() - the job is re-enqueued into the delayed queue if attempts remain
- otherwise the job is marked
failedand moved to the DLQ
Backoff supports several modes:
- numeric exponential base:
1.5,2,3 - callable taking the current retry count
- callable taking no arguments
Nonefor immediate retry
Example:
from asyncmq.tasks import task
def bounded_backoff(attempt: int) -> float:
return min(60.0, 2.0**attempt)
@task(queue="billing", retries=5, backoff=bounded_backoff)
async def charge_invoice(invoice_id: str) -> None:
...
TTL and Expiration¶
ttl is measured from created_at, not from the moment a worker first sees
the job.
That means TTL protects you from stale work such as:
- confirmation emails that no longer matter
- rebuild jobs superseded by newer requests
- time-sensitive downstream calls
If a job has already expired when the worker evaluates it:
- the status becomes
expired - the job is acknowledged
job:expiredis emitted- the payload is moved to the DLQ
Use TTL when executing stale work is worse than dropping it.
Delays, Priority, and Scheduling Metadata¶
AsyncMQ stores delayed execution directly on the job payload through
delay_until.
This field is used for:
- producer-created delayed jobs
- retry backoff
- debounce replacement windows
Priority is separate from delay:
- delay controls when a job becomes eligible
- priority controls which eligible waiting job should be consumed first
BullMQ users should think of this as the same practical model, expressed as a backend-neutral Python payload instead of Redis-only state structures.
Dependencies and waiting-children¶
Jobs may declare depends_on=[parent_job_id, ...].
Workers will not execute the child until all parents have completed. While dependencies remain unresolved:
- the job appears in
waiting-childreninspection views - runtime gating keeps it from executing
- completion of a parent removes that parent id from the child payload
Example:
from asyncmq.flow import FlowProducer
from asyncmq.jobs import Job
extract = Job(task_id="etl.extract", args=[], kwargs={})
transform = Job(task_id="etl.transform", args=[], kwargs={}, depends_on=[extract.id])
load = Job(task_id="etl.load", args=[], kwargs={}, depends_on=[transform.id])
Important operational detail:
- children are released by parent completion
- a permanently failed parent does not automatically fail descendants
- blocked descendants remain inspectable until you retry, remove, or repair the parent
That is simpler than BullMQ's richer flow-specific failure policies, but it is explicit and backend-neutral.
job_id and Deduplication¶
job_id and deduplication solve different problems:
job_idprevents a duplicate record for one exact queue-scoped identitydeduplicationprevents logically equivalent work from being created again
Example:
job_id = await queue.add(
"search.rebuild",
kwargs={"scope": "products"},
job_id="rebuild-products",
deduplication={"id": "search:products", "ttl": 30.0},
)
Use job_id when an operator or API needs to refer to one specific job.
Use deduplication when several producers may request the same logical work.
See Deduplication for mode-specific behavior.
Serialization Contract¶
Job.to_dict() and Job.from_dict() define the payload shape that custom
backends, admin tools, and external integrations should expect.
Important keys:
id: stable job identifiertask: task registry idargsandkwargs: serialized call argumentsstatus: current runtime statedelay_until: next eligible execution timecreated_at: creation timestampresult: stored completion result if availablelast_erroranderror_traceback: failure metadatadepends_on: unresolved parent idsdeduplication: active deduplication metadata when present
If you implement a custom backend, preserve unknown keys rather than filtering them out. That keeps forward compatibility with newer runtime metadata.
Production Advice¶
- Keep payloads JSON-serializable and stable across deploys.
- Keep job arguments small; store large blobs externally and pass references.
- Make handlers idempotent because retries, retries after restarts, and manual replays are normal.
- Use explicit
max_retriesand bounded backoff for external systems. - Use TTL for work that becomes harmful when stale.
- Use queue inspection APIs to clean completed and failed jobs instead of relying on backend-specific manual deletes.
Common Mistakes¶
- Treating
ttlas a worker timeout. It is a staleness window, not a hard execution deadline. - Using huge payloads that make backends and dashboards expensive to operate.
- Depending on
waiting-childrenjobs to fail automatically when a parent fails. - Forgetting that
retriesis attempt history, not remaining attempts.