Advanced Patterns¶
Welcome to the secret sauce of AsyncMQ where we go beyond the basics into Custom Backends, DAG Orchestration, and Kubernetes Scaling.
If you thought AsyncMQ was cool before, strap in: it’s about to get wild.
1. Custom Backends¶
Sometimes Redis, Postgres, or MongoDB just don’t cut it. Maybe you need to push tasks into an SQS queue, or write to Kafka, or integrate with a proprietary message bus. Here’s how:
1.1. Understand BaseBackend
¶
All AsyncMQ backends implement the BaseBackend
protocol (in asyncmq.backends.base
), which defines methods such as:
async def enqueue(self, queue: str, job_dict: dict) -> None: ...
async def enqueue_delayed(self, queue: str, job_dict: dict, run_at: float) -> None: ...
async def dequeue(self, queue: str, timeout: int) -> dict | None: ...
async def add_dependencies(self, queue: str, job_dict: dict) -> None: ...
async def get_jobs(self, queue: str, state: State) -> list[dict]: ...
# plus DLQ, ack, cancel, pause/resume methods and a lot more
1.2. Implementing Your Backend¶
import json
from typing import Optional
import aiobotocore.session
from asyncmq.backends.base import BaseBackend
class SQSBackend(BaseBackend):
def __init__(self, queue_url: str):
session = aiobotocore.session.get_session()
self.client = session.create_client('sqs')
self.queue_url = queue_url
async def enqueue(self, queue: str, job_dict: dict) -> None:
await self.client.send_message(
QueueUrl=self.queue_url,
MessageBody=json.dumps(job_dict),
)
async def dequeue(self, queue: str, timeout: int) -> Optional[dict]:
resp = await self.client.receive_message(
QueueUrl=self.queue_url,
WaitTimeSeconds=timeout,
MaxNumberOfMessages=1,
)
msgs = resp.get('Messages')
if not msgs:
return None
msg = msgs[0]
body = json.loads(msg['Body'])
# Acknowledge
await self.client.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=msg['ReceiptHandle'],
)
return body
You must implement the remaining methods: enqueue_delayed
(using SQS DelaySeconds), add_dependencies
(store in DynamoDB?), move_to_dlq, get_jobs (DynamoDB scan?), etc.
Everything is in the BaseBackend
and BaseStore
to follow.
Why bother?
- Leverage existing infrastructure (SQS, Kafka...)
- Meet compliance requirements
- Offload scaling concerns to a managed service
Tip
If your queuing system doesn’t support delayed messages natively, emulate delays by storing messages in a secondary store (Redis sorted set, DynamoDB TTL) and poll for ready items in a custom scanner.
2. DAG Orchestration with FlowProducer
¶
Chaining tasks into multi-step pipelines, no more ad-hoc glue scripts.
2.1. Building a DAG¶
from asyncmq.backends.redis import RedisBackend
from asyncmq.flow import FlowProducer
from asyncmq.jobs import Job
from asyncmq.logging import logger
backend = RedisBackend(redis_url="redis://localhost:6379/0")
flow = FlowProducer(backend=backend)
# Define jobs
job_ingest = Job(task_id="app.ingest_data", args=["s3://bucket/file.csv"], kwargs={})
job_transform = Job(
task_id="app.transform",
args=[],
kwargs={"threshold": 0.75},
depends_on=[job_ingest.id],
)
job_load = Job(
task_id="app.load_to_db", args=["db_table"], kwargs={},
depends_on=[job_transform.id]
)
# Enqueue the entire flow atomically if supported
job_ids = await flow.add_flow("data_pipeline", [job_ingest, job_transform, job_load])
logger.info(f"Enqueued pipeline jobs: {job_ids}")
2.2. Conditional and Parallel Steps¶
Use depends_on
lists to fan-out parallel tasks or gate steps based on multiple dependencies.
# Two parallel transforms after ingestion
job_transform_a = Job(..., depends_on=[job_ingest.id])
job_transform_b = Job(..., depends_on=[job_ingest.id])
# Final step waits for both
job_aggregate = Job(
..., depends_on=[job_transform_a.id, job_transform_b.id]
)
await flow.add_flow("parallel_pipeline", [job_ingest, job_transform_a, job_transform_b, job_aggregate])
2.3. Repeatable and Cron Jobs¶
# Run cleanup every day at midnight
from croniter import croniter
job_cleanup = Job(
task_id="app.cleanup_temp",
args=[],
cron="0 0 * * *"
)
await flow.add_flow("maintenance", [job_cleanup])
Note
Inside scoop: FlowProducer
uses croniter
to calculate next run times and schedules them via the delayed scanner.
Bonus: custom backoff and jitter can be injected if you need resilience.
3. Kubernetes Scaling¶
Your AsyncMQ workers can scale elegantly in Kubernetes. Let’s break down a typical setup:
3.1. Redis Deployment¶
Use the Bitnami Redis Helm chart or an equivalent managed service:
# redis-deployment.yaml\ napiVersion: apps/v1
kind: Deployment
metadata:
name: redis
spec:
replicas: 1
template:
spec:
containers:
- name: redis
image: redis:7-alpine
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256Mi
3.2. Worker Deployment¶
# asyncmq-worker.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: asyncmq-worker
spec:
replicas: 3 # start with 3 workers
selector:
matchLabels:
app: asyncmq-worker
template:
metadata:
labels:
app: asyncmq-worker
spec:
containers:
- name: worker
image: myrepo/asyncmq-worker:latest
env:
- name: ASYNCMQ_SETTINGS_MODULE
value: "myapp.settings.Settings"
args: ["worker", "start", "default"]
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: 1
memory: 512Mi
readinessProbe:
exec:
command: ["/bin/sh", "-c", "asyncmq job list --queue default --state waiting"]
initialDelaySeconds: 10
periodSeconds: 15
3.3. Autoscaling with KEDA¶
Use KEDA to autoscale based on Redis list length:
# keda-scaledobject.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: asyncmq-worker-scaledobject
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: asyncmq-worker
triggers:
- type: redis
metadata:
address: "redis://redis:6379"
listName: "default:jobs"
listLength: "10" # scale up if >10 jobs waiting
3.4. Tips & Gotchas¶
- Resource Tuning: Keep your CPU and memory requests small for event loop tasks; avoid OOM kills.
- Probe wisely: Readiness should check that the worker process is up, not that jobs exist—otherwise scaling may misinterpret an empty queue as unhealthy.
- Secrets: Store Redis credentials in Kubernetes Secrets; mount as env vars or files.
- Logging & Monitoring: Combine
event_emitter
hooks with Prometheus or ELK stack for real-time insights.
Joke
Final joke: If your cluster autoscaler has better reflexes than you do on Monday mornings, you’re doing it right.
That’s a wrap on Advanced Patterns! Up next: Performance Tuning & Benchmarking We’ll squeeze every last millisecond out of AsyncMQ.