Flows¶
Flows model job dependencies. They let you describe "run B after A" or "run D after B and C" without moving orchestration logic into ad hoc task code.
AsyncMQ's flow API is centered on FlowProducer plus job-level
depends_on metadata.
What Problem Flows Solve¶
Use flows when correctness depends on ordering, not just convenience.
Typical cases:
- ETL pipelines
- fan-out / fan-in workloads
- document generation after several preprocessing steps
- multi-stage webhooks where later tasks should not run until earlier ones finish
If independent jobs can safely run in any order, do not use flows just for organization. Keep them separate and let the queue maximize throughput.
Basic Example¶
from asyncmq.flow import FlowProducer
from asyncmq.jobs import Job
producer = FlowProducer(backend)
extract = Job(task_id="etl.extract", args=[], kwargs={})
transform = Job(task_id="etl.transform", args=[], kwargs={}, depends_on=[extract.id])
load = Job(task_id="etl.load", args=[], kwargs={}, depends_on=[transform.id])
ids = await producer.add_flow("etl", [extract, transform, load])
Fan-Out / Fan-In Example¶
from asyncmq.flow import FlowProducer
from asyncmq.jobs import Job
producer = FlowProducer(backend)
ingest = Job(task_id="media.ingest", args=["asset-42"], kwargs={})
thumbnail = Job(task_id="media.thumbnail", args=["asset-42"], kwargs={}, depends_on=[ingest.id])
transcode = Job(task_id="media.transcode", args=["asset-42"], kwargs={}, depends_on=[ingest.id])
publish = Job(
task_id="media.publish",
args=["asset-42"],
kwargs={},
depends_on=[thumbnail.id, transcode.id],
)
await producer.add_flow("media", [ingest, thumbnail, transcode, publish])
Here:
thumbnailandtranscodewait foringestpublishwaits for both branches
How add_flow() Works¶
FlowProducer.add_flow(queue, jobs) does four things:
- serializes every
Jobinto the durable payload shape - derives dependency edges as
(parent_id, child_id)pairs - tries backend-native
atomic_add_flow(...) - falls back to sequential enqueue plus dependency registration when the backend does not support an atomic flow write
That gives AsyncMQ one API surface across several backend capabilities.
Atomic vs Fallback Behavior¶
If the backend implements atomic_add_flow(...), the entire graph is written
in one backend operation.
If it does not, AsyncMQ falls back to a safe sequential path:
- root jobs are enqueued first
- child dependencies are registered
- dependent jobs are then stored
Fallback mode is still correct for runtime execution, but it is not fully atomic. If a failure happens halfway through flow creation, you may have a partially created graph.
Production advice:
- keep flow creation idempotent
- use stable
job_idvalues when repeat submissions are possible - prefer Redis or Postgres if atomic graph creation matters operationally
Runtime Dependency Resolution¶
At execution time, workers enforce dependencies in a backend-neutral way.
If a dequeued job still has unresolved parents:
- it is not executed
- it is moved back to
delayedbriefly - it remains visible through
waiting-childreninspection APIs
When a parent completes, the backend removes that parent id from each affected child. Once the last unresolved parent is removed, the child becomes runnable.
This means flow correctness does not depend solely on producer-side setup. Workers still protect execution ordering.
Failure Semantics¶
AsyncMQ flow behavior is intentionally simple and explicit:
- child jobs run only after all parents complete
- failed parents do not automatically trigger descendant failure
- descendants remain blocked until the parent is retried, repaired, or removed
This is an important operational difference from BullMQ's broader flow ecosystem. AsyncMQ focuses on dependency gating rather than a large family of parent-state policy options.
If you want cascade failure semantics, model them in one of these ways:
- a supervisor task that inspects parent state and removes blocked descendants
- explicit operator workflows using queue inspection and retry/remove actions
- application-level compensation logic
Inspecting Flow State¶
The queue inspection APIs are the main way to inspect flow progress:
waiting_children = await queue.get_waiting_children()
children_page = await queue.get_jobs(["waiting-children"], start=0, end=49)
parent_state = await queue.get_job_state(parent_id)
parent_job = await queue.get_job(parent_id)
Useful patterns:
- look at
waiting-childrento find blocked descendants - inspect parent
failedorcompletedstate directly - use
get_job_counts(...)to monitor how much work is blocked on dependencies
Backend Notes¶
All built-in backends support dependency-aware execution, but they do not all offer the same write-time guarantees.
- Redis and Postgres are the strongest choices when you want more robust flow creation semantics and broader coordination guarantees
- MongoDB and in-memory keep the behavior portable but do not pretend to offer cross-process atomicity they do not have
- RabbitMQ flow behavior depends on its metadata store for dependency state
BullMQ Mapping¶
| BullMQ | AsyncMQ |
|---|---|
FlowProducer |
FlowProducer |
| parent/child dependencies | Job(..., depends_on=[...]) |
| waiting children inspection | queue.get_waiting_children() |
| flow creation | await flow_producer.add_flow(queue, jobs) |
AsyncMQ preserves the practical DAG creation model while keeping storage and dependency bookkeeping portable across backends.
Common Mistakes¶
- Using flows for loosely related jobs that could just run independently.
- Assuming flow creation is atomic on every backend.
- Assuming children will fail automatically when a parent fails.
- Forgetting to inspect
waiting-childrenduring incidents.