Work Scheduling¶
Learn how to use the NUMA-aware executor for work distribution.
Basic Usage¶
use numaperf::{NumaExecutor, Topology, StealPolicy};
use std::sync::Arc;
let topo = Arc::new(Topology::discover()?);
// Create executor
let exec = NumaExecutor::builder(Arc::clone(&topo))
.steal_policy(StealPolicy::LocalThenSocketThenRemote)
.workers_per_node(2)
.build()?;
// Submit work
exec.submit_to_node(NodeId::new(0), || {
println!("Running on node 0!");
});
// Shutdown when done
exec.shutdown();
Steal Policies¶
LocalOnly¶
Workers never steal from other nodes:
- Best locality - Work always runs on submitted node
- Risk: Idle workers if load is imbalanced
LocalThenSocketThenRemote (Default)¶
Tiered stealing - prefer nearby nodes:
let exec = NumaExecutor::builder(topo)
.steal_policy(StealPolicy::LocalThenSocketThenRemote)
.build()?;
- Balanced - Good locality with load balancing
- Recommended for most workloads
Any¶
Steal from any node:
- Best throughput - No idle workers
- Worst locality - Work may run far from data
Submitting Work¶
To Specific Node¶
Distribute Across Nodes¶
for (i, node) in topo.numa_nodes().iter().enumerate() {
let data = get_data_for_node(i);
exec.submit_to_node(node.id(), move || {
process(data);
});
}
Tracking Completion¶
With Atomics¶
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..100 {
let c = Arc::clone(&counter);
exec.submit_to_node(node_id, move || {
// Do work...
c.fetch_add(1, Ordering::SeqCst);
});
}
exec.shutdown();
println!("Completed: {}", counter.load(Ordering::SeqCst));
With Channels¶
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
for i in 0..100 {
let tx = tx.clone();
exec.submit_to_node(node_id, move || {
let result = compute(i);
tx.send(result).unwrap();
});
}
drop(tx); // Close sender
let results: Vec<_> = rx.iter().collect();
Configuration Options¶
let exec = NumaExecutor::builder(topo)
.steal_policy(StealPolicy::LocalThenSocketThenRemote)
.workers_per_node(4) // 4 workers per NUMA node
.hard_mode(HardMode::Strict) // Fail if pinning fails
.build()?;
Inspection¶
Shutdown¶
Pattern: Data-Local Processing¶
Submit work to the node that owns the data:
// Data partitioned by node
let node_data: Vec<Arc<Data>> = partition_data(&topo);
for (node_idx, data) in node_data.iter().enumerate() {
let node_id = topo.numa_nodes()[node_idx].id();
let data = Arc::clone(data);
exec.submit_to_node(node_id, move || {
// Process data on its local node
process(&data);
});
}
Pattern: Pipeline Processing¶
// Stage 1: Parse on any node
for chunk in input_chunks {
exec.submit_to_node(NodeId::new(0), move || {
let parsed = parse(chunk);
stage2_queue.push(parsed);
});
}
// Stage 2: Process on node 1
while let Some(item) = stage2_queue.pop() {
exec.submit_to_node(NodeId::new(1), move || {
process(item);
});
}
Best Practices¶
- Match work to data location - Submit to node that owns data
- Use LocalThenSocketThenRemote for most workloads
- Use LocalOnly when locality is critical
- Set workers_per_node based on CPU cores per node
- Track locality with StatsCollector