diff --git a/docs/DEVELOPMENT_LOG.md b/docs/DEVELOPMENT_LOG.md new file mode 100644 index 0000000..037cb13 --- /dev/null +++ b/docs/DEVELOPMENT_LOG.md @@ -0,0 +1,424 @@ +# Momentry Core 開發日誌 + +> **文檔維護開始**:2026-03-18 +> **⚠️ 補充說明**:事後補記(2026-03-18 以前),僅供參考。未來紀錄將即時記錄,參考價值較高。 + +--- + +## 開發工具 + +### Coding LLM 模型 + +| 階段 | 工具 | 模型 | ID | 說明 | +|------|------|------|-----|------| +| **初期** | Claude CLI | - | - | 初始專案架構建立 | +| **中期** | OpenCode | big-pickle | opencode/big-pickle | 主要開發協作者 | + +**切換記錄**: +- 初期使用 Claude CLI 建立專案基本架構 +- 中期切換至 OpenCode (big-pickle) 進行主要功能開發 + +--- + +## 2026-03-17 + +### ML 模型選用 + +| Processor | 模型 | 版本/大小 | 說明 | +|----------|------|-----------|------| +| **ASR** | WhisperX (faster-whisper) | base, int8 | 語音識別 + 對話分段 | +| **CUT** | PySceneDetect | 0.6.7.1 | ContentDetector 場景檢測 | +| **YOLO** | YOLOv8n | yolov8n.pt (6.2MB) | 物體檢測(nano 版本最快) | +| **OCR** | EasyOCR | 1.7.2 | 文字識別 | +| **Face** | OpenCV Haar Cascade | built-in | 人臉檢測(無需額外下載) | +| **Pose** | YOLOv8n-Pose | yolov8n-pose.pt (6.5MB) | 姿態估計(nano 版本) | + +**模型下載**: +- YOLOv8n: `yolov8n.pt` (6.2MB) +- YOLOv8n-Pose: `yolov8n-pose.pt` (6.5MB) + +**Python 依賴**: +``` +torch==2.8.0 +whisperx==3.8.2 +ultralytics==8.4.23 +scenedetect==0.6.7.1 +easyocr==1.7.2 +opencv-python==4.13.0.92 +``` + +--- + +### ASR 實作完成 +- 完成 Python ML processor scripts(使用本地模型) + - `asrx_processor.py` - whisperx for speaker diarization + - `cut_processor.py` - PySceneDetect for scene detection + - `yolo_processor.py` - YOLOv8 for object detection + - `ocr_processor.py` - EasyOCR for text recognition + - `face_processor.py` - OpenCV Haar Cascade for face detection + - `pose_processor.py` - YOLOv8 Pose for pose estimation + +- 更新 `requirements.txt` with all dependencies +- 安裝完成:torch 2.8.0, whisperx 3.8.2, ultralytics 8.4.23, scenedetect 0.6.7.1, easyocr 1.7.2, opencv-python 4.13.0.92 +- 下載模型:YOLOv8n.pt (6.2MB), YOLOv8n-Pose.pt (6.5MB) + +### Async Streaming 實作 +- 更新 Rust processor modules 使用 async streaming 進行 real-time progress + - `src/core/processor/asr.rs` + - `src/core/processor/cut.rs` + - `src/core/processor/yolo.rs` + - `src/core/processor/ocr.rs` + - `src/core/processor/face.rs` + - `src/core/processor/pose.rs` + +### 測試結果 +- 測試影片:BigBuckBunny_320x180.mp4 +- ASR: 4 segments +- CUT: 134 scenes +- YOLO: 14315 frames(每幀處理耗時) +- OCR: 40 frames with text +- Face: 44 frames with faces +- Pose: Timeout + +--- + +### Warning 清理 +修復 clippy warnings: +- 移除未使用的 imports (HashMap in mongodb_db.rs, postgres_db.rs) +- 新增 `#[allow(dead_code)]` 標註未使用變數 +- 新增 `Default` implementation for MongoDb, QdrantDb +- 將 `probe` module 重新命名為 `ffprobe` +- 新增 `player` feature in Cargo.toml +- 修復 `format_in_format_args` 警告 + +--- + +### TUI Progress Window 實作 +建立新的 UI module: +- 建立 `src/ui/mod.rs` +- 建立 `src/ui/progress/mod.rs` + +實作功能: +- ProcessorProgress 結構(追蹤每個 processor 狀態) +- ProgressState 結構(管理所有 processors) +- ProgressUi 結構(ratatui TUI 渲染) +- 整合到 `src/main.rs` 的 process 命令 + +TUI 顯示: +``` +┌ Processing: BigBuckBunny_320x180.mp4 ────────────────────────────────────────┐ +│ ASR [████████████] 100% (4 segs) │ +│ CUT [████████████] 100% (134 scenes) │ +│ ASRX [████████████] 100% (0 segs) │ +│ YOLO [██░░░░░░░░░░░] 30% (4200/14315) ETA 2:30 │ +│ OCR [---------] 0% │ +│ Face [---------] 0% │ +│ Pose [---------] 0% │ +└──────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +### 輸出位置討論 +討論 stdout vs stderr vs TUI 的輸出配置: +- 最終結果 → stdout +- Python progress → 需改用 Redis Pub/Sub +- TUI Progress → stderr (ratatui) + +--- + +## 2026-03-18 + +### Redis Message Bus 設計 +討論使用 Redis 作為消息總線,分離 Python 輸出與 Rust TUI 顯示。 + +設計重點: +1. 頻道命名:`momentry:progress:{uuid}` +2. 本地 Redis:`localhost:6379` +3. 失敗策略:完全失效(因 stdout 問題未解決) + +### UUID 使用時機分析 +分析 Redis Key 上使用 UUID 的時機: + +**全局 Keys(無 UUID)**: +- health, stats, jobs 管理 + +**Per-Video Keys(UUID 必要)**: +- job:{uuid}, progress:{uuid}, metrics:{uuid} + +**Per-Processor Keys(UUID + Processor 必要)**: +- job:{uuid}:processor:{name} + +### 備份系統整合 +參考 `docs/SERVICE_ADDITION_GUIDE.md` 設計規範,規劃 OutputDir 模組: + +1. **環境變數**: + - `MOMENTRY_OUTPUT_DIR` - JSON 輸出目錄 + - `MOMENTRY_BACKUP_DIR` - 備份目錄(預設:`/Users/accusys/momentry/backup/momentry`) + - `MOMENTRY_BACKUP_ENABLED` - 啟用備份 + +2. **命名格式**: + - 備份格式:`momentry_data_{YYYYMMDD}_{HHMMSS}_{uuid}.{ext}` + - 校驗和:`{filename}.sha256` + +3. **CLI 命令**: + - `cargo run -- backup list` - 列出備份 + - `cargo run -- backup cleanup` - 清理舊備份 + - `cargo run -- backup verify` - 驗證備份 + +--- + +### 監控系統整合 +討論將 momentry_core 納入監控系統: + +1. **Layer 2: Service 監控** + - 新增 momentry_core CLI 檢查 + +2. **Layer 7: Backup 監控** + - 新增 momentry 備份配置 + +3. **Redis 監控** + - 健康檢查 + - Job 狀態監控 + - 即時進度監控 + +--- + +## 實作完成項目 + +### 程式碼變更 + +| 日期 | 檔案 | 變更 | +|------|------|------| +| 2026-03-17 | `src/core/processor/*.rs` | Async streaming 更新 | +| 2026-03-17 | `src/ui/mod.rs` | 新增 UI module | +| 2026-03-17 | `src/ui/progress/mod.rs` | 新增 Progress TUI | +| 2026-03-17 | `src/main.rs` | 整合 Progress UI | +| 2026-03-18 | `src/core/storage/output_dir.rs` | 新增 OutputDir 模組 | +| 2026-03-18 | `src/core/storage/mod.rs` | 新增 output_dir export | +| 2026-03-18 | `src/core/db/redis_client.rs` | 新增 Redis 客戶端(Hash + Pub/Sub) | +| 2026-03-18 | `src/core/db/mod.rs` | 新增 redis_client export | + +### 新增檔案 + +| 日期 | 檔案 | 說明 | +|------|------|------| +| 2026-03-18 | `docs/MOMENTRY_CORE_REDIS_KEYS.md` | Redis Key 設計規範 | +| 2026-03-18 | `docs/MOMENTRY_CORE_MONITORING.md` | 監控規範(暫定) | +| 2026-03-18 | `scripts/redis_publisher.py` | Redis 訊息發布模組 | + +### 更新檔案 + +| 日期 | 檔案 | 說明 | +|------|------|------| +| 2026-03-17 | `Cargo.toml` | 新增 player feature | +| 2026-03-17 | `src/lib.rs` | 新增 ui module exports | +| 2026-03-18 | `docs/PENDING_ISSUES.md` | 新增問題 #2, #3 | +| 2026-03-18 | `src/core/storage/output_dir.rs` | 預設改為 `./output` | +| 2026-03-18 | `scripts/yolo_processor.py` | 新增 --uuid 參數 + Redis | +| 2026-03-18 | `scripts/cut_processor.py` | 新增 Redis | +| 2026-03-18 | `scripts/ocr_processor.py` | 新增 Redis | +| 2026-03-18 | `scripts/face_processor.py` | 新增 --uuid 參數 + Redis | +| 2026-03-18 | `scripts/pose_processor.py` | 新增 --uuid 參數 + Redis | +| 2026-03-18 | `scripts/asr_processor.py` | 新增 --uuid 參數 + Redis | +| 2026-03-18 | `scripts/asrx_processor.py` | 新增 --uuid 參數 + Redis | +| 2026-03-18 | `requirements.txt` | 新增 redis>=5.0.0 | +| 2026-03-18 | `src/core/processor/yolo.rs` | 新增 uuid 參數 | +| 2026-03-18 | `src/core/processor/cut.rs` | 新增 uuid 參數 | +| 2026-03-18 | `src/core/processor/ocr.rs` | 新增 uuid 參數 | +| 2026-03-18 | `src/core/processor/face.rs` | 新增 uuid 參數 | +| 2026-03-18 | `src/core/processor/pose.rs` | 新增 uuid 參數 | +| 2026-03-18 | `src/core/processor/asr.rs` | 新增 uuid 參數 | +| 2026-03-18 | `src/core/processor/asrx.rs` | 新增 uuid 參數 | +| 2026-03-18 | `src/main.rs` | 更新所有 processor 調用傳入 uuid | +| 2026-03-18 | `Cargo.toml` | 新增 futures-util 依賴 | +| 2026-03-18 | `src/core/db/redis_client.rs` | 新增 subscribe_and_callback 方法,密碼認證 | +| 2026-03-18 | `src/ui/progress/mod.rs` | 新增 update_from_redis 方法 | +| 2026-03-18 | `scripts/redis_publisher.py` | 新增密碼認證支援 | +| 2026-03-18 | 測試 | Redis Pub/Sub 成功運作 | + +--- + +## 待解決問題 + +### 問題 #1: sqlx async INSERT 不會實際寫入數據庫 +- 狀態:待解決 +- 影響:`store_vector` 函數,PVector 存儲 + +### 問題 #2: TUI 與 stdout 輸出混合 +- 狀態:已解決 +- 解決方案:使用 Redis Message Bus +- 進度: + - ✅ Redis 客戶端 (`src/core/db/redis_client.rs`) + - ✅ Python redis_publisher.py + - ✅ 所有 Python processors 更新完成 + - ✅ 所有 Rust processor 函數更新完成 + - ✅ main.rs 調用更新完成 + - ✅ Rust TUI Redis 訂閱已完成 + +### 問題 #3: Redis Message Bus 尚未實作 +- 狀態:已解決 +- 詳細設計:參考 `docs/MOMENTRY_CORE_REDIS_KEYS.md` +- 進度:Python 端 + Rust 端均已完成 + +--- + +## 環境變數 + +```bash +# 輸出目錄 +MOMENTRY_OUTPUT_DIR=./output # 預設 + +# 備份 +MOMENTRY_BACKUP_ENABLED=false # 預設 +MOMENTRY_BACKUP_DIR=/Users/accusys/momentry/backup/momentry + +# Redis(未來實作) +REDIS_URL=redis://localhost:6379 +REDIS_PASSWORD=accusys +``` + +--- + +## 數據庫 + +- PostgreSQL: `postgres://accusys@localhost:5432/momentry` +- Redis: `localhost:6379`(待實作) +- Qdrant: `localhost:6333` + +--- + +## 指令範例 + +```bash +# 註冊視頻 +cargo run -- register /path/to/video.mp4 + +# 處理視頻 +cargo run -- process + +# 列出備份 +cargo run -- backup list + +# 清理備份 +cargo run -- backup cleanup + +# 驗證備份 +cargo run -- backup verify + +# 查看狀態 +cargo run -- status + +# API Server +cargo run -- server --host 0.0.0.0 --port 3000 +``` + +--- + +## 2026-03-18 (進行中) + +### Redis Message Bus 實作 + +**問題**:TUI 與 Python stdout 輸出混合,導致 TUI 顯示混亂 + +**解決方案**:使用 Redis Pub/Sub 作為訊息匯流排 + +**實作內容**: + +| 元件 | 檔案 | 狀態 | +|------|------|------| +| Redis 客戶端 | `src/core/db/redis_client.rs` | ✅ | +| Progress 訂閱 | `src/main.rs` | ✅ | +| UI 更新 | `src/ui/progress/mod.rs` | ✅ | +| Python Publisher | `scripts/redis_publisher.py` | ✅ | +| Python Processors | 7 個 `scripts/*_processor.py` | ✅ | +| Rust 函數 | `src/core/processor/*.rs` | ✅ | + +**流程**: +``` +Python Processor ──(Redis Pub)──> Redis ──(Subscribe)──> Rust TUI +``` + +**測試結果**: +- Redis 連線 ✅ +- 密碼認證 ✅ +- 即時進度發布 ✅ +- TUI 即時更新 ✅ + +**新增依賴**: +- `futures-util = "0.3"` (Cargo.toml) +- `redis >= 5.0.0` (requirements.txt) + +--- + +## 2026-03-18 (HTTP API) + +### HTTP API 實作 + +**問題**:TUI 運作正常但使用者偏好 HTTP API 來查詢進度 + +**解決方案**:建立 HTTP 端點 + Redis Hash 儲存 + +**實作內容**: + +| 元件 | 檔案 | 變更 | +|------|------|------| +| HTTP 端點 | `src/api/server.rs` | 新增 `/api/v1/progress/:uuid` | +| Redis Hash 查詢 | `src/core/db/redis_client.rs` | 新增 `get_processor_status` 方法 | +| Progress 儲存 | `src/main.rs` | 新增 Redis HSET 儲存進度 | + +**API 端點**: +``` +GET /api/v1/progress/:uuid + +Response: +{ + "uuid": "5dea6618a606e7c7", + "processors": [ + {"name": "asr", "status": "complete", "current": 0, "total": 0, "message": "7 segments"}, + {"name": "cut", "status": "complete", "current": 134, "total": 134, "message": "134 scenes"}, + {"name": "yolo", "status": "complete", "current": 14300, "total": 14315, "message": "..."}, + ... + ] +} +``` + +**流程**: +``` +Python Processor ──(Redis Pub)──> Redis ──(Subscribe)──> Rust TUI + └──(HSET)──> Redis Hash + │ +HTTP Client ──(GET /progress/:uuid)──> Rust API ─(HGETALL)──> Redis Hash +``` + +**測試結果**: +- ✅ 編譯成功 +- ✅ API 伺服器啟動 (port 3002) +- ✅ 即時進度查詢 +- ✅ 完整流程測試 (BigBuckBunny_320x180.mp4) + +**除錯記錄**: +1. 語法錯誤:main.rs 有重複程式碼區塊 (lines 297-322),已移除 +2. DB 連線池:從 5 增加到 10 個連線 +3. PostgreSQL 狀態:處理 shutdown 狀態,殺掉 stale 連線 + +**新增變更**: +- `src/api/server.rs` - 新增進度端點 +- `src/core/db/redis_client.rs` - 新增 `get_processor_status` 方法 +- `src/core/db/postgres_db.rs` - 連線池 5→10 +- `src/main.rs` - Redis Hash 儲存 + 語法修復 + +**使用方式**: +```bash +# 啟動 API 伺服器 +cargo run --bin momentry -- server --host 127.0.0.1 --port 3002 + +# 註冊影片 +cargo run --bin momentry -- register ~/test_video/BigBuckBunny_320x180.mp4 + +# 處理影片 +cargo run --bin momentry -- process + +# 查詢進度 +curl http://127.0.0.1:3002/api/v1/progress/ +``` diff --git a/src/api/server.rs b/src/api/server.rs index 8d07abf..e62d138 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -1,20 +1,389 @@ -// Placeholder for API server +use axum::{ + extract::{Query, State}, + http::StatusCode, + response::Json, + routing::{get, post}, + Router, +}; +use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; +use std::path::Path; +use std::sync::Arc; + +use crate::core::db::{Database, PostgresDb, QdrantDb, RedisClient, VideoRecord}; +use crate::{Embedder, FileManager}; + +#[derive(Clone)] +struct AppState { + embedder: Arc, + #[allow(dead_code)] + embedder_model: String, +} + +#[derive(Debug, Deserialize)] +struct RegisterRequest { + path: String, +} + +#[derive(Debug, Serialize)] +struct RegisterResponse { + uuid: String, + video_id: i64, + file_name: String, + duration: f64, + width: u32, + height: u32, +} + +#[derive(Debug, Deserialize)] +struct SearchRequest { + query: String, + limit: Option, + uuid: Option, +} + +#[derive(Debug, Serialize)] +struct SearchResult { + uuid: String, + chunk_id: String, + chunk_type: String, + start_time: f64, + end_time: f64, + text: String, + score: f32, +} + +#[derive(Debug, Serialize)] +struct SearchResponse { + results: Vec, + query: String, +} + +#[derive(Debug, Deserialize)] +struct LookupQuery { + path: Option, + uuid: Option, +} + +#[derive(Debug, Serialize)] +struct LookupResponse { + uuid: String, + file_path: Option, + file_name: Option, + duration: Option, +} + +#[derive(Debug, Serialize)] +struct VideoInfoResponse { + uuid: String, + file_path: String, + file_name: String, + duration: f64, + width: u32, + height: u32, +} + +#[derive(Debug, Serialize)] +struct VideosResponse { + videos: Vec, +} + +async fn register( + State(_state): State, + Json(req): Json, +) -> Result, StatusCode> { + let path = req.path; + + let uuid = crate::uuid::compute_uuid_from_path(&path); + + let probe_result = + crate::core::probe::probe_video(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let duration = probe_result + .format + .duration + .as_ref() + .and_then(|s: &String| s.parse::().ok()) + .unwrap_or(0.0); + + let mut width = 0u32; + let mut height = 0u32; + + for stream in &probe_result.streams { + if stream.codec_type.as_deref() == Some("video") { + width = stream.width.unwrap_or(0); + height = stream.height.unwrap_or(0); + } + } + + let file_manager = FileManager::new(std::path::PathBuf::from(".")); + let json_str = + serde_json::to_string(&probe_result).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let _json_path = file_manager + .save_json(&uuid, "probe", &json_str) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let db = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let file_path = Path::new(&path) + .canonicalize() + .map(|p| p.to_string_lossy().to_string()) + .unwrap_or_else(|_| path.clone()); + + let file_name = Path::new(&path) + .file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_default(); + + let record = VideoRecord { + id: 0, + uuid: uuid.clone(), + file_path, + file_name: file_name.clone(), + duration, + width, + height, + fps: 0.0, + probe_json: Some(json_str), + storage: Default::default(), + created_at: String::new(), + }; + + let video_id = db + .register_video(&record) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + Ok(Json(RegisterResponse { + uuid, + video_id, + file_name, + duration, + width, + height, + })) +} + +async fn search( + State(state): State, + Json(req): Json, +) -> Result, StatusCode> { + let limit = req.limit.unwrap_or(10); + + let query_vector = state.embedder.embed_query(&req.query).await.map_err(|e| { + tracing::error!("Failed to embed query: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let qdrant: QdrantDb = QdrantDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let pg: PostgresDb = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let search_results = if let Some(uuid) = &req.uuid { + let query_f64: Vec = query_vector.iter().map(|&x| x as f64).collect(); + qdrant + .search_in_uuid(&query_f64, uuid, limit) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + } else { + qdrant + .search(&query_vector, limit) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + }; + + let mut results = Vec::new(); + for r in search_results { + let chunks = pg.get_chunks_by_uuid(&r.chunk_id).await.unwrap_or_default(); + + for chunk in chunks { + let text = chunk + .content + .get("text") + .and_then(|v: &serde_json::Value| v.as_str()) + .unwrap_or("") + .to_string(); + + results.push(SearchResult { + uuid: chunk.uuid.clone(), + chunk_id: chunk.chunk_id.clone(), + chunk_type: chunk.chunk_type.as_str().to_string(), + start_time: chunk.start_time, + end_time: chunk.end_time, + text, + score: r.score, + }); + } + } + + Ok(Json(SearchResponse { + results, + query: req.query, + })) +} + +async fn lookup(Query(query): Query) -> Result, StatusCode> { + let db: PostgresDb = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + if let Some(path) = query.path { + let uuid = crate::uuid::compute_uuid_from_path(&path); + return Ok(Json(LookupResponse { + uuid, + file_path: None, + file_name: None, + duration: None, + })); + } + + if let Some(uuid) = query.uuid { + let video = db + .get_video_by_uuid(&uuid) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + if let Some(v) = video { + return Ok(Json(LookupResponse { + uuid: v.uuid, + file_path: Some(v.file_path), + file_name: Some(v.file_name), + duration: Some(v.duration), + })); + } + } + + Err(StatusCode::NOT_FOUND) +} pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { - println!("Starting API server at {}:{}", host, port); - // TODO: Implement Axum server - // - // Routes: - // POST /api/v1/register - // POST /api/v1/process - // POST /api/v1/chunk - // POST /api/v1/vectorize - // POST /api/v1/store - // POST /api/v1/watch - // DELETE /api/v1/watch/{path} - // GET /api/v1/lookup?path=... - // GET /api/v1/resolve?uuid=... - // GET /api/v1/status/{uuid} - // POST /api/v1/query + let embedder = Arc::new(Embedder::new("nomic-embed-text:v1.5".to_string())); + + let state = AppState { + embedder, + embedder_model: "nomic-embed-text:v1.5".to_string(), + }; + + let app = Router::new() + .route("/api/v1/register", post(register)) + .route("/api/v1/search", post(search)) + .route("/api/v1/lookup", get(lookup)) + .route("/api/v1/videos", get(list_videos)) + .route("/api/v1/progress/:uuid", get(get_progress)) + .with_state(state); + + let addr = SocketAddr::new(host.parse().unwrap(), port); + tracing::info!("Starting API server at http://{}", addr); + + let listener = tokio::net::TcpListener::bind(addr).await?; + axum::serve(listener, app).await?; + Ok(()) } + +async fn list_videos() -> Result, StatusCode> { + let db: PostgresDb = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let videos = db + .list_videos() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let video_infos: Vec = videos + .into_iter() + .map(|v| VideoInfoResponse { + uuid: v.uuid, + file_path: v.file_path, + file_name: v.file_name, + duration: v.duration, + width: v.width, + height: v.height, + }) + .collect(); + + Ok(Json(VideosResponse { + videos: video_infos, + })) +} + +#[derive(Debug, Serialize)] +struct ProgressResponse { + uuid: String, + processors: Vec, +} + +#[derive(Debug, Serialize)] +struct ProcessorProgressInfo { + name: String, + status: String, + current: u32, + total: u32, + message: String, +} + +async fn get_progress( + axum::extract::Path(uuid): axum::extract::Path, +) -> Result, StatusCode> { + let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let mut conn = redis + .get_conn() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let processor_names = ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"]; + let mut processors = Vec::new(); + + for name in processor_names { + let key = format!("momentry:job:{}:processor:{}", uuid, name); + + let status: String = redis::cmd("HGET") + .arg(&key) + .arg("status") + .query_async(&mut conn) + .await + .unwrap_or_else(|_| "pending".to_string()); + + let current: u32 = redis::cmd("HGET") + .arg(&key) + .arg("current") + .query_async(&mut conn) + .await + .unwrap_or_else(|_| "0".to_string()) + .parse() + .unwrap_or(0); + + let total: u32 = redis::cmd("HGET") + .arg(&key) + .arg("total") + .query_async(&mut conn) + .await + .unwrap_or_else(|_| "0".to_string()) + .parse() + .unwrap_or(0); + + let message: String = redis::cmd("HGET") + .arg(&key) + .arg("message") + .query_async(&mut conn) + .await + .unwrap_or_else(|_| "".to_string()); + + processors.push(ProcessorProgressInfo { + name: name.to_string(), + status, + current, + total, + message, + }); + } + + Ok(Json(ProgressResponse { uuid, processors })) +} diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 68cc64d..ac7146c 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -1,12 +1,23 @@ use anyhow::Result; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use sqlx::{PgPool, Row}; +use sqlx::{postgres::PgPoolOptions, PgPool, Row}; use std::sync::Arc; use tokio::sync::RwLock; use super::Database; -use crate::core::chunk::{Chunk, ChunkType}; +use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType}; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct StorageStatus { + pub fs_video: bool, + pub fs_json: bool, + pub psql_chunk: bool, + pub pobject_chunk: bool, + pub mobject_chunk: bool, + pub pvector_chunk: bool, + pub qvector_chunk: bool, +} #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VideoRecord { @@ -19,6 +30,40 @@ pub struct VideoRecord { pub height: u32, pub fps: f64, pub probe_json: Option, + pub storage: StorageStatus, + pub created_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PreChunk { + pub id: i64, + pub file_id: i64, + pub source_type: String, + pub source_file: Option, + pub chunk_type: String, + pub start_time: f64, + pub end_time: f64, + pub start_frame: i64, + pub end_frame: i64, + pub fps: f64, + pub raw_json: serde_json::Value, + pub text_content: Option, + pub processed: bool, + pub chunk_id: Option, + pub created_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Frame { + pub id: i64, + pub file_id: i64, + pub frame_number: i64, + pub timestamp: f64, + pub fps: f64, + pub yolo_objects: Option, + pub ocr_results: Option, + pub face_results: Option, + pub frame_path: Option, pub created_at: String, } @@ -30,12 +75,17 @@ pub struct PostgresDb { #[derive(Debug, Default)] pub struct PostgresCache { videos: std::collections::HashMap, + #[allow(dead_code)] chunks: std::collections::HashMap>, } impl PostgresDb { pub async fn new(database_url: &str) -> Result { - let pool = PgPool::connect(database_url).await?; + let pool_options = PgPoolOptions::new() + .max_connections(10) + .acquire_timeout(std::time::Duration::from_secs(60)); + + let pool = pool_options.connect(database_url).await?; let db = Self { pool, @@ -49,8 +99,8 @@ impl PostgresDb { pub async fn register_video(&self, record: &VideoRecord) -> Result { let result = sqlx::query( r#" - INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, TRUE) ON CONFLICT (uuid) DO UPDATE SET file_path = EXCLUDED.file_path, file_name = EXCLUDED.file_name, @@ -59,6 +109,7 @@ impl PostgresDb { height = EXCLUDED.height, fps = EXCLUDED.fps, probe_json = EXCLUDED.probe_json, + fs_video = TRUE, updated_at = CURRENT_TIMESTAMP RETURNING id::bigint "# @@ -94,8 +145,8 @@ impl PostgresDb { } } - let result = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option)>( - "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json FROM videos WHERE uuid = $1" + let result = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option, bool, bool, bool, bool, bool, bool, bool)>( + "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk FROM videos WHERE uuid = $1" ) .bind(uuid) .fetch_optional(&self.pool) @@ -104,7 +155,7 @@ impl PostgresDb { if let Some(r) = result { let video = VideoRecord { id: r.0 as i64, - uuid: r.1, + uuid: r.1.clone(), file_path: r.2, file_name: r.3, duration: r.4, @@ -112,6 +163,15 @@ impl PostgresDb { height: r.6 as u32, fps: r.7, probe_json: r.8, + storage: StorageStatus { + fs_video: r.9, + fs_json: r.10, + psql_chunk: r.11, + pobject_chunk: r.12, + mobject_chunk: r.13, + pvector_chunk: r.14, + qvector_chunk: r.15, + }, created_at: String::new(), }; @@ -126,8 +186,8 @@ impl PostgresDb { } pub async fn list_videos(&self) -> Result> { - let rows = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option)>( - "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json FROM videos ORDER BY id DESC" + let rows = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option, bool, bool, bool, bool, bool, bool, bool)>( + "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk FROM videos ORDER BY id DESC" ) .fetch_all(&self.pool) .await?; @@ -144,6 +204,15 @@ impl PostgresDb { height: r.6 as u32, fps: r.7, probe_json: r.8, + storage: StorageStatus { + fs_video: r.9, + fs_json: r.10, + psql_chunk: r.11, + pobject_chunk: r.12, + mobject_chunk: r.13, + pvector_chunk: r.14, + qvector_chunk: r.15, + }, created_at: String::new(), }) .collect(); @@ -151,6 +220,69 @@ impl PostgresDb { Ok(videos) } + pub async fn update_storage_status(&self, uuid: &str, field: &str, value: bool) -> Result<()> { + let column = match field { + "fs_video" => "fs_video", + "fs_json" => "fs_json", + "psql_chunk" => "psql_chunk", + "pobject_chunk" => "pobject_chunk", + "mobject_chunk" => "mobject_chunk", + "pvector_chunk" => "pvector_chunk", + "qvector_chunk" => "qvector_chunk", + _ => return Err(anyhow::anyhow!("Invalid storage field: {}", field)), + }; + + sqlx::query(&format!( + "UPDATE videos SET {} = $1, updated_at = CURRENT_TIMESTAMP WHERE uuid = $2", + column + )) + .bind(value) + .bind(uuid) + .execute(&self.pool) + .await?; + + // Invalidate cache + let mut cache = self.cache.write().await; + cache.videos.remove(uuid); + + Ok(()) + } + + pub async fn get_storage_status(&self, uuid: &str) -> Result> { + if let Some(video) = self.get_video_by_uuid(uuid).await? { + Ok(Some(video.storage)) + } else { + Ok(None) + } + } + + pub async fn get_chunk_count(&self, uuid: &str) -> Result<(i64, i64)> { + let sentence_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM chunks WHERE uuid = $1 AND chunk_type = 'sentence'", + ) + .bind(uuid) + .fetch_one(&self.pool) + .await?; + + let time_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM chunks WHERE uuid = $1 AND chunk_type = 'time_based'", + ) + .bind(uuid) + .fetch_one(&self.pool) + .await?; + + Ok((sentence_count, time_count)) + } + + pub async fn get_vector_count(&self, uuid: &str) -> Result { + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM chunk_vectors WHERE uuid = $1") + .bind(uuid) + .fetch_one(&self.pool) + .await?; + + Ok(count) + } + async fn init_schema(&self) -> Result<()> { sqlx::query( r#" @@ -164,6 +296,13 @@ impl PostgresDb { height INTEGER, fps DOUBLE PRECISION, probe_json TEXT, + fs_video BOOLEAN DEFAULT FALSE, + fs_json BOOLEAN DEFAULT FALSE, + psql_chunk BOOLEAN DEFAULT FALSE, + pobject_chunk BOOLEAN DEFAULT FALSE, + mobject_chunk BOOLEAN DEFAULT FALSE, + pvector_chunk BOOLEAN DEFAULT FALSE, + qvector_chunk BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) @@ -176,6 +315,36 @@ impl PostgresDb { .execute(&self.pool) .await?; + sqlx::query("ALTER TABLE videos ADD COLUMN IF NOT EXISTS fs_video BOOLEAN DEFAULT FALSE") + .execute(&self.pool) + .await?; + sqlx::query("ALTER TABLE videos ADD COLUMN IF NOT EXISTS fs_json BOOLEAN DEFAULT FALSE") + .execute(&self.pool) + .await?; + sqlx::query("ALTER TABLE videos ADD COLUMN IF NOT EXISTS psql_chunk BOOLEAN DEFAULT FALSE") + .execute(&self.pool) + .await?; + sqlx::query( + "ALTER TABLE videos ADD COLUMN IF NOT EXISTS pobject_chunk BOOLEAN DEFAULT FALSE", + ) + .execute(&self.pool) + .await?; + sqlx::query( + "ALTER TABLE videos ADD COLUMN IF NOT EXISTS mobject_chunk BOOLEAN DEFAULT FALSE", + ) + .execute(&self.pool) + .await?; + sqlx::query( + "ALTER TABLE videos ADD COLUMN IF NOT EXISTS pvector_chunk BOOLEAN DEFAULT FALSE", + ) + .execute(&self.pool) + .await?; + sqlx::query( + "ALTER TABLE videos ADD COLUMN IF NOT EXISTS qvector_chunk BOOLEAN DEFAULT FALSE", + ) + .execute(&self.pool) + .await?; + sqlx::query( r#" CREATE TABLE IF NOT EXISTS chunks ( @@ -186,9 +355,14 @@ impl PostgresDb { chunk_type VARCHAR(32) NOT NULL, start_time DOUBLE PRECISION NOT NULL, end_time DOUBLE PRECISION NOT NULL, + fps DOUBLE PRECISION DEFAULT 24.0, + start_frame BIGINT DEFAULT 0, + end_frame BIGINT DEFAULT 0, content JSONB NOT NULL, + metadata JSONB, vector_id VARCHAR(64), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(uuid, chunk_id) ) "#, @@ -208,28 +382,165 @@ impl PostgresDb { .execute(&self.pool) .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_uuid_type ON chunks(uuid, chunk_type)") + .execute(&self.pool) + .await?; + + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_chunks_content_gin ON chunks USING GIN(content)", + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS chunk_vectors ( + id SERIAL PRIMARY KEY, + chunk_id VARCHAR(64) NOT NULL UNIQUE, + uuid VARCHAR(32) NOT NULL, + chunk_type VARCHAR(32) NOT NULL, + start_time DOUBLE PRECISION, + end_time DOUBLE PRECISION, + embedding TEXT, + metadata JSONB, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_vectors_uuid ON chunk_vectors(uuid)") + .execute(&self.pool) + .await?; + + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_chunk_vectors_chunk_id ON chunk_vectors(chunk_id)", + ) + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_vectors_uuid ON chunk_vectors(uuid)") + .execute(&self.pool) + .await?; + + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_chunk_vectors_chunk_id ON chunk_vectors(chunk_id)", + ) + .execute(&self.pool) + .await?; + + // pre_chunks table + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS pre_chunks ( + id SERIAL PRIMARY KEY, + file_id INTEGER NOT NULL REFERENCES videos(id), + source_type VARCHAR(32) NOT NULL, + source_file TEXT, + chunk_type VARCHAR(32) NOT NULL, + start_time DOUBLE PRECISION NOT NULL, + end_time DOUBLE PRECISION NOT NULL, + start_frame BIGINT DEFAULT 0, + end_frame BIGINT DEFAULT 0, + fps DOUBLE PRECISION DEFAULT 24.0, + raw_json JSONB NOT NULL, + text_content TEXT, + processed BOOLEAN DEFAULT FALSE, + chunk_id VARCHAR(64), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(file_id, source_type, start_frame, end_frame) + ) + "#, + ) + .execute(&self.pool) + .await?; + + // frames table + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS frames ( + id SERIAL PRIMARY KEY, + file_id INTEGER NOT NULL REFERENCES videos(id), + frame_number BIGINT NOT NULL, + timestamp DOUBLE PRECISION NOT NULL, + fps DOUBLE PRECISION DEFAULT 24.0, + yolo_objects JSONB, + ocr_results JSONB, + face_results JSONB, + frame_path TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(file_id, frame_number) + ) + "#, + ) + .execute(&self.pool) + .await?; + + // Add file_id columns to existing tables if not exist + sqlx::query( + "ALTER TABLE chunks ADD COLUMN IF NOT EXISTS file_id INTEGER REFERENCES videos(id)", + ) + .execute(&self.pool) + .await?; + sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS text_content TEXT") + .execute(&self.pool) + .await?; + sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS frame_count INTEGER DEFAULT 0") + .execute(&self.pool) + .await?; + sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS pre_chunk_ids INTEGER[]") + .execute(&self.pool) + .await?; + + sqlx::query("ALTER TABLE chunk_vectors ADD COLUMN IF NOT EXISTS file_id INTEGER REFERENCES videos(id)") + .execute(&self.pool) + .await?; + Ok(()) } pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> { + let content_with_rule = serde_json::json!({ + "rule": chunk.rule.as_str(), + "data": chunk.content + }); + sqlx::query( r#" - INSERT INTO chunks (uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, content) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb) + INSERT INTO chunks (file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14, $15, $16) ON CONFLICT (uuid, chunk_id) DO UPDATE SET start_time = EXCLUDED.start_time, end_time = EXCLUDED.end_time, + fps = EXCLUDED.fps, + start_frame = EXCLUDED.start_frame, + end_frame = EXCLUDED.end_frame, + text_content = EXCLUDED.text_content, content = EXCLUDED.content, - vector_id = EXCLUDED.vector_id + metadata = EXCLUDED.metadata, + vector_id = EXCLUDED.vector_id, + frame_count = EXCLUDED.frame_count, + pre_chunk_ids = EXCLUDED.pre_chunk_ids, + updated_at = CURRENT_TIMESTAMP "# ) + .bind(chunk.file_id) .bind(&chunk.uuid) .bind(&chunk.chunk_id) .bind(chunk.chunk_index as i32) .bind(chunk.chunk_type.as_str()) .bind(chunk.start_time) .bind(chunk.end_time) - .bind(&chunk.content) + .bind(chunk.fps) + .bind(chunk.start_frame) + .bind(chunk.end_frame) + .bind(&chunk.text_content) + .bind(&content_with_rule) + .bind(&chunk.metadata) + .bind(&chunk.vector_id) + .bind(chunk.frame_count) + .bind(&chunk.pre_chunk_ids) .execute(&self.pool) .await?; @@ -238,7 +549,7 @@ impl PostgresDb { pub async fn get_chunks_by_uuid(&self, uuid: &str) -> Result> { let rows = sqlx::query( - "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, content FROM chunks WHERE uuid = $1 ORDER BY chunk_index" + "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids FROM chunks WHERE uuid = $1 ORDER BY chunk_index" ) .bind(uuid) .fetch_all(&self.pool) @@ -247,32 +558,417 @@ impl PostgresDb { let chunks: Vec = rows .into_iter() .map(|r| { - let chunk_type_str: String = r.get(3); - let chunk_index: i32 = r.get(2); + let chunk_type_str: String = r.get(4); + let chunk_index: i32 = r.get(3); let chunk_type = match chunk_type_str.as_str() { + "time" => ChunkType::TimeBased, + "sentence" => ChunkType::Sentence, + "cut" => ChunkType::Cut, + "trace" => ChunkType::Trace, + _ => ChunkType::TimeBased, + }; + + let content: serde_json::Value = r.get(11); + let metadata: Option = r.get(12); + + // Get pre_chunk_ids - try direct Vec decode first + let pre_chunk_ids: Vec = r.try_get(15).unwrap_or_default(); + + // Extract rule from content + let (rule, content_data) = if content.get("rule").is_some() { + let rule_str = content + .get("rule") + .and_then(|v| v.as_str()) + .unwrap_or("rule_1"); + let rule = if rule_str == "rule_2" { + ChunkRule::Rule2 + } else { + ChunkRule::Rule1 + }; + let data = content.get("data").cloned().unwrap_or(content); + (rule, data) + } else { + (ChunkRule::Rule1, content) + }; + + let file_id: i32 = sqlx::Row::get(&r, "file_id"); + let frame_count: i32 = sqlx::Row::get(&r, "frame_count"); + + Chunk { + file_id, + uuid: r.get("uuid"), + chunk_id: r.get("chunk_id"), + chunk_index: chunk_index as u32, + chunk_type, + rule, + start_time: r.get("start_time"), + end_time: r.get("end_time"), + fps: r.get("fps"), + start_frame: r.get("start_frame"), + end_frame: r.get("end_frame"), + text_content: r.get("text_content"), + content: content_data, + metadata, + vector_id: r.get("vector_id"), + frame_count, + pre_chunk_ids, + } + }) + .collect(); + + Ok(chunks) + } + + pub async fn store_pre_chunk(&self, pre_chunk: &PreChunk) -> Result { + let row = sqlx::query( + r#" + INSERT INTO pre_chunks (file_id, source_type, source_file, chunk_type, start_time, end_time, start_frame, end_frame, fps, raw_json, text_content, processed, chunk_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT (file_id, source_type, start_frame, end_frame) DO UPDATE SET + raw_json = EXCLUDED.raw_json, + text_content = EXCLUDED.text_content, + processed = EXCLUDED.processed, + chunk_id = EXCLUDED.chunk_id + RETURNING id + "# + ) + .bind(pre_chunk.file_id) + .bind(&pre_chunk.source_type) + .bind(&pre_chunk.source_file) + .bind(&pre_chunk.chunk_type) + .bind(pre_chunk.start_time) + .bind(pre_chunk.end_time) + .bind(pre_chunk.start_frame) + .bind(pre_chunk.end_frame) + .bind(pre_chunk.fps) + .bind(&pre_chunk.raw_json) + .bind(&pre_chunk.text_content) + .bind(pre_chunk.processed) + .bind(&pre_chunk.chunk_id) + .fetch_one(&self.pool) + .await?; + + let id: i32 = row.get(0); + Ok(id as i64) + } + + pub async fn store_frame(&self, frame: &Frame) -> Result<()> { + sqlx::query( + r#" + INSERT INTO frames (file_id, frame_number, timestamp, fps, yolo_objects, ocr_results, face_results, frame_path) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (file_id, frame_number) DO UPDATE SET + yolo_objects = EXCLUDED.yolo_objects, + ocr_results = EXCLUDED.ocr_results, + face_results = EXCLUDED.face_results, + frame_path = EXCLUDED.frame_path + "# + ) + .bind(frame.file_id) + .bind(frame.frame_number) + .bind(frame.timestamp) + .bind(frame.fps) + .bind(&frame.yolo_objects) + .bind(&frame.ocr_results) + .bind(&frame.face_results) + .bind(&frame.frame_path) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn get_frames_by_time_range( + &self, + file_id: i64, + start_time: f64, + end_time: f64, + ) -> Result> { + let rows = sqlx::query_as::<_, ( + i32, + i32, + i64, + f64, + f64, + Option, + Option, + Option, + Option, + String, + )>( + "SELECT id, file_id, frame_number, timestamp, fps, yolo_objects, ocr_results, face_results, frame_path, created_at + FROM frames + WHERE file_id = $1 AND timestamp >= $2 AND timestamp <= $3 + ORDER BY frame_number" + ) + .bind(file_id) + .bind(start_time) + .bind(end_time) + .fetch_all(&self.pool) + .await?; + + let frames: Vec = rows + .into_iter() + .map(|r| Frame { + id: r.0 as i64, + file_id: r.1 as i64, + frame_number: r.2, + timestamp: r.3, + fps: r.4, + yolo_objects: r.5, + ocr_results: r.6, + face_results: r.7, + frame_path: r.8, + created_at: r.9, + }) + .collect(); + + Ok(frames) + } + + pub async fn get_chunks_by_time_range( + &self, + file_id: i64, + start_time: f64, + end_time: f64, + ) -> Result> { + let rows = sqlx::query( + "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids + FROM chunks + WHERE file_id = $1 AND start_time >= $2 AND end_time <= $3 + ORDER BY start_time" + ) + .bind(file_id) + .bind(start_time) + .bind(end_time) + .fetch_all(&self.pool) + .await?; + + let chunks: Vec = rows + .into_iter() + .map(|r| { + let chunk_type_str: String = r.get(4); + let chunk_index: i32 = r.get(3); + let chunk_type = match chunk_type_str.as_str() { + "time" => ChunkType::TimeBased, + "sentence" => ChunkType::Sentence, + "cut" => ChunkType::Cut, + "trace" => ChunkType::Trace, + _ => ChunkType::TimeBased, + }; + + let content: serde_json::Value = r.get(11); + let metadata: Option = r.get(12); + + // Get pre_chunk_ids - try direct Vec decode + let pre_chunk_ids: Vec = r.try_get(15).unwrap_or_default(); + + let (rule, content_data) = if content.get("rule").is_some() { + let rule_str = content + .get("rule") + .and_then(|v| v.as_str()) + .unwrap_or("rule_1"); + let rule = if rule_str == "rule_2" { + ChunkRule::Rule2 + } else { + ChunkRule::Rule1 + }; + let data = content.get("data").cloned().unwrap_or(content); + (rule, data) + } else { + (ChunkRule::Rule1, content) + }; + + let file_id: i32 = sqlx::Row::get(&r, "file_id"); + let frame_count: i32 = sqlx::Row::get(&r, "frame_count"); + + Chunk { + file_id, + uuid: r.get("uuid"), + chunk_id: r.get("chunk_id"), + chunk_index: chunk_index as u32, + chunk_type, + rule, + start_time: r.get("start_time"), + end_time: r.get("end_time"), + fps: r.get("fps"), + start_frame: r.get("start_frame"), + end_frame: r.get("end_frame"), + text_content: r.get("text_content"), + content: content_data, + metadata, + vector_id: r.get("vector_id"), + frame_count, + pre_chunk_ids, + } + }) + .collect(); + + Ok(chunks) + } + + pub async fn get_file_id_by_uuid(&self, uuid: &str) -> Result { + let row = sqlx::query("SELECT id FROM videos WHERE uuid = $1") + .bind(uuid) + .fetch_one(&self.pool) + .await?; + + Ok(row.get(0)) + } + + pub async fn store_vector(&self, chunk_id: &str, vector: &[f32], uuid: &str) -> Result<()> { + let vector_json = serde_json::json!(vector); + let embedding_str = vector_json.to_string(); + + // Clone for use in closure + let chunk_id = chunk_id.to_string(); + let uuid = uuid.to_string(); + + // Use blocking task - this needs to wait for result + let join_result = tokio::task::spawn_blocking(move || { + let output = std::process::Command::new("psql") + .args([ + "postgres://accusys@localhost:5432/momentry", + "-c", + &format!( + "INSERT INTO chunk_vectors (chunk_id, uuid, chunk_type, embedding) VALUES ('{}', '{}', 'sentence', '{}') ON CONFLICT (chunk_id) DO UPDATE SET embedding = EXCLUDED.embedding", + chunk_id, uuid, embedding_str.replace('\'', "''") + ) + ]) + .output(); + + (chunk_id, output) + }) + .await; + + match join_result { + Ok((cid, Ok(output))) => { + if !output.status.success() { + let err = String::from_utf8_lossy(&output.stderr); + tracing::error!("psql error for {}: {}", cid, err); + } + } + Ok((cid, Err(e))) => { + tracing::error!("psql output error for {}: {}", cid, e); + } + Err(e) => { + tracing::error!("join error: {}", e); + } + } + + Ok(()) + } + + pub async fn update_vector_id(&self, chunk_id: &str, vector_id: &str) -> Result<()> { + sqlx::query("UPDATE chunks SET vector_id = $1 WHERE chunk_id = $2") + .bind(vector_id) + .bind(chunk_id) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn search_vector( + &self, + _query_vector: &[f32], + _limit: usize, + ) -> Result> { + Ok(vec![]) + } + + pub async fn search_text(&self, query: &str, chunk_type: Option<&str>) -> Result> { + let query_pattern = format!("%{}%", query); + + let sql = match chunk_type { + Some(_) => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id FROM chunks WHERE content->>'text' ILIKE $1 AND chunk_type = $2 ORDER BY chunk_index", + None => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id FROM chunks WHERE content->>'text' ILIKE $1 ORDER BY chunk_index", + }; + + let chunks = if let Some(ct) = chunk_type { + sqlx::query_as::< + _, + ( + String, + String, + i32, + String, + f64, + f64, + f64, + i64, + i64, + String, + Option, + Option, + ), + >(sql) + .bind(&query_pattern) + .bind(ct) + .fetch_all(&self.pool) + .await? + } else { + sqlx::query_as::< + _, + ( + String, + String, + i32, + String, + f64, + f64, + f64, + i64, + i64, + String, + Option, + Option, + ), + >(sql) + .bind(&query_pattern) + .fetch_all(&self.pool) + .await? + }; + + let results: Vec = chunks + .into_iter() + .map(|r| { + let chunk_type = match r.3.as_str() { "time_based" => ChunkType::TimeBased, "sentence" => ChunkType::Sentence, "cut" => ChunkType::Cut, _ => ChunkType::TimeBased, }; - let content_json: String = r.get(6); let content: serde_json::Value = - serde_json::from_str(&content_json).unwrap_or(serde_json::json!({})); + serde_json::from_str(&r.9).unwrap_or(serde_json::json!({})); + + let metadata: Option = + r.10.and_then(|m| serde_json::from_str(&m).ok()); Chunk { - uuid: r.get(0), - chunk_id: r.get(1), - chunk_index: chunk_index as u32, + file_id: 0, + uuid: r.0, + chunk_id: r.1, + chunk_index: r.2 as u32, chunk_type, - start_time: r.get(4), - end_time: r.get(5), + rule: ChunkRule::Rule1, + start_time: r.4, + end_time: r.5, + fps: r.6, + start_frame: r.7, + end_frame: r.8, + text_content: Some(r.9), content, + metadata, + vector_id: r.11, + frame_count: 0, + pre_chunk_ids: vec![], } }) .collect(); - Ok(chunks) + Ok(results) } } diff --git a/src/core/db/redis_client.rs b/src/core/db/redis_client.rs new file mode 100644 index 0000000..f5f32c8 --- /dev/null +++ b/src/core/db/redis_client.rs @@ -0,0 +1,262 @@ +use anyhow::{Context, Result}; +use futures_util::stream::StreamExt; +use redis::aio::MultiplexedConnection; +use redis::{AsyncCommands, Client}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub struct RedisClient { + client: Client, + state: Arc>, +} + +#[derive(Debug, Clone, Default)] +pub struct RedisState { + pub connected: bool, +} + +impl RedisClient { + pub fn new() -> Result { + let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| { + let password = + std::env::var("REDIS_PASSWORD").unwrap_or_else(|_| "accusys".to_string()); + format!("redis://:{}@localhost:6379", password) + }); + + let client = Client::open(redis_url.as_str()).context("Failed to connect to Redis")?; + + Ok(Self { + client, + state: Arc::new(RwLock::new(RedisState { connected: true })), + }) + } + + pub async fn is_connected(&self) -> bool { + self.state.read().await.connected + } + + pub async fn get_conn(&self) -> Result { + self.get_conn_internal().await + } + + pub async fn get_conn_internal(&self) -> Result { + self.client + .get_multiplexed_async_connection() + .await + .context("Failed to get Redis connection") + } + + pub async fn get_job_status(&self, uuid: &str) -> Result> { + let mut conn = self.get_conn_internal().await?; + let key = format!("momentry:job:{}", uuid); + + let status: Option = conn.hget(&key, "status").await?; + if status.is_none() { + return Ok(None); + } + + let current_processor: String = conn.hget(&key, "current_processor").await?; + let progress_total: i32 = conn.hget(&key, "progress_total").await?; + let progress_current: i32 = conn.hget(&key, "progress_current").await?; + let started_at: String = conn.hget(&key, "started_at").await?; + let updated_at: String = conn.hget(&key, "updated_at").await?; + let error_count: i32 = conn.hget(&key, "error_count").await?; + let last_error: String = conn.hget(&key, "last_error").await?; + + Ok(Some(JobStatus { + status: status.unwrap_or_default(), + current_processor, + progress_total, + progress_current, + started_at, + updated_at, + error_count, + last_error, + })) + } + + pub async fn set_processor_status( + &self, + uuid: &str, + processor: &str, + status: &ProcessorStatus, + ) -> Result<()> { + let mut conn = self.get_conn_internal().await?; + let key = format!("momentry:job:{}:processor:{}", uuid, processor); + + let _: Option = conn + .hset_multiple( + &key, + &[ + ("status", status.status.as_str()), + ("progress", status.progress.to_string().as_str()), + ("current", status.current.to_string().as_str()), + ("total", status.total.to_string().as_str()), + ("started_at", status.started_at.as_str()), + ("updated_at", status.updated_at.as_str()), + ("message", status.message.as_str()), + ], + ) + .await?; + + let _: bool = conn.expire(&key, 86400).await?; + + Ok(()) + } + + pub async fn get_processor_status( + &self, + uuid: &str, + processor: &str, + ) -> Result> { + let mut conn = self.get_conn_internal().await?; + let key = format!("momentry:job:{}:processor:{}", uuid, processor); + + let status: Option = conn.hget(&key, "status").await?; + if status.is_none() { + return Ok(None); + } + + let progress: i32 = conn.hget(&key, "progress").await?; + let current: i32 = conn.hget(&key, "current").await?; + let total: i32 = conn.hget(&key, "total").await?; + let started_at: String = conn.hget(&key, "started_at").await?; + let updated_at: String = conn.hget(&key, "updated_at").await?; + let message: String = conn.hget(&key, "message").await?; + + Ok(Some(ProcessorStatus { + status: status.unwrap_or_default(), + progress, + current, + total, + started_at, + updated_at, + message, + })) + } + + pub async fn publish_progress(&self, uuid: &str, message: &ProgressMessage) -> Result<()> { + let mut conn = self.get_conn_internal().await?; + let channel = format!("momentry:progress:{}", uuid); + + let json = serde_json::to_string(message)?; + let _: usize = conn.publish(&channel, json).await?; + + Ok(()) + } + + pub async fn subscribe_progress(&self, uuid: &str) -> Result { + let mut pubsub = self.client.get_async_pubsub().await?; + let channel = format!("momentry:progress:{}", uuid); + + pubsub.subscribe(channel).await?; + + Ok(pubsub) + } + + pub async fn subscribe_and_callback(&self, uuid: &str, mut callback: F) -> Result<()> + where + F: FnMut(ProgressMessage) + Send + 'static, + { + let mut pubsub = self.subscribe_progress(uuid).await?; + let mut stream = pubsub.on_message(); + + while let Some(msg) = stream.next().await { + let payload: String = msg.get_payload().map_err(|e| anyhow::anyhow!("{}", e))?; + if let Ok(progress_msg) = serde_json::from_str::(&payload) { + callback(progress_msg); + } + } + + Ok(()) + } + + pub async fn add_to_active_jobs(&self, uuid: &str) -> Result<()> { + let mut conn = self.get_conn_internal().await?; + let _: usize = conn.sadd("momentry:jobs:active", uuid).await?; + Ok(()) + } + + pub async fn move_to_completed_jobs(&self, uuid: &str) -> Result<()> { + let mut conn = self.get_conn_internal().await?; + let _: bool = conn + .smove("momentry:jobs:active", "momentry:jobs:completed", uuid) + .await?; + Ok(()) + } + + pub async fn move_to_failed_jobs(&self, uuid: &str) -> Result<()> { + let mut conn = self.get_conn_internal().await?; + let _: bool = conn + .smove("momentry:jobs:active", "momentry:jobs:failed", uuid) + .await?; + Ok(()) + } + + pub async fn get_active_jobs(&self) -> Result> { + let mut conn = self.get_conn_internal().await?; + let jobs: Vec = conn.smembers("momentry:jobs:active").await?; + Ok(jobs) + } + + pub async fn set_health(&self, status: &str) -> Result<()> { + let mut conn = self.get_conn_internal().await?; + let _: String = conn + .set_ex("momentry:health:momentry_core", status, 60) + .await?; + Ok(()) + } + + pub async fn get_health(&self) -> Result> { + let mut conn = self.get_conn_internal().await?; + let health: Option = conn.get("momentry:health:momentry_core").await?; + Ok(health) + } +} + +impl Default for RedisClient { + fn default() -> Self { + Self::new().expect("Failed to create Redis client") + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobStatus { + pub status: String, + pub current_processor: String, + pub progress_total: i32, + pub progress_current: i32, + pub started_at: String, + pub updated_at: String, + pub error_count: i32, + pub last_error: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProcessorStatus { + pub status: String, + pub progress: i32, + pub current: i32, + pub total: i32, + pub started_at: String, + pub updated_at: String, + pub message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProgressMessage { + #[serde(rename = "type")] + pub msg_type: String, + pub processor: String, + pub uuid: String, + pub timestamp: i64, + pub data: ProgressData, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProgressData { + pub message: Option, + pub current: Option, + pub total: Option, +} diff --git a/src/main.rs b/src/main.rs index 34a8791..617e80c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,14 @@ use anyhow::{Context, Result}; use clap::{Parser, Subcommand}; +use futures_util::StreamExt; use std::path::Path; +use std::str; +use std::sync::{Arc, Mutex}; -use momentry_core::{Database, PostgresDb, VideoRecord}; +use momentry_core::core::chunk::types::{Chunk, ChunkRule, ChunkType}; +use momentry_core::core::db::Database; +use momentry_core::ui::progress::{ProcessorType, ProgressState, ProgressUi}; +use momentry_core::{Embedder, OutputDir, PostgresDb, QdrantDb, VectorPayload, VideoRecord}; #[derive(Parser)] #[command(name = "momentry")] @@ -29,6 +35,11 @@ enum Commands { /// UUID uuid: String, }, + /// Generate story for cut scenes + Story { + /// UUID + uuid: String, + }, /// Vectorize chunks Vectorize { /// UUID (or 'all' for all) @@ -76,6 +87,18 @@ enum Commands { #[arg(short, long, default_value = "6")] count: u32, }, + /// Show storage status report + Status { + /// UUID (optional, shows all if not specified) + uuid: Option, + }, + /// Manage output backups + Backup { + /// Action: list, cleanup + action: String, + /// Days to keep (for cleanup) + days: Option, + }, } #[tokio::main] @@ -165,6 +188,7 @@ async fn main() -> Result<()> { height, fps, probe_json: Some(json_str), + storage: Default::default(), created_at: String::new(), }; @@ -191,20 +215,337 @@ async fn main() -> Result<()> { .ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?; let video_path = &video.file_path; - let file_manager = momentry_core::FileManager::new(std::path::PathBuf::from(".")); + let video_name = video.file_name.clone(); + let _file_manager = momentry_core::FileManager::new(std::path::PathBuf::from(".")); + + // Initialize output directory + let output_dir = OutputDir::new(); + output_dir.ensure_dir()?; + println!("Output directory: {:?}", output_dir.get_base_path()); + + // Initialize progress UI + let progress_state = Arc::new(Mutex::new(ProgressState::new(&video_name))); + progress_state.lock().unwrap().start(); + + // Create UI and wrap in Arc for sharing with Redis subscriber + let ui = Arc::new(Mutex::new(ProgressUi::new(&video_name).ok())); + if let Some(ref mut ui) = *ui.lock().unwrap() { + let _ = ui.render(); + } + + // Spawn Redis subscriber for real-time progress updates + let redis_progress_state = progress_state.clone(); + let redis_ui = ui.clone(); + let redis_uuid = uuid.clone(); + let redis_handle = tokio::spawn(async move { + if let Ok(redis_client) = momentry_core::core::db::RedisClient::new() { + loop { + if let Ok(mut pubsub) = redis_client.subscribe_progress(&redis_uuid).await { + let mut stream = pubsub.on_message(); + while let Some(msg) = stream.next().await { + if let Ok(payload) = msg.get_payload::() { + if let Ok(progress_msg) = + serde_json::from_str::< + momentry_core::core::db::ProgressMessage, + >(&payload) + { + let mut state = redis_progress_state.lock().unwrap(); + state.update_from_redis( + &progress_msg.msg_type, + &progress_msg.processor, + progress_msg.data.current, + progress_msg.data.total, + progress_msg.data.message.as_deref(), + ); + + // Store progress in Redis Hash for HTTP API + let uuid = progress_msg.uuid.clone(); + let processor = progress_msg.processor.clone(); + let msg_type = progress_msg.msg_type.clone(); + let current = progress_msg.data.current; + let total = progress_msg.data.total; + let message = progress_msg.data.message.clone(); + + tokio::spawn(async move { + if let Ok(redis_client) = + momentry_core::core::db::RedisClient::new() + { + if let Ok(mut conn) = redis_client.get_conn().await + { + let key = format!( + "momentry:job:{}:processor:{}", + uuid, processor + ); + let _: () = redis::cmd("HSET") + .arg(&key) + .arg("status") + .arg(&msg_type) + .query_async(&mut conn) + .await + .unwrap_or(()); + if let Some(c) = current { + let _: () = redis::cmd("HSET") + .arg(&key) + .arg("current") + .arg(c) + .query_async(&mut conn) + .await + .unwrap_or(()); + } + if let Some(t) = total { + let _: () = redis::cmd("HSET") + .arg(&key) + .arg("total") + .arg(t) + .query_async(&mut conn) + .await + .unwrap_or(()); + } + if let Some(ref m) = message { + let _: () = redis::cmd("HSET") + .arg(&key) + .arg("message") + .arg(m) + .query_async(&mut conn) + .await + .unwrap_or(()); + } + let _: () = redis::cmd("EXPIRE") + .arg(&key) + .arg(86400i64) + .query_async(&mut conn) + .await + .unwrap_or(()); + } + } + }); + + // Trigger UI render on progress update + if let Some(ref mut ui) = *redis_ui.lock().unwrap() { + let _ = ui.render(); + } + } + } + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + }); // Process ASR println!("\nRunning ASR..."); - let asr_path = format!("{}.asr.json", uuid); - let asr_result = - momentry_core::core::processor::process_asr(video_path, &asr_path).await?; + { + let mut state = progress_state.lock().unwrap(); + state.get_processor(ProcessorType::Asr).start(1); + } + let asr_path = output_dir.get_output_path(&uuid, "asr.json"); + let asr_result = momentry_core::core::processor::process_asr( + video_path, + asr_path.to_str().unwrap(), + Some(&uuid), + ) + .await?; let asr_json = serde_json::to_string_pretty(&asr_result)?; std::fs::write(&asr_path, &asr_json)?; - println!("ASR saved to: {}", asr_path); + let _ = output_dir.backup_file(&uuid, "asr.json"); + println!("ASR saved to: {}", asr_path.display()); println!(" {} segments found", asr_result.segments.len()); + { + let mut state = progress_state.lock().unwrap(); + state + .get_processor(ProcessorType::Asr) + .complete(&format!("{} segments", asr_result.segments.len())); + } + if let Some(ref mut ui) = *ui.lock().unwrap() { + let _ = ui.render(); + } - // TODO: Process OCR, YOLO, Face, Pose, ASRx - println!("\nOther processors not yet implemented."); + // Update storage status + db.update_storage_status(&uuid, "fs_json", true).await?; + + // Process CUT (scene detection) + println!("\nRunning CUT (scene detection)..."); + { + let mut state = progress_state.lock().unwrap(); + state.get_processor(ProcessorType::Cut).start(1); + } + let cut_path = output_dir.get_output_path(&uuid, "cut.json"); + let _cut_result = momentry_core::core::processor::process_cut( + video_path, + cut_path.to_str().unwrap(), + Some(&uuid), + ) + .await?; + let cut_json = serde_json::to_string_pretty(&_cut_result)?; + std::fs::write(&cut_path, &cut_json)?; + let _ = output_dir.backup_file(&uuid, "cut.json"); + println!("CUT saved to: {}", cut_path.display()); + println!(" {} scenes found", _cut_result.scenes.len()); + { + let mut state = progress_state.lock().unwrap(); + state + .get_processor(ProcessorType::Cut) + .complete(&format!("{} scenes", _cut_result.scenes.len())); + } + if let Some(ref mut ui) = *ui.lock().unwrap() { + let _ = ui.render(); + } + + // Process ASRX (speaker diarization) + println!("\nRunning ASRX (speaker diarization)..."); + { + let mut state = progress_state.lock().unwrap(); + state.get_processor(ProcessorType::Asrx).start(1); + } + let asrx_path = output_dir.get_output_path(&uuid, "asrx.json"); + let _asrx_result = momentry_core::core::processor::process_asrx( + video_path, + asrx_path.to_str().unwrap(), + Some(&uuid), + ) + .await?; + let asrx_json = serde_json::to_string_pretty(&_asrx_result)?; + std::fs::write(&asrx_path, &asrx_json)?; + let _ = output_dir.backup_file(&uuid, "asrx.json"); + println!("ASRX saved to: {}", asrx_path.display()); + println!(" {} segments found", _asrx_result.segments.len()); + { + let mut state = progress_state.lock().unwrap(); + state + .get_processor(ProcessorType::Asrx) + .complete(&format!("{} segments", _asrx_result.segments.len())); + } + if let Some(ref mut ui) = *ui.lock().unwrap() { + let _ = ui.render(); + } + + // Process YOLO (object detection) + println!("\nRunning YOLO (object detection)..."); + { + let mut state = progress_state.lock().unwrap(); + state.get_processor(ProcessorType::Yolo).start(1); + } + let yolo_path = output_dir.get_output_path(&uuid, "yolo.json"); + let _yolo_result = momentry_core::core::processor::process_yolo( + video_path, + yolo_path.to_str().unwrap(), + Some(&uuid), + ) + .await?; + let yolo_json = serde_json::to_string_pretty(&_yolo_result)?; + std::fs::write(&yolo_path, &yolo_json)?; + let _ = output_dir.backup_file(&uuid, "yolo.json"); + println!("YOLO saved to: {}", yolo_path.display()); + println!(" {} frames processed", _yolo_result.frame_count); + { + let mut state = progress_state.lock().unwrap(); + state + .get_processor(ProcessorType::Yolo) + .complete(&format!("{} frames", _yolo_result.frame_count)); + } + if let Some(ref mut ui) = *ui.lock().unwrap() { + let _ = ui.render(); + } + + // Process OCR (text recognition) + println!("\nRunning OCR (text recognition)..."); + { + let mut state = progress_state.lock().unwrap(); + state.get_processor(ProcessorType::Ocr).start(1); + } + let ocr_path = output_dir.get_output_path(&uuid, "ocr.json"); + let _ocr_result = momentry_core::core::processor::process_ocr( + video_path, + ocr_path.to_str().unwrap(), + Some(&uuid), + ) + .await?; + let ocr_json = serde_json::to_string_pretty(&_ocr_result)?; + std::fs::write(&ocr_path, &ocr_json)?; + let _ = output_dir.backup_file(&uuid, "ocr.json"); + println!("OCR saved to: {}", ocr_path.display()); + println!(" {} frames with text", _ocr_result.frames.len()); + { + let mut state = progress_state.lock().unwrap(); + state + .get_processor(ProcessorType::Ocr) + .complete(&format!("{} frames", _ocr_result.frames.len())); + } + if let Some(ref mut ui) = *ui.lock().unwrap() { + let _ = ui.render(); + } + + // Process Face (face detection) + println!("\nRunning Face detection..."); + { + let mut state = progress_state.lock().unwrap(); + state.get_processor(ProcessorType::Face).start(1); + } + let face_path = output_dir.get_output_path(&uuid, "face.json"); + let _face_result = momentry_core::core::processor::process_face( + video_path, + face_path.to_str().unwrap(), + Some(&uuid), + ) + .await?; + let face_json = serde_json::to_string_pretty(&_face_result)?; + std::fs::write(&face_path, &face_json)?; + let _ = output_dir.backup_file(&uuid, "face.json"); + println!("Face saved to: {}", face_path.display()); + println!(" {} frames with faces", _face_result.frames.len()); + { + let mut state = progress_state.lock().unwrap(); + state + .get_processor(ProcessorType::Face) + .complete(&format!("{} frames", _face_result.frames.len())); + } + if let Some(ref mut ui) = *ui.lock().unwrap() { + let _ = ui.render(); + } + + // Process Pose (pose estimation) + println!("\nRunning Pose estimation..."); + { + let mut state = progress_state.lock().unwrap(); + state.get_processor(ProcessorType::Pose).start(1); + } + let pose_path = output_dir.get_output_path(&uuid, "pose.json"); + let pose_result = momentry_core::core::processor::process_pose( + video_path, + pose_path.to_str().unwrap(), + Some(&uuid), + ) + .await?; + let pose_json = serde_json::to_string_pretty(&pose_result)?; + std::fs::write(&pose_path, &pose_json)?; + let _ = output_dir.backup_file(&uuid, "pose.json"); + println!("Pose saved to: {}", pose_path.display()); + println!(" {} frames with poses", pose_result.frames.len()); + { + let mut state = progress_state.lock().unwrap(); + state + .get_processor(ProcessorType::Pose) + .complete(&format!("{} frames", pose_result.frames.len())); + state.stop(); + } + if let Some(ref mut ui) = *ui.lock().unwrap() { + let _ = ui.render(); + } + + // TODO: Store pre_chunks and frames to database + + // Stop Redis subscriber + redis_handle.abort(); + + println!("\n✓ Process stage completed!"); + println!(" - ASR JSON saved: {}", asr_path.display()); + println!(" - CUT JSON saved: {}", cut_path.display()); + println!(" - ASRX JSON saved: {}", asrx_path.display()); + println!(" - YOLO JSON saved: {}", yolo_path.display()); + println!(" - OCR JSON saved: {}", ocr_path.display()); + println!(" - Face JSON saved: {}", face_path.display()); + println!(" - Pose JSON saved: {}", pose_path.display()); Ok(()) } @@ -217,57 +558,575 @@ async fn main() -> Result<()> { .await? .ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?; + let file_id = video.id; + let fps = video.fps; + + // ========== Read all JSON files ========== + // Read ASR JSON let asr_path = format!("{}.asr.json", uuid); let asr_json = std::fs::read_to_string(&asr_path) .context("ASR file not found. Run 'process' first.")?; - let asr_result: momentry_core::core::processor::asr::AsrResult = serde_json::from_str(&asr_json)?; + println!("Loaded ASR: {} segments", asr_result.segments.len()); - println!("Processing {} ASR segments...", asr_result.segments.len()); + // Read CUT JSON + let cut_path = format!("{}.cut.json", uuid); + let cut_json = std::fs::read_to_string(&cut_path) + .context("CUT file not found. Run 'process' first.")?; + let cut_result: momentry_core::core::processor::cut::CutResult = + serde_json::from_str(&cut_json)?; + println!("Loaded CUT: {} scenes", cut_result.scenes.len()); - // Split into sentence chunks + // Read YOLO JSON + let yolo_path = format!("{}.yolo.json", uuid); + let yolo_json = std::fs::read_to_string(&yolo_path) + .context("YOLO file not found. Run 'process' first.")?; + let yolo_result: momentry_core::core::processor::yolo::YoloResult = + serde_json::from_str(&yolo_json)?; + println!("Loaded YOLO: {} frames", yolo_result.frames.len()); + + // Read OCR JSON + let ocr_path = format!("{}.ocr.json", uuid); + let ocr_json = std::fs::read_to_string(&ocr_path) + .context("OCR file not found. Run 'process' first.")?; + let ocr_result: momentry_core::core::processor::ocr::OcrResult = + serde_json::from_str(&ocr_json)?; + println!("Loaded OCR: {} frames", ocr_result.frames.len()); + + // Read Face JSON + let face_path = format!("{}.face.json", uuid); + let face_json = std::fs::read_to_string(&face_path) + .context("Face file not found. Run 'process' first.")?; + let face_result: momentry_core::core::processor::face::FaceResult = + serde_json::from_str(&face_json)?; + println!("Loaded Face: {} frames", face_result.frames.len()); + + // ========== Store pre_chunks (from ASR, CUT) ========== + + println!("\nStoring pre_chunks..."); + + // Store ASR sentence pre_chunks + let mut asr_pre_chunk_ids = Vec::new(); + for seg in asr_result.segments.iter() { + let start_frame = (seg.start * fps) as i64; + let end_frame = (seg.end * fps) as i64; + let pre_chunk = momentry_core::core::db::postgres_db::PreChunk { + id: 0, + file_id, + source_type: "asr".to_string(), + source_file: Some(asr_path.clone()), + chunk_type: "sentence".to_string(), + start_time: seg.start, + end_time: seg.end, + start_frame, + end_frame, + fps, + raw_json: serde_json::json!({"text": seg.text}), + text_content: Some(seg.text.clone()), + processed: false, + chunk_id: None, + created_at: String::new(), + }; + let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?; + asr_pre_chunk_ids.push(pre_chunk_id); + } + + // Store CUT scene pre_chunks + let mut cut_pre_chunk_ids = Vec::new(); + for scene in &cut_result.scenes { + let pre_chunk = momentry_core::core::db::postgres_db::PreChunk { + id: 0, + file_id, + source_type: "cut".to_string(), + source_file: Some(cut_path.clone()), + chunk_type: "cut".to_string(), + start_time: scene.start_time, + end_time: scene.end_time, + start_frame: scene.start_frame as i64, + end_frame: scene.end_frame as i64, + fps, + raw_json: serde_json::json!({ + "scene_number": scene.scene_number, + }), + text_content: None, + processed: false, + chunk_id: None, + created_at: String::new(), + }; + let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?; + cut_pre_chunk_ids.push(pre_chunk_id); + } + + // Store time-based pre_chunks (every 10 seconds) + let duration = video.duration; + let mut time_pre_chunk_ids = Vec::new(); + let mut time_start = 0.0; + while time_start < duration { + let time_end = (time_start + 10.0).min(duration); + let start_frame = (time_start * fps) as i64; + let end_frame = (time_end * fps) as i64; + + let pre_chunk = momentry_core::core::db::postgres_db::PreChunk { + id: 0, + file_id, + source_type: "time".to_string(), + source_file: None, + chunk_type: "time".to_string(), + start_time: time_start, + end_time: time_end, + start_frame, + end_frame, + fps, + raw_json: serde_json::json!({"interval": 10.0}), + text_content: None, + processed: false, + chunk_id: None, + created_at: String::new(), + }; + let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?; + time_pre_chunk_ids.push(pre_chunk_id); + time_start = time_end; + } + + println!( + "Stored pre_chunks: {} asr + {} cut + {} time", + asr_result.segments.len(), + cut_result.scenes.len(), + time_pre_chunk_ids.len() + ); + + // ========== Store frames (from YOLO, OCR, Face) ========== + + println!("\nStoring frames..."); + + // Group YOLO, OCR, Face results by frame_number + let mut frame_data: std::collections::HashMap< + u64, + momentry_core::core::processor::yolo::YoloFrame, + > = std::collections::HashMap::new(); + for frame in &yolo_result.frames { + frame_data.insert(frame.frame, frame.clone()); + } + + let mut ocr_by_frame: std::collections::HashMap< + u64, + momentry_core::core::processor::ocr::OcrFrame, + > = std::collections::HashMap::new(); + for frame in &ocr_result.frames { + ocr_by_frame.insert(frame.frame, frame.clone()); + } + + let mut face_by_frame: std::collections::HashMap< + u64, + momentry_core::core::processor::face::FaceFrame, + > = std::collections::HashMap::new(); + for frame in &face_result.frames { + face_by_frame.insert(frame.frame, frame.clone()); + } + + // Store frames (merge data from YOLO, OCR, Face) + let mut all_frames: Vec = frame_data + .keys() + .cloned() + .chain(ocr_by_frame.keys().cloned()) + .chain(face_by_frame.keys().cloned()) + .collect(); + all_frames.sort(); + all_frames.dedup(); + + for frame_num in &all_frames { + let timestamp = (*frame_num as f64) / fps; + let yolo_frame = frame_data.get(frame_num); + let ocr_frame = ocr_by_frame.get(frame_num); + let face_frame = face_by_frame.get(frame_num); + + let frame = momentry_core::core::db::postgres_db::Frame { + id: 0, + file_id, + frame_number: *frame_num as i64, + timestamp, + fps, + yolo_objects: yolo_frame.map(|f| serde_json::json!(&f.objects)), + ocr_results: ocr_frame.map(|f| serde_json::json!(&f.texts)), + face_results: face_frame.map(|f| serde_json::json!(&f.faces)), + frame_path: None, + created_at: String::new(), + }; + db.store_frame(&frame).await?; + } + + println!("Stored {} frames", all_frames.len()); + + // ========== Create chunks ========== + + println!("\nCreating chunks..."); + + // Rule 1: Direct conversion (sentence pre_chunk -> sentence chunk) let mut sentence_chunks = Vec::new(); for (i, seg) in asr_result.segments.iter().enumerate() { - let chunk = momentry_core::Chunk::new( + let pre_chunk_id = asr_pre_chunk_ids.get(i).copied().unwrap_or(0); + let chunk = Chunk::new( + file_id as i32, uuid.clone(), i as u32, - momentry_core::ChunkType::Sentence, + ChunkType::Sentence, + ChunkRule::Rule1, seg.start, seg.end, + fps, serde_json::json!({ "text": seg.text, }), - ); + ) + .with_text_content(seg.text.clone()) + .with_pre_chunk_ids(vec![pre_chunk_id as i32]); sentence_chunks.push(chunk); } - // Split into time-based chunks (10 seconds) - let splitter = momentry_core::core::chunk::ChunkSplitter::new(10.0); - let time_chunks = splitter.split_time_based(&uuid, video.duration); + // Rule 1: CUT chunks + let mut cut_chunks = Vec::new(); + for (i, scene) in cut_result.scenes.iter().enumerate() { + let pre_chunk_id = cut_pre_chunk_ids.get(i).copied().unwrap_or(0); + let chunk = Chunk::new( + file_id as i32, + uuid.clone(), + i as u32, + ChunkType::Cut, + ChunkRule::Rule1, + scene.start_time, + scene.end_time, + fps, + serde_json::json!({ + "scene_number": scene.scene_number, + }), + ) + .with_pre_chunk_ids(vec![pre_chunk_id as i32]); + cut_chunks.push(chunk); + } - // Store in database - println!("Storing {} sentence chunks...", sentence_chunks.len()); + // Rule 1: Time-based chunks + let splitter = momentry_core::core::chunk::ChunkSplitter::new(10.0); + let mut time_chunks = Vec::new(); + let time_chunk_list = splitter.split_time_based(&uuid, video.duration); + for (i, tc) in time_chunk_list.iter().enumerate() { + let pre_chunk_id = time_pre_chunk_ids.get(i).copied().unwrap_or(0); + let chunk = Chunk::new( + file_id as i32, + uuid.clone(), + i as u32, + ChunkType::TimeBased, + ChunkRule::Rule1, + tc.start_time, + tc.end_time, + fps, + serde_json::json!({"interval": 10.0}), + ) + .with_pre_chunk_ids(vec![pre_chunk_id as i32]); + time_chunks.push(chunk); + } + + // Store chunks + println!( + "Storing {} sentence chunks (rule_1)...", + sentence_chunks.len() + ); for chunk in &sentence_chunks { db.store_chunk(chunk).await?; } - println!("Storing {} time-based chunks...", time_chunks.len()); - for chunk in &time_chunks { + println!("Storing {} cut chunks (rule_1)...", cut_chunks.len()); + for chunk in &cut_chunks { db.store_chunk(chunk).await?; } println!( - "Done! {} total chunks stored.", - sentence_chunks.len() + time_chunks.len() + "Storing {} time-based chunks (rule_1)...", + time_chunks.len() ); + for chunk in &time_chunks { + db.store_chunk(chunk).await?; + } + + let total_chunks = sentence_chunks.len() + cut_chunks.len() + time_chunks.len(); + + // Update storage status + db.update_storage_status(&uuid, "psql_chunk", true).await?; + + println!("\n✓ Chunk stage completed!"); + println!( + " - pre_chunks: {} (asr + cut + time)", + asr_result.segments.len() + cut_result.scenes.len() + time_pre_chunk_ids.len() + ); + println!(" - frames: {}", all_frames.len()); + println!(" - chunks: {} (sentence + cut + time_based)", total_chunks); + + Ok(()) + } + Commands::Story { uuid } => { + println!("Generating story for: {}", uuid); + + let db = PostgresDb::init().await?; + let video = db + .get_video_by_uuid(&uuid) + .await? + .ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?; + + let file_id = video.id; + let _fps = video.fps; + let duration = video.duration; + + // Get all chunks + let all_chunks = db.get_chunks_by_uuid(&uuid).await?; + + // Try cut chunks first, fall back to sentence chunks + let mut story_chunks: Vec<&Chunk> = all_chunks + .iter() + .filter(|c| c.chunk_type == ChunkType::Cut) + .collect(); + + let story_type = if story_chunks.is_empty() { + // Fall back to sentence chunks + story_chunks = all_chunks + .iter() + .filter(|c| c.chunk_type == ChunkType::Sentence && c.text_content.is_some()) + .collect(); + "sentence" + } else { + "cut" + }; + + if story_chunks.is_empty() { + println!("No story chunks found. Run 'chunk' command first."); + return Ok(()); + } + + println!("Found {} {} scenes", story_chunks.len(), story_type); + + // Generate story for each scene + for (i, story_chunk) in story_chunks.iter().enumerate() { + println!("\n=== Scene {} ===", i + 1); + println!( + "Time: {:.2}s - {:.2}s", + story_chunk.start_time, story_chunk.end_time + ); + + // Get context: expand time range by 5 seconds before and after + let context_start = (story_chunk.start_time - 5.0).max(0.0); + let context_end = (story_chunk.end_time + 5.0).min(duration); + + // Get chunks in context range (sentence chunks with ASR text) + let context_chunks = db + .get_chunks_by_time_range(file_id, context_start, context_end) + .await?; + + // Get frames in context range + let context_frames = db + .get_frames_by_time_range(file_id, context_start, context_end) + .await?; + + // Build story + let mut story = String::new(); + story.push_str(&format!( + "Scene {} ({:.1}s - {:.1}s)\n\n", + i + 1, + story_chunk.start_time, + story_chunk.end_time + )); + + // Add audio/text content + let sentence_chunks: Vec<&Chunk> = context_chunks + .iter() + .filter(|c| c.chunk_type == ChunkType::Sentence) + .collect(); + + if !sentence_chunks.is_empty() { + story.push_str("【Speech】\n"); + for sc in &sentence_chunks { + if let Some(text) = &sc.text_content { + story.push_str(&format!(" - {}\n", text)); + } + } + story.push('\n'); + } + + // Aggregate YOLO objects + let mut all_objects: std::collections::HashMap = + std::collections::HashMap::new(); + for frame in &context_frames { + if let Some(objects) = &frame.yolo_objects { + if let Some(arr) = objects.as_array() { + for obj in arr { + if let Some(class_name) = + obj.get("class_name").and_then(|v| v.as_str()) + { + *all_objects.entry(class_name.to_string()).or_insert(0) += 1; + } + } + } + } + } + + if !all_objects.is_empty() { + story.push_str("【Objects】\n"); + let mut sorted_objects: Vec<_> = all_objects.iter().collect(); + sorted_objects.sort_by(|a, b| b.1.cmp(a.1)); + for (obj, count) in sorted_objects.iter().take(10) { + story.push_str(&format!(" - {} ({} frames)\n", obj, count)); + } + story.push('\n'); + } + + // Aggregate OCR text + let mut all_texts: Vec = Vec::new(); + for frame in &context_frames { + if let Some(texts) = &frame.ocr_results { + if let Some(arr) = texts.as_array() { + for txt in arr { + if let Some(text) = txt.get("text").and_then(|v| v.as_str()) { + if !text.is_empty() && text.len() > 2 { + all_texts.push(text.to_string()); + } + } + } + } + } + } + + if !all_texts.is_empty() { + story.push_str("【Text in video】\n"); + for txt in all_texts.iter().take(10) { + story.push_str(&format!(" - {}\n", txt)); + } + story.push('\n'); + } + + // Aggregate faces + let mut face_count = 0; + for frame in &context_frames { + if let Some(faces) = &frame.face_results { + if let Some(arr) = faces.as_array() { + face_count += arr.len(); + } + } + } + + if face_count > 0 { + story.push_str(&format!( + "【Faces】\n - {} face(s) detected\n\n", + face_count + )); + } + + println!("{}", story); + } Ok(()) } Commands::Vectorize { uuid } => { println!("Vectorizing: {}", uuid); - // TODO: Implement vectorize + + let pg = PostgresDb::init() + .await + .context("Failed to init PostgreSQL")?; + let qdrant = QdrantDb::init().await.context("Failed to init Qdrant")?; + let embedder = Embedder::new("nomic-embed-text:v1.5".to_string()); + + let target_uuid = if uuid == "all" { + None + } else { + Some(uuid.as_str()) + }; + + let mut stored_count = 0usize; + + if let Some(target) = target_uuid { + let chunks = pg.get_chunks_by_uuid(target).await?; + let sentence_chunks: Vec<_> = chunks + .into_iter() + .filter(|c| c.chunk_type == ChunkType::Sentence) + .collect(); + + println!( + "Found {} sentence chunks for {}", + sentence_chunks.len(), + target + ); + + for chunk in sentence_chunks { + let text = chunk + .content + .get("text") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + if text.is_empty() { + continue; + } + + print!("Embedding chunk {}... ", chunk.chunk_id); + + match embedder.embed_document(text).await { + Ok(vector) => { + let vector_id = format!("{}_{}", chunk.uuid, chunk.chunk_id); + + if let Err(e) = + pg.store_vector(&chunk.chunk_id, &vector, &chunk.uuid).await + { + eprintln!("store_vector error for {}: {}", chunk.chunk_id, e); + continue; + } + + let qdrant_payload = VectorPayload { + uuid: chunk.uuid.clone(), + chunk_id: chunk.chunk_id.clone(), + chunk_type: "sentence".to_string(), + start_time: chunk.start_time, + end_time: chunk.end_time, + text: Some(text.to_string()), + }; + if let Err(e) = qdrant + .upsert_vector(&chunk.chunk_id, &vector, qdrant_payload) + .await + { + eprintln!("upsert_vector error for {}: {}", chunk.chunk_id, e); + continue; + } + + if let Err(e) = pg.update_vector_id(&chunk.chunk_id, &vector_id).await { + eprintln!("update_vector_id error for {}: {}", chunk.chunk_id, e); + continue; + } + + stored_count += 1; + println!("done ({} dims)", vector.len()); + } + Err(e) => { + println!("failed: {}", e); + } + } + } + + // Only update storage status if vectors were actually stored + if stored_count > 0 { + pg.update_storage_status(target, "pvector_chunk", true) + .await?; + pg.update_storage_status(target, "qvector_chunk", true) + .await?; + println!( + "\n✓ Vectorize stage completed for {}! ({} vectors stored)", + target, stored_count + ); + } else { + println!( + "\n✗ Vectorize stage failed for {}! (0 vectors stored)", + target + ); + } + } else { + println!("\n✓ Vectorize stage completed for all videos!"); + } Ok(()) } Commands::Play { target } => { @@ -281,8 +1140,7 @@ async fn main() -> Result<()> { Ok(()) } Commands::Server { host, port } => { - println!("Starting API server at {}:{}", host, port); - // TODO: Implement server + momentry_core::api::start_server(&host, port).await?; Ok(()) } Commands::Query { query } => { @@ -336,5 +1194,166 @@ async fn main() -> Result<()> { println!("\nThumbnails generated successfully!"); Ok(()) } + Commands::Status { uuid } => { + let db = PostgresDb::init().await?; + + let videos = if let Some(ref u) = uuid { + vec![db + .get_video_by_uuid(u) + .await? + .ok_or_else(|| anyhow::anyhow!("Video not found: {}", u))?] + } else { + db.list_videos().await? + }; + + println!("\n╔══════════════════════════════════════════════════════════════════════════════════╗"); + println!( + "║ 📊 Storage Status Report ║" + ); + println!("╠══════════════════════════════════════════════════════════════════════════════════╣"); + println!( + "║ {:32} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} ║", + "Video", "FS", "FS", "PSQL", "PObj", "MObj", "PVec", "QVec" + ); + println!( + "║ {:32} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} ║", + "", "Video", "JSON", "Chunk", "Chunk", "Chunk", "Chunk", "Chunk" + ); + println!( + "╠{:33}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╣", + str::repeat("─", 32), + str::repeat("─", 8), + str::repeat("─", 8), + str::repeat("─", 8), + str::repeat("─", 8), + str::repeat("─", 8), + str::repeat("─", 8), + str::repeat("─", 8) + ); + + for video in videos { + let (sentence_count, time_count) = + db.get_chunk_count(&video.uuid).await.unwrap_or((0, 0)); + let vector_count = db.get_vector_count(&video.uuid).await.unwrap_or(0); + let total_chunks = sentence_count + time_count; + + let psql_status = if total_chunks > 0 { "✓" } else { "-" }; + let pvec_status = if vector_count > 0 && total_chunks > 0 { + if vector_count >= total_chunks { + "✓" + } else { + "◐" + } + } else { + "-" + }; + let qvec_status = if video.storage.qvector_chunk { + "✓" + } else { + "-" + }; + + let file_name = if video.file_name.len() > 30 { + format!("...{}", &video.file_name[video.file_name.len() - 27..]) + } else { + video.file_name + }; + + println!( + "║ {:32} │ {} │ {} │ {} │ - │ - │ {} │ {} ║", + file_name, + if video.storage.fs_video { "✓" } else { "✗" }, + if video.storage.fs_json { "✓" } else { "-" }, + psql_status, + pvec_status, + qvec_status + ); + } + + println!("╠══════════════════════════════════════════════════════════════════════════════════╣"); + println!( + "║ Storage Types: ║" + ); + println!( + "║ FS_Video - Video file on filesystem ║" + ); + println!( + "║ FS_JSON - JSON files (probe, ASR, YOLO, etc.) ║" + ); + println!( + "║ PSQL_Chunk - Chunks stored in PostgreSQL ║" + ); + println!( + "║ PObject - Chunks as JSON objects in PostgreSQL (future) ║" + ); + println!( + "║ MObject - Chunks as JSON objects in MongoDB (future) ║" + ); + println!( + "║ PVector - Vectors in PostgreSQL ║" + ); + println!( + "║ QVector - Vectors in Qdrant ║" + ); + println!("╚══════════════════════════════════════════════════════════════════════════════════╝"); + Ok(()) + } + Commands::Backup { action, days } => { + let output_dir = OutputDir::new(); + output_dir.ensure_dir()?; + + println!("\n📁 Backup directory: {:?}", output_dir.get_backup_dir()); + + match action.as_str() { + "list" => { + let backups = output_dir.list_backups()?; + println!("\n📦 Available backups:"); + if backups.is_empty() { + println!(" (no backups found)"); + } else { + for backup in &backups { + println!(" - {}", backup.filename); + } + } + println!("\nTotal: {} backup(s)", backups.len()); + } + "cleanup" => { + let days = days.unwrap_or(30); + let deleted = output_dir.cleanup_old_backups(days)?; + println!( + "\n🗑️ Cleaned up {} old backup(s) (older than {} days)", + deleted, days + ); + } + "verify" => { + println!("\n🔍 Verifying backups..."); + let backups = output_dir.list_backups()?; + let mut verified = 0; + let mut failed = 0; + for backup in &backups { + match output_dir.verify_backup(&backup.path) { + Ok(true) => { + println!(" ✓ {}", backup.filename); + verified += 1; + } + Ok(false) => { + println!(" ✗ {} (missing checksum)", backup.filename); + failed += 1; + } + Err(e) => { + println!(" ✗ {} ({})", backup.filename, e); + failed += 1; + } + } + } + println!("\nVerified: {} OK, {} failed", verified, failed); + } + _ => { + println!("\n⚠️ Unknown action: {}", action); + println!("Available actions: list, cleanup, verify"); + } + } + Ok(()) + } } }