Advanced Workflows¶
Advanced usage patterns and customizations for Direktor.
Custom Pipeline¶
Override Script Generation¶
from direktor.core.audio import generate_audio
from direktor.core.transcript import generate_transcript
from direktor.core.images import generate_image_prompts, generate_images
from direktor.core.video import create_video
from direktor.core.utils import create_temp_dir
def custom_pipeline(input_file, custom_script):
"""Run pipeline with a custom script."""
temp_dir = create_temp_dir(input_file)
# Save custom script
script_file = f"{temp_dir}/podcast_script.txt"
with open(script_file, "w") as f:
f.write(custom_script)
# Continue with remaining stages
audio_file = generate_audio(custom_script, temp_dir)
transcript = generate_transcript(audio_file, temp_dir)
prompts = generate_image_prompts(transcript, temp_dir)
images = generate_images(prompts, temp_dir)
video = create_video(audio_file, images, prompts, temp_dir)
return video
# Use with custom script
my_script = """
Hey everyone! Today I'm going to share something amazing with you.
Let's dive right into the fascinating world of quantum computing.
"""
video = custom_pipeline("input.txt", my_script)
Custom Image Style Pipeline¶
import json
from direktor.core.images import generate_images
from direktor.core.utils import create_temp_dir
def add_style_to_prompts(prompts, style):
"""Add a consistent style to all prompts."""
styled_prompts = []
for prompt in prompts:
styled_prompts.append({
"time": prompt["time"],
"prompt": f"{prompt['prompt']}, {style}"
})
return styled_prompts
# Load existing prompts
with open("temp/abc123/image_prompts.json", "r") as f:
prompts = json.load(f)
# Add cyberpunk style
style = "cyberpunk aesthetic, neon lights, rain-soaked streets, high contrast"
styled_prompts = add_style_to_prompts(prompts, style)
# Save and regenerate
with open("temp/abc123/image_prompts.json", "w") as f:
json.dump(styled_prompts, f)
images = generate_images(styled_prompts, "temp/abc123")
Integration Examples¶
Web Application Integration¶
from flask import Flask, request, jsonify
from direktor import generate_video
import threading
import uuid
app = Flask(__name__)
jobs = {}
def process_video(job_id, text):
"""Background video processing."""
try:
# Write text to temp file
input_file = f"/tmp/{job_id}.txt"
with open(input_file, "w") as f:
f.write(text)
generate_video(input_file)
jobs[job_id] = {"status": "completed"}
except Exception as e:
jobs[job_id] = {"status": "failed", "error": str(e)}
@app.route("/generate", methods=["POST"])
def start_generation():
text = request.json.get("text")
job_id = str(uuid.uuid4())
jobs[job_id] = {"status": "processing"}
thread = threading.Thread(target=process_video, args=(job_id, text))
thread.start()
return jsonify({"job_id": job_id})
@app.route("/status/<job_id>")
def check_status(job_id):
return jsonify(jobs.get(job_id, {"status": "not_found"}))
Webhook Notifications¶
import requests
from direktor import generate_video
def generate_with_webhook(input_file, webhook_url):
"""Generate video and notify via webhook."""
try:
generate_video(input_file)
requests.post(webhook_url, json={
"status": "success",
"input_file": input_file
})
except Exception as e:
requests.post(webhook_url, json={
"status": "failed",
"input_file": input_file,
"error": str(e)
})
generate_with_webhook("article.txt", "https://example.com/webhook")
Custom Content Processing¶
Pre-processing Pipeline¶
import re
from direktor import generate_video
from direktor.core.narrative import optimize_content
def preprocess_content(text):
"""Custom content preprocessing."""
# Remove URLs
text = re.sub(r'http\S+', '', text)
# Remove citations like [1], [2]
text = re.sub(r'\[\d+\]', '', text)
# Expand abbreviations
abbreviations = {
"AI": "artificial intelligence",
"ML": "machine learning",
"NLP": "natural language processing"
}
for abbr, full in abbreviations.items():
text = re.sub(rf'\b{abbr}\b', full, text)
return text
# Read and preprocess
with open("technical_article.txt", "r") as f:
raw_text = f.read()
processed_text = preprocess_content(raw_text)
optimized_text = optimize_content(processed_text)
# Save processed text
with open("processed_article.txt", "w") as f:
f.write(optimized_text)
# Generate video
generate_video("processed_article.txt")
Multi-Language Support¶
from openai import OpenAI
def translate_and_generate(input_file, target_language="Spanish"):
"""Translate content and generate video."""
client = OpenAI()
with open(input_file, "r") as f:
original_text = f.read()
# Translate
response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[
{
"role": "system",
"content": f"Translate the following text to {target_language}. Maintain the same tone and style."
},
{
"role": "user",
"content": original_text
}
]
)
translated_text = response.choices[0].message.content
# Save translated text
translated_file = f"translated_{input_file}"
with open(translated_file, "w") as f:
f.write(translated_text)
# Generate video
from direktor import generate_video
generate_video(translated_file)
Error Handling and Recovery¶
Robust Processing¶
import logging
from direktor import generate_video
from direktor.core.config import validate_env_vars
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def robust_generate(input_file, max_retries=3):
"""Generate video with retry logic."""
# Validate configuration first
try:
validate_env_vars()
except EnvironmentError as e:
logger.error(f"Configuration error: {e}")
return None
for attempt in range(max_retries):
try:
logger.info(f"Attempt {attempt + 1}/{max_retries}")
generate_video(input_file)
logger.info("Video generation successful")
return True
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
logger.info("Retrying...")
else:
logger.error("All retries exhausted")
return False
return False
robust_generate("article.txt")
Stage-Level Recovery¶
import os
from direktor import generate_video
def recover_and_continue(input_file):
"""Detect failed stage and continue from there."""
# Determine output directory
import hashlib
with open(input_file, "rb") as f:
file_hash = hashlib.md5(f.read()).hexdigest()
temp_dir = os.path.join("temp", file_hash)
# Check what stages are complete
stages = {
1: "podcast_script.txt",
2: "audio.mp3",
3: "transcript.json",
4: "image_prompts.json",
5: "images",
6: "output.mp4"
}
last_complete = 0
for stage, output in stages.items():
path = os.path.join(temp_dir, output)
if os.path.exists(path):
last_complete = stage
else:
break
if last_complete == 6:
print("Video already complete!")
return
# Resume from next stage
next_stage = last_complete + 1
print(f"Resuming from stage {next_stage}")
generate_video(input_file, stage=next_stage)
recover_and_continue("article.txt")