Skip to content

Data Flow

This document details how data flows through Polymathy from request to response.

Request Lifecycle

1. Request Reception

sequenceDiagram
    participant C as Client
    participant A as Actix-web
    participant H as Handler

    C->>A: GET /v1/search?q=rust
    A->>H: Parse query params
    H->>H: Create SearchQuery

The request enters through Actix-web, which parses the query parameters into a SearchQuery struct.

sequenceDiagram
    participant H as Handler
    participant S as SearxNG

    H->>S: GET /search?q=rust&format=json
    S-->>H: JSON results with URLs

Polymathy forwards the search query to SearxNG:

let search_results: Value = client
    .get(searxng_url)
    .query(&[("q", &query.q), ("format", &"json".to_string())])
    .send()
    .await?
    .json()
    .await?;

3. URL Extraction

From the SearxNG response, Polymathy extracts URLs:

let urls: Vec<String> = search_results["results"]
    .as_array()
    .iter()
    .filter_map(|r| r["url"].as_str().map(String::from))
    .take(10)  // Max 10 URLs
    .collect();

4. Concurrent Content Processing

sequenceDiagram
    participant H as Handler
    participant P as Processor

    par For each URL
        H->>P: POST /v1/process
        P-->>H: ProcessedContent
    end

Each URL is processed concurrently:

let futures: Vec<_> = urls
    .iter()
    .map(|url| {
        async move {
            let response = client
                .post(&processor_url)
                .json(&json!({
                    "config": {
                        "chunking_size": 100,
                        "chunking_type": "words",
                        "embedding_model": "AllMiniLML6V2"
                    },
                    "url": url
                }))
                .send()
                .await?;

            response.json::<ProcessedContent>().await
        }
    })
    .collect();

let results = join_all(futures).await;

5. Content Processor Response

The processor returns a ProcessedContent structure:

{
  "url": "https://example.com/article",
  "config": {
    "chunking_type": "words",
    "chunking_size": 100,
    "embedding_model": "AllMiniLML6V2"
  },
  "chunks": {
    "chunk_0": "First chunk of content...",
    "chunk_1": "Second chunk of content...",
    "chunk_2": "Third chunk of content..."
  },
  "embeddings": {
    "chunk_0": [0.123, -0.456, ...],
    "chunk_1": [0.789, -0.012, ...],
    "chunk_2": [0.345, -0.678, ...]
  },
  "error": null
}

6. Chunk Collection

Chunks are collected into a thread-safe map:

let chunk_map: Arc<Mutex<HashMap<u64, (String, String)>>> =
    Arc::new(Mutex::new(HashMap::new()));

for (chunk_id, chunk_text) in &processed_content.chunks {
    if !processed_content.embeddings.contains_key(chunk_id) {
        continue;  // Skip chunks without embeddings
    }

    let key: u64 = *chunk_counter;
    chunk_map.lock().unwrap().insert(
        key,
        (processed_content.url.clone(), chunk_text.clone())
    );
    *chunk_counter += 1;
}

7. Response Formation

sequenceDiagram
    participant H as Handler
    participant C as Client

    H->>H: Serialize chunk_map
    H-->>C: JSON response

Final response structure:

{
  "0": ["https://example.com/article", "First chunk of content..."],
  "1": ["https://example.com/article", "Second chunk of content..."],
  "2": ["https://other-site.com/page", "Different content..."]
}

Error Handling

Errors are handled gracefully at each stage:

graph TD
    A[Request] --> B{SearxNG OK?}
    B -->|No| C[500 Error]
    B -->|Yes| D{URLs found?}
    D -->|No| E[Empty response]
    D -->|Yes| F{Process URLs}
    F -->|Failures| G[Skip failed URLs]
    F -->|Success| H[Collect chunks]
    H --> I[Return response]
    G --> H

Individual URL failures don't fail the entire request:

match client.post(&processor_url).send().await {
    Ok(response) => { /* process */ }
    Err(e) => {
        log::debug!("Error processing URL {}: {}", url, e);
        return Ok(());  // Continue with other URLs
    }
}

Performance Considerations

  1. Concurrent Processing: URLs are processed in parallel
  2. Early Termination: Skip processing if content is empty
  3. Limited URLs: Max 10 URLs per query
  4. Async I/O: Non-blocking network operations