Goal
Move WordPress events (form leads, orders, comments) to external AI/CRM services reliably and securely using a Django gateway. Avoid direct browser-to-API calls, centralize secrets, add validation, idempotency, and backpressure.
Architecture
– WordPress: Triggers event, signs payload, sends to gateway
– Django API Gateway: Verifies HMAC, validates schema, enqueues job, returns 202
– Worker (Celery/RQ): Calls external APIs (OpenAI, CRM, Slack)
– Storage: Redis for queue, Postgres for idempotency and logs
– Observability: Structured logs, request tracing, dead-letter queue (DLQ)
Data flow
1) WP event → signed POST to /webhooks/wp
2) Django verifies signature, dedups, validates
3) Enqueue task with minimal payload + correlation_id
4) Worker performs outbound calls with retries and rate limits
5) Results stored and optional callback to WP or Slack alert
Security
– HMAC-SHA256 with rotating shared secret
– Required headers: X-WP-Signature, X-WP-Timestamp, X-Event-Id
– 5-minute timestamp window
– Enforce IP allowlist if possible
– No secrets in WordPress; only the signing key
– All outbound secrets live in Django (env or vault)
WordPress sender (minimal plugin snippet)
– Sends compact JSON, signs body + timestamp
Note: replace ENDPOINT_URL and SHARED_SECRET.
‘comment.created’,
‘event_id’ => uniqid(‘wp_’, true),
‘site’ => get_bloginfo(‘url’),
‘data’ => array(
‘id’ => $comment_ID,
‘post_id’ => $comment->comment_post_ID,
‘author’ => $comment->comment_author,
‘content’ => $comment->comment_content,
‘created_at’ => $comment->comment_date_gmt
)
);
$json = wp_json_encode($payload);
$ts = (string) time();
$secret = getenv(‘DJANGO_SHARED_SECRET’) ?: ‘replace_me’;
$sig = base64_encode(hash_hmac(‘sha256’, $ts . ‘.’ . $json, $secret, true));
$resp = wp_remote_post(‘https://ENDPOINT_URL/webhooks/wp’, array(
‘timeout’ => 5,
‘headers’ => array(
‘Content-Type’ => ‘application/json’,
‘X-WP-Timestamp’ => $ts,
‘X-WP-Signature’ => $sig,
‘X-Event-Id’ => $payload[‘event_id’]
),
‘body’ => $json
));
}, 10, 2);
});
Django: settings (env-driven)
– DJANGO_SHARED_SECRET
– REDIS_URL
– DATABASE_URL
– EXTERNAL_API_KEYS (OpenAI, HubSpot, Slack)
Django: URL and view (Fast path, 202 response)
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils import timezone
from django.conf import settings
import hmac, hashlib, base64, json, time
from .models import WebhookEvent
from .tasks import process_wp_event
from .schemas import validate_wp_event
def verify_sig(ts, raw, sig_b64):
secret = settings.DJANGO_SHARED_SECRET.encode()
mac = hmac.new(secret, (ts + ‘.’ + raw.decode()).encode(), hashlib.sha256).digest()
expected = base64.b64encode(mac).decode()
return hmac.compare_digest(expected, sig_b64)
@csrf_exempt
def wp_webhook(request):
if request.method != ‘POST’:
return HttpResponse(status=405)
ts = request.headers.get(‘X-WP-Timestamp’)
sig = request.headers.get(‘X-WP-Signature’)
eid = request.headers.get(‘X-Event-Id’)
if not (ts and sig and eid):
return JsonResponse({‘error’:’missing headers’}, status=400)
try:
if abs(time.time() – int(ts)) > 300:
return JsonResponse({‘error’:’stale’}, status=401)
raw = request.body
if not verify_sig(ts, raw, sig):
return JsonResponse({‘error’:’bad signature’}, status=401)
body = json.loads(raw.decode())
validate_wp_event(body) # raises on error
# idempotency
obj, created = WebhookEvent.objects.get_or_create(
event_id=eid,
defaults={‘event_type’: body[‘event’], ‘payload’: body, ‘received_at’: timezone.now()}
)
if not created:
return JsonResponse({‘status’:’duplicate’}, status=200)
process_wp_event.delay(obj.id)
return JsonResponse({‘status’:’accepted’, ‘id’: obj.id}, status=202)
except Exception as e:
# Optionally send to DLQ
return JsonResponse({‘error’:’invalid’}, status=400)
Django: model for idempotency and audit
from django.db import models
class WebhookEvent(models.Model):
event_id = models.CharField(max_length=128, unique=True)
event_type = models.CharField(max_length=64)
payload = models.JSONField()
received_at = models.DateTimeField()
status = models.CharField(max_length=32, default=’queued’)
last_error = models.TextField(null=True, blank=True)
Validation (Pydantic or Django validator)
from pydantic import BaseModel, Field, ValidationError
class CommentData(BaseModel):
id: int
post_id: int
author: str
content: str
created_at: str
class WPEvent(BaseModel):
event: str = Field(pattern=’^(comment\.created|order\.created|lead\.created)$’)
event_id: str
site: str
data: CommentData
def validate_wp_event(body):
try:
WPEvent(**body)
except ValidationError as e:
raise ValueError(str(e))
Queue worker (Celery)
# celery.py
import os
from celery import Celery
os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘core.settings’)
app = Celery(‘core’)
app.config_from_object(‘django.conf:settings’, namespace=’CELERY’)
app.autodiscover_tasks()
# tasks.py
from celery import shared_task
import requests, time, os
from django.db import transaction
from .models import WebhookEvent
@shared_task(bind=True, max_retries=5, default_retry_delay=30)
def process_wp_event(self, event_id):
with transaction.atomic():
ev = WebhookEvent.objects.select_for_update().get(id=event_id)
ev.status = ‘processing’
ev.save()
try:
# Example outbound: send summary to Slack, push to CRM, call LLM
slack_url = os.getenv(‘SLACK_WEBHOOK_URL’)
if slack_url:
requests.post(slack_url, json={‘text’: f”New comment by {ev.payload[‘data’][‘author’]}”}, timeout=5)
# Backoff-friendly external call
# Example: CRM
# requests.post(CRM_URL, headers=auth, json=map_payload(ev.payload), timeout=10)
with transaction.atomic():
ev.status = ‘done’
ev.last_error = None
ev.save()
except requests.HTTPError as e:
raise self.retry(exc=e)
except Exception as e:
with transaction.atomic():
ev.status = ‘failed’
ev.last_error = str(e)
ev.save()
raise
Rate limiting and retries
– Use Celery rate_limit per task (e.g., @shared_task(rate_limit=”10/m”))
– Implement exponential backoff (handled by Celery retries)
– Handle 429/5xx with retry; do not retry 4xx validation errors
Observability
– Log correlation_id = event_id across gateway and workers
– Add request_id header when calling externals
– Export metrics: accepted_count, duplicate_count, processing_time
– Create DLQ table for permanently failed events
Performance notes
– Return 202 within ~10–30 ms on average (local) by deferring work
– Redis and Postgres on same VPC/region to reduce latency
– Batch external calls when possible; keep payloads small
Local testing (quick)
– Run ngrok for Django: ngrok http 8000
– Send test HTTP: curl -X POST … with signed headers
– Use Celery + Redis: celery -A core worker -Q celery -l info
Deployment checklist
– Enforce HTTPS; HSTS enabled
– Rotate DJANGO_SHARED_SECRET; support dual keys during rotation
– DB unique index on event_id
– Set firewall/IP allowlist for /webhooks/wp if feasible
– Time sync (NTP) on servers
– Alert on failure rate > 2% over 5 minutes
Common pitfalls
– Missing idempotency → duplicate CRM leads
– Large payloads from WP → timeouts; send minimal fields
– Long synchronous processing in view → client timeouts
– Not validating event schema → brittle downstream handlers
Extending to AI use cases
– Enrich text with LLM before CRM insert (run in worker)
– Cache LLM results for repeated content (keyed by hash)
– Guardrails: token limits, cost caps, latency budgets per task
Outcome
You get a secure, observable pipeline for WordPress events that scales, avoids duplicates, and centralizes external API access with real backpressure and retries.
This is a really solid and scalable architecture for handling webhooks. How do you approach managing the dead-letter queue and replaying those failed events?