Data Flow¶
This document details how data flows through Polymathy from request to response.
Request Lifecycle¶
1. Request Reception¶
sequenceDiagram
participant C as Client
participant A as Actix-web
participant H as Handler
C->>A: GET /v1/search?q=rust
A->>H: Parse query params
H->>H: Create SearchQuery
The request enters through Actix-web, which parses the query parameters into a SearchQuery struct.
2. SearxNG Search¶
sequenceDiagram
participant H as Handler
participant S as SearxNG
H->>S: GET /search?q=rust&format=json
S-->>H: JSON results with URLs
Polymathy forwards the search query to SearxNG:
let search_results: Value = client
.get(searxng_url)
.query(&[("q", &query.q), ("format", &"json".to_string())])
.send()
.await?
.json()
.await?;
3. URL Extraction¶
From the SearxNG response, Polymathy extracts URLs:
let urls: Vec<String> = search_results["results"]
.as_array()
.iter()
.filter_map(|r| r["url"].as_str().map(String::from))
.take(10) // Max 10 URLs
.collect();
4. Concurrent Content Processing¶
sequenceDiagram
participant H as Handler
participant P as Processor
par For each URL
H->>P: POST /v1/process
P-->>H: ProcessedContent
end
Each URL is processed concurrently:
let futures: Vec<_> = urls
.iter()
.map(|url| {
async move {
let response = client
.post(&processor_url)
.json(&json!({
"config": {
"chunking_size": 100,
"chunking_type": "words",
"embedding_model": "AllMiniLML6V2"
},
"url": url
}))
.send()
.await?;
response.json::<ProcessedContent>().await
}
})
.collect();
let results = join_all(futures).await;
5. Content Processor Response¶
The processor returns a ProcessedContent structure:
{
"url": "https://example.com/article",
"config": {
"chunking_type": "words",
"chunking_size": 100,
"embedding_model": "AllMiniLML6V2"
},
"chunks": {
"chunk_0": "First chunk of content...",
"chunk_1": "Second chunk of content...",
"chunk_2": "Third chunk of content..."
},
"embeddings": {
"chunk_0": [0.123, -0.456, ...],
"chunk_1": [0.789, -0.012, ...],
"chunk_2": [0.345, -0.678, ...]
},
"error": null
}
6. Chunk Collection¶
Chunks are collected into a thread-safe map:
let chunk_map: Arc<Mutex<HashMap<u64, (String, String)>>> =
Arc::new(Mutex::new(HashMap::new()));
for (chunk_id, chunk_text) in &processed_content.chunks {
if !processed_content.embeddings.contains_key(chunk_id) {
continue; // Skip chunks without embeddings
}
let key: u64 = *chunk_counter;
chunk_map.lock().unwrap().insert(
key,
(processed_content.url.clone(), chunk_text.clone())
);
*chunk_counter += 1;
}
7. Response Formation¶
sequenceDiagram
participant H as Handler
participant C as Client
H->>H: Serialize chunk_map
H-->>C: JSON response
Final response structure:
{
"0": ["https://example.com/article", "First chunk of content..."],
"1": ["https://example.com/article", "Second chunk of content..."],
"2": ["https://other-site.com/page", "Different content..."]
}
Error Handling¶
Errors are handled gracefully at each stage:
graph TD
A[Request] --> B{SearxNG OK?}
B -->|No| C[500 Error]
B -->|Yes| D{URLs found?}
D -->|No| E[Empty response]
D -->|Yes| F{Process URLs}
F -->|Failures| G[Skip failed URLs]
F -->|Success| H[Collect chunks]
H --> I[Return response]
G --> H
Individual URL failures don't fail the entire request:
match client.post(&processor_url).send().await {
Ok(response) => { /* process */ }
Err(e) => {
log::debug!("Error processing URL {}: {}", url, e);
return Ok(()); // Continue with other URLs
}
}
Performance Considerations¶
- Concurrent Processing: URLs are processed in parallel
- Early Termination: Skip processing if content is empty
- Limited URLs: Max 10 URLs per query
- Async I/O: Non-blocking network operations