Build a Secure Webhook Gateway for WordPress with Django, HMAC Verification, and Idempotent Jobs

This guide shows how to deploy a webhook gateway in Django that verifies third‑party signatures (e.g., Stripe, GitHub, HubSpot), normalizes events, runs idempotent background jobs, and safely updates WordPress via authenticated REST. It’s designed for multi-tenant SaaS sites and automation-heavy WordPress installs.

Architecture
– Sources: Third-party services send webhooks to a single Django gateway.
– Ingress: Django REST endpoint with HMAC/signature verification, timestamp tolerance, payload caps, and replay protection.
– Queue: Valid events normalized into a canonical schema and enqueued (Celery + Redis or RQ).
– Workers: Idempotent tasks execute business logic and call WordPress via secure REST.
– WordPress: Minimal plugin or functions.php adds authenticated REST routes and caches state.
– Observability: Structured logs, metrics, traces, and a dead-letter queue (DLQ).

Why a Django Gateway in Front of WordPress
– Security: Keeps secrets and signature logic server-side; network allowlisting on a single IP.
– Reliability: Centralizes retries, idempotency, and DLQ handling.
– Performance: Offloads heavy work from WordPress; reduces request time and PHP memory pressure.
– Control: One normalization layer supports many services and tenants.

Django Models (minimal)
– WebhookEvent
– id (uuid), source (str), event_type (str), received_at (datetime)
– raw_body (bytes), headers (json), signature_valid (bool), idempotency_key (str)
– status (received|queued|processed|failed|discarded), attempts (int), error (text)
– JobExecution
– id (uuid), event (fk), task_name (str), status, attempts, started_at, finished_at, error

Ingress Endpoint (Django REST Framework)
– POST /api/webhooks/{source}/ingest
– Steps:
1) Enforce POST, JSON, Content-Length cap (e.g., 512 KB).
2) Read raw body exactly as sent.
3) Validate timestamp header (e.g., Stripe’s t=… within 5 min).
4) Verify HMAC or provider signature with per-tenant secret.
5) Compute idempotency_key from provider event id + source + tenant.
6) Persist WebhookEvent with signature_valid flag and dedupe check.
7) If valid + new, enqueue Celery task; else return 200 for duplicates, 400/401 on invalid.

Example verification (Stripe-style)
– Signature header: Stripe-Signature = t=…,v1=HMAC_SHA256(secret, “{t}.{raw_body}”)
– Reject if timestamp skew > 300s or if computed HMAC != v1.

Pseudo-code (concise)
– Verify
– raw = request.body
– sig_hdr = request.headers.get(“Stripe-Signature”)
– t, v1 = parse(sig_hdr)
– if abs(now – t) > 300: 401
– mac = hex(hmac_sha256(secret, f”{t}.{raw}”))
– if not hmac_compare(mac, v1): 401
– Idempotency
– key = f”{source}:{tenant}:{provider_event_id}”
– if exists_in_store(key): return 200 (duplicate)
– store key in Redis with TTL 24h
– Persist + enqueue
– save WebhookEvent(…)
– celery_delay(event_id)

Normalization
– Map provider payloads to a canonical object:
– actor { id, email }
– subject { id, type }
– action { type, reason }
– data { free-form JSON }
– occurred_at
– tenant_id
– Keep raw payload for auditing.

Celery Task (idempotent)
– Load event by id; exit if already processed.
– Begin idempotency:
– Use Redis SETNX lock “job:{event.id}”
– Execute business logic:
– Transform event → downstream actions.
– Write records to DB; use UPSERTs for repeat events.
– Call WordPress REST with retries.
– Mark processed; release lock.

WordPress Integration
– Auth options:
– Application Passwords (Basic over HTTPS) for server-to-server.
– JWT (recommended if you need scoped tokens and rotation).
– REST route examples:
– POST /wp-json/ai-guy/v1/user-sync
– POST /wp-json/ai-guy/v1/order-event
– Hardening:
– Require server IP allowlist or token signature.
– Validate JSON schema with WP_REST_Request.
– Enforce rate limits via transients or a small options cache.
– Return 202 for async processing; never block on heavy work in PHP.

Minimal WordPress route (pseudo)
– register_rest_route(‘ai-guy/v1’, ‘/user-sync’, [
– ‘methods’ => ‘POST’,
– ‘permission_callback’ => function($request) {
// Verify Authorization header or shared HMAC
return current_user_can(‘manage_options’) || verify_server_token($request);
},
– ‘callback’ => function($request) {
$data = $request->get_json_params();
// validate schema; sanitize
// wp_insert_user or wp_update_user
// set/update user meta
return new WP_REST_Response([‘ok’ => true], 200);
}
])

Django → WordPress Request
– POST with:
– Authorization: Bearer or Basic (App Password)
– X-Request-Id: {uuid}
– X-Signature: HMAC_SHA256(shared_secret, raw_body)
– Retries: exponential backoff (e.g., 1s, 3s, 9s, 27s, jitter), max 5 tries.
– Idempotency: include Idempotency-Key header and handle at WordPress by short-circuiting duplicates.

Security Controls
– Signature verification per provider; rotate secrets quarterly.
– Replay protection with timestamp and nonce storage (Redis SETEX).
– Payload limits and JSON schema validation.
– Network controls: only expose /ingest via a public path; restrict /admin with VPN.
– Secrets in environment variables; no secrets in code or DB exports.
– Audit fields (created_by, source_ip, user_agent).
– PII minimization and encryption at rest if needed.

Observability
– Structured JSON logs: event_id, tenant, source, outcome, latency_ms.
– Metrics:
– webhook.ingest.count, .fail.count
– job.duration.ms, job.retry.count
– wordpress.http.2xx/4xx/5xx
– Tracing: propagate traceparent to WordPress; annotate external calls.
– DLQ: failed events after N retries go to DLQ table + Slack/Email alert.

Performance & Scale
– Keep ingress fast: verify + enqueue only; never call WordPress inline.
– Use gunicorn with async workers or uvicorn + ASGI for bursty loads.
– Redis for locks, idempotency keys, and short-lived state.
– Batch downstream operations when possible (e.g., queue coalescing per user).
– Backpressure: pause workers on WP 5xx storms; circuit breaker per endpoint.

Local Development
– Use ngrok or Cloudflare Tunnel for provider callbacks.
– Seed tenant/provider secrets in .env (never commit).
– Replay fixtures from saved JSON payloads.
– Integration tests:
– Valid signature → 202; invalid → 401
– Duplicate event → 200 no-op
– Worker retry logic on transient WordPress 5xx
– Contract tests for WordPress routes with schema checks.

Example .env (redacted)
– PROVIDER_SECRETS_STRIPE=tenantA:sk_live_xxx,tenantB:sk_live_yyy
– WORDPRESS_BASE_URL=https://example.com
– WORDPRESS_TOKEN=…
– HMAC_SHARED_SECRET=…

Cutover Plan
– Deploy Django gateway behind HTTPS (HSTS).
– Create provider webhook endpoints per tenant: https://api.yourdomain.com/api/webhooks/stripe
– Validate signatures live; mirror events to a non-production DLQ initially.
– Gradually enable WordPress writes per event type; monitor metrics.
– Add dashboards and alerts; document runbooks.

Failure Handling
– Provider 429/5xx at ingress: accept and queue; never call back providers.
– WordPress 4xx: mark failed, do not retry unless fixable (e.g., 409).
– WordPress 5xx/timeouts: exponential retry with jitter up to max window.
– Poison messages: move to DLQ with root-cause tag.

Deliverables checklist
– Django endpoint with signature verification and timestamp tolerance
– Redis idempotency + nonce store
– Celery worker with idempotent jobs and circuit breaker
– WordPress REST routes with auth, schema validation, and idempotent handling
– Observability: logs, metrics, traces, DLQ
– Runbooks: secret rotation, replay, backfills

This pattern keeps webhook security, reliability, and performance centralized, while WordPress remains a clean, fast presentation and light business layer. It’s battle-tested for multi-tenant automations and scales with your traffic.

Shipping a Production Support Agent: Brain + Hands with Django, Redis, and WordPress

This post walks through a production-ready support agent with a Brain + Hands separation, wired into WordPress on the front, and Django on the back. The goal: predictable behavior, fast responses, measurable quality, and easy handoff to humans.

Use case
– Tier-1 support for order status, returns, product info, and FAQ
– Handoff to human when confidence is low or user requests it
– Works in a WordPress site widget, Slack, and email (shared backend)

Architecture (high level)
– Front-end: WordPress chat widget (vanilla JS) -> Django REST endpoint
– Brain: LLM for reasoning + routing (no direct data access)
– Hands: Tools in Django (Postgres + Redis) exposed via function-calling schemas
– Memory: Short-term thread memory (Redis), long-term knowledge (Postgres + pgvector)
– Orchestrator: Deterministic state machine (Django service + Celery tasks)
– RAG: Product/FAQ index with embeddings; constrained retrieval
– Observability: Request logs, traces, tool latency, outcomes, cost
– Deployment: Docker, Nginx, Gunicorn, Celery, Redis, Postgres

Brain + Hands separation
– Brain (LLM): Planning, deciding which tool to call, assembling final answer. No raw DB/API keys. Receives tool specs only.
– Hands (Tools): Deterministic, side-effect aware, with strict input/output schemas. Tools never “think”—they do.

Core tools (Hands)
– search_kb(query, top_k): RAG over Postgres+pgvector. Returns citations with IDs and source.
– get_order(email|order_id): Reads order status from internal service.
– create_ticket(email, subject, body, priority): Creates support case in helpdesk.
– handoff_human(reason, transcript_excerpt): Flags for live agent queue with context.

Tool contracts (JSON schema examples)
– search_kb input: { query: string, top_k: integer PLAN -> (TOOL_LOOP)* -> DRAFT -> GUARDRAIL -> RESPOND
– TOOL_LOOP limits to 3 tool calls per turn
– If Brain calls an unknown tool or wrong schema: correct and retry once, else fallback to handoff_human
– Timeouts: 3s per tool; overall SLA 6s; degrade mode returns partial + “We’re checking further via email” and opens ticket

Guardrails
– Content filter: block sensitive/abusive content; offer handoff
– PII sanitizer: mask tokens before vector search
– Citation checker: if answer references kb, verify at least one valid citation is present
– Safety fallback: neutral response + create_ticket when filter trips

RAG implementation
– Storage: Postgres with pgvector for embeddings
– Chunking: 512–800 tokens, overlap 80
– Metadata: doc_id, section, source, updated_at, allowed_channels
– Query: Hybrid BM25 + vector; re-rank top 8 to 3
– Response: Return only snippets + URLs; Brain composes final with citations “(See: Title)”

Error handling
– Tool failures: exponential backoff (200ms, 400ms); then circuit-break for 60s
– LLM failures: switch to fallback model on timeout; respond with concise generic + ticket
– Data drift: if RAG index empty or stale, disable search_kb and escalate

WordPress integration
– Front-end widget: Minimal JS injects a floating chat; posts to /api/agent/messages with thread_id and csrf token nonce
– Auth: Public sessions get rate-limited by IP + device fingerprint; logged-in users attach JWT from WordPress to Django via shared secret
– Webhooks: Ticket created -> WordPress admin notice and email; agent takeover -> support Slack channel

Django endpoints (concise)
– POST /api/agent/messages: { thread_id, user_msg }
– GET /api/agent/thread/{id}: returns last N messages + status
– POST /api/agent/feedback: thumbs_up/down, tags
– Admin: /admin/agent/tools, /admin/agent/kb, /admin/agent/metrics

Celery tasks
– run_brain_step(thread_id)
– execute_tool(call_id)
– rebuild_kb_index()
– nightly_eval() against golden test set

Model selection
– Primary: a function-calling LLM with low latency (e.g., GPT-4o-mini or Claude Sonnet-lite). Keep token limits reasonable.
– Fallback: cheaper model with same tool schema to maintain compatibility.
– Temperature: 0.2 for tool routing, 0.5 for final drafting.

Cost and latency targets
– P50: 1.4s response (no tools), 2.8s with RAG, 3.5s with order lookup
– P95: <5s
– Cost: X%

Deployment notes
– Docker services: web (Gunicorn), worker (Celery), scheduler (Celery Beat), redis, postgres, nginx
– Readiness probes: tool ping, RAG index freshness, model API status
– Secrets: mounted via Docker secrets; rotate quarterly
– Blue/green deploy: drain workers, warm RAG cache, switch traffic

Minimal data models
– threads(id, user_id, channel, status, created_at)
– messages(id, thread_id, role, content, tool_name?, tool_payload?, created_at)
– kb_docs(id, title, url, text, embedding, updated_at, allowed_channels)
– tickets(id, thread_id, external_id, status, priority, created_at)

Snippet: tool call flow (pseudo)
– User -> /messages
– Orchestrator builds context from Redis + last N messages
– Brain returns tool_call: search_kb
– Celery executes search_kb, stores items
– Brain drafts answer with citations
– Guardrail checks
– Respond; optionally create_ticket if unresolved

Rollout plan
– Phase 1: FAQ-only RAG; no order lookups; human-in-the-loop
– Phase 2: Enable get_order with safe whitelist; add evals
– Phase 3: Enable create_ticket + SLA timers
– Phase 4: Add Slack channel and email ingestion to same backend

What to avoid
– Letting the Brain call HTTP endpoints directly
– Unbounded memory growth in Redis
– RAG over unreviewed or user-generated content
– Returning tool stack traces to users

Repository checklist
– /orchestrator: state machine, guardrails
– /tools: deterministic functions, schemas, tests
– /brain: prompt templates, model client, retries
– /kb: loaders, chunker, embeddings, indexer
– /web: Django views, serializers, auth
– /ops: docker-compose, nginx, CI, eval harness, dashboards

This pattern gives you a predictable, support-ready agent that integrates cleanly with WordPress, scales under load, and stays auditable.

A production-ready pattern for AI in WordPress: async jobs, signed webhooks, and external workers

Why this pattern
– WordPress is great at routing and rendering, not long-running I/O.
– AI calls are slow, variable, and expensive; they need retries, quotas, and tracing.
– The solution: push jobs to an external worker and accept results via signed webhooks.

Architecture (high level)
– Client (WP admin or theme) submits an AI request to a WP REST route.
– WordPress writes a job row (pending), enqueues to an external queue (or HTTP to a worker gateway).
– Worker (Python/Node) pulls the job, calls the AI provider, then POSTs a signed webhook back to WordPress.
– WordPress verifies the signature, stores result, and invalidates relevant cache.
– Frontend polls or uses SSE/WS via a lightweight proxy for updates.

Database schema (custom table)
– wp_ai_jobs
– id (bigint PK)
– user_id (bigint)
– status (enum: pending, running, succeeded, failed)
– input_hash (char(64)) for idempotency
– request_json (longtext)
– result_json (longtext, nullable)
– error_text (text, nullable)
– created_at, updated_at (datetime)
– idempotency_key (varchar(64), unique)
– webhook_ts (datetime, nullable)

Create the table on plugin activation
– dbDelta with utf8mb4, proper indexes:
– INDEX status_created (status, created_at)
– UNIQUE idempotency_key (idempotency_key)
– INDEX input_hash (input_hash)

Plugin structure (minimal)
– ai-integration/
– ai-integration.php (bootstrap, routes, activation)
– includes/
– class-ai-controller.php (REST endpoints)
– class-ai-webhook.php (webhook verifier)
– class-ai-repo.php (DB access)
– class-ai-queue.php (enqueue out to worker)
– helpers.php (crypto, validation)
– Do not store secrets in options; put them in wp-config.php.

Secrets and config (wp-config.php)
– define(‘AI_WORKER_URL’, ‘https://worker.example.com/jobs’);
– define(‘AI_WEBHOOK_SECRET’, ‘base64-32-bytes’);
– define(‘AI_JWT_PRIVATE_KEY’, ‘—–BEGIN PRIVATE KEY—–…’);
– define(‘AI_QUEUE_TIMEOUT’, 2); // seconds for outbound enqueue

REST endpoint: create job (POST /wp-json/ai/v1/jobs)
– Validate capability (logged-in or signed public token).
– Build idempotency_key from client or hash(input_json + user_id + model).
– Insert row (pending).
– Enqueue to worker:
– POST to AI_WORKER_URL with signed JWT (kid, iat, exp, sub=user_id, jti=idempotency_key).
– Timeout <= 2s. If enqueue fails, leave job pending; a retry worker (Action Scheduler) can re-enqueue.
– Return { job_id, status: "pending" }.

Example: tiny enqueue
– Headers: Authorization: Bearer
– Body: { job_id, idempotency_key, request: {…}, callback_url: “https://site.com/wp-json/ai/v1/webhook” }

Webhook endpoint: receive result (POST /wp-json/ai/v1/webhook)
– Require HMAC-SHA256 signature header: X-AI-Signature: base64(hmac(secret, body))
– Require idempotency_key and job_id in body.
– Verify:
– Constant-time compare HMAC.
– Check timestamp drift <= 2 minutes (X-AI-Timestamp).
– Enforce replay guard: cache "webhook:{jti}" in Redis for 10m.
– Update row (status to succeeded/failed, set result_json or error_text, webhook_ts).
– Return 204.

Minimal verification (PHP)
– $sig = base64_decode($_SERVER['HTTP_X_AI_SIGNATURE'] ?? '');
– $calc = hash_hmac('sha256', $rawBody, AI_WEBHOOK_SECRET, true);
– hash_equals($sig, $calc) or wp_die('invalid sig', 403);

Frontend polling pattern
– Client gets job_id, then polls GET /wp-json/ai/v1/jobs/{id} every 1–2s (cap at 30s).
– Cache-control: private, max-age=0. Use ETag from updated_at to 304 unchanged.
– Optional: stream via SSE proxied through PHP only if your infra supports long-lived requests without PHP-FPM worker starvation.

Idempotency and dedupe
– On create:
– If idempotency_key exists, return existing job.
– Also check input_hash + user_id within time window to reduce duplicates from flaky clients.

Rate limiting
– Per-user sliding window: e.g., 60 jobs/10m.
– Use wp_cache (Redis/Memcached). Key: rl:{user}:{minute-epoch}. Increment and check.
– On limit exceed, 429 with Retry-After.

Background retries
– Action Scheduler job scans pending/running older than N minutes:
– Re-enqueue if no worker ack.
– Mark failed if exceeded retry budget; store error_text.

Security checklist
– Do not accept webhooks without HMAC and timestamp.
– JWT to worker uses short exp (<=60s). Sign with ES256 or RS256; rotate keys quarterly.
– Sanitize and escape all fields when rendering.
– Disable file edits in prod; restrict wp-admin to known IPs if possible.
– Log minimal PII; encrypt sensitive request_json fields at rest if needed (sodium_crypto_secretbox).

Performance considerations
– Never call AI providers inside a WP page render path.
– Outbound enqueue must be non-blocking (<2s). Use Requests::post with short timeouts and no redirects.
– Store only necessary parts of result_json; large blobs to object storage (S3) with signed URLs.
– Use indexes to keep dashboard queries fast; paginate admin list by created_at DESC.
– Cache job summaries with wp_cache_set on read path; invalidate on webhook.

Worker reference (Python, outline)
– Pull from queue, call provider with circuit breaker and retry/backoff (e.g., 100ms→2s jitter).
– On completion, POST result to callback_url with:
– Headers: X-AI-Signature, X-AI-Timestamp
– Body: { job_id, idempotency_key, status, result_json, usage: {tokens, ms} }
– Keep results small; upload big artifacts elsewhere first.

Minimal job table index DDL
– INDEX status_created (status, created_at)
– INDEX user_created (user_id, created_at)
– UNIQUE idempotency_key (idempotency_key)

Observability
– Add a request_id to all flows; return it to client.
– Store provider latency, tokens, and error codes in result_json. Useful for cost/perf dashboards.
– Emit Server-Timing headers on job reads: worker;dur=123,provider;dur=456.

Admin UI ideas
– List jobs with filters (status, user, model).
– Re-enqueue button (capability checked).
– Export CSV of usage by date/user.

Deployment checklist
– HTTPS everywhere; verify real client IP behind any CDN.
– Set AI_WEBHOOK_SECRET via environment, not version control.
– Protect webhook with allowlist of worker IPs if static.
– Enable object cache. Prefer Redis with persistence.
– Load test: 200 req/s create → ensure PHP-FPM pool and DB connections stay healthy.
– Back up the table and rotate old rows to cold storage monthly.

What to avoid
– Synchronous AI calls in templates.
– Storing provider keys in options.
– Webhooks without signature or timestamp.
– Unbounded job payload sizes.

This pattern scales from small sites to high-traffic publishers, keeps your PHP requests fast, and centralizes reliability and security where they belong: in the worker and webhook boundary.

Production RAG for WordPress: pgvector + FastAPI backend, secure webhook intake, and a shortcode chat UI

Overview
This tutorial wires WordPress to a production-grade RAG backend:
– Intake: WordPress Media upload triggers a signed webhook to the backend.
– Index: Backend fetches the file, chunks text, stores embeddings in Postgres/pgvector.
– Serve: FastAPI endpoint answers user questions via retrieval-augmented generation.
– Frontend: A WordPress shortcode renders a chat box that queries the backend.

We’ll keep the stack minimal and production-ready:
– WordPress (webhook + shortcode)
– Python FastAPI backend
– Postgres + pgvector
– OpenAI embeddings + model (swap as needed)
– Nginx or cloud proxy, HTTPS, and API key auth

Architecture
1) User uploads PDF/Doc to WordPress Media.
2) WordPress sends a webhook: {file_url, title, post_id, signature}.
3) Backend validates the HMAC, downloads file, extracts text, chunks, embeds, stores in pgvector with a collection/site scope.
4) Chat UI (shortcode) hits /rag/query with apiKey to return grounded answers.

Prereqs
– WordPress admin access
– Python 3.11+, FastAPI, uvicorn
– Postgres 14+ with pgvector
– OpenAI API key (or compatible embedding/LLM)
– A secret shared between WP and backend for webhook signing

Database setup (pgvector)
— Enable extension
CREATE EXTENSION IF NOT EXISTS vector;

— Documents table
CREATE TABLE IF NOT EXISTS documents (
id UUID PRIMARY KEY,
site_id TEXT NOT NULL,
doc_id TEXT NOT NULL, — WP attachment ID or slug
title TEXT,
source_url TEXT,
created_at TIMESTAMPTZ DEFAULT now()
);

— Chunks table
CREATE TABLE IF NOT EXISTS doc_chunks (
id UUID PRIMARY KEY,
doc_id UUID REFERENCES documents(id) ON DELETE CASCADE,
idx INT NOT NULL,
content TEXT NOT NULL,
embedding vector(1536), — match embedding size
token_count INT,
created_at TIMESTAMPTZ DEFAULT now()
);

— Index for ANN search
CREATE INDEX IF NOT EXISTS doc_chunks_embedding_ivfflat
ON doc_chunks USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

— Filter speedups
CREATE INDEX IF NOT EXISTS doc_chunks_doc_id_idx ON doc_chunks(doc_id);
CREATE INDEX IF NOT EXISTS documents_site_doc_idx ON documents(site_id, doc_id);

FastAPI backend (app/main.py)
– Provides /webhook/wp-media to index uploads.
– Provides /rag/query for Q&A.
– Uses HMAC-SHA256 signature (X-WP-Signature) header.

from fastapi import FastAPI, Header, HTTPException, Depends
from pydantic import BaseModel
import hmac, hashlib, os, uuid, httpx, io
import asyncpg
from typing import List, Optional
from datetime import datetime
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI

OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”)
WEBHOOK_SECRET = os.getenv(“WEBHOOK_SECRET”) # shared with WP
DATABASE_URL = os.getenv(“DATABASE_URL”) # postgres://…
EMBED_MODEL = “text-embedding-3-small”
GEN_MODEL = “gpt-4o-mini”

app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=[“https://your-site.com”], allow_methods=[“*”], allow_headers=[“*”])

client = AsyncOpenAI(api_key=OPENAI_API_KEY)

async def db():
if not hasattr(app.state, “pool”):
app.state.pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=8)
return app.state.pool

def verify_signature(raw_body: bytes, signature: str):
mac = hmac.new(WEBHOOK_SECRET.encode(), raw_body, hashlib.sha256).hexdigest()
return hmac.compare_digest(mac, signature)

class WPWebhook(BaseModel):
site_id: str
file_url: str
title: Optional[str] = None
attachment_id: str

@app.post(“/webhook/wp-media”)
async def wp_media(webhook: WPWebhook, x_wp_signature: str = Header(None), raw_body: bytes = b””, pool=Depends(db)):
# Signature check (requires a middleware or route body retrieval)
if not x_wp_signature or not verify_signature(raw_body, x_wp_signature):
raise HTTPException(status_code=401, detail=”Invalid signature”)

# Download file
async with httpx.AsyncClient(timeout=60) as http:
r = await http.get(webhook.file_url)
r.raise_for_status()
content = r.content

# Extract text (PDF/doc). Minimal example uses pdfminer.six if PDF; else fallback.
text = await extract_text_auto(webhook.file_url, content)
chunks = simple_chunk(text, max_chars=1200, overlap=100)

# Insert document
doc_uuid = str(uuid.uuid4())
async with pool.acquire() as conn:
await conn.execute(
“INSERT INTO documents(id, site_id, doc_id, title, source_url) VALUES($1,$2,$3,$4,$5)”,
doc_uuid, webhook.site_id, webhook.attachment_id, webhook.title, webhook.file_url
)

# Embed and insert chunks
embeddings = await embed_texts([c[“content”] for c in chunks])
async with pool.acquire() as conn:
async with conn.transaction():
for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
await conn.execute(
“INSERT INTO doc_chunks(id, doc_id, idx, content, embedding, token_count) VALUES($1,$2,$3,$4,$5,$6)”,
str(uuid.uuid4()), doc_uuid, i, chunk[“content”], emb, chunk[“tokens”]
)
return {“status”:”ok”,”doc_id”:doc_uuid,”chunks”:len(chunks)}

async def extract_text_auto(url: str, content: bytes) -> str:
import mimetypes, tempfile, os
mt = mimetypes.guess_type(url)[0] or “”
if “pdf” in mt or url.lower().endswith(“.pdf”):
from pdfminer.high_level import extract_text
with tempfile.NamedTemporaryFile(delete=False, suffix=”.pdf”) as f:
f.write(content); f.flush()
out = extract_text(f.name)
os.unlink(f.name)
return out or “”
# Basic fallback
try:
return content.decode(“utf-8″, errors=”ignore”)
except:
return “”

def simple_chunk(text: str, max_chars=1200, overlap=100):
text = text.strip()
if not text:
return []
chunks = []
i = 0
while i < len(text):
end = min(i+max_chars, len(text))
chunks.append({"content": text[i:end], "tokens": int((end – i)/4)}) # rough est
i = end – overlap
if i < 0: i = 0
return chunks

async def embed_texts(texts: List[str]):
if not texts:
return []
resp = await client.embeddings.create(model=EMBED_MODEL, input=texts)
return [e.embedding for e in resp.data]

class QueryBody(BaseModel):
site_id: str
question: str
k: int = 5
api_key: Optional[str] = None # simple per-site key

def require_site_key(key: Optional[str], site_id: str):
expected = os.getenv(f"SITE_{site_id.upper()}_KEY")
if expected and key != expected:
raise HTTPException(status_code=401, detail="Invalid API key")

@app.post("/rag/query")
async def rag_query(q: QueryBody, pool=Depends(db)):
require_site_key(q.api_key, q.site_id)
# Embed question
qemb = (await embed_texts([q.question]))[0]
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT c.content, 1 – (c.embedding $1::vector) AS score
FROM doc_chunks c
JOIN documents d ON d.id = c.doc_id
WHERE d.site_id = $2
ORDER BY c.embedding $1::vector
LIMIT $3
“””,
qemb, q.site_id, q.k
)
context = “nn”.join([r[“content”] for r in rows])

prompt = f”You are a helpful assistant. Use the context to answer.nnContext:n{context}nnQuestion: {q.question}nAnswer concisely with citations like [chunk #].”
messages = [{“role”:”user”,”content”:prompt}]
comp = await client.chat.completions.create(model=GEN_MODEL, messages=messages, temperature=0.2)
answer = comp.choices[0].message.content
return {“answer”: answer, “hits”: len(rows)}

Note: For raw_body signature verification, FastAPI needs request.state or a custom middleware to capture the raw bytes. In production, add a middleware to cache body for verification.

WordPress: webhook sender (plugin)
Create a small MU-plugin or standard plugin to post to the backend on upload.

post_type !== ‘attachment’) return;

$file_url = wp_get_attachment_url($post_ID);
$title = get_the_title($post_ID);
$site_id = get_bloginfo(‘url’); // or a fixed slug
$payload = array(
‘site_id’ => $site_id,
‘file_url’ => $file_url,
‘title’ => $title,
‘attachment_id’ => strval($post_ID),
);
$json = wp_json_encode($payload);
$secret = getenv(‘AI_WEBHOOK_SECRET’) ?: ‘change-me’;
$sig = hash_hmac(‘sha256’, $json, $secret);

$resp = wp_remote_post(‘https://api.your-backend.com/webhook/wp-media’, array(
‘headers’ => array(
‘Content-Type’ => ‘application/json’,
‘X-WP-Signature’ => $sig
),
‘body’ => $json,
‘timeout’ => 30
));
});

Shortcode chat UI
Adds [ai_chat] shortcode and a minimal UI that posts to /rag/query.

function ai_chat_shortcode($atts){
$a = shortcode_atts(array(
‘placeholder’ => ‘Ask about our docs…’,
‘site_id’ => get_bloginfo(‘url’),
), $atts);
ob_start(); ?>

<input id="ai-chat-q" type="text" placeholder="” style=”width:100%;padding:8px;” />

(function(){
const api = ‘https://api.your-backend.com/rag/query’;
const siteId = ”;
const key = ”;
const log = document.getElementById(‘ai-chat-log’);
const q = document.getElementById(‘ai-chat-q’);
document.getElementById(‘ai-chat-send’).addEventListener(‘click’, async function(){
const question = q.value.trim();
if(!question) return;
log.innerHTML += ‘

You: ‘ + question + ‘

‘;
q.value = ”;
try {
const r = await fetch(api, {
method: ‘POST’,
headers: {‘Content-Type’:’application/json’},
body: JSON.stringify({site_id: siteId, question, api_key: key})
});
const data = await r.json();
log.innerHTML += ‘

AI: ‘ + (data.answer || ‘No answer’) + ‘

‘;
} catch(e){
log.innerHTML += ‘

Error contacting AI backend.

‘;
}
});
})();

Writing via a small admin page, or define in wp-config.php and expose via get_option fallback.

Security and performance
– Transport: Enforce HTTPS end-to-end. Set CORS to your WP origin only.
– Auth: Use HMAC for webhooks and per-site API keys for /rag/query. Rotate keys regularly.
– Limits: Cap file size on WP, and validate mimetypes server-side. Queue large files.
– Costs: Use a small embedding model for indexing; cache embeddings by hash.
– Indexing: Run embedding in a background worker if uploads are frequent. Return 202 and poll status.
– Vector search: Tune ivfflat lists and analyze to your data size. Consider HNSW (pgvector 0.7+).
– Token control: Limit k and compress context (dedupe, summarization).
– Observability: Log latency, chunk counts, and hit scores. Add simple eval prompts for regression checks.
– Deployment:
– Postgres: managed instance with pgvector.
– Backend: Fly.io/Render/VM with health checks, 2+ replicas, stickyless.
– Secrets: Use platform secrets, not hard-coded keys.
– CDN: Serve static JS/CSS via WP enqueue, cache API via short TTL if answers are stable.

Local testing
– Create .env with OPENAI_API_KEY, WEBHOOK_SECRET, DATABASE_URL, SITE_{SITEID}_KEY.
– Run: uvicorn app.main:app –host 0.0.0.0 –port 8080 –proxy-headers
– Post a test webhook with curl and validate doc/chunk counts.
– Use the [ai_chat] shortcode on a test page.

What to adjust
– Swap extractors (unstructured, textract) for DOCX/HTML.
– Replace OpenAI with local or Azure endpoints by changing embed/generation calls.
– Add per-document metadata filters (post type, tags) in the query.

Inbox To Revenue: Deploying an AI Triage Router For Customer Ops (Gmail → Slack → Airtable)

Overview
Most SMBs lose money in the inbox: late replies, dropped leads, and manual copying into CRMs. This post shows how to deploy an AI triage router that classifies emails, extracts fields, assigns ownership, and generates first responses. Stack uses Gmail API, a lightweight Python service, an LLM, Slack for notifications, and Airtable as the system of record.

Target outcomes
– Classify inbound messages into 6-10 business-specific buckets
– Extract structured fields with >95% precision on core attributes
– Auto-acknowledge within 2 minutes, human follow-up within SLA
– Track cycle time and conversion in Airtable

Reference architecture
– Ingestion: Gmail API watch + Pub/Sub (or AWS SES/SNS) pushes new email IDs
– Processing: Python service (Cloud Run/Lambda) pulls raw MIME, normalizes text, strips signatures/footers
– Reasoning: LLM call (gpt-4.1-mini or Claude Haiku) with tool-free JSON output
– Persistence: Airtable (Tickets table), plus Redis queue for retries
– Notification: Slack webhook (team channel + assignee DM)
– Controls: Policy engine (PII redaction), rate limiting, eval harness
– Observability: BigQuery or Postgres for logs; Grafana/Looker dashboards

Airtable schema (minimal)
– Tickets: ticket_id, source, received_at, status, category, priority, customer_email, company, subject, summary, due_at, assignee, confidence, fields_json, reply_draft, url
– Categories: id, name, routing_rule, sla_minutes
– Agents/Assignees: id, name, slack_id, skill_tags, workload_score

LLM extraction targets
– category (enum): lead, support, billing, vendor, spam, career, legal, other
– intent: short verb phrase
– priority: low/normal/high (SLA map)
– entities: company, contact_name, email, phone, product, plan, order_id
– summary: 1-2 lines
– reply_draft: brief, factual, safe-to-send
– confidence: 0-1

Prompt shape (system)
– You are a router for customer operations. Output valid JSON only. Do not invent data. Leave null if unknown. Categories limited to: [list]. Keep reply_draft under 120 words, plain text, no promises we cannot keep.

Guardrails
– Temperature 0.2 for determinism
– Response format enforced with JSON schema validation
– If validation fails, fallback to simpler extraction prompt or rules

Routing rules (examples)
– lead → assignee with skill “sales” and workload_score < threshold; SLA 120 min
– billing → finance queue; SLA 240 min
– support with keywords (“down”, “outage”) → priority high; on-call Slack
– legal → do not auto-reply; escalate; redact attachments
– spam/marketing → closed; no Slack

Workflow
1) Watch: Gmail push notifies message_id
2) Normalize: Fetch MIME, remove tracking pixels, detect language
3) Safety: Strip PII from body preview; dedupe threads by Message-Id/In-Reply-To
4) LLM: Extract fields JSON, 2-shot examples per category
5) Persist: Upsert Ticket; compute due_at using SLA map; set status “new”
6) Notify: Post Slack summary with buttons (Claim, Reassign, Close, Send Draft)
7) Auto-acknowledge: If category in allowed list, send reply_draft to customer with footer “Human review in progress”
8) Measure: Log timings, confidence, corrections
9) Retrain: Periodic batch eval, update examples, adjust categories

Slack message format
– Title: [category][priority] subject
– Summary: 1 line + key entities
– Buttons: Claim (assign to self), Approve Draft (sends), Request Edit (opens modal), Reassign (picker)
– Thread: Bot posts Airtable link + due_at countdown

Failure modes and handling
– LLM timeout → retry with backoff; if still failing, default to rule-based category using keyword regex
– Low confidence (<0.6) → tag “needs_review”; do not auto-send; ping triage channel
– Large threads → summarize last human message only; include thread_size in log
– Attachments → virus scan; extract PDF text for entity match (order_id, invoice #)

Costs and performance
– Cost: ~ $0.002–$0.01 per email with small LLM; less if batching summaries
– Latency: Target <2s end-to-end; use streaming only for UI if needed
– Accuracy: Start with 6 categories; aim 95% precision on category, 98% on email detection, 85% on entities; iterate with error review
– Throughput: Cloud Run min-instances=0 for idle; scale to 100 rps bursts

Security and compliance
– Service account with restricted Gmail scopes
– Do not store raw bodies in logs; keep hashed identifiers
– PII redaction before Slack
– Secrets in GCP Secret Manager or AWS Secrets Manager
– Data retention policy in Airtable (archive after 180 days)

Evaluation loop (weekly)
– Sample 100 tickets; compare category, entities, SLA hit rate
– Track “first meaningful response” time and close rate per category
– Capture human edits to reply_draft for fine-tuning examples
– Adjust routing thresholds and on-call hours

ROI model (simple)
– Baseline: 400 inbound/month, 5 min manual triage each → 33 hours
– Post-automation: 30 sec review each → 3.3 hours
– Net saved: ~30 hours/month; at $45/hour → ~$1,350/month
– Plus conversion lift from same-day lead replies (track won vs. response time)

Implementation notes
– Use Gmail HistoryId to avoid double-processing
– Cache model responses for identical threads within 10 minutes (Redis)
– JSON schema example keys must be stable to preserve analytics
– Keep examples business-specific; swap in real subject lines, product names
– Add language detection; route non-English to bilingual assignees

Minimal endpoint contract (POST /triage)
– Input: message_id, thread_id
– Output: ticket_id, category, confidence, actions_taken [ack_sent, slack_posted]

Go-live checklist
– 2-week shadow mode (no auto-send), collect corrections
– Thresholds tuned; legal/billing excluded from auto-ack
– On-call rotation confirmed; Slack permissions tested
– Dashboards: SLA breach count, average first response, category distribution
– Runbook for outages and LLM provider failover

Extensions
– CRM sync (HubSpot/Close) on category=lead
– Voice/voicemail ingestion via transcription
– Calendar links in reply_draft for sales
– Priority boost for repeat customers (email/domain match)

Bottom line
Start narrow, measure aggressively, and keep humans-in-the-loop where it matters. This pattern reliably turns inbox chaos into a predictable, SLA-driven pipeline that pays for itself in the first month.

June 2026 Update: Orchestrator v0.6, Secure API Proxy, and Faster WordPress AI Workflows

What shipped
– Agent Orchestrator v0.6 (Python/Django backend + Celery): cleaner task graphing, priority queues, and plugin-friendly hooks.
– Secure API Proxy (FastAPI): per-connector tokens, KMS-backed key encryption, request signing, and rate limiting.
– WordPress AI Plugin update: switched to Action Scheduler + async webhooks; added request signing and idempotency keys.
– Observability stack: OpenTelemetry traces → Tempo, metrics → Prometheus, logs → Loki; dashboard bundle in Grafana.

Why it matters
– Faster agent pipelines with fewer cold starts and smarter batching.
– Safer outbound integrations without exposing provider keys to the edge.
– Traceable requests from WordPress to backend to model provider for easier debugging and SLA tracking.

Key improvements
– Latency: median end-to-end agent run cut from 2.1s → 1.3s (p50), 5.8s → 3.9s (p95) on common retrieval+gen flows.
– Cost: 22% drop on evaluated runs via prompt caching + adaptive truncation.
– Reliability: HTTP 5xx reduced from 1.7% → 0.6% with circuit breakers and exponential backoff.
– Throughput: +41% jobs/min with dedicated high-priority queue for webhooks and cache-warm tasks.

Architecture notes
– Orchestrator v0.6
– Task DAGs with per-edge retry/backoff; dead-letter queue via Redis streams.
– Priority queues (high/standard/batch) mapped to Celery routes; predictable SLAs for web callbacks.
– Prompt cache (Redis) keyed by model+tools+hash(prompt); TTL default 30m, override per route.
– Secure API Proxy
– Secrets at rest: AES-GCM envelope encryption with AWS KMS; no raw provider keys on disk.
– Scoped service tokens per connector; HMAC-SHA256 request signing; replay protection with nonce+timestamp.
– Token bucket rate limiter (Redis) per provider+account; burst controls to avoid 429s.
– WordPress AI Plugin
– Action Scheduler replaces WP-Cron; async callbacks with signed webhooks.
– Idempotency-Key header to de-dupe retries; transient cache for 10 minutes.
– Admin logs show trace IDs linking to Grafana dashboards.
– Observability
– OpenTelemetry auto-instrumentation for Django, Celery, Requests, FastAPI.
– SLO panels: latency, error budget, cache hit rate, token spend per model.

Early results (staging + 3 client projects)
– Cache hit rate: 34–48% on repeated prompts with minor parameter variance (canonicalization enabled).
– Token spend: -18% on Anthropic, -24% on OpenAI, -11% on Google via selective truncation and structured tool calls.
– Time-to-first-byte (streamed): 420–600ms typical with warm model sessions.

Upgrade notes
– Backend: Python 3.11, Django 4.2 LTS, Celery 5.4; run migrations for task tables and token scopes.
– WordPress: update plugin to v1.7+, re-save settings to generate proxy credentials; rotate old API keys.
– Grafana: import dashboard bundle v0.6; set TEMPO_URL, LOKI_URL, PROM_URL envs in docker-compose.

Compatibility
– Models: OpenAI, Anthropic, Google, Mistral via proxy connectors.
– Hosting: Works on Docker Swarm or Kubernetes; sample manifests included.
– Rollback: Feature-flagged; set ORCH_FEATURE_V06=false to revert queue routing.

What’s next
– Multi-tenant usage limits and per-tenant budgets.
– Native RAG evaluation harness with dataset snapshots.
– Canary deploys for prompt changes with automatic rollback on SLO breach.

If you want this upgrade applied to your deployment, reply with your stack details (hosting, provider mix, WordPress version) and we’ll share the migration checklist.

Production metrics pipeline for AI agents with OpenTelemetry, Kafka, ClickHouse, and Metabase

Overview
This build logs AI agent metrics in near real time, scales to millions of events/day, and keeps query latency under 200 ms. It uses:

– OpenTelemetry SDKs to emit spans/metrics
– OpenTelemetry Collector to batch and forward
– Kafka for durable buffering
– ClickHouse for fast analytics
– Metabase for dashboards and alerts

Use cases
– Track request latency, token usage, cost per endpoint
– Error rates by model/provider
– Agent step timings and tool success
– Customer-level SLOs and anomaly alerts

Architecture
– Services/agents emit OTLP spans and metrics.
– Otel Collector scrubs PII, batches, retries.
– Kafka provides backpressure and replay.
– ClickHouse ingests via Kafka Engine to a MergeTree.
– Metabase connects directly to ClickHouse.

Data model
Two core tables keep it simple and fast:

– agent_events: per request/step event (span-like)
– ts (DateTime64), trace_id, span_id, parent_span_id
– service, env, agent, model, provider
– route, user_id_hash, status, error_type
– latency_ms, tokens_prompt, tokens_completion, cost_usd
– attrs (JSON)

– agent_metrics_1m: rollups by minute
– ts_min, service, env, agent, model, route
– calls, p50_ms, p95_ms, error_rate, tokens_total, cost_usd

ClickHouse DDL
— Raw Kafka topic table
CREATE TABLE kafka_agent_events (
ts DateTime64(3),
trace_id String,
span_id String,
parent_span_id String,
service LowCardinality(String),
env LowCardinality(String),
agent LowCardinality(String),
model LowCardinality(String),
provider LowCardinality(String),
route LowCardinality(String),
user_id_hash FixedString(32),
status LowCardinality(String),
error_type LowCardinality(String),
latency_ms UInt32,
tokens_prompt UInt32,
tokens_completion UInt32,
cost_usd Decimal(12,6),
attrs JSON
) ENGINE = Kafka
SETTINGS
kafka_broker_list = ‘kafka:9092’,
kafka_topic_list = ‘agent_events_v1’,
kafka_group_name = ‘ch_agent_events_ingest’,
kafka_format = ‘JSONEachRow’,
kafka_num_consumers = 4;

— Final storage
CREATE TABLE agent_events (
ts DateTime64(3),
trace_id String,
span_id String,
parent_span_id String,
service LowCardinality(String),
env LowCardinality(String),
agent LowCardinality(String),
model LowCardinality(String),
provider LowCardinality(String),
route LowCardinality(String),
user_id_hash FixedString(32),
status LowCardinality(String),
error_type LowCardinality(String),
latency_ms UInt32,
tokens_prompt UInt32,
tokens_completion UInt32,
cost_usd Decimal(12,6),
attrs JSON
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ts)
ORDER BY (ts, service, env, agent, route)
TTL ts + INTERVAL 90 DAY DELETE;

— Streaming materialization
CREATE MATERIALIZED VIEW mv_agent_events TO agent_events AS
SELECT
ts, trace_id, span_id, parent_span_id,
service, env, agent, model, provider, route,
user_id_hash, status, error_type,
latency_ms, tokens_prompt, tokens_completion, cost_usd, attrs
FROM kafka_agent_events;

— Rollup by minute
CREATE MATERIALIZED VIEW mv_agent_metrics_1m
ENGINE = SummingMergeTree
PARTITION BY toYYYYMM(ts_min)
ORDER BY (ts_min, service, env, agent, model, route)
AS
SELECT
toStartOfMinute(ts) AS ts_min,
service, env, agent, model, route,
count() AS calls,
quantileExact(0.5)(latency_ms) AS p50_ms,
quantileExact(0.95)(latency_ms) AS p95_ms,
sumIf(1, status != ‘ok’) / count() AS error_rate,
sum(tokens_prompt + tokens_completion) AS tokens_total,
sum(cost_usd) AS cost_usd
FROM agent_events
GROUP BY ts_min, service, env, agent, model, route;

OpenTelemetry Collector config
Receives spans/metrics over OTLP, drops PII, batches, pushes to Kafka.

receivers:
otlp:
protocols:
http:
grpc:

processors:
attributes:
actions:
– key: user_email
action: delete
– key: user_id
action: hash
batch:
timeout: 2s
send_batch_size: 5000
memory_limiter:
check_interval: 5s
limit_mib: 512
spike_limit_mib: 256
transform:
error_mode: ignore
traces:
– set(name, Concat(attributes[“agent”], “:”, attributes[“route”])) where name == “”

exporters:
kafka:
brokers: [ “kafka:9092” ]
topic: agent_events_v1
encoding: json
balancer: round_robin
retry_on_failure:
enabled: true
max_elapsed_time: 120s

service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, attributes, transform, batch]
exporters: [kafka]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [kafka]

Client emission example (Python)
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import BatchSpanProcessor

provider = TracerProvider(resource=Resource.create({
“service.name”: “api”,
“deployment.environment”: “prod”,
“agent”: “order_bot”,
“model”: “gpt-4.1”,
“provider”: “openai”,
“route”: “POST /v1/answer”
}))
trace.set_tracer_provider(provider)
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=”http://otel-collector:4318/v1/traces”))
provider.add_span_processor(processor)

with trace.get_tracer(“api”).start_as_current_span(“agent_step”) as span:
# do work…
span.set_attribute(“tokens_prompt”, 512)
span.set_attribute(“tokens_completion”, 128)
span.set_attribute(“cost_usd”, 0.0032)
span.set_attribute(“status”, “ok”)
span.set_attribute(“user_id”, “12345”) # will be hashed by Collector

Metabase setup
– Add ClickHouse as a database (native driver).
– Set sync/scan to hourly for new fields.
– Create questions using native SQL.

Useful queries
– Errors by model (last 24h):
SELECT model, provider, count() AS errors
FROM agent_events
WHERE ts > now() – INTERVAL 1 DAY AND status != ‘ok’
GROUP BY model, provider
ORDER BY errors DESC
LIMIT 20;

– P95 latency by route (last 7d):
SELECT route, round(quantile(0.95)(latency_ms)) AS p95_ms, count() AS calls
FROM agent_events
WHERE ts > now() – INTERVAL 7 DAY
GROUP BY route
ORDER BY p95_ms DESC;

– Cost by customer (requires user_id map):
SELECT user_id_hash, sum(cost_usd) AS cost
FROM agent_events
WHERE ts > now() – INTERVAL 30 DAY
GROUP BY user_id_hash
ORDER BY cost DESC
LIMIT 50;

– SLO burn (errors > 2% in 15m windows):
SELECT
toStartOfInterval(ts, INTERVAL 15 minute) AS win,
round(avg(status != ‘ok’) * 100, 2) AS error_pct
FROM agent_events
WHERE ts > now() – INTERVAL 1 DAY
GROUP BY win
ORDER BY win DESC;

Alerting pattern
– Use a saved question with a threshold and Metabase email/Slack alerts.
– For low-latency alerts, add a lightweight worker polling ClickHouse every minute and posting to Slack webhook if error_pct > threshold.

Performance notes
– ClickHouse tuning:
– Use LowCardinality for strings; keep ORDER BY narrow and aligned with filters.
– Prefer DateTime64 for precise latency windows.
– Use TTL to control storage; hot data on fast disks.
– Kafka:
– Start with 3 partitions; increase to match ingest QPS.
– Retain 24–48h for replay.
– Otel Collector:
– Batch size 5k–10k; enable compression if cross-AZ.
– Cost:
– ClickHouse on a single NVMe host handles 50–100k events/sec; scale via shards + replicas.

Security and compliance
– Hash or drop user PII at the Collector.
– Restrict Metabase to read-only users.
– Use network policies or private subnets; encrypt Kafka and ClickHouse at rest and in transit.
– Rotate Kafka credentials and ClickHouse users; audit queries.

Deployment tips
– Docker Compose for a single-node pilot; Terraform + Kubernetes for prod.
– Health checks: Kafka consumer lag, Otel Collector queue size, ClickHouse insert delays.
– Backfill by producing historical JSON to the Kafka topic; ClickHouse will materialize.

What to build next
– Enrich events with billing account or feature flags.
– Add model drift metrics (output length, refusal rate).
– Expose a public status dashboard per-customer via Metabase embeds.

A Secure WordPress→Django API Bridge for AI Automations (JWT, HMAC, Queues, Retries)

This guide shows how to connect WordPress to a Django backend for AI automations without exposing third-party API keys in WordPress. The pattern: WordPress collects events, signs and forwards them to Django; Django validates, enqueues, calls AI/CRMs, and posts results back.

Core goals
– Never store vendor API keys in WordPress
– Validate every request and avoid duplicate processing
– Keep WordPress fast; move heavy work off-request
– Add clear observability and safe retries

Reference architecture
– WordPress (origin): Triggers on events (form submit, order paid, post published). Sends minimal signed JSON to Django via HTTPS.
– Django API (ingress): DRF view validates auth + HMAC + schema + idempotency-key; enqueues job.
– Task queue: Celery or RQ processes jobs, calls LLMs/CRMs.
– Result callback: Optional webhook back to WordPress or store in Django and let WP fetch.
– Monitoring: Log, trace, alert on failures and high latency.

Data flow
1) WordPress creates payload: {event_type, object_ref, data, idempotency_key, occurred_at}
2) WP adds headers: Authorization: Bearer , X-Signature: sha256=…, X-Idempotency-Key: …
3) Django verifies: JWT, timestamp drift, HMAC signature, schema, idempotency
4) Django enqueues task; returns 202
5) Worker runs task with vendor secrets; stores outcome + optional callback

Authentication and integrity
– JWT: Short-lived (<=5 min). Issuer: WordPress; Audience: Django. Secret or asymmetric keys (RS256 preferred). Include jti (nonce).
– HMAC signature: sha256 over raw body using a shared per-site secret. Header: X-Signature: sha256=. Reject if timestamp skew >5 min.
– IP allowlist (optional) and HTTPS only.

Idempotency
– Client (WP) generates UUIDv4 per business event; send as X-Idempotency-Key and in JSON.
– Server stores key + status for 24–72h. If duplicate, return cached 202/200 without re-enqueue.
– Use Redis or Postgres unique index on (idempotency_key).

WordPress implementation (concise)
– Store: JWT private key or client secret, HMAC secret, Django endpoint in wp_options via Secrets API or environment constants.
– Hook: add_action(‘your_event’, function($payload) { … });
– Create compact JSON, e.g., json_encode([…], JSON_UNESCAPED_SLASHES).
– Compute HMAC: hash_hmac(‘sha256’, $body, HMAC_SECRET).
– Issue JWT (RS256 if possible): include iss, aud, iat, exp, jti.
– Send with wp_remote_post to https://api.your-django.com/ingest with 3s connect and 5s timeout.
– Handle non-2xx: backoff (exponential with jitter), retry max 3, log to error_log or custom table.

Minimal PHP send example (inline, trimmed)
– $body = json_encode($data);
– $sig = hash_hmac(‘sha256’, $body, HMAC_SECRET);
– $jwt = generate_jwt_rs256($claims, PRIVATE_KEY);
– wp_remote_post($url, [
‘method’ => ‘POST’,
‘headers’ => [
‘Authorization’ => ‘Bearer ‘ . $jwt,
‘X-Signature’ => ‘sha256=’ . $sig,
‘X-Idempotency-Key’ => $idempotency_key,
‘Content-Type’ => ‘application/json’
],
‘body’ => $body,
‘timeout’ => 5
]);

Django ingress (DRF) key checks
– Verify Authorization: Bearer . Validate iss, aud, exp, jti (cache jti for replay prevention).
– Verify X-Signature: recompute sha256 HMAC over raw body; constant-time compare.
– Enforce timestamp in body; reject if drift >300s.
– Validate schema: pydantic or DRF serializers.
– Enqueue Celery task and return 202 with request_id.

Python highlights (inline, trimmed)
– jwt.decode(token, public_key, algorithms=[‘RS256′], audience=’django-api’)
– hmac.compare_digest(sig_header, ‘sha256=’ + hex_hmac)
– serializer.is_valid(raise_exception=True)
– Task delay: process_event.delay(request_id, payload)

Queue worker considerations
– Do not call LLMs/CRMs from the request thread.
– Set per-service timeouts (e.g., 10–15s), retries with backoff, and circuit breakers.
– Store vendor responses with redaction of PII/API keys.

Result handling patterns
– Callback to WordPress: WP registers a secret endpoint with HMAC verification; Django posts results.
– Or polling: WP cron fetches new results via GET /results?since=…
– Or dashboard: Keep results in Django, surface via internal UI.

Schema example (compact)
– event_type: string enum (form.submitted, order.paid)
– object_ref: string (post ID, order ID)
– idempotency_key: string UUID
– occurred_at: RFC3339 UTC timestamp
– data: object with validated subfields

Security checklist
– HTTPS everywhere; HSTS; TLS 1.2+
– JWT exp <= 5 minutes, rotate keys regularly
– HMAC secrets per WordPress site; rotate quarterly
– Validate content-type, size limits (e.g., <=256KB)
– Rate limit per IP and per JWT sub (e.g., 60/min)
– Store minimal PII; encrypt at rest if needed
– Log redacted payloads only

Performance notes
– Aim for p95 threshold, repeated signature failures

Local and deployment tips
– Run docker-compose: Nginx, Django, Postgres, Redis, Celery worker/beat
– Use separate Redis DBs for Celery and idempotency to avoid key churn collisions
– Keep WordPress outbound on a static egress IP; add IP to Django allowlist
– Use environment-based config; no secrets in Git
– Add a dry-run mode for new event types before production

Common pitfalls
– Doing LLM calls inside the Django view (timeouts, user-facing latency)
– Missing idempotency causing duplicate CRM/LLM actions
– Storing API keys in WordPress options without protection
– Ignoring replay windows and timestamp validation
– Returning 200 instead of 202, confusing clients about async work

When to use this bridge
– WordPress triggers backend AI workflows (summarize posts, generate assets, score leads)
– Commerce events sync to CRM with AI enrichment
– Form submissions drive multi-step automations with vendor APIs

Deliverables checklist
– WP: event hooks, JWT builder, HMAC signer, resilient http client, logging
– Django: DRF endpoint with JWT+HMAC+schema+idempotency, Celery queue, result store
– Ops: dashboards, alerts, runbooks, key rotation plan

Building a Production-Ready Support Agent for WordPress (Brain + Hands Architecture)

This post describes a production-ready customer support agent that runs inside WordPress while delegating orchestration and tools to a secure backend. It’s engineered for reliability, observability, and low latency.

Use case
– Answer product and account questions for a WooCommerce site
– Escalate to human when confidence is low
– Take actions: lookup orders, issue refunds (guardrails), update tickets, send emails

High-level architecture
– Frontend (WordPress plugin)
– Chat UI + streaming
– Session auth via signed JWT from WP
– Minimal logic; forwards events to backend
– Brain (Django service)
– Orchestrator with policy, planning, and tool routing
– Provider-agnostic LLM client with timeouts, retries, circuit breakers
– Memory: short-term conversation state + RAG for docs + customer context
– Hands (tools)
– WooCommerce API (read + guarded write)
– Vector search for KB (Postgres pgvector or Pinecone)
– Email/Slack/webhook notifications
– Ticketing (GitHub Issues, Linear, or HelpScout)
– Infra
– Postgres (sessions, logs, transcripts, evals)
– Redis (queues, rate limits, short-term state)
– Celery/RQ workers for long I/O tasks
– S3/GCS for attachments
– Nginx + Gunicorn/Uvicorn
– Feature flags via environment or LaunchDarkly

Brain + Hands separation
– Brain responsibilities
– Interpret user intent, decide tool plans, enforce policies
– Choose between “answer from RAG” vs “call WooCommerce” vs “escalate”
– Keep tight token budgets and latency budgets
– Hands responsibilities
– Deterministic, audited side effects
– Strict schemas, input validation, and permission checks
– Idempotent operations with clear error codes

Data model (Postgres)
– users: wp_user_id, email, roles
– sessions: session_id, user_id, created_at, last_seen
– messages: session_id, role, content, tool_calls, latencies
– kb_docs: id, url, title, chunk_id, embedding
– eval_runs: scenario_id, score, metrics, model, timestamp
– actions_log: tool_name, request, response, status, cost

Tooling interfaces (Hands)
– Tool schema example
– name: get_order_status
– input: {order_id: string}
– output: {status, items[], total, updated_at}
– errors: NOT_FOUND, UNAUTHORIZED, RATE_LIMIT, UPSTREAM_5XX
– Guarded write tool
– name: issue_refund
– preconditions: user identity verified, order within policy window, amount soft_limit
– RAG search tool
– name: kb_search
– input: {query: string, top_k: int<=5}
– output: [{title, url, snippet, score}]
– Messaging tools
– name: notify_human
– name: create_ticket

Prompting and policies (Brain)
– System prompt
– You are a support agent for ACME Store. Answer strictly from tools or KB. If unsure, escalate.
– Never fabricate order details. Use get_order_status.
– For refunds, always run verify_identity then check_refund_policy.
– Summarize actions taken at the end, link to sources.
– Planning logic
– If user mentions an order identifier → get_order_status
– If how-to/product question → kb_search then compose
– If policy question not in KB → escalate
– Style rules
– Short answers, steps, and links to sources
– Don’t expose internal error traces

Memory strategy
– Short-term: last 10 messages in Redis keyed by session_id
– Long-term: store all messages in Postgres for analytics
– Entity memory: customer profile (name, last orders, subscription) fetched on session start, cached 15 minutes
– RAG: nightly KB crawl (docs, FAQs, policies), chunked and embedded; invalidate on content updates

Orchestration flow
1) WP plugin sends user message + session_id to Django
2) Brain assembles context: last 10 messages, customer profile, latency budget
3) Brain calls plan() to choose tools
4) Execute tool calls with timeouts and retries (exponential backoff, jitter)
5) If tools fail with 5xx, trigger fallback: reduced context + alternative provider
6) Compose final answer with citations
7) Stream tokens back to WP; log telemetry

Error handling and guardrails
– Timeouts: 2.5s LLM planning; 3s tool I/O (retry up to 2x)
– Circuit breakers: open per-tool after 5 failures/60s, route to fallback answer + escalate
– Input validation: pydantic/dataclasses schemas for all tool inputs
– PII scrubbing before logs; hashed identifiers
– Rate limits: per-user and global; shed load with friendly message
– High-risk actions (refunds) require dual confirmation path or human approval token

Latency and cost controls
– Small model for planning, larger model only when composing long answers
– Token budget caps by policy; truncate history with windowing
– Cache KB search for 60s per session
– Turn off streaming if bandwidth-constrained; send final only

Observability
– Structured logs: request_id, session_id, tool_name, duration_ms, token_usage, cost_usd
– Traces: OpenTelemetry spans around LLM calls and tools
– Dashboards: success rate, handoffs, mean latency, top errors, containment rate
– Red-teaming playbook: simulate tricky prompts weekly; auto-generate counterfactual tests

Evaluation harness
– Scenarios: “Where is my order?”, “Refund outside window”, “Change shipping address”
– Metrics: exactness, policy adherence, source coverage, latency
– Offline evals on synthetic+real transcripts
– Canary release: 5% traffic to new Brain version, compare containment and CSAT

WordPress integration (plugin)
– Shortcode to render chat widget
– Auth: WP nonce → backend JWT (scoped to session only)
– Endpoints
– POST /chat/send
– GET /chat/stream?session_id=…
– POST /webhooks/order_updated
– Admin settings
– Backend URL, API key, rate limit per minute
– Feature flags: enable_refunds, enable_streaming
– Privacy: transcript retention days, PII masking rules

Security notes
– Store only necessary fields; encrypt at rest
– Strict CORS and allowed origins
– Signed webhooks for WooCommerce events
– Secrets via env vars (no secrets in WP DB)

Deployment checklist
– Docker images for Django API and workers
– Nginx reverse proxy with request size and timeout limits
– Postgres + Redis managed services
– Migrate DB, seed KB embeddings
– Health checks, readiness probes
– Runbooks for incident response and on-call rotation

Model/providers
– Start: gpt-4o-mini or claude-3-haiku for planning; gpt-4o or claude-3.5-sonnet for final answers
– Fall back chain: Provider A → Provider B → cached KB-only answer → human handoff
– Track provider performance per scenario; auto-shift traffic via feature flags

Rollout plan
– Week 1: shadow mode (agent suggests, human replies)
– Week 2: low-risk actions enabled, no refunds
– Week 3: enable refunds under $50 with approval token
– Ongoing: weekly evals, error budget, model updates behind canaries

What to build first
– KB pipeline (crawler, chunker, embeddings)
– Minimal Brain with plan() + kb_search + compose
– Observability + eval harness
– Then add WooCommerce read tools, finally guarded write tools

This blueprint keeps WordPress simple and pushes orchestration, policy, and tools into a hardened backend. You get fast responses, safe actions, measurable performance, and a path to scale.

A production-grade pattern for AI endpoints in WordPress (secure proxy, caching, rate limits, observability)

This post shows a real, production-ready pattern for running AI inference from WordPress without exposing API keys to the browser. We’ll build a secure REST endpoint, add caching and rate limits, handle retries and webhooks, and log everything for observability.

Use case examples:
– Generate product descriptions or summaries from authenticated admin screens
– Enrich form submissions (classify, route, extract fields)
– Power a custom block or dashboard tool that fetches AI results server-side

High-level architecture
– Frontend (block/admin page) → WP REST endpoint (server) → AI provider (OpenAI/Anthropic/etc.)
– Caching at the WordPress layer (transient or object cache)
– Rate limiting per user/site to prevent abuse
– Background jobs for long-running tasks via Action Scheduler
– Optional webhooks from AI provider back to WordPress
– Audit logs stored in a custom table with PII minimization

Prerequisites
– WordPress 6.4+
– PHP 8.1+
– Persistent object cache (Redis/Memcached) recommended
– Action Scheduler plugin or equivalent job runner
– Environment configuration for secrets (wp-config.php or environment variables)

1) Minimal plugin scaffold
Create wp-content/plugins/ai-secure-proxy/ai-secure-proxy.php

prefix . ‘aisp_logs’;
$charset = $wpdb->get_charset_collate();
$sql = “CREATE TABLE IF NOT EXISTS $table (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
user_id BIGINT UNSIGNED NULL,
route VARCHAR(191) NOT NULL,
req_hash CHAR(64) NOT NULL,
tokens_in INT UNSIGNED DEFAULT 0,
tokens_out INT UNSIGNED DEFAULT 0,
duration_ms INT UNSIGNED DEFAULT 0,
status_code INT DEFAULT 0,
error TEXT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
) $charset;”;
require_once ABSPATH . ‘wp-admin/includes/upgrade.php’;
dbDelta($sql);
}

public function register_routes() {
register_rest_route(self::NAMESPACE, ‘/infer’, [
‘methods’ => WP_REST_Server::CREATABLE,
‘permission_callback’ => [$this, ‘can_access’],
‘callback’ => [$this, ‘handle_infer’],
‘args’ => [
‘prompt’ => [‘required’ => true, ‘type’ => ‘string’, ‘minLength’ => 1, ‘maxLength’ => 8000],
‘model’ => [‘required’ => false, ‘type’ => ‘string’, ‘default’ => ‘gpt-4o-mini’],
‘cache’ => [‘required’ => false, ‘type’ => ‘boolean’, ‘default’ => true]
]
]);

register_rest_route(self::NAMESPACE, ‘/webhook’, [
‘methods’ => WP_REST_Server::CREATABLE,
‘permission_callback’ => ‘__return_true’,
‘callback’ => [$this, ‘handle_webhook’],
]);
}

public function can_access(WP_REST_Request $req) {
// Require logged-in user with capability. For public use, implement signed HMAC header instead.
return current_user_can(self::CAPABILITY);
}

private function rate_key($user_id) {
return self::OPT_PREFIX . ‘rate_’ . $user_id . ‘_’ . gmdate(‘YmdH’);
}

private function is_rate_limited($user_id) {
$key = $this->rate_key($user_id);
$count = (int) wp_cache_get($key, ”, false, $found);
if (!$found) $count = (int) get_option($key, 0);
return $count >= self::RATE_LIMIT;
}

private function bump_rate($user_id) {
$key = $this->rate_key($user_id);
$count = (int) get_option($key, 0) + 1;
update_option($key, $count, false);
wp_cache_set($key, $count, ”, 3600);
}

public function handle_infer(WP_REST_Request $req) {
$user_id = get_current_user_id();
if ($this->is_rate_limited($user_id)) {
return new WP_Error(‘rate_limited’, ‘Rate limit exceeded’, [‘status’ => 429]);
}

$prompt = trim($req->get_param(‘prompt’));
$model = sanitize_text_field($req->get_param(‘model’));
$use_cache = (bool) $req->get_param(‘cache’);

// Hash to cache against normalized inputs
$req_hash = hash(‘sha256’, json_encode([‘m’=>$model, ‘p’=>$prompt]));

$cache_key = self::OPT_PREFIX . ‘c_’ . $req_hash;
if ($use_cache) {
$cached = wp_cache_get($cache_key);
if ($cached !== false) {
$this->log($user_id, ‘/infer’, $req_hash, 0, 0, 1, 200, null);
return new WP_REST_Response([‘cached’ => true, ‘result’ => $cached], 200);
}
}

$start = microtime(true);
$resp = $this->call_ai_provider($model, $prompt);
$duration_ms = (int) round((microtime(true) – $start) * 1000);

if (is_wp_error($resp)) {
$this->log($user_id, ‘/infer’, $req_hash, 0, 0, $duration_ms, 500, $resp->get_error_message());
return $resp;
}

$result = [
‘text’ => $resp[‘text’] ?? ”,
‘tokens_in’ => $resp[‘tokens_in’] ?? 0,
‘tokens_out’ => $resp[‘tokens_out’] ?? 0,
‘model’ => $model,
];

if ($use_cache && !empty($result[‘text’])) {
wp_cache_set($cache_key, $result, ”, self::CACHE_TTL);
}

$this->bump_rate($user_id);
$this->log($user_id, ‘/infer’, $req_hash, (int)$result[‘tokens_in’], (int)$result[‘tokens_out’], $duration_ms, 200, null);

return new WP_REST_Response([‘cached’ => false, ‘result’ => $result], 200);
}

private function call_ai_provider($model, $prompt) {
// Secrets via environment or wp-config
$api_key = getenv(‘OPENAI_API_KEY’) ?: (defined(‘OPENAI_API_KEY’) ? OPENAI_API_KEY : ”);
if (!$api_key) return new WP_Error(‘config_error’, ‘Missing AI API key’, [‘status’ => 500]);

$body = [
‘model’ => $model,
‘input’ => $prompt,
];

$attempts = 0;
$max_attempts = 3;
$last_err = null;

while ($attempts 15,
‘headers’ => [
‘Authorization’ => ‘Bearer ‘ . $api_key,
‘Content-Type’ => ‘application/json’
],
‘body’ => wp_json_encode($body),
]);

if (is_wp_error($response)) {
$last_err = $response;
} else {
$code = wp_remote_retrieve_response_code($response);
$data = json_decode(wp_remote_retrieve_body($response), true);

// Simple provider-agnostic parse
if ($code >= 200 && $code $text, ‘tokens_in’ => $tokens_in, ‘tokens_out’ => $tokens_out];
}

// Retry on 429/5xx with exponential backoff
if (in_array($code, [429, 500, 502, 503, 504], true)) {
$last_err = new WP_Error(‘ai_retry’, ‘Transient AI error: ‘ . $code);
} else {
return new WP_Error(‘ai_error’, ‘AI provider error: ‘ . $code, [‘status’ => $code, ‘data’ => $data]);
}
}

// Backoff
usleep((int) (pow(2, $attempts) * 200000)); // 200ms, 400ms, 800ms
}

return $last_err ?: new WP_Error(‘ai_error’, ‘Unknown AI error’, [‘status’ => 500]);
}

public function handle_webhook(WP_REST_Request $req) {
// Optional: verify HMAC signature from provider
$shared = getenv(‘AI_WEBHOOK_SECRET’) ?: ”;
$sig = $req->get_header(‘x-aisig’);
if ($shared && $sig) {
$calc = hash_hmac(‘sha256’, $req->get_body(), $shared);
if (!hash_equals($calc, $sig)) {
return new WP_Error(‘forbidden’, ‘Invalid signature’, [‘status’ => 403]);
}
}
// Process event (store job result, update post meta, etc.)
do_action(‘aisp_webhook_received’, $req->get_json_params());
return new WP_REST_Response([‘ok’ => true], 200);
}

private function log($user_id, $route, $req_hash, $tin, $tout, $dur_ms, $code, $error) {
global $wpdb;
$table = $wpdb->prefix . ‘aisp_logs’;
$wpdb->insert($table, [
‘user_id’ => $user_id ?: null,
‘route’ => $route,
‘req_hash’ => $req_hash,
‘tokens_in’ => $tin,
‘tokens_out’ => $tout,
‘duration_ms’ => $dur_ms,
‘status_code’ => $code,
‘error’ => $error,
], [‘%d’,’%s’,’%s’,’%d’,’%d’,’%d’,’%d’,’%s’]);
}
}
new AI_Secure_Proxy();

2) Secure frontend usage
– Admin page or block should never expose provider keys.
– Call the endpoint with wp.apiFetch or fetch, including nonce.
Example:

const res = await wp.apiFetch({
path: ‘/ai-secure-proxy/v1/infer’,
method: ‘POST’,
data: { prompt, model: ‘gpt-4o-mini’, cache: true }
});

3) Authentication options
– Logged-in only (capability-based): safest for back-office tools.
– Public usage: require HMAC-signed requests.
– Client sends X-Sign: HMAC_SHA256(body, PUBLIC_CLIENT_SECRET).
– Server validates before processing.
– Consider IP allowlists for server-to-server.

4) Caching strategy
– Key by normalized inputs and model.
– Use persistent object cache for speed and TTL-based eviction.
– Invalidate on model/version changes or when content sources update.

5) Rate limiting
– Example above uses per-user/hour counters.
– For high-traffic public endpoints, use Redis INCR with TTL or a token bucket.
– Return 429 with Retry-After.

6) Background jobs
– For long prompts or batch operations, offload to Action Scheduler:
– POST creates a job and returns job_id.
– Worker picks up job, calls AI, stores result in post meta or custom table.
– Optional webhook updates job status when provider supports async.

7) Webhooks
– Verify HMAC signature.
– Idempotency: store last seen event IDs to avoid duplicate processing.
– Enqueue work; do not block the webhook handler.

8) Observability
– Log request hash, tokens, duration, status code.
– Build admin UI with filters by user/date/status.
– Emit wp-json logs or use error_log for quick triage in staging.
– Add metrics: P95 latency, cache hit ratio, 4xx/5xx rates.

9) Security checklist
– Store API keys in environment or wp-config, never the DB.
– Enforce capability or signed requests.
– Validate and length-limit inputs.
– Strip PII where possible before sending to provider.
– Set conservative timeouts; implement retries with backoff.
– Disable indexing for admin tools; use HTTPS everywhere.

10) Performance notes
– Keep endpoint thin; avoid loading unnecessary WP subsystems.
– Enable OPcache and a persistent object cache.
– Batch multiple small prompts into one call when possible.
– Circuit breaker: if upstream is failing consistently, short-circuit for a cooldown period.

11) Deployment tips
– Separate staging keys and webhooks.
– Run a load test (k6/Locust) with realistic prompts.
– Monitor Redis hit ratio and PHP-FPM slow logs.
– Backup logs table and rotate old entries.

Extending the pattern
– Add SSE endpoint for streaming tokens to an admin tool.
– Implement content moderation via a pre-check model before saving results.
– Build a “dry run” mode for previews without committing changes.

This approach keeps AI logic server-side, with guardrails for cost, performance, and security—ready for production in WordPress environments.