Real-time Product Analytics with Postgres → ClickHouse → dbt → Grafana

If you run your app on Postgres and need sub-second analytics without crushing OLTP, this stack works reliably in production:
– Ingest: Postgres → ClickHouse (MaterializedPostgreSQL or Kafka + Debezium)
– Transform: dbt (dbt-clickhouse)
– Serve: Grafana dashboards + alerts

Use cases
– Funnel and retention analysis
– Feature usage and cohort metrics
– Operational dashboards (latency, errors, throughput)
– Finance-lite rollups (orders, MRR, refunds)

Reference architecture
– Source DB: Postgres 14+ on managed cloud (RDS/Cloud SQL)
– Analytics store: ClickHouse Cloud or self-managed cluster
– Replication: ClickHouse MaterializedPostgreSQL (for simplicity) or Kafka (for multi-sink / heavy transforms)
– Transform: dbt runner (GitHub Actions, Dagster, or Airflow)
– Viz: Grafana with ClickHouse plugin
– Storage/lineage: S3/GCS parquet exports (optional but recommended)
– Secrets: environment or Vault; network via PrivateLink/VPC peering

Table design in Postgres
– Prefer narrow event table + dimension tables
– Use UUID or bigint IDs; avoid wide JSONB for hot paths
– For events:
– event_id (UUID, PK)
– user_id
– occurred_at (timestamptz)
– event_name (text)
– properties (jsonb) — sparse, normalized over time
– For orders/subscriptions:
– immutable facts + status changes as events
– avoid in-place updates on hot rows

ClickHouse schema (denormalized for reads)
– Use MergeTree with time + shard key
– Example:

CREATE TABLE analytics.events
(
event_id UUID,
user_id String,
event_name LowCardinality(String),
occurred_at DateTime64(3, ‘UTC’),
properties_json JSON,
ingest_ts DateTime DEFAULT now()
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(occurred_at)
ORDER BY (event_name, occurred_at, user_id)
SETTINGS index_granularity = 8192;

– Add projections or materialized views for frequent queries (daily rollups, funnels)

Option A: Direct Postgres → ClickHouse via MaterializedPostgreSQL
Good when you control Postgres and schema churn is moderate.

Steps
1) Enable logical replication in Postgres:
– rds.logical_replication = 1 (RDS) or wal_level = logical
– Create a replication user with REPLICATION
2) In ClickHouse:

CREATE DATABASE pg_repl ENGINE = MaterializedPostgreSQL(
‘postgres-host:5432’, ‘app_db’, ‘replicator_user’, ‘REDACTED’,
‘public’, — schema
16 — max threads
);

— Exposed tables appear under pg_repl.

3) Hydrate analytics tables from the replicated ones:
CREATE TABLE analytics.events AS
SELECT
event_id,
user_id::String,
event_name,
occurred_at,
properties as properties_json
FROM pg_repl.events
ENGINE = MergeTree
PARTITION BY toYYYYMM(occurred_at)
ORDER BY (event_name, occurred_at, user_id);

Notes
– MaterializedPostgreSQL tails WAL and applies row-level changes
– For deletes/updates, ensure primary keys exist in Postgres
– For high write volume, separate heavy tables into their own ClickHouse databases to parallelize apply

Option B: Postgres → Kafka (Debezium) → ClickHouse
Use when you need multi-sink, schema registry, or custom buses.

– Debezium captures CDC to Kafka with Avro/JSON schema
– ClickHouse reads via Kafka engine + materialized views
– Handle upserts with ReplacingMergeTree or CollapsingMergeTree keyed by primary_id + version/flag

dbt transforms (dbt-clickhouse)
– Create semantic models and rollups; keep them small and incremental
– Example model: sessions_daily.sql

{{
config(
materialized=’incremental’,
unique_key=’session_day_user’,
incremental_strategy=’delete+insert’
)
}}

WITH sessions AS (
SELECT
user_id,
toStartOfDay(occurred_at) AS session_day,
countIf(event_name = ‘session_start’) AS starts,
countIf(event_name = ‘session_end’) AS ends
FROM {{ source(‘analytics’, ‘events’) }}
{% if is_incremental() %}
WHERE occurred_at >= date_sub(day, 3, today())
{% endif %}
GROUP BY user_id, session_day
)
SELECT
concat(user_id, ‘_’, toString(session_day)) AS session_day_user,
user_id,
session_day,
starts,
ends
FROM sessions;

– Schedule:
– Small models every 5 minutes
– Heavy rollups hourly
– Backfills in off-peak

Grafana dashboards
– Install ClickHouse data source
– Panels:
– Events per minute with 1m/5m rollups
– DAU/WAU/MAU with asof joins
– Feature adoption by cohort week
– p50/p95 latency by endpoint
– Error rate by service/version
– Add alert rules on thresholds (error_rate > 2%, DAU drop > 20% d/d)

Performance and cost tips
– ClickHouse:
– Partition by month, order by (event_name, occurred_at, user_id)
– Use LowCardinality for strings
– Keep JSON for long tail props, but extract top N to typed columns for filters
– Set max_threads per query; use quotas for noisy tenants
– dbt:
– Favor incremental + small windows
– Stage raw to clean, then marts; limit cross-joins
– Postgres:
– Keep CDC slot monitored; alert on replication lag
– Index primary keys; avoid mass vacuum stalls
– Storage:
– TTL old raw events to S3 via ClickHouse S3 TTL or periodic exports
– Costs:
– Use ClickHouse Cloud autoscaling with sane concurrency caps
– Compress JSON and avoid SELECT *

Reliability and ops
– Health checks:
– Replication delay (Postgres WAL LSN vs ClickHouse apply lag)
– Kafka consumer lag (if using Debezium)
– dbt run success rates and duration SLAs
– Schema changes:
– Additive first (new columns), then backfill, then drop
– Use dbt contracts/tests and column-level lineage
– Dedupe:
– For CDC upserts, use ReplacingMergeTree with a version column
– For at-least-once ingestion, dedupe on event_id in dbt staging
– Backfills:
– Snapshot Postgres to S3 (pg_dump or logical snapshot), bulk copy into ClickHouse, then reattach WAL

Security
– Restrict replication role to specific DB and tables
– Private networking between services
– Rotate secrets; limit Grafana viewer vs editor roles
– PII handling: hash/email_tokenize in ingestion; store raw PII in a separate restricted table

Quick start checklist
– Stand up ClickHouse Cloud and Grafana
– Enable Postgres logical replication
– Create MaterializedPostgreSQL database in ClickHouse
– Define analytics.events table and initial projections
– Configure dbt-clickhouse; build staging and marts
– Publish Grafana dashboards and alerts
– Add monitors for lag, error rates, and costs

What this delivers
– Sub-second reads on billions of events
– Minimal load on Postgres OLTP
– Clear, versioned transforms
– Actionable dashboards and alerts for product and ops

Real-Time Product Analytics with ClickHouse, Airbyte, dbt, and Superset

Overview
This is a production-ready analytics stack designed for product teams that need fast event queries and reliable, explainable metrics:
– Storage/engine: ClickHouse
– Ingestion: Event collector (HTTP) to ClickHouse; Airbyte for SaaS enrichment
– Transformations: dbt (dbt-clickhouse)
– Dashboards: Apache Superset
– Orchestration: Cron or Airflow (optional)
– Monitoring: ClickHouse system tables + alerting

Why this stack
– ClickHouse gives millisecond queries on billions of rows with low storage overhead.
– Airbyte reliably syncs SaaS sources (Stripe, HubSpot, PostHog export, S3 logs) into ClickHouse.
– dbt standardizes transformations, testing, and CI.
– Superset is fast, self-hostable, and permissionable.

Reference architecture (flow)
Client apps/web → Event Collector (NGINX + small Python/Go handler) → ClickHouse (raw_events) → dbt models/materialized views → Superset dashboards
Plus: Airbyte → ClickHouse (dim_*, stg_* tables) for enrichment joins

Event collection
– Send JSON over HTTPS from SDKs to your collector. Keep the collector stateless and append-only.
– Validate required keys, add server timestamps, and write line-delimited JSON batches to ClickHouse via HTTP insert.

ClickHouse: core tables
Use ReplacingMergeTree for safe deduplication and TTL for hot/cold retention.

Example table (compact):
CREATE TABLE analytics.raw_events
(
event_time DateTime64(3, ‘UTC’),
event_date Date MATERIALIZED toDate(event_time),
event_name LowCardinality(String),
event_id UUID,
user_id String,
session_id String,
device LowCardinality(String),
os LowCardinality(String),
country LowCardinality(FixedString(2)),
properties JSON,
received_at DateTime64(3, ‘UTC’) DEFAULT now64(3),
_ingest_version UInt32 DEFAULT 1
)
ENGINE = ReplacingMergeTree(_ingest_version)
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id, event_name, event_id)
TTL event_date + INTERVAL 180 DAY
SETTINGS index_granularity = 8192;

Notes
– ReplacingMergeTree dedupes rows with the same primary key using _ingest_version; update that when reprocessing.
– For very high volume, switch to ReplicatedReplacingMergeTree and add Kafka ingestion or buffering via S3.

Sessionization materialized view
CREATE MATERIALIZED VIEW analytics.mv_sessions
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id, session_id)
AS
SELECT
minState(event_time) AS session_start_state,
maxState(event_time) AS session_end_state,
user_id,
session_id,
toDate(min(event_time)) AS event_date,
anyState(country) AS country_state,
countState() AS events_state
FROM analytics.raw_events
GROUP BY user_id, session_id;

Session query:
SELECT
user_id,
session_id,
minMerge(session_start_state) AS session_start,
maxMerge(session_end_state) AS session_end,
anyMerge(country_state) AS country,
countMerge(events_state) AS events
FROM analytics.mv_sessions
WHERE event_date >= today() – 7
GROUP BY user_id, session_id;

Daily rollups (cost saver)
– Create a daily_agg_events table with SummingMergeTree for event counts by event_name, country, user_id segment, etc.
– Use a materialized view to fill it from raw_events.
– Point common dashboards to rollups; leave ad-hoc drill-downs on raw_events.

Airbyte: enrichment data
– Destination: ClickHouse (native destination).
– Sources: Stripe, HubSpot, CRM, billing exports, S3 logs.
– Namespace to analytics_ext.*
– Keep staging (stg_*) tables raw; build dim_* models in dbt for clean joins.

dbt on ClickHouse
– Adapter: dbt-clickhouse.
– Store models in analytics.* schemas; configure incremental where possible.

Example dbt model (event → signup funnel):
{{ config(materialized=’table’) }}
SELECT
e.user_id,
minIf(e.event_time, e.event_name = ‘page_view’ AND JSONExtractString(e.properties, ‘path’) = ‘/signup’) AS first_signup_page_at,
minIf(e.event_time, e.event_name = ‘signup_submitted’) AS signup_submitted_at,
minIf(e.event_time, e.event_name = ‘signup_completed’) AS signup_completed_at,
IF(signup_completed_at IS NOT NULL, 1, 0) AS completed
FROM {{ source(‘analytics’, ‘raw_events’) }} e
WHERE e.event_date >= addDays(today(), -30)
GROUP BY e.user_id;

Data quality
– dbt tests: not_null on event_time, event_id; unique on event_id within a day; accepted_values on event_name.
– ClickHouse constraints: use CHECKs for schema sanity (e.g., length(country)=2).

Superset dashboards
– Connection: SQLAlchemy driver for ClickHouse.
– Datasets: expose models and rollups; hide raw tables from non-admins.
– Common charts:
– DAU/WAU/MAU from rollups
– Activation funnel (dbt model)
– Session length distribution (mv_sessions)
– L7 retention cohort
– Country/device breakdown
– Caching: enable datasource and chart-level caching; warm critical charts via a small cron job hitting chart APIs.

Monitoring and ops
– Lag/health:
– SELECT count() FROM system.mutations WHERE is_done = 0;
– SELECT * FROM system.parts WHERE active = 0; (stuck merges)
– SELECT * FROM system.disks; (space)
– Alert on:
– Insert failures from collector
– Mutations backlog
– Disk > 75%
– Queries > p95 threshold
– Backfill strategy:
– Insert to raw_events with higher _ingest_version to overwrite.
– Pause dependent materialized views or rebuild rollups after batch backfill.
– Backpressure:
– Use async inserts (async_insert=1) and min_bytes_to_use_direct_io for large loads.

Security
– TLS for ClickHouse HTTP/native.
– Users/roles: read-only for BI; writer for collector; admin for dbt/ops.
– Row-level security: create row policies for tenants or business units.
– Mask PII: store hashed user_id; keep mapping in a restricted dim_user table if needed.

Performance tips
– Keep properties JSON compact; extract hot keys into dedicated columns.
– LowCardinality on strings with medium/low cardinality (event_name, device).
– ORDER BY: put high-selectivity columns first (event_date, user_id).
– Use SAMPLE on exploratory queries; LIMIT 0 BY for distinct lists.
– Partition by month for most workloads; day if >1B/day.

Cost control
– TTL to move old partitions to cheaper storage or delete after 180 days.
– Rollups for dashboards; raw only for deep dives.
– Prefer ClickHouse Cloud autoscaling or size 2–3 medium nodes before sharding.

Minimal docker-compose (dev)
version: “3.8”
services:
clickhouse:
image: clickhouse/clickhouse-server:24.1
ports: [“8123:8123″,”9000:9000”]
ulimits: { nofile: { soft: 262144, hard: 262144 } }
superset:
image: apache/superset:latest
ports: [“8088:8088”]
environment:
– SUPERSET_SECRET_KEY=dev
airbyte:
image: airbyte/airbyte:latest
ports: [“8000:8000”] # use official deployment for prod
collector:
build: ./collector # small Python/Go service that validates and inserts to ClickHouse

Rollout checklist
– Define event contract and versioning.
– Provision ClickHouse and create raw_events with TTL and dedupe.
– Stand up collector; load test inserts.
– Configure Airbyte syncs for enrichment.
– Add dbt models and tests; set up CI.
– Publish Superset datasets and dashboards with caching.
– Add monitoring, alerts, and a backfill playbook.

Shipping a Real-Time Telemetry Pipeline for AI Agents with OpenTelemetry, Kafka, ClickHouse, and Grafana

This build logs every AI agent run, tool call, token usage, latency, and error into a low-latency analytics stack you can operate. It is optimized for high write throughput, cheap retention, and fast aggregations.

Architecture
– Ingest: OpenTelemetry OTLP HTTP (collector) receives spans/events from agents, services, and workers.
– Queue: Kafka buffers bursts and decouples ingestion from storage.
– Storage: ClickHouse stores normalized spans and derived rollups via materialized views.
– Visualization: Grafana queries ClickHouse for real-time dashboards and SLOs.
– Optional: S3 object storage for cheap parquet archives via ClickHouse TTL + MOVES.

Data model
– Trace-level: trace_id, span_id, parent_span_id, service_name, operation, status_code, start_ts, end_ts, duration_ms, attributes (map), tenant_id, env.
– Event-level: event_ts, event_type, trace_id, payload_json, tenant_id, cost_usd, tokens_prompt, tokens_completion, model, tool_name.
– Derived rollups: per-minute and per-tenant aggregates for latency, error_rate, cost, token_count.

Docker Compose (minimal dev)
– Services: otel-collector, kafka, zookeeper, clickhouse-server, clickhouse-init (one-shot DDL), grafana.
– Expose:
– OTLP HTTP: 4318
– ClickHouse HTTP: 8123, Native: 9000
– Grafana: 3000
– Kafka: 9092 (internal), 29092 (host)

otel-collector.yaml (key points)
– receivers:
– otlp: protocols http
– processors:
– batch
– attributes (add env, tenant_id if available)
– exporters:
– kafka (topic: otel-spans, encoding: otlp_json)
– service:
– pipelines:
– traces: otlp -> batch -> kafka

Kafka topics
– otel-spans (partitions: 6+ for throughput)
– otel-events (optional for custom events)
– Use compression lz4 or zstd. Replication 3 in prod.

ClickHouse schema (core)
– Table: spans_raw (MergeTree)
– Columns: as above, plus attributes_json JSON
– Order by: (tenant_id, toDate(start_ts), service_name, operation, start_ts)
– TTL: start_ts + INTERVAL 30 DAY TO VOLUME ‘slow’, start_ts + INTERVAL 180 DAY DELETE
– Table: events_raw (MergeTree)
– Order by: (tenant_id, toDate(event_ts), event_type, event_ts)
– Kafka engines:
– spans_kafka ENGINE = Kafka reading topic otel-spans, format JSONEachRow
– Materialized view mv_spans_raw inserts into spans_raw with JSON extraction and type casting
– Rollups:
– mv_span_minute aggregates to spans_minute (AggregatingMergeTree) with quantiles and counts
– mv_cost_minute aggregates token and cost fields

Example ClickHouse DDL (abridged)
– Create database telemetry;
– Create table telemetry.spans_raw (…)
ENGINE = MergeTree
ORDER BY (tenant_id, toDate(start_ts), service_name, operation, start_ts)
TTL start_ts + INTERVAL 90 DAY DELETE;
– Create table telemetry.spans_kafka (…)
ENGINE = Kafka
SETTINGS kafka_broker_list=’kafka:9092′, kafka_topic_list=’otel-spans’, kafka_group_name=’ch-spans’, kafka_format=’JSONEachRow’;
– Create materialized view telemetry.mv_spans_raw TO telemetry.spans_raw AS
SELECT
JSON_VALUE(_raw, ‘$.traceId’) AS trace_id,
JSON_VALUE(_raw, ‘$.spanId’) AS span_id,

FROM telemetry.spans_kafka;
– Create table telemetry.spans_minute (…)
ENGINE = AggregatingMergeTree
ORDER BY (tenant_id, service_name, operation, toStartOfMinute(start_ts));
– Create materialized view telemetry.mv_spans_minute TO telemetry.spans_minute AS
SELECT
tenant_id, service_name, operation, toStartOfMinute(start_ts) AS ts_min,
countState() AS c,
quantilesTimingState(0.5,0.9,0.99)(duration_ms) AS q,
sumState(if(status_code!=’OK’,1,0)) AS errors
FROM telemetry.spans_raw
GROUP BY tenant_id, service_name, operation, ts_min;

Ingest from Python agents (OTLP)
– Install: pip install opentelemetry-sdk opentelemetry-exporter-otlp
– Configure:
– OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318
– Resource attributes: service.name, deployment.environment, tenant.id
– Emit spans per agent run and tool call. Add attributes:
– model, tokens.prompt, tokens.completion, cost.usd, user_id, workflow_id, tool_name.

Example Grafana panels
– Latency: SELECT quantilesExact(0.5,0.9,0.99)(duration_ms) FROM spans_raw WHERE tenant_id=$tenant AND $__timeFilter(start_ts) GROUP BY bin( start_ts, 1m )
– Error rate: sum(errors)/sum(c) from spans_minute
– Cost $: sum(cost_usd) from events_raw or spans_raw attributes over time and by model
– Throughput: countDistinct(trace_id) per minute
– SLOs:
– Availability: 1 – (errors / c)
– Latency objective: P95(duration_ms) 2% for 5m
– P95 latency > target for 10m
– Cost per minute spike vs 7d baseline
– No data (ingestion stalled) for 5m

What this enables
– Real-time visibility into agent reliability, latency, token spend, and tool performance.
– Fast drilldowns by tenant, model, workflow, and tool to debug regressions.
– Predictable cost with cheap storage and controllable retention.

Repo starter checklist
– docker-compose.yml with all services and volumes
– otel-collector.yaml
– clickhouse-init.sql for tables, topics, views, and users
– grafana-provisioning dashboards and datasources
– Makefile targets: up, down, logs, migrate, backfill

Production-grade LLM Ops Metrics Pipeline with OpenTelemetry, ClickHouse, and Grafana (Docker Compose)

This post ships a real, minimal LLM ops pipeline for metrics, traces, and logs:
– OpenTelemetry SDK (your services) -> OTLP -> OpenTelemetry Collector
– Collector -> ClickHouse (fast, columnar, cheap)
– Grafana -> ClickHouse for dashboards and alerting

Why this stack:
– OTLP is a standard you can use across Python, Node, Go, and edge functions.
– ClickHouse handles high-cardinality metrics and traces at low cost.
– Grafana reads ClickHouse directly with a mature plugin.

What you get
– Docker Compose to run everything locally or on a small VM.
– OpenTelemetry Collector config with ClickHouse exporter.
– Python example for emitting traces, metrics, and logs.
– ClickHouse retention/partitioning for predictable costs.
– Example Grafana queries to visualize agent quality and latency.

Prerequisites
– Docker + Docker Compose
– A domain or IP (optional, for remote access)
– Grafana ClickHouse data source plugin (ID: vertamedia-clickhouse-datasource)

1) Docker Compose
Create docker-compose.yml:
version: “3.8”
services:
clickhouse:
image: clickhouse/clickhouse-server:24.1
container_name: clickhouse
ports:
– “8123:8123” # HTTP
– “9000:9000” # Native
volumes:
– ch-data:/var/lib/clickhouse
– ./clickhouse/config.d:/etc/clickhouse-server/config.d
– ./clickhouse/users.d:/etc/clickhouse-server/users.d
ulimits:
nofile:
soft: 262144
hard: 262144

otel-collector:
image: otel/opentelemetry-collector-contrib:0.100.0
container_name: otel-collector
command: [“–config=/etc/otelcol/config.yaml”]
volumes:
– ./otel/config.yaml:/etc/otelcol/config.yaml
ports:
– “4317:4317” # OTLP gRPC
– “4318:4318” # OTLP HTTP
depends_on:
– clickhouse

grafana:
image: grafana/grafana:10.4.3
container_name: grafana
ports:
– “3000:3000”
environment:
– GF_INSTALL_PLUGINS=vertamedia-clickhouse-datasource
volumes:
– grafana-data:/var/lib/grafana
depends_on:
– clickhouse

volumes:
ch-data:
grafana-data:

2) ClickHouse minimal config (auth + profiles)
Create clickhouse/users.d/users.xml:

::/0

default
default

otel_password

::/0

default
default

Optional hardening (recommended for internet-facing):
– Bind to private network only and proxy via VPN or Tailscale.
– Create dedicated DB and user with limited privileges.

3) OpenTelemetry Collector config
Create otel/config.yaml:
receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:
send_batch_size: 8192
timeout: 5s
memory_limiter:
check_interval: 5s
limit_mib: 512
spike_limit_mib: 256
attributes:
actions:
– key: service.environment
action: insert
value: prod

exporters:
clickhouse:
endpoint: tcp://clickhouse:9000?secure=false
database: otel
ttl: 168h # 7 days default retention; we’ll add TTLs too
username: otel
password: otel_password
create_schema: true
logs_table_name: otel_logs
traces_table_name: otel_traces
metrics_table_name: otel_metrics
timeout: 10s

service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [clickhouse]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [clickhouse]
logs:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [clickhouse]

Notes:
– clickhouse exporter auto-creates schema when create_schema: true.
– You can split pipelines by environment or service using routing processors.

4) Start the stack
docker compose up -d
– ClickHouse UI (HTTP): http://localhost:8123
– Grafana: http://localhost:3000 (admin/admin by default)
– OTLP endpoint: grpc http://localhost:4317, http http://localhost:4318

5) Emit data from Python (traces, metrics, logs)
Install:
pip install opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-instrumentation-requests opentelemetry-api

Sample app (app.py):
import time
import random
import requests
from opentelemetry import trace, metrics
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry._logs import set_logger_provider

resource = Resource.create({
“service.name”: “agent-orchestrator”,
“service.version”: “1.2.3”,
“service.environment”: “prod”,
“deployment.region”: “us-west-2″,
})

# Traces
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
span_exporter = OTLPSpanExporter(endpoint=”http://localhost:4317″, insecure=True)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(span_exporter))

# Metrics
metric_exporter = OTLPMetricExporter(endpoint=”http://localhost:4317”, insecure=True)
reader = PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000)
metrics.set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
meter = metrics.get_meter(“agent-metrics”)
latency_hist = meter.create_histogram(“agent.response_latency_ms”)
tokens_counter = meter.create_counter(“agent.output_tokens”)
success_counter = meter.create_counter(“agent.success”)

# Logs
logger_provider = LoggerProvider(resource=resource)
set_logger_provider(logger_provider)
log_exporter = OTLPLogExporter(endpoint=”http://localhost:4317″, insecure=True)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
logger = logger_provider.get_logger(“agent-logger”)

def call_model(prompt):
# fake work
start = time.time()
time.sleep(random.uniform(0.05, 0.4))
tokens = random.randint(50, 400)
ok = random.random() > 0.1
duration_ms = (time.time() – start) * 1000
attributes = {“model”: “gpt-4o-mini”, “route”: “answer”, “customer_tier”: “pro”}
latency_hist.record(duration_ms, attributes)
tokens_counter.add(tokens, attributes)
success_counter.add(1 if ok else 0, attributes)
with tracer.start_as_current_span(“llm.call”, attributes=attributes) as span:
span.set_attribute(“llm.prompt.len”, len(prompt))
span.set_attribute(“llm.tokens”, tokens)
if not ok:
span.set_attribute(“error”, True)
logger.error(“agent failure”, extra={“attributes”: attributes})
else:
logger.info(“agent success”, extra={“attributes”: attributes})
return ok, tokens, duration_ms

if __name__ == “__main__”:
while True:
call_model(“hello”)
time.sleep(1)

Run:
python app.py

6) Verify data landed
In ClickHouse:
– Show DBs: SHOW DATABASES;
– Data lives in database otel with tables otel_traces, otel_metrics, otel_logs (names from exporter).
– Basic checks:
SELECT count() FROM otel.otel_traces;
SELECT count() FROM otel.otel_metrics;
SELECT count() FROM otel.otel_logs;

7) Retention, partitioning, and compression
For cost control, add TTL and partitioning. If you let the exporter create schema, alter tables:
ALTER TABLE otel.otel_traces
MODIFY TTL toDateTime(timestamp) + INTERVAL 7 DAY
SETTINGS storage_policy = ‘default’;

ALTER TABLE otel.otel_metrics
MODIFY TTL toDateTime(timestamp) + INTERVAL 30 DAY;

ALTER TABLE otel.otel_logs
MODIFY TTL toDateTime(timestamp) + INTERVAL 14 DAY;

Optional: create your own tables with partitions by toYYYYMMDD(timestamp) and codecs (ZSTD(6)) for lower storage.

8) Grafana data source
– Login to Grafana -> Connections -> Data sources -> Add data source -> ClickHouse.
– URL: http://clickhouse:8123
– Auth: username otel, password otel_password
– Default database: otel
– Confirm connection.

9) Example Grafana panels (SQL)
– Agent P50/P95 latency (ms) by model, 15m
SELECT
model,
quantile(0.5)(value) AS p50,
quantile(0.95)(value) AS p95,
toStartOfInterval(timestamp, INTERVAL 15 minute) AS ts
FROM otel_metrics
WHERE name = ‘agent.response_latency_ms’
AND timestamp >= now() – INTERVAL 24 HOUR
GROUP BY model, ts
ORDER BY ts ASC;

– Success rate by route (rolling 1h)
WITH
sumIf(value, name = ‘agent.success’) AS succ,
countIf(name = ‘agent.success’) AS total
SELECT
route,
toStartOfInterval(timestamp, INTERVAL 1 hour) AS ts,
if(total = 0, 0, succ / total) AS success_rate
FROM otel_metrics
WHERE timestamp >= now() – INTERVAL 24 HOUR
GROUP BY route, ts
ORDER BY ts;

– Tokens per minute by customer_tier
SELECT
customer_tier,
toStartOfMinute(timestamp) AS ts,
sumIf(value, name = ‘agent.output_tokens’) AS tokens
FROM otel_metrics
WHERE timestamp >= now() – INTERVAL 6 HOUR
GROUP BY customer_tier, ts
ORDER BY ts;

– Error logs (last 1h)
SELECT
timestamp,
severity_text,
body,
attributes:error AS err,
attributes:model AS model,
attributes:route AS route
FROM otel_logs
WHERE timestamp >= now() – INTERVAL 1 HOUR
AND (severity_number >= 17 OR JSONExtractBool(attributes, ‘error’) = 1)
ORDER BY timestamp DESC
LIMIT 200;

– Trace sample count by service
SELECT
service_name,
count() AS spans
FROM otel_traces
WHERE timestamp >= now() – INTERVAL 1 DAY
GROUP BY service_name
ORDER BY spans DESC;

10) Production notes
– Separate environments: run separate DBs or add service.environment in resource and filter in Grafana.
– Cardinality guardrails: cap dynamic attributes (e.g., customer_id) or hash/map to tiers. High-cardinality tags can blow up storage.
– Backpressure: tune batch processor send_batch_size and timeouts. Add queued_retry if you expect spikes.
– Ingestion SLOs: keep ClickHouse inserts under 50–100 MB per batch for stable performance on small VMs.
– Storage: start with 2–4 vCPU, 8–16 GB RAM, NVMe. Enable ZSTD compression and TTLs.
– Security: do not expose ClickHouse or Grafana admin to the internet. Use VPN, SSO, or OAuth proxy.
– Backups: S3-compatible backup via clickhouse-backup or object storage disks.
– Cost: This stack runs comfortably on a $20–40/month VPS for moderate load (tens of thousands of spans/min and metrics).

Extending the pipeline
– Add dbt for derived metrics (e.g., session-level aggregation).
– Add alerting in Grafana: p95 latency > threshold, success_rate < X, tokens/min anomaly.
– Add router-level tracing to attribute latency to providers and prompts.

This is a deployable baseline that turns your AI agent traffic into actionable, queryable telemetry with low operational overhead.

Data Pipelines & Dashboards Guide

Data pipelines and dashboards are the backbone of modern analytics. In a world where every interaction can generate useful information, the ability to collect, organize and visualize data quickly makes the difference between informed decisions and guesswork. A pipeline is a sequence of automated steps that moves data from where it is created—think web forms, CRM systems, e‑commerce transactions, or IoT sensors—to a centralized repository where it can be cleaned and transformed. Dashboards use this curated data to present key metrics in an easy‑to‑digest format so you and your team can monitor performance at a glance.

Building an effective pipeline starts with defining what data you need. For a service business, that might include leads captured through a contact form, bookings made through a scheduling tool, support tickets submitted via chat, and payments processed through a store. Each of these systems—WordPress, Google Sheets, email, CRMs, calendar services—stores information differently. The goal of a pipeline is to extract relevant fields from each source and normalize them into a consistent structure. An automation tool or custom script can listen for new submissions via webhooks, parse the payloads, then append the results to a spreadsheet or database. Where possible, enrich your records by adding context such as time stamps, source campaign or geographic location.

Transformation is the next critical step. Raw data often contains inconsistencies such as duplicate entries, missing values or inconsistent capitalization. Automated routines can remove duplicates based on email address or phone number, standardize text fields to a common format, validate addresses and flag incomplete submissions for follow‑up. You might also use AI models to classify leads by industry, detect sentiment in feedback messages or summarize long comments into tags. By cleaning and enriching your data at this stage, you ensure that downstream dashboards reflect accurate and actionable information.

Once your data is in good shape, you need a storage solution that supports easy querying. For small projects, a shared Google Sheet or Airtable base may suffice. Larger operations might prefer a relational database like MySQL or PostgreSQL or a cloud data warehouse. The key is to choose a platform that integrates smoothly with your data sources and reporting tools. When working with WordPress sites, it’s common to store form submissions in the database and then replicate them to a spreadsheet for analysis. API bridges can keep multiple systems synchronized so you always have a single source of truth.

Dashboards are the window into your pipeline. A well‑designed dashboard should answer the most important questions about your business without overwhelming the viewer. If you run a membership site, you might track new sign‑ups, cancellations, churn rate and lifetime value. An e‑commerce store would monitor sales revenue, average order value, cart abandonment and top products. A service agency would keep an eye on leads generated, consultations booked, conversion rates, and project profitability. Tools like Google Data Studio, Looker Studio (formerly Data Studio), Tableau, Power BI and Notion support rich visualizations such as bar charts, line graphs and funnel diagrams. Many allow you to embed dashboards into your WordPress admin panel or client portals so everyone sees the same information.

When designing dashboards, clarity is paramount. Group related metrics together and use consistent colors and scales to make comparisons easy. Include filters that let stakeholders drill down by date range, marketing channel or product category. Consider building multiple dashboards for different audiences: management might need high‑level KPIs, while marketing teams benefit from detailed campaign analytics. Scheduling automated email or Slack reports keeps the data top‑of‑mind; for instance, a daily summary could include new leads captured, meetings scheduled and revenue generated, while a weekly report might highlight trends and anomalies.

Security and privacy should be integral to your pipeline architecture. Always handle personal data in accordance with applicable laws such as GDPR or California’s CPRA. Limit access to the database or spreadsheets to those who need it, and use secure authentication for API connections. When sending automated reports, avoid including sensitive information in plain text. Instead, provide links to secure dashboards where users must log in. Regularly audit your integrations to ensure tokens haven’t expired and that revocations are respected.

Implementing a pipeline and dashboard system is an iterative process. Start with the most critical data points and add additional sources and metrics over time. Begin by documenting where your data originates, how often it changes and who needs to see it. Then choose automation tools or write scripts to handle extraction and loading. Create transformations that clean and enrich the data, and test the results with a small sample before scaling up. Design a dashboard that surfaces the metrics you care about, gather feedback from users and refine the visualizations. As your business evolves, revisit your pipeline to incorporate new systems, retire unused sources and adjust KPIs.

In summary, data pipelines and dashboards give you the infrastructure to run a data‑driven business. By automating the flow of information from your website and applications into a clean repository and presenting insights through intuitive visualizations, you empower your team to make decisions based on facts rather than hunches. Whether you’re tracking marketing performance, customer satisfaction, financial health or operational efficiency, investing time in a robust data pipeline will pay dividends in clearer insights and faster growth.

When you have reliable pipelines feeding your dashboards, decision making becomes easier. You can schedule summaries to be delivered to Slack or email and give team members access to the specific metrics they need.