Change Data Capture (CDC)¶
Stream real-time data changes from ORMDB.
Overview¶
Change Data Capture (CDC) allows you to:
- Stream changes in real-time
- Build event-driven architectures
- Sync data to external systems
- Maintain caches and search indexes
- Create audit trails
Change Log¶
ORMDB maintains a change log with every mutation:
pub struct ChangeEntry {
pub lsn: u64, // Log Sequence Number
pub timestamp: i64, // Microseconds since epoch
pub entity: String, // Entity type
pub entity_id: [u8; 16], // Entity ID
pub operation: Operation, // Insert, Update, Delete
pub old_data: Option<EntityData>, // Previous values (update/delete)
pub new_data: Option<EntityData>, // New values (insert/update)
}
pub enum Operation {
Insert,
Update,
Delete,
}
Streaming Changes¶
Basic Streaming¶
use ormdb_client::cdc::ChangeStream;
// Start streaming from current position
let mut stream = client.stream_changes().await?;
while let Some(change) = stream.next().await? {
match change.operation {
Operation::Insert => {
println!("New {}: {:?}", change.entity, change.entity_id);
}
Operation::Update => {
println!("Updated {}: {:?}", change.entity, change.entity_id);
}
Operation::Delete => {
println!("Deleted {}: {:?}", change.entity, change.entity_id);
}
}
}
const stream = client.streamChanges();
for await (const change of stream) {
switch (change.operation) {
case "insert":
console.log(`New ${change.entity}: ${change.entityId}`);
break;
case "update":
console.log(`Updated ${change.entity}: ${change.entityId}`);
break;
case "delete":
console.log(`Deleted ${change.entity}: ${change.entityId}`);
break;
}
}
Start from Specific Position¶
// Start from a specific LSN (useful for recovery)
let stream = client.stream_changes()
.from_lsn(12345)
.await?;
// Start from a timestamp
let stream = client.stream_changes()
.from_timestamp(start_time)
.await?;
// Start from the beginning (full replay)
let stream = client.stream_changes()
.from_beginning()
.await?;
Filter by Entity¶
// Only stream specific entities
let stream = client.stream_changes()
.entities(vec!["User", "Order"])
.await?;
// Exclude entities
let stream = client.stream_changes()
.exclude_entities(vec!["Session", "Log"])
.await?;
Filter by Operation¶
// Only inserts
let stream = client.stream_changes()
.operations(vec![Operation::Insert])
.await?;
// Inserts and updates (no deletes)
let stream = client.stream_changes()
.operations(vec![Operation::Insert, Operation::Update])
.await?;
Checkpointing¶
Track your progress to resume after failures:
let mut last_lsn: u64 = load_checkpoint().unwrap_or(0);
let stream = client.stream_changes()
.from_lsn(last_lsn)
.await?;
for change in stream {
// Process change
process_change(&change).await?;
// Checkpoint periodically
if change.lsn % 1000 == 0 {
save_checkpoint(change.lsn)?;
}
last_lsn = change.lsn;
}
Use Cases¶
Search Index Sync¶
Keep Elasticsearch/Typesense in sync:
const stream = client.streamChanges({ entities: ["Product"] });
for await (const change of stream) {
switch (change.operation) {
case "insert":
case "update":
await searchClient.index("products", {
id: change.entityId,
...change.newData,
});
break;
case "delete":
await searchClient.delete("products", change.entityId);
break;
}
}
Cache Invalidation¶
let stream = client.stream_changes()
.entities(vec!["User", "Product"])
.await?;
for change in stream {
let cache_key = format!("{}:{}", change.entity, hex::encode(change.entity_id));
cache.invalidate(&cache_key).await?;
}
Event Publishing¶
const stream = client.streamChanges();
for await (const change of stream) {
const event = {
type: `${change.entity}.${change.operation}`,
timestamp: change.timestamp,
data: {
entityId: change.entityId,
old: change.oldData,
new: change.newData,
},
};
await messageQueue.publish("events", event);
}
Audit Log¶
let stream = client.stream_changes()
.from_beginning()
.await?;
for change in stream {
let audit_entry = AuditEntry {
timestamp: change.timestamp,
entity_type: change.entity,
entity_id: change.entity_id,
operation: change.operation,
old_values: change.old_data,
new_values: change.new_data,
// Add user context if available
user_id: extract_user_id(&change),
};
audit_log.append(audit_entry).await?;
}
Real-Time Notifications¶
const stream = client.streamChanges({ entities: ["Order"] });
for await (const change of stream) {
if (change.operation === "update") {
const oldStatus = change.oldData?.status;
const newStatus = change.newData?.status;
if (oldStatus !== newStatus && newStatus === "shipped") {
await sendNotification({
type: "order_shipped",
orderId: change.entityId,
userId: change.newData.userId,
});
}
}
}
Data Replication¶
// Replicate to secondary database
let stream = source_client.stream_changes()
.from_lsn(last_replicated_lsn)
.await?;
for change in stream {
match change.operation {
Operation::Insert => {
target_client.insert(&change.entity, change.new_data.unwrap()).await?;
}
Operation::Update => {
target_client.update(&change.entity, change.entity_id,
change.new_data.unwrap()).await?;
}
Operation::Delete => {
target_client.delete(&change.entity, change.entity_id).await?;
}
}
}
Batch Processing¶
Process changes in batches for efficiency:
let stream = client.stream_changes()
.batch_size(100)
.batch_timeout(Duration::from_secs(1))
.await?;
while let Some(batch) = stream.next_batch().await? {
// Process batch
let inserts: Vec<_> = batch.iter()
.filter(|c| c.operation == Operation::Insert)
.collect();
bulk_index_to_search(inserts).await?;
// Checkpoint at batch boundary
if let Some(last) = batch.last() {
save_checkpoint(last.lsn)?;
}
}
Configuration¶
Server Configuration¶
Retention Policy¶
# View CDC status
ormdb admin cdc status
# Output:
# CDC Status:
# Enabled: true
# Current LSN: 1,234,567
# Oldest LSN: 1,000,000
# Retention: 168 hours
# Disk usage: 256 MB
# Manually trim old changes
ormdb admin cdc trim --before-lsn 1100000
Error Handling¶
Reconnection¶
loop {
let result = client.stream_changes()
.from_lsn(last_lsn)
.await;
match result {
Ok(stream) => {
for change in stream {
match change {
Ok(c) => {
process_change(&c).await?;
last_lsn = c.lsn;
}
Err(e) => {
log::error!("Stream error: {}", e);
break; // Reconnect
}
}
}
}
Err(e) => {
log::error!("Connection error: {}", e);
}
}
// Backoff before reconnecting
tokio::time::sleep(Duration::from_secs(5)).await;
}
Handling Gaps¶
let stream = client.stream_changes()
.from_lsn(last_lsn)
.await?;
for change in stream {
if change.lsn > last_lsn + 1 {
log::warn!("Gap detected: {} -> {}", last_lsn, change.lsn);
// Handle gap (e.g., trigger full sync)
}
process_change(&change).await?;
last_lsn = change.lsn;
}
Best Practices¶
1. Always Checkpoint¶
2. Process Idempotently¶
// Handle reprocessing after failures
async function processChange(change: Change) {
// Use upsert instead of insert
await searchClient.upsert("products", {
id: change.entityId,
...change.newData,
});
}
3. Monitor Lag¶
let current_lsn = client.current_lsn().await?;
let lag = current_lsn - last_processed_lsn;
if lag > 10000 {
alert("CDC consumer is falling behind!");
}
4. Use Separate Consumers¶
# Run multiple consumers for different purposes
consumers:
- name: search-sync
entities: [Product, Category]
checkpoint: redis://search-checkpoint
- name: analytics
entities: [Order, User]
checkpoint: redis://analytics-checkpoint
- name: audit
entities: ["*"]
checkpoint: redis://audit-checkpoint
5. Handle Schema Changes¶
for change in stream {
// Check for schema compatibility
if !is_compatible_schema(&change) {
log::warn!("Schema change detected, restarting consumer");
break;
}
process_change(&change).await?;
}
API Reference¶
ChangeStreamBuilder¶
| Method | Description |
|---|---|
from_lsn(lsn) |
Start from specific LSN |
from_timestamp(ts) |
Start from timestamp |
from_beginning() |
Start from earliest available |
entities(list) |
Filter by entity types |
exclude_entities(list) |
Exclude entity types |
operations(list) |
Filter by operation types |
batch_size(n) |
Set batch size |
batch_timeout(d) |
Set batch timeout |
ChangeEntry Fields¶
| Field | Type | Description |
|---|---|---|
lsn |
u64 |
Log Sequence Number |
timestamp |
i64 |
Change timestamp |
entity |
String |
Entity type name |
entity_id |
[u8; 16] |
Entity UUID |
operation |
Operation |
Insert/Update/Delete |
old_data |
Option<EntityData> |
Previous values |
new_data |
Option<EntityData> |
New values |
Next Steps¶
- Real-time Dashboard Example - CDC in action
- Operations Guide - Production CDC deployment
- Monitoring - Monitor CDC consumers