Real-time Dashboard Example¶
An analytics dashboard demonstrating Change Data Capture, aggregations, and performance optimization.
Overview¶
This example builds a real-time analytics dashboard with: - Live data updates via CDC (Change Data Capture) - Efficient aggregation queries - Time-series data handling - Columnar storage optimization - WebSocket streaming
Schema¶
// schema.ormdb
// Events table - high volume append-only
entity Event {
id: uuid @id @default(uuid())
org_id: uuid
event_type: string
event_name: string
properties: json?
user_id: string?
session_id: string?
timestamp: timestamp @default(now())
// Denormalized for fast queries
date: string // YYYY-MM-DD
hour: int32 // 0-23
@index(org_id)
@index(event_type)
@index(timestamp)
@index([org_id, date])
@index([org_id, event_type, date])
}
// Pre-aggregated hourly stats
entity HourlyStats {
id: uuid @id @default(uuid())
org_id: uuid
date: string
hour: int32
event_type: string
event_count: int64
unique_users: int64
unique_sessions: int64
@unique([org_id, date, hour, event_type])
@index(org_id)
@index([org_id, date])
}
// Pre-aggregated daily stats
entity DailyStats {
id: uuid @id @default(uuid())
org_id: uuid
date: string
event_type: string
event_count: int64
unique_users: int64
unique_sessions: int64
@unique([org_id, date, event_type])
@index(org_id)
@index([org_id, date])
}
// Real-time counters (updated every few seconds)
entity RealtimeCounter {
id: uuid @id @default(uuid())
org_id: uuid
counter_name: string
value: int64
window_start: timestamp
window_end: timestamp
updated_at: timestamp
@unique([org_id, counter_name])
@index(org_id)
}
// Dashboard configuration
entity Dashboard {
id: uuid @id @default(uuid())
org_id: uuid
name: string
widgets: json
created_at: timestamp @default(now())
updated_at: timestamp @default(now())
@index(org_id)
}
Event Ingestion¶
High-Throughput Event Writer¶
// src/ingestion.rs
use std::sync::Arc;
use tokio::sync::mpsc;
use ormdb_core::Database;
use chrono::{Utc, Datelike, Timelike};
pub struct EventIngester {
db: Arc<Database>,
buffer: mpsc::Sender<Event>,
}
impl EventIngester {
pub fn new(db: Arc<Database>) -> Self {
let (tx, rx) = mpsc::channel(10_000);
// Spawn background batch writer
let db_clone = db.clone();
tokio::spawn(async move {
batch_writer(db_clone, rx).await;
});
Self { db, buffer: tx }
}
pub async fn ingest(&self, event: IngestEvent) -> Result<(), Error> {
let now = Utc::now();
let event = Event {
id: Uuid::new_v4(),
org_id: event.org_id,
event_type: event.event_type,
event_name: event.event_name,
properties: event.properties,
user_id: event.user_id,
session_id: event.session_id,
timestamp: now.timestamp_micros(),
date: now.format("%Y-%m-%d").to_string(),
hour: now.hour() as i32,
};
self.buffer.send(event).await?;
Ok(())
}
}
async fn batch_writer(db: Arc<Database>, mut rx: mpsc::Receiver<Event>) {
let mut batch = Vec::with_capacity(1000);
let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
tokio::select! {
Some(event) = rx.recv() => {
batch.push(event);
// Flush when batch is full
if batch.len() >= 1000 {
flush_batch(&db, &mut batch).await;
}
}
_ = interval.tick() => {
// Flush periodically even if batch not full
if !batch.is_empty() {
flush_batch(&db, &mut batch).await;
}
}
}
}
}
async fn flush_batch(db: &Database, batch: &mut Vec<Event>) {
let mutations: Vec<_> = batch.drain(..)
.map(|e| {
Mutation::create("Event")
.set("id", Value::Uuid(e.id.into_bytes()))
.set("org_id", Value::Uuid(e.org_id.into_bytes()))
.set("event_type", Value::String(e.event_type))
.set("event_name", Value::String(e.event_name))
.set_opt("properties", e.properties.map(|p| Value::Json(p.to_string())))
.set_opt("user_id", e.user_id.map(Value::String))
.set_opt("session_id", e.session_id.map(Value::String))
.set("timestamp", Value::Timestamp(e.timestamp))
.set("date", Value::String(e.date))
.set("hour", Value::Int32(e.hour))
})
.collect();
if let Err(e) = db.mutate_batch(mutations).await {
eprintln!("Failed to write event batch: {}", e);
}
}
Change Data Capture¶
CDC Subscriber for Aggregation¶
// src/cdc.rs
use ormdb_core::{Database, CdcSubscriber, ChangeEvent};
pub async fn start_aggregation_worker(db: Arc<Database>) {
let subscriber = db.subscribe_changes("Event").await.unwrap();
tokio::spawn(async move {
process_changes(db, subscriber).await;
});
}
async fn process_changes(db: Arc<Database>, mut subscriber: CdcSubscriber) {
let mut hourly_buffer: HashMap<AggKey, AggValue> = HashMap::new();
let mut flush_interval = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
Some(change) = subscriber.next() => {
if let ChangeEvent::Insert { entity, .. } = change {
let key = AggKey {
org_id: entity.get("org_id").unwrap(),
date: entity.get("date").unwrap(),
hour: entity.get("hour").unwrap(),
event_type: entity.get("event_type").unwrap(),
};
let entry = hourly_buffer.entry(key).or_default();
entry.event_count += 1;
if let Some(user_id) = entity.get::<String>("user_id") {
entry.unique_users.insert(user_id);
}
if let Some(session_id) = entity.get::<String>("session_id") {
entry.unique_sessions.insert(session_id);
}
}
}
_ = flush_interval.tick() => {
flush_aggregations(&db, &mut hourly_buffer).await;
}
}
}
}
async fn flush_aggregations(
db: &Database,
buffer: &mut HashMap<AggKey, AggValue>,
) {
for (key, value) in buffer.drain() {
// Upsert hourly stats
let existing = db.query(
GraphQuery::new("HourlyStats")
.filter(FilterExpr::and(vec![
FilterExpr::eq("org_id", Value::Uuid(key.org_id.into_bytes())),
FilterExpr::eq("date", Value::String(key.date.clone())),
FilterExpr::eq("hour", Value::Int32(key.hour)),
FilterExpr::eq("event_type", Value::String(key.event_type.clone())),
]))
).await.ok().and_then(|r| r.entities().next());
if let Some(existing) = existing {
let id: Uuid = existing.get("id").unwrap();
let current_count: i64 = existing.get("event_count").unwrap();
let current_users: i64 = existing.get("unique_users").unwrap();
let current_sessions: i64 = existing.get("unique_sessions").unwrap();
db.mutate(
Mutation::update("HourlyStats")
.filter(FilterExpr::eq("id", Value::Uuid(id.into_bytes())))
.set("event_count", Value::Int64(current_count + value.event_count))
.set("unique_users", Value::Int64(current_users + value.unique_users.len() as i64))
.set("unique_sessions", Value::Int64(current_sessions + value.unique_sessions.len() as i64))
).await.ok();
} else {
db.mutate(
Mutation::create("HourlyStats")
.set("id", Value::Uuid(Uuid::new_v4().into_bytes()))
.set("org_id", Value::Uuid(key.org_id.into_bytes()))
.set("date", Value::String(key.date))
.set("hour", Value::Int32(key.hour))
.set("event_type", Value::String(key.event_type))
.set("event_count", Value::Int64(value.event_count))
.set("unique_users", Value::Int64(value.unique_users.len() as i64))
.set("unique_sessions", Value::Int64(value.unique_sessions.len() as i64))
).await.ok();
}
}
}
#[derive(Hash, Eq, PartialEq)]
struct AggKey {
org_id: Uuid,
date: String,
hour: i32,
event_type: String,
}
#[derive(Default)]
struct AggValue {
event_count: i64,
unique_users: HashSet<String>,
unique_sessions: HashSet<String>,
}
Aggregation Queries¶
Time-Series Data¶
// src/queries/analytics.rs
/// Get event counts over time
pub async fn get_event_timeseries(
db: &Database,
org_id: Uuid,
event_type: Option<&str>,
start_date: &str,
end_date: &str,
granularity: Granularity,
) -> Vec<TimeseriesPoint> {
match granularity {
Granularity::Hour => {
let query = GraphQuery::new("HourlyStats")
.filter(FilterExpr::and(vec![
FilterExpr::eq("org_id", Value::Uuid(org_id.into_bytes())),
FilterExpr::gte("date", Value::String(start_date.into())),
FilterExpr::lte("date", Value::String(end_date.into())),
]))
.order_by(OrderSpec::asc("date"))
.order_by(OrderSpec::asc("hour"));
let query = if let Some(et) = event_type {
query.filter(FilterExpr::eq("event_type", Value::String(et.into())))
} else {
query
};
let result = db.query(query).await.unwrap();
result.entities()
.map(|e| TimeseriesPoint {
timestamp: format!("{} {:02}:00", e.get::<String>("date").unwrap(), e.get::<i32>("hour").unwrap()),
value: e.get("event_count").unwrap(),
})
.collect()
}
Granularity::Day => {
let query = GraphQuery::new("DailyStats")
.filter(FilterExpr::and(vec![
FilterExpr::eq("org_id", Value::Uuid(org_id.into_bytes())),
FilterExpr::gte("date", Value::String(start_date.into())),
FilterExpr::lte("date", Value::String(end_date.into())),
]))
.order_by(OrderSpec::asc("date"));
// ... similar processing
}
}
}
/// Get top events by count
pub async fn get_top_events(
db: &Database,
org_id: Uuid,
date: &str,
limit: usize,
) -> Vec<TopEvent> {
// Use columnar aggregation for efficiency
let result = db.aggregate(
AggregateQuery::new("Event")
.filter(FilterExpr::and(vec![
FilterExpr::eq("org_id", Value::Uuid(org_id.into_bytes())),
FilterExpr::eq("date", Value::String(date.into())),
]))
.group_by("event_name")
.aggregate("count", AggregateFunction::Count)
.order_by(OrderSpec::desc("count"))
.limit(limit)
).await.unwrap();
result.rows()
.map(|r| TopEvent {
event_name: r.get("event_name").unwrap(),
count: r.get("count").unwrap(),
})
.collect()
}
/// Get unique user count
pub async fn get_unique_users(
db: &Database,
org_id: Uuid,
start_date: &str,
end_date: &str,
) -> i64 {
let result = db.aggregate(
AggregateQuery::new("Event")
.filter(FilterExpr::and(vec![
FilterExpr::eq("org_id", Value::Uuid(org_id.into_bytes())),
FilterExpr::gte("date", Value::String(start_date.into())),
FilterExpr::lte("date", Value::String(end_date.into())),
FilterExpr::is_not_null("user_id"),
]))
.aggregate("unique_users", AggregateFunction::CountDistinct("user_id"))
).await.unwrap();
result.get("unique_users").unwrap_or(0)
}
Real-time Counters¶
Counter Update Worker¶
// src/realtime.rs
pub async fn start_realtime_counter_worker(db: Arc<Database>) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
update_realtime_counters(&db).await;
}
}
async fn update_realtime_counters(db: &Database) {
let now = Utc::now();
let window_start = now - Duration::from_secs(300); // Last 5 minutes
// Get all organizations
let orgs = db.query(GraphQuery::new("Organization")).await.unwrap();
for org in orgs.entities() {
let org_id: Uuid = org.get("id").unwrap();
// Count events in window
let event_count = db.aggregate(
AggregateQuery::new("Event")
.filter(FilterExpr::and(vec![
FilterExpr::eq("org_id", Value::Uuid(org_id.into_bytes())),
FilterExpr::gte("timestamp", Value::Timestamp(window_start.timestamp_micros())),
]))
.aggregate("count", AggregateFunction::Count)
).await.unwrap().get::<i64>("count").unwrap_or(0);
// Count active users
let active_users = db.aggregate(
AggregateQuery::new("Event")
.filter(FilterExpr::and(vec![
FilterExpr::eq("org_id", Value::Uuid(org_id.into_bytes())),
FilterExpr::gte("timestamp", Value::Timestamp(window_start.timestamp_micros())),
FilterExpr::is_not_null("user_id"),
]))
.aggregate("count", AggregateFunction::CountDistinct("user_id"))
).await.unwrap().get::<i64>("count").unwrap_or(0);
// Upsert counters
upsert_counter(db, org_id, "events_5m", event_count, window_start, now).await;
upsert_counter(db, org_id, "active_users_5m", active_users, window_start, now).await;
}
}
async fn upsert_counter(
db: &Database,
org_id: Uuid,
name: &str,
value: i64,
window_start: DateTime<Utc>,
window_end: DateTime<Utc>,
) {
let existing = db.query(
GraphQuery::new("RealtimeCounter")
.filter(FilterExpr::and(vec![
FilterExpr::eq("org_id", Value::Uuid(org_id.into_bytes())),
FilterExpr::eq("counter_name", Value::String(name.into())),
]))
).await.ok().and_then(|r| r.entities().next());
if let Some(existing) = existing {
let id: Uuid = existing.get("id").unwrap();
db.mutate(
Mutation::update("RealtimeCounter")
.filter(FilterExpr::eq("id", Value::Uuid(id.into_bytes())))
.set("value", Value::Int64(value))
.set("window_start", Value::Timestamp(window_start.timestamp_micros()))
.set("window_end", Value::Timestamp(window_end.timestamp_micros()))
.set("updated_at", Value::Timestamp(window_end.timestamp_micros()))
).await.ok();
} else {
db.mutate(
Mutation::create("RealtimeCounter")
.set("id", Value::Uuid(Uuid::new_v4().into_bytes()))
.set("org_id", Value::Uuid(org_id.into_bytes()))
.set("counter_name", Value::String(name.into()))
.set("value", Value::Int64(value))
.set("window_start", Value::Timestamp(window_start.timestamp_micros()))
.set("window_end", Value::Timestamp(window_end.timestamp_micros()))
.set("updated_at", Value::Timestamp(window_end.timestamp_micros()))
).await.ok();
}
}
WebSocket Streaming¶
Real-time Updates¶
// src/websocket.rs
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use futures::{SinkExt, StreamExt};
pub async fn ws_handler(
ws: WebSocketUpgrade,
State(db): State<Db>,
auth: AuthContext,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_socket(socket, db, auth))
}
async fn handle_socket(socket: WebSocket, db: Db, auth: AuthContext) {
let (mut sender, mut receiver) = socket.split();
// Subscribe to CDC for this org's events
let org_id = auth.current_org_id.expect("Org required");
let mut cdc = db.subscribe_changes("Event").await.unwrap();
// Task to send updates
let send_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut event_buffer = Vec::new();
loop {
tokio::select! {
Some(change) = cdc.next() => {
if let ChangeEvent::Insert { entity, .. } = change {
// Filter to this org
if entity.get::<Uuid>("org_id") == Some(org_id) {
event_buffer.push(entity);
}
}
}
_ = interval.tick() => {
if !event_buffer.is_empty() {
let update = DashboardUpdate {
events: event_buffer.drain(..).collect(),
counters: get_realtime_counters(&db, org_id).await,
};
let msg = serde_json::to_string(&update).unwrap();
if sender.send(Message::Text(msg)).await.is_err() {
break;
}
}
}
}
}
});
// Task to receive commands
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
if let Message::Text(text) = msg {
// Handle client commands (e.g., change time range)
if let Ok(cmd) = serde_json::from_str::<ClientCommand>(&text) {
// Process command
}
}
}
});
tokio::select! {
_ = send_task => {}
_ = recv_task => {}
}
}
async fn get_realtime_counters(db: &Database, org_id: Uuid) -> HashMap<String, i64> {
let result = db.query(
GraphQuery::new("RealtimeCounter")
.filter(FilterExpr::eq("org_id", Value::Uuid(org_id.into_bytes())))
).await.unwrap();
result.entities()
.map(|e| (e.get::<String>("counter_name").unwrap(), e.get::<i64>("value").unwrap()))
.collect()
}
Frontend Dashboard¶
React Dashboard Component¶
// frontend/src/components/Dashboard.tsx
import { useEffect, useState } from 'react';
import { useWebSocket } from '../hooks/useWebSocket';
import { LineChart, BarChart, StatCard } from './charts';
export function Dashboard({ orgId }: { orgId: string }) {
const [timeRange, setTimeRange] = useState('24h');
const [data, setData] = useState<DashboardData | null>(null);
// WebSocket for real-time updates
const { lastMessage, sendMessage } = useWebSocket(`/ws/dashboard`);
useEffect(() => {
if (lastMessage) {
const update = JSON.parse(lastMessage.data);
setData(prev => ({
...prev,
counters: update.counters,
recentEvents: [...update.events, ...(prev?.recentEvents || [])].slice(0, 100),
}));
}
}, [lastMessage]);
// Initial data load
useEffect(() => {
loadDashboardData(orgId, timeRange).then(setData);
}, [orgId, timeRange]);
if (!data) return <Loading />;
return (
<div className="dashboard">
<TimeRangeSelector value={timeRange} onChange={setTimeRange} />
<div className="stats-row">
<StatCard
title="Events (5m)"
value={data.counters.events_5m}
trend={data.eventsTrend}
/>
<StatCard
title="Active Users"
value={data.counters.active_users_5m}
trend={data.usersTrend}
/>
<StatCard
title="Total Events Today"
value={data.totalEventsToday}
/>
<StatCard
title="Unique Users Today"
value={data.uniqueUsersToday}
/>
</div>
<div className="charts-row">
<LineChart
title="Events Over Time"
data={data.eventTimeseries}
xKey="timestamp"
yKey="value"
/>
<BarChart
title="Top Events"
data={data.topEvents}
xKey="event_name"
yKey="count"
/>
</div>
<div className="recent-events">
<h3>Recent Events</h3>
<EventList events={data.recentEvents} />
</div>
</div>
);
}
Custom Hook for WebSocket¶
// frontend/src/hooks/useWebSocket.ts
import { useEffect, useRef, useState, useCallback } from 'react';
export function useWebSocket(url: string) {
const ws = useRef<WebSocket | null>(null);
const [lastMessage, setLastMessage] = useState<MessageEvent | null>(null);
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
const connect = () => {
ws.current = new WebSocket(`${window.location.protocol === 'https:' ? 'wss:' : 'ws:'}//${window.location.host}${url}`);
ws.current.onopen = () => setIsConnected(true);
ws.current.onclose = () => {
setIsConnected(false);
// Reconnect after delay
setTimeout(connect, 3000);
};
ws.current.onmessage = setLastMessage;
};
connect();
return () => {
ws.current?.close();
};
}, [url]);
const sendMessage = useCallback((data: any) => {
if (ws.current?.readyState === WebSocket.OPEN) {
ws.current.send(JSON.stringify(data));
}
}, []);
return { lastMessage, sendMessage, isConnected };
}
Performance Optimization¶
Columnar Storage for Analytics¶
ORMDB automatically uses columnar storage for aggregation queries:
// This query uses columnar storage internally
let result = db.aggregate(
AggregateQuery::new("Event")
.filter(FilterExpr::eq("org_id", Value::Uuid(org_id.into_bytes())))
.aggregate("total", AggregateFunction::Count)
.aggregate("unique_users", AggregateFunction::CountDistinct("user_id"))
).await?;
// Columnar operations:
// 1. Scan only org_id and user_id columns (not full rows)
// 2. Use dictionary encoding for string columns
// 3. SIMD-optimized aggregation functions
Index Strategy¶
// Composite indexes for common query patterns
@index([org_id, date]) // Daily queries
@index([org_id, event_type, date]) // Event type breakdown
@index([org_id, timestamp]) // Real-time range queries
Pre-aggregation Strategy¶
| Query Type | Data Source | Latency |
|---|---|---|
| Last 5 min | RealtimeCounter | ~5ms |
| Today | HourlyStats | ~20ms |
| Last 7 days | DailyStats | ~30ms |
| Last 30 days | DailyStats | ~50ms |
| Custom range | Raw Events | ~200ms+ |
Key Takeaways¶
- CDC enables real-time - Subscribe to changes for live updates
- Pre-aggregate for speed - Compute hourly/daily stats in background
- Columnar for analytics - ORMDB optimizes aggregation queries
- Batch high-volume writes - Buffer events before writing
- WebSocket for live UI - Stream updates to dashboards
Next Steps¶
- Add custom dashboards with Tutorials
- Implement alerting based on thresholds
- Export data with Operations Guide