Core Concepts¶
Objects You Work With¶
@task: registers a callable inTASK_REGISTRYand adds.enqueue()/.delay()/.send().Job: serializable unit of work (task,args,kwargs, retries, ttl, status, etc.).Queue: high-level API bound to a queue name and backend.Worker/runner functions: consume jobs and execute handlers.BaseBackend: persistence and coordination contract.
End-to-End Flow¶
- Producer calls
my_task.enqueue(..., backend=...). - AsyncMQ creates a
Joband serializes it (Job.to_dict()). - Backend stores the job as
waiting(ordelayed). - Worker dequeues, runs
handle_job, and resolves task function fromTASK_REGISTRY. - Worker updates state/result, retries or DLQ routing, and acknowledges via backend
ack.
sequenceDiagram
participant P as Producer
participant Q as Queue/Backend
participant W as Worker
participant T as Task Handler
P->>Q: enqueue(job payload)
W->>Q: dequeue()
W->>W: state=active, checks(deps/cancel/ttl)
W->>T: execute(task)
alt success
W->>Q: state=completed + save result + ack
else failure(retries left)
W->>Q: state=delayed + enqueue_delayed + ack
else failure(retries exhausted)
W->>Q: state=failed + move_to_dlq + ack
end
Job States¶
State values in runtime:
waitingactivecompletedfaileddelayedexpired
Typical transitions:
waiting -> active -> completedwaiting -> active -> delayed(retry with backoff)waiting -> active -> failed(retries exhausted)waiting/active -> expired(TTL exceeded)
Delayed and Repeatable Work¶
- Delayed jobs are scheduled through
enqueue_delayedand moved back to the main queue bydelayed_job_scanner. - Local repeatables are registered with
Queue.add_repeatable(...), while durable backend-managed schedules useQueue.upsert_repeatable(...); both are consumed byrepeatable_schedulerwhen that queue's worker runs.
Dependencies and Flows¶
- Jobs may include
depends_onIDs. - Worker checks parent states before execution; unresolved dependencies are re-delayed briefly.
FlowProducer.add_flow()writes dependency graphs, using backend atomic flow support when available.
Stalled Recovery¶
If enable_stalled_check=True, workers write heartbeats for active jobs.
Recovery loop is provided by asyncmq.core.stalled.stalled_recovery_scheduler(...) and must be started by your application if you want automatic re-enqueue of stale active jobs.