Query Engine¶
The query engine transforms GraphQuery requests into executable plans and runs them against the storage engine.
Overview¶
┌─────────────────────────────────────────────────────────────────┐
│ Query Engine │
│ │
│ GraphQuery ──► Planner ──► QueryPlan ──► Executor ──► Result │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Catalog Cache Storage │
│ │ │
│ ┌─────────┼─────────┐ │
│ ▼ ▼ ▼ │
│ Filter Join Aggregate │
└─────────────────────────────────────────────────────────────────┘
Query Planner¶
The planner validates queries and produces execution plans.
Planning Steps¶
- Resolve root entity - Look up entity definition in catalog
- Validate fields - Ensure requested fields exist
- Plan includes - Resolve relations for each include
- Check depth - Enforce maximum include depth
- Estimate fanout - Calculate expected result size
- Optimize order - Reorder includes by estimated cost
QueryPlan Structure¶
pub struct QueryPlan {
/// Root entity type
pub root_entity: String,
/// Resolved entity definition
pub root_entity_def: EntityDef,
/// Fields to project (empty = all)
pub fields: Vec<String>,
/// Filter expression
pub filter: Option<FilterExpr>,
/// Sort order
pub order_by: Vec<OrderSpec>,
/// Pagination
pub pagination: Option<Pagination>,
/// Nested includes
pub includes: Vec<IncludePlan>,
/// Resource limits
pub budget: FanoutBudget,
}
IncludePlan Structure¶
pub struct IncludePlan {
/// Path from root (e.g., "posts.comments")
pub path: String,
/// Relation definition
pub relation: RelationDef,
/// Target entity definition
pub target_entity_def: EntityDef,
/// Fields to project
pub fields: Vec<String>,
/// Filter for related entities
pub filter: Option<FilterExpr>,
/// Sort order
pub order_by: Vec<OrderSpec>,
/// Pagination per parent
pub pagination: Option<Pagination>,
}
Include Depth¶
impl IncludePlan {
/// Depth: 1 for "posts", 2 for "posts.comments"
pub fn depth(&self) -> usize {
self.path.matches('.').count() + 1
}
pub fn is_top_level(&self) -> bool {
!self.path.contains('.')
}
pub fn parent_path(&self) -> Option<&str> {
self.path.rsplit_once('.').map(|(parent, _)| parent)
}
}
Fanout Budget¶
Budgets prevent runaway queries:
pub struct FanoutBudget {
/// Max entities across all blocks
pub max_entities: usize, // Default: 10,000
/// Max edges (relationships)
pub max_edges: usize, // Default: 50,000
/// Max include depth
pub max_depth: usize, // Default: 5
}
Budget Enforcement¶
// Check depth at planning time
if include.depth() > budget.max_depth {
return Err(Error::InvalidData(format!(
"Query depth {} exceeds maximum {}",
include.depth(), budget.max_depth
)));
}
Fanout Estimation¶
pub fn estimate_fanout(cardinality: Cardinality) -> usize {
match cardinality {
Cardinality::OneToOne => 1,
Cardinality::OneToMany => 10,
Cardinality::ManyToMany => 25,
}
}
Include Optimization¶
The planner optimizes include order for efficiency:
impl QueryPlan {
pub fn optimize_include_order(&mut self) {
// 1. Build dependency graph
let dependencies = build_dependencies(&self.includes);
// 2. Estimate cost per include
let costs = estimate_costs(&self.includes);
// 3. Topological sort with cost-based ordering
// Process cheaper includes first while respecting dependencies
self.includes = topological_sort_by_cost(
&self.includes,
&dependencies,
&costs
);
}
}
Example:
Before optimization:
posts.comments (depth 2, fanout 25)
posts (depth 1, fanout 10)
profile (depth 1, fanout 1)
After optimization:
profile (depth 1, fanout 1) ← Cheapest first
posts (depth 1, fanout 10) ← Required before posts.comments
posts.comments (depth 2, fanout 25)
Query Executor¶
The executor runs plans against storage.
Execution Flow¶
pub async fn execute(&self, plan: &QueryPlan) -> Result<QueryResult> {
// 1. Fetch root entities
let root_entities = self.fetch_root_entities(plan).await?;
// 2. Execute each include
let mut edge_blocks = Vec::new();
for include_plan in &plan.includes {
let edges = self.execute_include(
include_plan,
&root_entities,
&edge_blocks
).await?;
edge_blocks.push(edges);
}
// 3. Assemble result
Ok(QueryResult {
entities: root_entities,
edges: edge_blocks,
})
}
Root Entity Fetching¶
async fn fetch_root_entities(&self, plan: &QueryPlan) -> Result<EntityBlock> {
// 1. Check for index-optimized path
if let Some(filter) = &plan.filter {
if let Some(entity_ids) = self.try_index_lookup(filter).await? {
return self.fetch_by_ids(entity_ids).await;
}
}
// 2. Fall back to scan with filter
let entities = self.storage
.scan_entity_type(&plan.root_entity)
.filter(|e| self.evaluate_filter(&plan.filter, e))
.collect();
// 3. Apply ordering
if !plan.order_by.is_empty() {
entities.sort_by(|a, b| self.compare_by_order(&plan.order_by, a, b));
}
// 4. Apply pagination
if let Some(pagination) = &plan.pagination {
entities = entities
.skip(pagination.offset)
.take(pagination.limit)
.collect();
}
Ok(EntityBlock::new(&plan.root_entity, entities))
}
Filter Evaluation¶
Filter expressions are evaluated recursively:
pub fn evaluate(filter: &FilterExpr, entity: &Entity) -> bool {
match filter {
FilterExpr::Eq { field, value } => {
entity.get(field) == Some(value)
}
FilterExpr::Ne { field, value } => {
entity.get(field) != Some(value)
}
FilterExpr::Gt { field, value } => {
entity.get(field).map(|v| v > value).unwrap_or(false)
}
FilterExpr::Like { field, pattern } => {
entity.get_string(field)
.map(|s| match_pattern(s, pattern))
.unwrap_or(false)
}
FilterExpr::And { left, right } => {
evaluate(left, entity) && evaluate(right, entity)
}
FilterExpr::Or { left, right } => {
evaluate(left, entity) || evaluate(right, entity)
}
FilterExpr::Not { expr } => {
!evaluate(expr, entity)
}
// ... other operators
}
}
Join Strategies¶
The executor supports multiple join strategies:
Nested Loop Join¶
Simple O(N * M) algorithm for small datasets:
fn nested_loop_join(
parent_ids: &[[u8; 16]],
relation: &RelationDef,
storage: &StorageEngine,
) -> Result<Vec<([u8; 16], [u8; 16])>> {
let mut edges = Vec::new();
for parent_id in parent_ids {
// Fetch children for this parent
let children = storage.scan_entity_type(&relation.to_entity)
.filter(|c| c.get(&relation.to_field) == Some(parent_id))
.collect::<Vec<_>>();
for child in children {
edges.push((*parent_id, child.id()));
}
}
Ok(edges)
}
Hash Join¶
O(N + M) algorithm for larger datasets:
fn hash_join(
parent_ids: &[[u8; 16]],
relation: &RelationDef,
storage: &StorageEngine,
) -> Result<Vec<([u8; 16], [u8; 16])>> {
// 1. Build phase: Create hash map of parent IDs
let parent_set: HashSet<[u8; 16]> = parent_ids.iter().copied().collect();
// 2. Probe phase: Scan children and match
let edges = storage.scan_entity_type(&relation.to_entity)
.filter_map(|child| {
let fk = child.get(&relation.to_field)?;
if parent_set.contains(fk) {
Some((*fk, child.id()))
} else {
None
}
})
.collect();
Ok(edges)
}
Strategy Selection¶
fn select_join_strategy(
parent_count: usize,
estimated_child_count: usize,
) -> JoinStrategy {
if parent_count < 100 {
JoinStrategy::NestedLoop
} else {
JoinStrategy::Hash
}
}
Plan Caching¶
Query plans are cached by fingerprint:
Fingerprint Computation¶
fn compute_fingerprint(query: &GraphQuery) -> u64 {
let mut hasher = DefaultHasher::new();
// Hash structure, not values
query.root_entity.hash(&mut hasher);
query.fields.hash(&mut hasher);
if let Some(filter) = &query.filter {
// Hash filter shape, not parameter values
hash_filter_shape(filter, &mut hasher);
}
for include in &query.includes {
include.path.hash(&mut hasher);
include.fields.hash(&mut hasher);
}
hasher.finish()
}
Cache Usage¶
pub async fn execute_query(&self, query: &GraphQuery) -> Result<QueryResult> {
let fingerprint = compute_fingerprint(query);
// Try cache
let plan = if let Some(cached) = self.plan_cache.get(&fingerprint) {
cached.clone()
} else {
// Plan and cache
let plan = self.planner.plan(query)?;
self.plan_cache.insert(fingerprint, plan.clone());
plan
};
// Execute with query-specific values
self.executor.execute(&plan, query).await
}
Aggregations¶
The aggregate executor handles COUNT, SUM, AVG, MIN, MAX:
pub enum AggregateFunction {
Count,
Sum,
Avg,
Min,
Max,
}
pub fn execute_aggregate(
function: AggregateFunction,
field: &str,
entities: &[Entity],
) -> Value {
match function {
AggregateFunction::Count => {
Value::Int64(entities.len() as i64)
}
AggregateFunction::Sum => {
let sum: f64 = entities
.iter()
.filter_map(|e| e.get_number(field))
.sum();
Value::Float64(sum)
}
AggregateFunction::Avg => {
let values: Vec<f64> = entities
.iter()
.filter_map(|e| e.get_number(field))
.collect();
if values.is_empty() {
Value::Null
} else {
Value::Float64(values.iter().sum::<f64>() / values.len() as f64)
}
}
AggregateFunction::Min => {
entities
.iter()
.filter_map(|e| e.get(field))
.min()
.cloned()
.unwrap_or(Value::Null)
}
AggregateFunction::Max => {
entities
.iter()
.filter_map(|e| e.get(field))
.max()
.cloned()
.unwrap_or(Value::Null)
}
}
}
Columnar Optimization¶
For large datasets, aggregations use the columnar store:
pub fn execute_aggregate_columnar(
function: AggregateFunction,
field: &str,
entity_type: &str,
columnar: &ColumnarStore,
) -> Result<Value> {
let projection = columnar.projection(entity_type)?;
let column = projection.get_column(field)?;
// Operate directly on column data
match function {
AggregateFunction::Sum => column.sum(),
AggregateFunction::Avg => column.avg(),
// ...
}
}
Explain Service¶
Query plans can be explained for debugging:
pub fn explain(plan: &QueryPlan) -> ExplainResult {
ExplainResult {
root_entity: plan.root_entity.clone(),
estimated_rows: estimate_rows(plan),
access_path: determine_access_path(plan),
includes: plan.includes.iter().map(|i| ExplainInclude {
path: i.path.clone(),
strategy: determine_join_strategy(&i.relation),
estimated_fanout: estimate_fanout(i.relation.cardinality),
}).collect(),
warnings: collect_warnings(plan),
}
}
Example output:
Query Explain
─────────────
Root: User
Estimated rows: 1,000
Access path: TypeIndex scan
Includes:
1. posts (HashJoin, fanout ~10)
2. posts.comments (HashJoin, fanout ~25)
Warnings:
- No index on User.status, consider adding one
- Include depth 2 may result in large result set
Performance Tips¶
- Use indexes for filters - Hash index for equality, B-tree for ranges
- Limit include depth - Deep nesting multiplies result size
- Project only needed fields - Reduces data transfer
- Paginate root entities - Control result size
- Filter at the source - Include filters reduce join work
Next Steps¶
- Storage Engine - Data organization
- Index Internals - Index implementation
- Performance Guide - Optimization tips