Buffer Pool Example¶
Per-node buffer pools for NUMA-local data access.
Overview¶
This example demonstrates:
- Creating per-node buffer pools
- NUMA-local memory allocation
- Simple bump allocator pattern
- Using
NumaShardedfor per-node state - Cache padding to prevent false sharing
Running the Example¶
Full Source Code¶
use numaperf::{
CachePadded, MemPolicy, NodeId, NodeMask, NumaRegion, NumaSharded, Prefault, Topology,
};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
/// A per-node buffer pool with NUMA-local allocation.
struct NodeBufferPool {
/// The NUMA node this pool belongs to.
node_id: NodeId,
/// The allocated memory region.
region: NumaRegion,
/// Allocation offset (for simple bump allocation).
offset: AtomicUsize,
}
impl NodeBufferPool {
/// Create a new buffer pool for a specific node.
fn new(node_id: NodeId, size: usize) -> Result<Self, numaperf::NumaError> {
let nodes = NodeMask::single(node_id);
let region = NumaRegion::anon(
size,
MemPolicy::Bind(nodes),
Default::default(),
Prefault::Touch
)?;
Ok(Self {
node_id,
region,
offset: AtomicUsize::new(0),
})
}
/// Allocate bytes from this pool (simple bump allocator).
fn alloc(&self, size: usize) -> Option<&mut [u8]> {
let offset = self.offset.fetch_add(size, Ordering::SeqCst);
if offset + size <= self.region.len() {
// Safety: we have exclusive access via atomic offset
let slice = unsafe {
std::slice::from_raw_parts_mut(
self.region.as_ptr().add(offset) as *mut u8,
size,
)
};
Some(slice)
} else {
None
}
}
/// Get remaining capacity.
fn remaining(&self) -> usize {
let offset = self.offset.load(Ordering::SeqCst);
self.region.len().saturating_sub(offset)
}
/// Reset the pool (reuse memory).
fn reset(&self) {
self.offset.store(0, Ordering::SeqCst);
}
}
fn main() -> Result<(), numaperf::NumaError> {
println!("=== numaperf: Buffer Pool Example ===\n");
// Discover topology
let topo = Arc::new(Topology::discover()?);
println!("Creating buffer pools for {} NUMA nodes", topo.node_count());
println!();
let pool_size = 1024 * 1024; // 1 MB per node
// Create per-node buffer pools
let mut node_pools = Vec::new();
for node in topo.numa_nodes() {
let pool = NodeBufferPool::new(node.id(), pool_size)?;
println!(
" Node {}: {} bytes allocated",
node.id(),
pool.region.len()
);
node_pools.push(CachePadded::new(pool));
}
println!();
// Demonstrate allocation from pools
println!("Allocating from pools:");
for (i, pool) in node_pools.iter().enumerate() {
// Allocate some data
if let Some(buf) = pool.alloc(1024) {
buf.fill(i as u8);
println!(
" Node {}: allocated 1024 bytes, {} remaining",
pool.node_id,
pool.remaining()
);
}
}
println!();
// Demonstrate sharded counter for tracking
println!("Using sharded counter for per-node statistics:");
let counters = NumaSharded::new(&topo, || AtomicUsize::new(0));
// Simulate work that updates node-local counters
for i in 0..10 {
let node_idx = i % topo.node_count();
if let Some(counter) = counters.get(topo.numa_nodes()[node_idx].id()) {
counter.fetch_add(1, Ordering::SeqCst);
}
}
// Read counters
let mut total = 0;
for (node_id, counter) in counters.iter() {
let count = counter.load(Ordering::SeqCst);
println!(" Node {}: {} operations", node_id, count);
total += count;
}
println!(" Total: {} operations", total);
println!();
// Reset and reuse pools
println!("Resetting pools for reuse...");
for pool in &node_pools {
pool.reset();
println!(
" Node {}: {} bytes available",
pool.node_id,
pool.remaining()
);
}
println!();
println!("Buffer pool example complete.");
Ok(())
}
Key Concepts¶
Per-Node Buffer Pools¶
Each NUMA node gets its own buffer pool with memory bound to that node:
fn new(node_id: NodeId, size: usize) -> Result<Self, NumaError> {
let nodes = NodeMask::single(node_id);
let region = NumaRegion::anon(
size,
MemPolicy::Bind(nodes), // Strictly on this node
Default::default(),
Prefault::Touch // Fault pages immediately
)?;
// ...
}
Bump Allocator¶
A simple lock-free bump allocator for fast allocation:
fn alloc(&self, size: usize) -> Option<&mut [u8]> {
let offset = self.offset.fetch_add(size, Ordering::SeqCst);
if offset + size <= self.region.len() {
// Return slice at offset
} else {
None // Out of space
}
}
Benefits:
- Lock-free (atomic offset)
- O(1) allocation
- No fragmentation (bump only)
- Bulk reset for reuse
Cache Padding¶
Prevent false sharing between pools:
CachePadded<T> ensures each pool occupies its own cache line (128 bytes), preventing cache line bouncing when different threads access different pools.
NumaSharded for Per-Node State¶
let counters = NumaSharded::new(&topo, || AtomicUsize::new(0));
// Access the local node's counter (fast path)
counters.local(|counter| {
counter.fetch_add(1, Ordering::Relaxed);
});
// Access a specific node's counter
counters.get(node_id).map(|counter| {
counter.fetch_add(1, Ordering::SeqCst);
});
Sample Output¶
=== numaperf: Buffer Pool Example ===
Creating buffer pools for 2 NUMA nodes
Node 0: 1048576 bytes allocated
Node 1: 1048576 bytes allocated
Allocating from pools:
Node 0: allocated 1024 bytes, 1047552 remaining
Node 1: allocated 1024 bytes, 1047552 remaining
Using sharded counter for per-node statistics:
Node 0: 5 operations
Node 1: 5 operations
Total: 10 operations
Resetting pools for reuse...
Node 0: 1048576 bytes available
Node 1: 1048576 bytes available
Buffer pool example complete.
Patterns¶
Thread-Local Pool Access¶
// In executor workers, access the local pool
let pool = &node_pools[current_node_index()];
let buf = pool.alloc(size)?;
Pool with Stats Tracking¶
struct TrackedPool {
pool: NodeBufferPool,
allocations: AtomicUsize,
bytes_allocated: AtomicUsize,
}
impl TrackedPool {
fn alloc(&self, size: usize) -> Option<&mut [u8]> {
let result = self.pool.alloc(size);
if result.is_some() {
self.allocations.fetch_add(1, Ordering::Relaxed);
self.bytes_allocated.fetch_add(size, Ordering::Relaxed);
}
result
}
}
Epoch-Based Reset¶
// Instead of individual allocations, use epochs
struct EpochPool {
pools: [NodeBufferPool; 2],
current_epoch: AtomicUsize,
}
impl EpochPool {
fn flip(&self) {
// Reset old pool and switch epochs
let old = self.current_epoch.fetch_xor(1, Ordering::SeqCst);
self.pools[old].reset();
}
}
When to Use This Pattern¶
- High-frequency allocations: Bump allocation is faster than malloc
- Deterministic latency: No heap fragmentation or allocation jitter
- NUMA-aware data processing: Data stays on the node where it's processed
- Network packet processing: Allocate buffers on the NIC's local node
Next Steps¶
- Diagnostics Example - Monitor locality effectiveness
- Memory Placement Example - Understand memory policies