Workers & Job Processing¶
Workers are the engine room of AsyncMQ—they breathe life into tasks by dequeuing jobs, enforcing limits, handling errors,
and emitting events at every stage. Buckle up for a deep dive into the process_job loop, rate limiting,
concurrency control, lifecycle management, and the Worker class helper.
1. The process_job Loop¶
async def process_job(
queue_name: str,
limiter: CapacityLimiter,
rate_limiter: RateLimiter | None = None,
backend: BaseBackend | None = None,
) -> None:
...
- Concurrency: Controlled by a
CapacityLimiter(semaphore-like). - Rate Limiting: Optional
RateLimiter(token-bucket) caps throughput. - Pause Support: Respects
backend.is_queue_paused(). - Task Group: Spawns handlers asynchronously, never blocking the loop.
Fun fact
Without the small sleep on empty dequeue, this loop would spin like a fidget spinner—CPU-intensive and dizzying.
2. Applying Limits: _run_with_limits¶
limiter: OnlyNjobs run in parallel.rate_limiter: If set, waits for token before each job.- Delegation: Calls
handle_jobfor full lifecycle management.
3. Lifecycle Management: handle_job¶
async def handle_job(
queue_name: str,
raw_job: dict[str, Any],
backend: BaseBackend | None = None,
) -> None:
...
Key Points¶
- Cancellation: Early exit if job cancelled, with
job:cancelledevent. - Expiration (TTL): Jobs past TTL go to DLQ,
job:expiredevent. - Delay Handling: Respects
delay_until, requeues without firing. - Events: Emits
job:started,job:completed,job:failed,job:expired,job:cancelled. - Retries: Honors
max_retries, schedules retry with backoff vianext_retry_delay(). - Sandbox Integration: Calls
sandbox.run_handlerif enabled, else direct/async call.
Tip
Listen to events via event_emitter.on("job:failed", ...) for alerting or custom fallback logic.
4. The Worker Helper Class¶
class Worker:
def __init__(self, queue: Queue) -> None: ...
def start(self): ...
def stop(self): ...
- Purpose: Encapsulates worker startup & graceful shutdown.
- Automatic Task Discovery: Before registering and processing any jobs, the worker runs
autodiscover_tasks()(which imports all modules in your custom tasks declared in the settings so your@task(...)handlers are registered). - CancelScope: Allows
.stop()to cleanly cancel processing. - Defaults: Uses
settings.worker_concurrency,settings.rate_limit, andsettings.rate_interval.
Tip
Use Worker(q).start() in scripts or container entrypoints, and .stop() on application shutdown.
5. Advanced Patterns & Tuning¶
- Dynamic Scaling: Change
settings.worker_concurrencyand restart workers for elastic scaling. - Burst Control: Combine a high
concurrencywith a lowrate_limitfor bursts capped by API quotas. - Custom Events: Subscribe to
job:*events to integrate with tracing or metrics.
6. Testing & Best Practices¶
- Unit Tests: Swap
backendtoInMemoryBackendand simulate a handful of jobs; assert job states. - Simulate Failures: Write a task that raises, check DLQ routing and retry counts.
- Pause/Resume: Test
is_queue_paused()logic by toggling pause state in backend mocks.
Tip
Use pytest-timeout to guard against hanging loops during tests.
7. Common Pitfalls & FAQs¶
- Silent Failures: Missing
awaitonprocess_joborWorker.start()results in no work. - Overly Aggressive Sleeping: Long sleep on
Nonedequeue inflates latency; adjust0.1value thoughtfully. - Event Flooding:
job:progressorjob:startedevents can flood if not throttled—unsubscribe or filter in handlers.
8. Worker Lifecycle Hooks¶
AsyncMQ lets you define lifecycle hooks for workers, special functions that run automatically when a worker starts up or shuts down.
They're ideal for tasks like connecting to databases, warming caches, or closing resources gracefully.
Declaring Hooks¶
You can declare them directly in your AsyncMQ settings file:
from asyncmq.protocols.lifespan import Lifespan
async def on_startup(backend, worker_id, queue):
print(f"Worker {worker_id} started for queue: {queue}")
# Example: await backend.connect_pool()
async def on_shutdown(backend, worker_id, queue):
print(f"Worker {worker_id} shutting down for queue: {queue}")
# Example: await backend.close_pool()
worker_on_startup = [on_startup]
worker_on_shutdown = [on_shutdown]
Hook Rules¶
- Hooks can be sync or async. AsyncMQ detects and awaits them automatically.
- You can define a single function or a list/tuple of multiple hooks.
- Hooks receive the following context arguments:
- backend: the active backend instance
- worker_id: unique worker UUID
- queue: queue name the worker is processing
Example — Synchronous Hooks¶
def warm_cache(backend, worker_id, queue):
print("Warming cache for", queue)
def log_shutdown(backend, worker_id, queue):
print("Flushing logs for", worker_id)
worker_on_startup = [warm_cache]
worker_on_shutdown = [log_shutdown]
Reusable Hook Modules¶
For complex setups, it's a good practice to organize hooks in a reusable module (e.g.: lifecycle.py or whatever you want to call):
async def connect_db(backend, worker_id, queue):
await backend.connect_pool()
async def disconnect_db(backend, worker_id, queue):
await backend.close_pool()
Then in your settings:
from myproject import lifecycle
worker_on_startup = [lifecycle.connect_db]
worker_on_shutdown = [lifecycle.disconnect_db]
Behind the Scenes¶
Under the hood, AsyncMQ uses the same lifecycle engine across the platform: * run_hooks() — executes hooks during startup, propagates exceptions. * run_hooks_safely() — executes hooks during shutdown, swallows exceptions by default.
This ensures that startup failures are visible, while shutdown always cleans up gracefully.
By leveraging lifecycle hooks, you make your workers state-aware, resource-conscious, and self-cleaning. A professional touch that turns simple background jobs into a resilient orchestration layer. 🚀
With the worker framework mastered, you can pipeline tens of thousands of jobs, handle errors gracefully, and keep an observant eye on every lifecycle event all while maintaining a cheeky grin. 😎