Why this matters
AI automations break when incoming webhooks aren’t verified, idempotent, or buffered. Vendors (OpenAI, Slack, Stripe, HubSpot, Notion) will retry, reorder, and burst traffic. You need a hardened ingestion layer that can authenticate requests, dedupe events, queue downstream work, and expose safe status back to WordPress.
What we’ll build
– A Django webhook gateway with:
– HMAC signature verification
– Request schema validation
– Idempotency + dedup
– Async dispatch via Celery/Redis
– Dead-letter and retry with jitter
– Tenant scoping and secret rotation
– Structured logging and metrics
– A WordPress integration that:
– Receives event summaries via authenticated REST
– Triggers follow-up actions without blocking vendors
– Displays job status to editors securely
Reference architecture
– Internet → Nginx/ALB → Django /webhooks/* (stateless)
– Validation + signature check → persist event envelope (Postgres)
– Put work on Celery queue (Redis/RabbitMQ)
– Workers call AI/CRM APIs → write results → optional WP callback
– Observability: OpenTelemetry + Prometheus + Sentry
– Backpressure: queue length alerts + vendor 429 handling
– Storage: events (immutable), jobs (state machine), results (artifact links)
Core models (Django)
– WebhookEvent(id, vendor, external_id, received_at, signature_ok, payload_hash, status)
– Job(id, event_id, type, state[pending|running|succeeded|failed|deadletter], attempts, next_run_at, last_error)
– Result(job_id, summary, artifacts_url, wp_post_id)
Signature verification (example: HMAC SHA-256)
– Store vendor secrets per tenant. Rotate with overlap window.
– Reject if timestamp skew > 5 minutes.
Minimal Django view (compressed for clarity)
– Assumes vendor sends headers: X-Signature, X-Timestamp, X-Event-Id.
from django.http import JsonResponse, HttpResponseBadRequest
from django.views.decorators.csrf import csrf_exempt
from django.utils import timezone
import hmac, hashlib, json, time
from .models import WebhookEvent
from .tasks import process_event
from django.db import transaction, IntegrityError
def verify_sig(secret: str, body: bytes, ts: str, sig: str) -> bool:
msg = f”{ts}.{body.decode(‘utf-8’)}”.encode(‘utf-8’)
mac = hmac.new(secret.encode(‘utf-8’), msg, hashlib.sha256).hexdigest()
return hmac.compare_digest(mac, sig)
@csrf_exempt
def webhook_gateway(request, vendor_slug):
if request.method != “POST”:
return HttpResponseBadRequest(“POST only”)
ts = request.headers.get(“X-Timestamp”)
sig = request.headers.get(“X-Signature”)
event_id = request.headers.get(“X-Event-Id”)
if not (ts and sig and event_id):
return HttpResponseBadRequest(“Missing headers”)
if abs(time.time() – int(ts)) > 300:
return HttpResponseBadRequest(“Stale”)
body = request.body
tenant_secret = lookup_secret(vendor_slug) # your secret manager
if not verify_sig(tenant_secret, body, ts, sig):
return HttpResponseBadRequest(“Bad signature”)
try:
payload = json.loads(body)
except json.JSONDecodeError:
return HttpResponseBadRequest(“Bad JSON”)
# Idempotency by vendor+event_id+hash
payload_hash = hashlib.sha256(body).hexdigest()
try:
with transaction.atomic():
evt = WebhookEvent.objects.create(
vendor=vendor_slug,
external_id=event_id,
received_at=timezone.now(),
signature_ok=True,
payload_hash=payload_hash,
status=”queued”,
raw=payload,
)
except IntegrityError:
# Already stored (duplicate retry) → acknowledge 200 to stop vendor retries
return JsonResponse({“ok”: True, “duplicate”: True})
process_event.delay(evt.id) # Celery async
return JsonResponse({“ok”: True})
Schema validation
– Validate incoming JSON early with Pydantic (or Django forms) to enforce required fields and types.
– Store the raw payload plus a normalized, versioned schema for downstream tasks.
from pydantic import BaseModel, Field, ValidationError
class NormalizedEvent(BaseModel):
event_type: str
subject_id: str
occurred_at: str
data: dict = Field(default_factory=dict)
Retry, backoff, and dead-letter (Celery)
– Use exponential backoff with jitter for transient errors (timeouts, 5xx, 429).
– Cap attempts (e.g., max 7). Move to dead-letter queue after cap.
from celery import shared_task
import random, time
@shared_task(bind=True, autoretry_for=(TimeoutError, ConnectionError,),
retry_backoff=True, retry_backoff_max=300, retry_jitter=True, max_retries=7)
def process_event(self, event_id):
evt = WebhookEvent.objects.get(id=event_id)
# Normalize
nevt = normalize(evt.raw) # build NormalizedEvent
# Route by event_type
if nevt.event_type == “assistant.completed”:
job = start_job(evt, “summarize_and_publish”)
try:
summary = generate_summary(nevt.data) # call LLM w/ timeouts + rate limit
wp_id = post_to_wp(summary) # see below
save_result(job, summary, wp_id)
mark_success(evt, job)
except Transient as e:
raise e
except Exception as e:
mark_failed(evt, job, str(e))
raise
Vendor rate limits and circuit-breaking
– Wrap outbound API calls with:
– Client-side rate limiter (token bucket)
– Timeout per call (e.g., 8–15s)
– Retries with backoff on 429/5xx
– Circuit breaker to pause a noisy vendor for a cooldown period
Security essentials
– Verify signatures (HMAC, RSA, or vendor-specific).
– Enforce allowlist paths per vendor. Never multiplex secrets across tenants.
– Limit payload size (e.g., 512 KB) and reject unexpected content types.
– Log only necessary fields; redact PII and secrets before storage.
– Rotate secrets with dual-valid window. Store in AWS Secrets Manager or Vault.
– Use HTTPS only; set strict TLS and HSTS.
Idempotency and ordering
– Deduplicate by (vendor, external_id, payload_hash).
– Design handlers to be idempotent: repeated processing yields same state.
– Do not assume ordering; handle out-of-order updates by comparing occurred_at.
WordPress integration (safe, async)
– Create a minimal WP endpoint (WP REST API) that accepts:
– Bearer JWT signed by Django (short TTL, audience = wp)
– Event summary and artifacts (URLs, not blobs)
– A correlation key (event_id or job_id)
Example WP callback payload
{
“job_id”: “j_123”,
“summary”: “Conversation complete. 3 actions taken.”,
“artifacts”: [{“type”:”file”,”url”:”https://s3/…”}],
“source”: “django-webhook-gw”
}
WP receives, validates JWT, sanitizes content, and:
– Creates/updates a post or post_meta
– Triggers follow-up actions like notifying editors
– Never blocks inbound vendor webhook; all calls are outbound from Django
Minimal WP endpoint (PHP, condensed)
add_action(‘rest_api_init’, function () {
register_rest_route(‘ai-gw/v1’, ‘/ingest’, [
‘methods’ => ‘POST’,
‘callback’ => ‘ai_gw_ingest’,
‘permission_callback’ => ‘__return_true’
]);
});
function ai_gw_ingest($request) {
$auth = $request->get_header(‘authorization’);
if (!ai_gw_verify_jwt($auth)) return new WP_Error(‘forbidden’, ‘bad token’, [‘status’=>403]);
$body = $request->get_json_params();
$summary = sanitize_text_field($body[‘summary’] ?? ”);
$job_id = sanitize_text_field($body[‘job_id’] ?? ”);
$post_id = ai_gw_upsert_post($job_id, $summary, $body[‘artifacts’] ?? []);
return [‘ok’=>true, ‘post_id’=>$post_id];
}
Operational visibility
– Emit structured logs (JSON) with event_id, vendor, job_id, state, duration.
– Metrics:
– Ingress rate, 2xx/4xx/5xx, signature failures
– Queue depth, worker concurrency, task latency p50/p95
– Vendor error rates, circuit breaker opens
– WP callback success/failure
– Tracing: correlate inbound webhook → job → vendor calls → WP callback with a single trace_id.
– Dashboards + alerts on queue saturation, rising 429s, or dead-letter growth.
Performance and resilience tips
– Keep the webhook view non-blocking; do not call external APIs inline.
– Bound payload sizes; store large attachments in S3 via pre-signed URLs.
– Use COPY-on-write records and versioned schemas to avoid migration stalls.
– Scale Celery workers horizontally; pin queue per vendor for isolation.
– Apply rate limiters per vendor key to avoid global throttling.
– Warm LLM clients and reuse HTTP sessions (HTTP/2 keep-alive).
Testing checklist
– Unit tests: signature, schema, idempotency, clock skew.
– Integration tests: vendor replay + out-of-order events.
– Chaos tests: drop 10% of worker tasks, ensure retries converge.
– Load tests: burst 1000 RPS for 60s; verify queue health and P95 latency.
– Security: secret rotation, JWT audience, RBAC for dashboards.
Deployment notes
– Run Django behind Nginx or ALB with request size/time limits.
– Use Postgres with unique constraint on (vendor, external_id, payload_hash).
– Redis or RabbitMQ for Celery; prefer Redis for simplicity, RabbitMQ for strict routing.
– Store secrets in AWS SM; inject via IAM role, not env files.
– Backups and runbooks for dead-letter replay.
When to use this pattern
– Any AI agent or automation that must react to external events reliably.
– Consolidating multiple vendor webhooks into one secure, observable gateway.
– Feeding WordPress with AI-generated summaries without risking vendor retries or editor workflows.
Deliverables you can deploy today
– Django app with /webhooks/ endpoint + Celery worker
– Pydantic schemas per vendor
– JWT-signed WP REST callback client
– Terraform/IaC for Redis/ALB/ASG and secret manager
– Grafana dashboards + SLO alerts
Excellent architecture overview for a problem that trips up many teams. How do you typically approach handling breaking changes or schema evolution from vendors?