Workers & Job Processing¶
Workers are the engine room of AsyncMQ—they breathe life into tasks by dequeuing jobs, enforcing limits, handling errors,
and emitting events at every stage. Buckle up for a deep dive into the process_job
loop, rate limiting,
concurrency control, lifecycle management, and the Worker
class helper.
1. The process_job
Loop¶
async def process_job(
queue_name: str,
limiter: CapacityLimiter,
rate_limiter: RateLimiter | None = None,
backend: BaseBackend | None = None,
) -> None:
...
- Concurrency: Controlled by a
CapacityLimiter
(semaphore-like). - Rate Limiting: Optional
RateLimiter
(token-bucket) caps throughput. - Pause Support: Respects
backend.is_queue_paused()
. - Task Group: Spawns handlers asynchronously, never blocking the loop.
Fun
Without the small sleep on empty dequeue, this loop would spin like a fidget spinner—CPU-intensive and dizzying.
2. Applying Limits: _run_with_limits
¶
limiter
: OnlyN
jobs run in parallel.rate_limiter
: If set, waits for token before each job.- Delegation: Calls
handle_job
for full lifecycle management.
3. Lifecycle Management: handle_job
¶
async def handle_job(
queue_name: str,
raw_job: dict[str, Any],
backend: BaseBackend | None = None,
) -> None:
...
Key Points¶
- Cancellation: Early exit if job cancelled, with
job:cancelled
event. - Expiration (TTL): Jobs past TTL go to DLQ,
job:expired
event. - Delay Handling: Respects
delay_until
, requeues without firing. - Events: Emits
job:started
,job:completed
,job:failed
,job:expired
,job:cancelled
. - Retries: Honors
max_retries
, schedules retry with backoff vianext_retry_delay()
. - Sandbox Integration: Calls
sandbox.run_handler
if enabled, else direct/async call.
Tip
Listen to events via event_emitter.on("job:failed", ...)
for alerting or custom fallback logic.
4. The Worker
Helper Class¶
class Worker:
def __init__(self, queue: Queue) -> None: ...
def start(self): ...
def stop(self): ...
- Purpose: Encapsulates worker startup & graceful shutdown.
- CancelScope: Allows
.stop()
to cleanly cancel processing. - Defaults: Uses
settings.worker_concurrency
,settings.rate_limit
, andsettings.rate_interval
.
Tip
Use Worker(q).start()
in scripts or container entrypoints, and .stop()
on application shutdown.
5. Advanced Patterns & Tuning¶
- Dynamic Scaling: Change
settings.worker_concurrency
and restart workers for elastic scaling. - Burst Control: Combine a high
concurrency
with a lowrate_limit
for bursts capped by API quotas. - Custom Events: Subscribe to
job:*
events to integrate with tracing or metrics.
6. Testing & Best Practices¶
- Unit Tests: Swap
backend
toInMemoryBackend
and simulate a handful of jobs; assert job states. - Simulate Failures: Write a task that raises, check DLQ routing and retry counts.
- Pause/Resume: Test
is_queue_paused()
logic by toggling pause state in backend mocks.
Tip
Use pytest-timeout
to guard against hanging loops during tests.
7. Common Pitfalls & FAQs¶
- Silent Failures: Missing
await
onprocess_job
orWorker.start()
results in no work. - Overly Aggressive Sleeping: Long sleep on
None
dequeue inflates latency; adjust0.1
value thoughtfully. - Event Flooding:
job:progress
orjob:started
events can flood if not throttled—unsubscribe or filter in handlers.
With the worker framework mastered, you can pipeline tens of thousands of jobs, handle errors gracefully, and keep an observant eye on every lifecycle event all while maintaining a cheeky grin. 😎