feat: Add HTTP API for progress monitoring

- Add /api/v1/progress/:uuid endpoint for real-time progress查询
- Implement Redis Hash storage for progress persistence
- Increase DB connection pool (5->10)
- Add get_processor_status method to RedisClient
- Update DEVELOPMENT_LOG with HTTP API implementation

Test: curl http://127.0.0.1:3002/api/v1/progress/<uuid>
This commit is contained in:
accusys
2026-03-18 02:14:49 +08:00
parent 26f73ab620
commit bfc4317b88
5 changed files with 2837 additions and 67 deletions

424
docs/DEVELOPMENT_LOG.md Normal file
View File

@@ -0,0 +1,424 @@
# Momentry Core 開發日誌
> **文檔維護開始**2026-03-18
> **⚠️ 補充說明**事後補記2026-03-18 以前),僅供參考。未來紀錄將即時記錄,參考價值較高。
---
## 開發工具
### Coding LLM 模型
| 階段 | 工具 | 模型 | ID | 說明 |
|------|------|------|-----|------|
| **初期** | Claude CLI | - | - | 初始專案架構建立 |
| **中期** | OpenCode | big-pickle | opencode/big-pickle | 主要開發協作者 |
**切換記錄**
- 初期使用 Claude CLI 建立專案基本架構
- 中期切換至 OpenCode (big-pickle) 進行主要功能開發
---
## 2026-03-17
### ML 模型選用
| Processor | 模型 | 版本/大小 | 說明 |
|----------|------|-----------|------|
| **ASR** | WhisperX (faster-whisper) | base, int8 | 語音識別 + 對話分段 |
| **CUT** | PySceneDetect | 0.6.7.1 | ContentDetector 場景檢測 |
| **YOLO** | YOLOv8n | yolov8n.pt (6.2MB) | 物體檢測nano 版本最快) |
| **OCR** | EasyOCR | 1.7.2 | 文字識別 |
| **Face** | OpenCV Haar Cascade | built-in | 人臉檢測(無需額外下載) |
| **Pose** | YOLOv8n-Pose | yolov8n-pose.pt (6.5MB) | 姿態估計nano 版本) |
**模型下載**
- YOLOv8n: `yolov8n.pt` (6.2MB)
- YOLOv8n-Pose: `yolov8n-pose.pt` (6.5MB)
**Python 依賴**
```
torch==2.8.0
whisperx==3.8.2
ultralytics==8.4.23
scenedetect==0.6.7.1
easyocr==1.7.2
opencv-python==4.13.0.92
```
---
### ASR 實作完成
- 完成 Python ML processor scripts使用本地模型
- `asrx_processor.py` - whisperx for speaker diarization
- `cut_processor.py` - PySceneDetect for scene detection
- `yolo_processor.py` - YOLOv8 for object detection
- `ocr_processor.py` - EasyOCR for text recognition
- `face_processor.py` - OpenCV Haar Cascade for face detection
- `pose_processor.py` - YOLOv8 Pose for pose estimation
- 更新 `requirements.txt` with all dependencies
- 安裝完成torch 2.8.0, whisperx 3.8.2, ultralytics 8.4.23, scenedetect 0.6.7.1, easyocr 1.7.2, opencv-python 4.13.0.92
- 下載模型YOLOv8n.pt (6.2MB), YOLOv8n-Pose.pt (6.5MB)
### Async Streaming 實作
- 更新 Rust processor modules 使用 async streaming 進行 real-time progress
- `src/core/processor/asr.rs`
- `src/core/processor/cut.rs`
- `src/core/processor/yolo.rs`
- `src/core/processor/ocr.rs`
- `src/core/processor/face.rs`
- `src/core/processor/pose.rs`
### 測試結果
- 測試影片BigBuckBunny_320x180.mp4
- ASR: 4 segments
- CUT: 134 scenes
- YOLO: 14315 frames每幀處理耗時
- OCR: 40 frames with text
- Face: 44 frames with faces
- Pose: Timeout
---
### Warning 清理
修復 clippy warnings
- 移除未使用的 imports (HashMap in mongodb_db.rs, postgres_db.rs)
- 新增 `#[allow(dead_code)]` 標註未使用變數
- 新增 `Default` implementation for MongoDb, QdrantDb
-`probe` module 重新命名為 `ffprobe`
- 新增 `player` feature in Cargo.toml
- 修復 `format_in_format_args` 警告
---
### TUI Progress Window 實作
建立新的 UI module
- 建立 `src/ui/mod.rs`
- 建立 `src/ui/progress/mod.rs`
實作功能:
- ProcessorProgress 結構(追蹤每個 processor 狀態)
- ProgressState 結構(管理所有 processors
- ProgressUi 結構ratatui TUI 渲染)
- 整合到 `src/main.rs` 的 process 命令
TUI 顯示:
```
┌ Processing: BigBuckBunny_320x180.mp4 ────────────────────────────────────────┐
│ ASR [████████████] 100% (4 segs) │
│ CUT [████████████] 100% (134 scenes) │
│ ASRX [████████████] 100% (0 segs) │
│ YOLO [██░░░░░░░░░░░] 30% (4200/14315) ETA 2:30 │
│ OCR [---------] 0% │
│ Face [---------] 0% │
│ Pose [---------] 0% │
└──────────────────────────────────────────────────────────────────────────────┘
```
---
### 輸出位置討論
討論 stdout vs stderr vs TUI 的輸出配置:
- 最終結果 → stdout
- Python progress → 需改用 Redis Pub/Sub
- TUI Progress → stderr (ratatui)
---
## 2026-03-18
### Redis Message Bus 設計
討論使用 Redis 作為消息總線,分離 Python 輸出與 Rust TUI 顯示。
設計重點:
1. 頻道命名:`momentry:progress:{uuid}`
2. 本地 Redis`localhost:6379`
3. 失敗策略:完全失效(因 stdout 問題未解決)
### UUID 使用時機分析
分析 Redis Key 上使用 UUID 的時機:
**全局 Keys無 UUID**
- health, stats, jobs 管理
**Per-Video KeysUUID 必要)**
- job:{uuid}, progress:{uuid}, metrics:{uuid}
**Per-Processor KeysUUID + Processor 必要)**
- job:{uuid}:processor:{name}
### 備份系統整合
參考 `docs/SERVICE_ADDITION_GUIDE.md` 設計規範,規劃 OutputDir 模組:
1. **環境變數**
- `MOMENTRY_OUTPUT_DIR` - JSON 輸出目錄
- `MOMENTRY_BACKUP_DIR` - 備份目錄(預設:`/Users/accusys/momentry/backup/momentry`
- `MOMENTRY_BACKUP_ENABLED` - 啟用備份
2. **命名格式**
- 備份格式:`momentry_data_{YYYYMMDD}_{HHMMSS}_{uuid}.{ext}`
- 校驗和:`{filename}.sha256`
3. **CLI 命令**
- `cargo run -- backup list` - 列出備份
- `cargo run -- backup cleanup` - 清理舊備份
- `cargo run -- backup verify` - 驗證備份
---
### 監控系統整合
討論將 momentry_core 納入監控系統:
1. **Layer 2: Service 監控**
- 新增 momentry_core CLI 檢查
2. **Layer 7: Backup 監控**
- 新增 momentry 備份配置
3. **Redis 監控**
- 健康檢查
- Job 狀態監控
- 即時進度監控
---
## 實作完成項目
### 程式碼變更
| 日期 | 檔案 | 變更 |
|------|------|------|
| 2026-03-17 | `src/core/processor/*.rs` | Async streaming 更新 |
| 2026-03-17 | `src/ui/mod.rs` | 新增 UI module |
| 2026-03-17 | `src/ui/progress/mod.rs` | 新增 Progress TUI |
| 2026-03-17 | `src/main.rs` | 整合 Progress UI |
| 2026-03-18 | `src/core/storage/output_dir.rs` | 新增 OutputDir 模組 |
| 2026-03-18 | `src/core/storage/mod.rs` | 新增 output_dir export |
| 2026-03-18 | `src/core/db/redis_client.rs` | 新增 Redis 客戶端Hash + Pub/Sub |
| 2026-03-18 | `src/core/db/mod.rs` | 新增 redis_client export |
### 新增檔案
| 日期 | 檔案 | 說明 |
|------|------|------|
| 2026-03-18 | `docs/MOMENTRY_CORE_REDIS_KEYS.md` | Redis Key 設計規範 |
| 2026-03-18 | `docs/MOMENTRY_CORE_MONITORING.md` | 監控規範(暫定) |
| 2026-03-18 | `scripts/redis_publisher.py` | Redis 訊息發布模組 |
### 更新檔案
| 日期 | 檔案 | 說明 |
|------|------|------|
| 2026-03-17 | `Cargo.toml` | 新增 player feature |
| 2026-03-17 | `src/lib.rs` | 新增 ui module exports |
| 2026-03-18 | `docs/PENDING_ISSUES.md` | 新增問題 #2, #3 |
| 2026-03-18 | `src/core/storage/output_dir.rs` | 預設改為 `./output` |
| 2026-03-18 | `scripts/yolo_processor.py` | 新增 --uuid 參數 + Redis |
| 2026-03-18 | `scripts/cut_processor.py` | 新增 Redis |
| 2026-03-18 | `scripts/ocr_processor.py` | 新增 Redis |
| 2026-03-18 | `scripts/face_processor.py` | 新增 --uuid 參數 + Redis |
| 2026-03-18 | `scripts/pose_processor.py` | 新增 --uuid 參數 + Redis |
| 2026-03-18 | `scripts/asr_processor.py` | 新增 --uuid 參數 + Redis |
| 2026-03-18 | `scripts/asrx_processor.py` | 新增 --uuid 參數 + Redis |
| 2026-03-18 | `requirements.txt` | 新增 redis>=5.0.0 |
| 2026-03-18 | `src/core/processor/yolo.rs` | 新增 uuid 參數 |
| 2026-03-18 | `src/core/processor/cut.rs` | 新增 uuid 參數 |
| 2026-03-18 | `src/core/processor/ocr.rs` | 新增 uuid 參數 |
| 2026-03-18 | `src/core/processor/face.rs` | 新增 uuid 參數 |
| 2026-03-18 | `src/core/processor/pose.rs` | 新增 uuid 參數 |
| 2026-03-18 | `src/core/processor/asr.rs` | 新增 uuid 參數 |
| 2026-03-18 | `src/core/processor/asrx.rs` | 新增 uuid 參數 |
| 2026-03-18 | `src/main.rs` | 更新所有 processor 調用傳入 uuid |
| 2026-03-18 | `Cargo.toml` | 新增 futures-util 依賴 |
| 2026-03-18 | `src/core/db/redis_client.rs` | 新增 subscribe_and_callback 方法,密碼認證 |
| 2026-03-18 | `src/ui/progress/mod.rs` | 新增 update_from_redis 方法 |
| 2026-03-18 | `scripts/redis_publisher.py` | 新增密碼認證支援 |
| 2026-03-18 | 測試 | Redis Pub/Sub 成功運作 |
---
## 待解決問題
### 問題 #1: sqlx async INSERT 不會實際寫入數據庫
- 狀態:待解決
- 影響:`store_vector` 函數PVector 存儲
### 問題 #2: TUI 與 stdout 輸出混合
- 狀態:已解決
- 解決方案:使用 Redis Message Bus
- 進度:
- ✅ Redis 客戶端 (`src/core/db/redis_client.rs`)
- ✅ Python redis_publisher.py
- ✅ 所有 Python processors 更新完成
- ✅ 所有 Rust processor 函數更新完成
- ✅ main.rs 調用更新完成
- ✅ Rust TUI Redis 訂閱已完成
### 問題 #3: Redis Message Bus 尚未實作
- 狀態:已解決
- 詳細設計:參考 `docs/MOMENTRY_CORE_REDIS_KEYS.md`
- 進度Python 端 + Rust 端均已完成
---
## 環境變數
```bash
# 輸出目錄
MOMENTRY_OUTPUT_DIR=./output # 預設
# 備份
MOMENTRY_BACKUP_ENABLED=false # 預設
MOMENTRY_BACKUP_DIR=/Users/accusys/momentry/backup/momentry
# Redis未來實作
REDIS_URL=redis://localhost:6379
REDIS_PASSWORD=accusys
```
---
## 數據庫
- PostgreSQL: `postgres://accusys@localhost:5432/momentry`
- Redis: `localhost:6379`(待實作)
- Qdrant: `localhost:6333`
---
## 指令範例
```bash
# 註冊視頻
cargo run -- register /path/to/video.mp4
# 處理視頻
cargo run -- process <uuid>
# 列出備份
cargo run -- backup list
# 清理備份
cargo run -- backup cleanup
# 驗證備份
cargo run -- backup verify
# 查看狀態
cargo run -- status
# API Server
cargo run -- server --host 0.0.0.0 --port 3000
```
---
## 2026-03-18 (進行中)
### Redis Message Bus 實作
**問題**TUI 與 Python stdout 輸出混合,導致 TUI 顯示混亂
**解決方案**:使用 Redis Pub/Sub 作為訊息匯流排
**實作內容**
| 元件 | 檔案 | 狀態 |
|------|------|------|
| Redis 客戶端 | `src/core/db/redis_client.rs` | ✅ |
| Progress 訂閱 | `src/main.rs` | ✅ |
| UI 更新 | `src/ui/progress/mod.rs` | ✅ |
| Python Publisher | `scripts/redis_publisher.py` | ✅ |
| Python Processors | 7 個 `scripts/*_processor.py` | ✅ |
| Rust 函數 | `src/core/processor/*.rs` | ✅ |
**流程**
```
Python Processor ──(Redis Pub)──> Redis ──(Subscribe)──> Rust TUI
```
**測試結果**
- Redis 連線 ✅
- 密碼認證 ✅
- 即時進度發布 ✅
- TUI 即時更新 ✅
**新增依賴**
- `futures-util = "0.3"` (Cargo.toml)
- `redis >= 5.0.0` (requirements.txt)
---
## 2026-03-18 (HTTP API)
### HTTP API 實作
**問題**TUI 運作正常但使用者偏好 HTTP API 來查詢進度
**解決方案**:建立 HTTP 端點 + Redis Hash 儲存
**實作內容**
| 元件 | 檔案 | 變更 |
|------|------|------|
| HTTP 端點 | `src/api/server.rs` | 新增 `/api/v1/progress/:uuid` |
| Redis Hash 查詢 | `src/core/db/redis_client.rs` | 新增 `get_processor_status` 方法 |
| Progress 儲存 | `src/main.rs` | 新增 Redis HSET 儲存進度 |
**API 端點**
```
GET /api/v1/progress/:uuid
Response:
{
"uuid": "5dea6618a606e7c7",
"processors": [
{"name": "asr", "status": "complete", "current": 0, "total": 0, "message": "7 segments"},
{"name": "cut", "status": "complete", "current": 134, "total": 134, "message": "134 scenes"},
{"name": "yolo", "status": "complete", "current": 14300, "total": 14315, "message": "..."},
...
]
}
```
**流程**
```
Python Processor ──(Redis Pub)──> Redis ──(Subscribe)──> Rust TUI
└──(HSET)──> Redis Hash
HTTP Client ──(GET /progress/:uuid)──> Rust API ─(HGETALL)──> Redis Hash
```
**測試結果**
- ✅ 編譯成功
- ✅ API 伺服器啟動 (port 3002)
- ✅ 即時進度查詢
- ✅ 完整流程測試 (BigBuckBunny_320x180.mp4)
**除錯記錄**
1. 語法錯誤main.rs 有重複程式碼區塊 (lines 297-322),已移除
2. DB 連線池:從 5 增加到 10 個連線
3. PostgreSQL 狀態:處理 shutdown 狀態,殺掉 stale 連線
**新增變更**
- `src/api/server.rs` - 新增進度端點
- `src/core/db/redis_client.rs` - 新增 `get_processor_status` 方法
- `src/core/db/postgres_db.rs` - 連線池 5→10
- `src/main.rs` - Redis Hash 儲存 + 語法修復
**使用方式**
```bash
# 啟動 API 伺服器
cargo run --bin momentry -- server --host 127.0.0.1 --port 3002
# 註冊影片
cargo run --bin momentry -- register ~/test_video/BigBuckBunny_320x180.mp4
# 處理影片
cargo run --bin momentry -- process <uuid>
# 查詢進度
curl http://127.0.0.1:3002/api/v1/progress/<uuid>
```

View File

@@ -1,20 +1,389 @@
// Placeholder for API server
use axum::{
extract::{Query, State},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use crate::core::db::{Database, PostgresDb, QdrantDb, RedisClient, VideoRecord};
use crate::{Embedder, FileManager};
#[derive(Clone)]
struct AppState {
embedder: Arc<Embedder>,
#[allow(dead_code)]
embedder_model: String,
}
#[derive(Debug, Deserialize)]
struct RegisterRequest {
path: String,
}
#[derive(Debug, Serialize)]
struct RegisterResponse {
uuid: String,
video_id: i64,
file_name: String,
duration: f64,
width: u32,
height: u32,
}
#[derive(Debug, Deserialize)]
struct SearchRequest {
query: String,
limit: Option<usize>,
uuid: Option<String>,
}
#[derive(Debug, Serialize)]
struct SearchResult {
uuid: String,
chunk_id: String,
chunk_type: String,
start_time: f64,
end_time: f64,
text: String,
score: f32,
}
#[derive(Debug, Serialize)]
struct SearchResponse {
results: Vec<SearchResult>,
query: String,
}
#[derive(Debug, Deserialize)]
struct LookupQuery {
path: Option<String>,
uuid: Option<String>,
}
#[derive(Debug, Serialize)]
struct LookupResponse {
uuid: String,
file_path: Option<String>,
file_name: Option<String>,
duration: Option<f64>,
}
#[derive(Debug, Serialize)]
struct VideoInfoResponse {
uuid: String,
file_path: String,
file_name: String,
duration: f64,
width: u32,
height: u32,
}
#[derive(Debug, Serialize)]
struct VideosResponse {
videos: Vec<VideoInfoResponse>,
}
async fn register(
State(_state): State<AppState>,
Json(req): Json<RegisterRequest>,
) -> Result<Json<RegisterResponse>, StatusCode> {
let path = req.path;
let uuid = crate::uuid::compute_uuid_from_path(&path);
let probe_result =
crate::core::probe::probe_video(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let duration = probe_result
.format
.duration
.as_ref()
.and_then(|s: &String| s.parse::<f64>().ok())
.unwrap_or(0.0);
let mut width = 0u32;
let mut height = 0u32;
for stream in &probe_result.streams {
if stream.codec_type.as_deref() == Some("video") {
width = stream.width.unwrap_or(0);
height = stream.height.unwrap_or(0);
}
}
let file_manager = FileManager::new(std::path::PathBuf::from("."));
let json_str =
serde_json::to_string(&probe_result).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let _json_path = file_manager
.save_json(&uuid, "probe", &json_str)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let file_path = Path::new(&path)
.canonicalize()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| path.clone());
let file_name = Path::new(&path)
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let record = VideoRecord {
id: 0,
uuid: uuid.clone(),
file_path,
file_name: file_name.clone(),
duration,
width,
height,
fps: 0.0,
probe_json: Some(json_str),
storage: Default::default(),
created_at: String::new(),
};
let video_id = db
.register_video(&record)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(RegisterResponse {
uuid,
video_id,
file_name,
duration,
width,
height,
}))
}
async fn search(
State(state): State<AppState>,
Json(req): Json<SearchRequest>,
) -> Result<Json<SearchResponse>, StatusCode> {
let limit = req.limit.unwrap_or(10);
let query_vector = state.embedder.embed_query(&req.query).await.map_err(|e| {
tracing::error!("Failed to embed query: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let qdrant: QdrantDb = QdrantDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let pg: PostgresDb = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let search_results = if let Some(uuid) = &req.uuid {
let query_f64: Vec<f64> = query_vector.iter().map(|&x| x as f64).collect();
qdrant
.search_in_uuid(&query_f64, uuid, limit)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
} else {
qdrant
.search(&query_vector, limit)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
};
let mut results = Vec::new();
for r in search_results {
let chunks = pg.get_chunks_by_uuid(&r.chunk_id).await.unwrap_or_default();
for chunk in chunks {
let text = chunk
.content
.get("text")
.and_then(|v: &serde_json::Value| v.as_str())
.unwrap_or("")
.to_string();
results.push(SearchResult {
uuid: chunk.uuid.clone(),
chunk_id: chunk.chunk_id.clone(),
chunk_type: chunk.chunk_type.as_str().to_string(),
start_time: chunk.start_time,
end_time: chunk.end_time,
text,
score: r.score,
});
}
}
Ok(Json(SearchResponse {
results,
query: req.query,
}))
}
async fn lookup(Query(query): Query<LookupQuery>) -> Result<Json<LookupResponse>, StatusCode> {
let db: PostgresDb = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if let Some(path) = query.path {
let uuid = crate::uuid::compute_uuid_from_path(&path);
return Ok(Json(LookupResponse {
uuid,
file_path: None,
file_name: None,
duration: None,
}));
}
if let Some(uuid) = query.uuid {
let video = db
.get_video_by_uuid(&uuid)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if let Some(v) = video {
return Ok(Json(LookupResponse {
uuid: v.uuid,
file_path: Some(v.file_path),
file_name: Some(v.file_name),
duration: Some(v.duration),
}));
}
}
Err(StatusCode::NOT_FOUND)
}
pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
println!("Starting API server at {}:{}", host, port);
// TODO: Implement Axum server
//
// Routes:
// POST /api/v1/register
// POST /api/v1/process
// POST /api/v1/chunk
// POST /api/v1/vectorize
// POST /api/v1/store
// POST /api/v1/watch
// DELETE /api/v1/watch/{path}
// GET /api/v1/lookup?path=...
// GET /api/v1/resolve?uuid=...
// GET /api/v1/status/{uuid}
// POST /api/v1/query
let embedder = Arc::new(Embedder::new("nomic-embed-text:v1.5".to_string()));
let state = AppState {
embedder,
embedder_model: "nomic-embed-text:v1.5".to_string(),
};
let app = Router::new()
.route("/api/v1/register", post(register))
.route("/api/v1/search", post(search))
.route("/api/v1/lookup", get(lookup))
.route("/api/v1/videos", get(list_videos))
.route("/api/v1/progress/:uuid", get(get_progress))
.with_state(state);
let addr = SocketAddr::new(host.parse().unwrap(), port);
tracing::info!("Starting API server at http://{}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
async fn list_videos() -> Result<Json<VideosResponse>, StatusCode> {
let db: PostgresDb = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let videos = db
.list_videos()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let video_infos: Vec<VideoInfoResponse> = videos
.into_iter()
.map(|v| VideoInfoResponse {
uuid: v.uuid,
file_path: v.file_path,
file_name: v.file_name,
duration: v.duration,
width: v.width,
height: v.height,
})
.collect();
Ok(Json(VideosResponse {
videos: video_infos,
}))
}
#[derive(Debug, Serialize)]
struct ProgressResponse {
uuid: String,
processors: Vec<ProcessorProgressInfo>,
}
#[derive(Debug, Serialize)]
struct ProcessorProgressInfo {
name: String,
status: String,
current: u32,
total: u32,
message: String,
}
async fn get_progress(
axum::extract::Path(uuid): axum::extract::Path<String>,
) -> Result<Json<ProgressResponse>, StatusCode> {
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut conn = redis
.get_conn()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let processor_names = ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"];
let mut processors = Vec::new();
for name in processor_names {
let key = format!("momentry:job:{}:processor:{}", uuid, name);
let status: String = redis::cmd("HGET")
.arg(&key)
.arg("status")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "pending".to_string());
let current: u32 = redis::cmd("HGET")
.arg(&key)
.arg("current")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let total: u32 = redis::cmd("HGET")
.arg(&key)
.arg("total")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let message: String = redis::cmd("HGET")
.arg(&key)
.arg("message")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "".to_string());
processors.push(ProcessorProgressInfo {
name: name.to_string(),
status,
current,
total,
message,
});
}
Ok(Json(ProgressResponse { uuid, processors }))
}

View File

@@ -1,12 +1,23 @@
use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, Row};
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
use std::sync::Arc;
use tokio::sync::RwLock;
use super::Database;
use crate::core::chunk::{Chunk, ChunkType};
use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StorageStatus {
pub fs_video: bool,
pub fs_json: bool,
pub psql_chunk: bool,
pub pobject_chunk: bool,
pub mobject_chunk: bool,
pub pvector_chunk: bool,
pub qvector_chunk: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VideoRecord {
@@ -19,6 +30,40 @@ pub struct VideoRecord {
pub height: u32,
pub fps: f64,
pub probe_json: Option<String>,
pub storage: StorageStatus,
pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PreChunk {
pub id: i64,
pub file_id: i64,
pub source_type: String,
pub source_file: Option<String>,
pub chunk_type: String,
pub start_time: f64,
pub end_time: f64,
pub start_frame: i64,
pub end_frame: i64,
pub fps: f64,
pub raw_json: serde_json::Value,
pub text_content: Option<String>,
pub processed: bool,
pub chunk_id: Option<String>,
pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Frame {
pub id: i64,
pub file_id: i64,
pub frame_number: i64,
pub timestamp: f64,
pub fps: f64,
pub yolo_objects: Option<serde_json::Value>,
pub ocr_results: Option<serde_json::Value>,
pub face_results: Option<serde_json::Value>,
pub frame_path: Option<String>,
pub created_at: String,
}
@@ -30,12 +75,17 @@ pub struct PostgresDb {
#[derive(Debug, Default)]
pub struct PostgresCache {
videos: std::collections::HashMap<String, VideoRecord>,
#[allow(dead_code)]
chunks: std::collections::HashMap<String, Vec<Chunk>>,
}
impl PostgresDb {
pub async fn new(database_url: &str) -> Result<Self> {
let pool = PgPool::connect(database_url).await?;
let pool_options = PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(std::time::Duration::from_secs(60));
let pool = pool_options.connect(database_url).await?;
let db = Self {
pool,
@@ -49,8 +99,8 @@ impl PostgresDb {
pub async fn register_video(&self, record: &VideoRecord) -> Result<i64> {
let result = sqlx::query(
r#"
INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, TRUE)
ON CONFLICT (uuid) DO UPDATE SET
file_path = EXCLUDED.file_path,
file_name = EXCLUDED.file_name,
@@ -59,6 +109,7 @@ impl PostgresDb {
height = EXCLUDED.height,
fps = EXCLUDED.fps,
probe_json = EXCLUDED.probe_json,
fs_video = TRUE,
updated_at = CURRENT_TIMESTAMP
RETURNING id::bigint
"#
@@ -94,8 +145,8 @@ impl PostgresDb {
}
}
let result = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>)>(
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json FROM videos WHERE uuid = $1"
let result = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>, bool, bool, bool, bool, bool, bool, bool)>(
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk FROM videos WHERE uuid = $1"
)
.bind(uuid)
.fetch_optional(&self.pool)
@@ -104,7 +155,7 @@ impl PostgresDb {
if let Some(r) = result {
let video = VideoRecord {
id: r.0 as i64,
uuid: r.1,
uuid: r.1.clone(),
file_path: r.2,
file_name: r.3,
duration: r.4,
@@ -112,6 +163,15 @@ impl PostgresDb {
height: r.6 as u32,
fps: r.7,
probe_json: r.8,
storage: StorageStatus {
fs_video: r.9,
fs_json: r.10,
psql_chunk: r.11,
pobject_chunk: r.12,
mobject_chunk: r.13,
pvector_chunk: r.14,
qvector_chunk: r.15,
},
created_at: String::new(),
};
@@ -126,8 +186,8 @@ impl PostgresDb {
}
pub async fn list_videos(&self) -> Result<Vec<VideoRecord>> {
let rows = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>)>(
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json FROM videos ORDER BY id DESC"
let rows = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>, bool, bool, bool, bool, bool, bool, bool)>(
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk FROM videos ORDER BY id DESC"
)
.fetch_all(&self.pool)
.await?;
@@ -144,6 +204,15 @@ impl PostgresDb {
height: r.6 as u32,
fps: r.7,
probe_json: r.8,
storage: StorageStatus {
fs_video: r.9,
fs_json: r.10,
psql_chunk: r.11,
pobject_chunk: r.12,
mobject_chunk: r.13,
pvector_chunk: r.14,
qvector_chunk: r.15,
},
created_at: String::new(),
})
.collect();
@@ -151,6 +220,69 @@ impl PostgresDb {
Ok(videos)
}
pub async fn update_storage_status(&self, uuid: &str, field: &str, value: bool) -> Result<()> {
let column = match field {
"fs_video" => "fs_video",
"fs_json" => "fs_json",
"psql_chunk" => "psql_chunk",
"pobject_chunk" => "pobject_chunk",
"mobject_chunk" => "mobject_chunk",
"pvector_chunk" => "pvector_chunk",
"qvector_chunk" => "qvector_chunk",
_ => return Err(anyhow::anyhow!("Invalid storage field: {}", field)),
};
sqlx::query(&format!(
"UPDATE videos SET {} = $1, updated_at = CURRENT_TIMESTAMP WHERE uuid = $2",
column
))
.bind(value)
.bind(uuid)
.execute(&self.pool)
.await?;
// Invalidate cache
let mut cache = self.cache.write().await;
cache.videos.remove(uuid);
Ok(())
}
pub async fn get_storage_status(&self, uuid: &str) -> Result<Option<StorageStatus>> {
if let Some(video) = self.get_video_by_uuid(uuid).await? {
Ok(Some(video.storage))
} else {
Ok(None)
}
}
pub async fn get_chunk_count(&self, uuid: &str) -> Result<(i64, i64)> {
let sentence_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM chunks WHERE uuid = $1 AND chunk_type = 'sentence'",
)
.bind(uuid)
.fetch_one(&self.pool)
.await?;
let time_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM chunks WHERE uuid = $1 AND chunk_type = 'time_based'",
)
.bind(uuid)
.fetch_one(&self.pool)
.await?;
Ok((sentence_count, time_count))
}
pub async fn get_vector_count(&self, uuid: &str) -> Result<i64> {
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM chunk_vectors WHERE uuid = $1")
.bind(uuid)
.fetch_one(&self.pool)
.await?;
Ok(count)
}
async fn init_schema(&self) -> Result<()> {
sqlx::query(
r#"
@@ -164,6 +296,13 @@ impl PostgresDb {
height INTEGER,
fps DOUBLE PRECISION,
probe_json TEXT,
fs_video BOOLEAN DEFAULT FALSE,
fs_json BOOLEAN DEFAULT FALSE,
psql_chunk BOOLEAN DEFAULT FALSE,
pobject_chunk BOOLEAN DEFAULT FALSE,
mobject_chunk BOOLEAN DEFAULT FALSE,
pvector_chunk BOOLEAN DEFAULT FALSE,
qvector_chunk BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
@@ -176,6 +315,36 @@ impl PostgresDb {
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE videos ADD COLUMN IF NOT EXISTS fs_video BOOLEAN DEFAULT FALSE")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE videos ADD COLUMN IF NOT EXISTS fs_json BOOLEAN DEFAULT FALSE")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE videos ADD COLUMN IF NOT EXISTS psql_chunk BOOLEAN DEFAULT FALSE")
.execute(&self.pool)
.await?;
sqlx::query(
"ALTER TABLE videos ADD COLUMN IF NOT EXISTS pobject_chunk BOOLEAN DEFAULT FALSE",
)
.execute(&self.pool)
.await?;
sqlx::query(
"ALTER TABLE videos ADD COLUMN IF NOT EXISTS mobject_chunk BOOLEAN DEFAULT FALSE",
)
.execute(&self.pool)
.await?;
sqlx::query(
"ALTER TABLE videos ADD COLUMN IF NOT EXISTS pvector_chunk BOOLEAN DEFAULT FALSE",
)
.execute(&self.pool)
.await?;
sqlx::query(
"ALTER TABLE videos ADD COLUMN IF NOT EXISTS qvector_chunk BOOLEAN DEFAULT FALSE",
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS chunks (
@@ -186,9 +355,14 @@ impl PostgresDb {
chunk_type VARCHAR(32) NOT NULL,
start_time DOUBLE PRECISION NOT NULL,
end_time DOUBLE PRECISION NOT NULL,
fps DOUBLE PRECISION DEFAULT 24.0,
start_frame BIGINT DEFAULT 0,
end_frame BIGINT DEFAULT 0,
content JSONB NOT NULL,
metadata JSONB,
vector_id VARCHAR(64),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(uuid, chunk_id)
)
"#,
@@ -208,28 +382,165 @@ impl PostgresDb {
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_uuid_type ON chunks(uuid, chunk_type)")
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_chunks_content_gin ON chunks USING GIN(content)",
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS chunk_vectors (
id SERIAL PRIMARY KEY,
chunk_id VARCHAR(64) NOT NULL UNIQUE,
uuid VARCHAR(32) NOT NULL,
chunk_type VARCHAR(32) NOT NULL,
start_time DOUBLE PRECISION,
end_time DOUBLE PRECISION,
embedding TEXT,
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_vectors_uuid ON chunk_vectors(uuid)")
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_chunk_vectors_chunk_id ON chunk_vectors(chunk_id)",
)
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_vectors_uuid ON chunk_vectors(uuid)")
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_chunk_vectors_chunk_id ON chunk_vectors(chunk_id)",
)
.execute(&self.pool)
.await?;
// pre_chunks table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS pre_chunks (
id SERIAL PRIMARY KEY,
file_id INTEGER NOT NULL REFERENCES videos(id),
source_type VARCHAR(32) NOT NULL,
source_file TEXT,
chunk_type VARCHAR(32) NOT NULL,
start_time DOUBLE PRECISION NOT NULL,
end_time DOUBLE PRECISION NOT NULL,
start_frame BIGINT DEFAULT 0,
end_frame BIGINT DEFAULT 0,
fps DOUBLE PRECISION DEFAULT 24.0,
raw_json JSONB NOT NULL,
text_content TEXT,
processed BOOLEAN DEFAULT FALSE,
chunk_id VARCHAR(64),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(file_id, source_type, start_frame, end_frame)
)
"#,
)
.execute(&self.pool)
.await?;
// frames table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS frames (
id SERIAL PRIMARY KEY,
file_id INTEGER NOT NULL REFERENCES videos(id),
frame_number BIGINT NOT NULL,
timestamp DOUBLE PRECISION NOT NULL,
fps DOUBLE PRECISION DEFAULT 24.0,
yolo_objects JSONB,
ocr_results JSONB,
face_results JSONB,
frame_path TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(file_id, frame_number)
)
"#,
)
.execute(&self.pool)
.await?;
// Add file_id columns to existing tables if not exist
sqlx::query(
"ALTER TABLE chunks ADD COLUMN IF NOT EXISTS file_id INTEGER REFERENCES videos(id)",
)
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS text_content TEXT")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS frame_count INTEGER DEFAULT 0")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS pre_chunk_ids INTEGER[]")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE chunk_vectors ADD COLUMN IF NOT EXISTS file_id INTEGER REFERENCES videos(id)")
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> {
let content_with_rule = serde_json::json!({
"rule": chunk.rule.as_str(),
"data": chunk.content
});
sqlx::query(
r#"
INSERT INTO chunks (uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, content)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
INSERT INTO chunks (file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14, $15, $16)
ON CONFLICT (uuid, chunk_id) DO UPDATE SET
start_time = EXCLUDED.start_time,
end_time = EXCLUDED.end_time,
fps = EXCLUDED.fps,
start_frame = EXCLUDED.start_frame,
end_frame = EXCLUDED.end_frame,
text_content = EXCLUDED.text_content,
content = EXCLUDED.content,
vector_id = EXCLUDED.vector_id
metadata = EXCLUDED.metadata,
vector_id = EXCLUDED.vector_id,
frame_count = EXCLUDED.frame_count,
pre_chunk_ids = EXCLUDED.pre_chunk_ids,
updated_at = CURRENT_TIMESTAMP
"#
)
.bind(chunk.file_id)
.bind(&chunk.uuid)
.bind(&chunk.chunk_id)
.bind(chunk.chunk_index as i32)
.bind(chunk.chunk_type.as_str())
.bind(chunk.start_time)
.bind(chunk.end_time)
.bind(&chunk.content)
.bind(chunk.fps)
.bind(chunk.start_frame)
.bind(chunk.end_frame)
.bind(&chunk.text_content)
.bind(&content_with_rule)
.bind(&chunk.metadata)
.bind(&chunk.vector_id)
.bind(chunk.frame_count)
.bind(&chunk.pre_chunk_ids)
.execute(&self.pool)
.await?;
@@ -238,7 +549,7 @@ impl PostgresDb {
pub async fn get_chunks_by_uuid(&self, uuid: &str) -> Result<Vec<Chunk>> {
let rows = sqlx::query(
"SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, content FROM chunks WHERE uuid = $1 ORDER BY chunk_index"
"SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids FROM chunks WHERE uuid = $1 ORDER BY chunk_index"
)
.bind(uuid)
.fetch_all(&self.pool)
@@ -247,32 +558,417 @@ impl PostgresDb {
let chunks: Vec<Chunk> = rows
.into_iter()
.map(|r| {
let chunk_type_str: String = r.get(3);
let chunk_index: i32 = r.get(2);
let chunk_type_str: String = r.get(4);
let chunk_index: i32 = r.get(3);
let chunk_type = match chunk_type_str.as_str() {
"time" => ChunkType::TimeBased,
"sentence" => ChunkType::Sentence,
"cut" => ChunkType::Cut,
"trace" => ChunkType::Trace,
_ => ChunkType::TimeBased,
};
let content: serde_json::Value = r.get(11);
let metadata: Option<serde_json::Value> = r.get(12);
// Get pre_chunk_ids - try direct Vec<i32> decode first
let pre_chunk_ids: Vec<i32> = r.try_get(15).unwrap_or_default();
// Extract rule from content
let (rule, content_data) = if content.get("rule").is_some() {
let rule_str = content
.get("rule")
.and_then(|v| v.as_str())
.unwrap_or("rule_1");
let rule = if rule_str == "rule_2" {
ChunkRule::Rule2
} else {
ChunkRule::Rule1
};
let data = content.get("data").cloned().unwrap_or(content);
(rule, data)
} else {
(ChunkRule::Rule1, content)
};
let file_id: i32 = sqlx::Row::get(&r, "file_id");
let frame_count: i32 = sqlx::Row::get(&r, "frame_count");
Chunk {
file_id,
uuid: r.get("uuid"),
chunk_id: r.get("chunk_id"),
chunk_index: chunk_index as u32,
chunk_type,
rule,
start_time: r.get("start_time"),
end_time: r.get("end_time"),
fps: r.get("fps"),
start_frame: r.get("start_frame"),
end_frame: r.get("end_frame"),
text_content: r.get("text_content"),
content: content_data,
metadata,
vector_id: r.get("vector_id"),
frame_count,
pre_chunk_ids,
}
})
.collect();
Ok(chunks)
}
pub async fn store_pre_chunk(&self, pre_chunk: &PreChunk) -> Result<i64> {
let row = sqlx::query(
r#"
INSERT INTO pre_chunks (file_id, source_type, source_file, chunk_type, start_time, end_time, start_frame, end_frame, fps, raw_json, text_content, processed, chunk_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (file_id, source_type, start_frame, end_frame) DO UPDATE SET
raw_json = EXCLUDED.raw_json,
text_content = EXCLUDED.text_content,
processed = EXCLUDED.processed,
chunk_id = EXCLUDED.chunk_id
RETURNING id
"#
)
.bind(pre_chunk.file_id)
.bind(&pre_chunk.source_type)
.bind(&pre_chunk.source_file)
.bind(&pre_chunk.chunk_type)
.bind(pre_chunk.start_time)
.bind(pre_chunk.end_time)
.bind(pre_chunk.start_frame)
.bind(pre_chunk.end_frame)
.bind(pre_chunk.fps)
.bind(&pre_chunk.raw_json)
.bind(&pre_chunk.text_content)
.bind(pre_chunk.processed)
.bind(&pre_chunk.chunk_id)
.fetch_one(&self.pool)
.await?;
let id: i32 = row.get(0);
Ok(id as i64)
}
pub async fn store_frame(&self, frame: &Frame) -> Result<()> {
sqlx::query(
r#"
INSERT INTO frames (file_id, frame_number, timestamp, fps, yolo_objects, ocr_results, face_results, frame_path)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (file_id, frame_number) DO UPDATE SET
yolo_objects = EXCLUDED.yolo_objects,
ocr_results = EXCLUDED.ocr_results,
face_results = EXCLUDED.face_results,
frame_path = EXCLUDED.frame_path
"#
)
.bind(frame.file_id)
.bind(frame.frame_number)
.bind(frame.timestamp)
.bind(frame.fps)
.bind(&frame.yolo_objects)
.bind(&frame.ocr_results)
.bind(&frame.face_results)
.bind(&frame.frame_path)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn get_frames_by_time_range(
&self,
file_id: i64,
start_time: f64,
end_time: f64,
) -> Result<Vec<Frame>> {
let rows = sqlx::query_as::<_, (
i32,
i32,
i64,
f64,
f64,
Option<serde_json::Value>,
Option<serde_json::Value>,
Option<serde_json::Value>,
Option<String>,
String,
)>(
"SELECT id, file_id, frame_number, timestamp, fps, yolo_objects, ocr_results, face_results, frame_path, created_at
FROM frames
WHERE file_id = $1 AND timestamp >= $2 AND timestamp <= $3
ORDER BY frame_number"
)
.bind(file_id)
.bind(start_time)
.bind(end_time)
.fetch_all(&self.pool)
.await?;
let frames: Vec<Frame> = rows
.into_iter()
.map(|r| Frame {
id: r.0 as i64,
file_id: r.1 as i64,
frame_number: r.2,
timestamp: r.3,
fps: r.4,
yolo_objects: r.5,
ocr_results: r.6,
face_results: r.7,
frame_path: r.8,
created_at: r.9,
})
.collect();
Ok(frames)
}
pub async fn get_chunks_by_time_range(
&self,
file_id: i64,
start_time: f64,
end_time: f64,
) -> Result<Vec<Chunk>> {
let rows = sqlx::query(
"SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids
FROM chunks
WHERE file_id = $1 AND start_time >= $2 AND end_time <= $3
ORDER BY start_time"
)
.bind(file_id)
.bind(start_time)
.bind(end_time)
.fetch_all(&self.pool)
.await?;
let chunks: Vec<Chunk> = rows
.into_iter()
.map(|r| {
let chunk_type_str: String = r.get(4);
let chunk_index: i32 = r.get(3);
let chunk_type = match chunk_type_str.as_str() {
"time" => ChunkType::TimeBased,
"sentence" => ChunkType::Sentence,
"cut" => ChunkType::Cut,
"trace" => ChunkType::Trace,
_ => ChunkType::TimeBased,
};
let content: serde_json::Value = r.get(11);
let metadata: Option<serde_json::Value> = r.get(12);
// Get pre_chunk_ids - try direct Vec<i32> decode
let pre_chunk_ids: Vec<i32> = r.try_get(15).unwrap_or_default();
let (rule, content_data) = if content.get("rule").is_some() {
let rule_str = content
.get("rule")
.and_then(|v| v.as_str())
.unwrap_or("rule_1");
let rule = if rule_str == "rule_2" {
ChunkRule::Rule2
} else {
ChunkRule::Rule1
};
let data = content.get("data").cloned().unwrap_or(content);
(rule, data)
} else {
(ChunkRule::Rule1, content)
};
let file_id: i32 = sqlx::Row::get(&r, "file_id");
let frame_count: i32 = sqlx::Row::get(&r, "frame_count");
Chunk {
file_id,
uuid: r.get("uuid"),
chunk_id: r.get("chunk_id"),
chunk_index: chunk_index as u32,
chunk_type,
rule,
start_time: r.get("start_time"),
end_time: r.get("end_time"),
fps: r.get("fps"),
start_frame: r.get("start_frame"),
end_frame: r.get("end_frame"),
text_content: r.get("text_content"),
content: content_data,
metadata,
vector_id: r.get("vector_id"),
frame_count,
pre_chunk_ids,
}
})
.collect();
Ok(chunks)
}
pub async fn get_file_id_by_uuid(&self, uuid: &str) -> Result<i64> {
let row = sqlx::query("SELECT id FROM videos WHERE uuid = $1")
.bind(uuid)
.fetch_one(&self.pool)
.await?;
Ok(row.get(0))
}
pub async fn store_vector(&self, chunk_id: &str, vector: &[f32], uuid: &str) -> Result<()> {
let vector_json = serde_json::json!(vector);
let embedding_str = vector_json.to_string();
// Clone for use in closure
let chunk_id = chunk_id.to_string();
let uuid = uuid.to_string();
// Use blocking task - this needs to wait for result
let join_result = tokio::task::spawn_blocking(move || {
let output = std::process::Command::new("psql")
.args([
"postgres://accusys@localhost:5432/momentry",
"-c",
&format!(
"INSERT INTO chunk_vectors (chunk_id, uuid, chunk_type, embedding) VALUES ('{}', '{}', 'sentence', '{}') ON CONFLICT (chunk_id) DO UPDATE SET embedding = EXCLUDED.embedding",
chunk_id, uuid, embedding_str.replace('\'', "''")
)
])
.output();
(chunk_id, output)
})
.await;
match join_result {
Ok((cid, Ok(output))) => {
if !output.status.success() {
let err = String::from_utf8_lossy(&output.stderr);
tracing::error!("psql error for {}: {}", cid, err);
}
}
Ok((cid, Err(e))) => {
tracing::error!("psql output error for {}: {}", cid, e);
}
Err(e) => {
tracing::error!("join error: {}", e);
}
}
Ok(())
}
pub async fn update_vector_id(&self, chunk_id: &str, vector_id: &str) -> Result<()> {
sqlx::query("UPDATE chunks SET vector_id = $1 WHERE chunk_id = $2")
.bind(vector_id)
.bind(chunk_id)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn search_vector(
&self,
_query_vector: &[f32],
_limit: usize,
) -> Result<Vec<super::SearchResult>> {
Ok(vec![])
}
pub async fn search_text(&self, query: &str, chunk_type: Option<&str>) -> Result<Vec<Chunk>> {
let query_pattern = format!("%{}%", query);
let sql = match chunk_type {
Some(_) => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id FROM chunks WHERE content->>'text' ILIKE $1 AND chunk_type = $2 ORDER BY chunk_index",
None => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id FROM chunks WHERE content->>'text' ILIKE $1 ORDER BY chunk_index",
};
let chunks = if let Some(ct) = chunk_type {
sqlx::query_as::<
_,
(
String,
String,
i32,
String,
f64,
f64,
f64,
i64,
i64,
String,
Option<String>,
Option<String>,
),
>(sql)
.bind(&query_pattern)
.bind(ct)
.fetch_all(&self.pool)
.await?
} else {
sqlx::query_as::<
_,
(
String,
String,
i32,
String,
f64,
f64,
f64,
i64,
i64,
String,
Option<String>,
Option<String>,
),
>(sql)
.bind(&query_pattern)
.fetch_all(&self.pool)
.await?
};
let results: Vec<Chunk> = chunks
.into_iter()
.map(|r| {
let chunk_type = match r.3.as_str() {
"time_based" => ChunkType::TimeBased,
"sentence" => ChunkType::Sentence,
"cut" => ChunkType::Cut,
_ => ChunkType::TimeBased,
};
let content_json: String = r.get(6);
let content: serde_json::Value =
serde_json::from_str(&content_json).unwrap_or(serde_json::json!({}));
serde_json::from_str(&r.9).unwrap_or(serde_json::json!({}));
let metadata: Option<serde_json::Value> =
r.10.and_then(|m| serde_json::from_str(&m).ok());
Chunk {
uuid: r.get(0),
chunk_id: r.get(1),
chunk_index: chunk_index as u32,
file_id: 0,
uuid: r.0,
chunk_id: r.1,
chunk_index: r.2 as u32,
chunk_type,
start_time: r.get(4),
end_time: r.get(5),
rule: ChunkRule::Rule1,
start_time: r.4,
end_time: r.5,
fps: r.6,
start_frame: r.7,
end_frame: r.8,
text_content: Some(r.9),
content,
metadata,
vector_id: r.11,
frame_count: 0,
pre_chunk_ids: vec![],
}
})
.collect();
Ok(chunks)
Ok(results)
}
}

262
src/core/db/redis_client.rs Normal file
View File

@@ -0,0 +1,262 @@
use anyhow::{Context, Result};
use futures_util::stream::StreamExt;
use redis::aio::MultiplexedConnection;
use redis::{AsyncCommands, Client};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct RedisClient {
client: Client,
state: Arc<RwLock<RedisState>>,
}
#[derive(Debug, Clone, Default)]
pub struct RedisState {
pub connected: bool,
}
impl RedisClient {
pub fn new() -> Result<Self> {
let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| {
let password =
std::env::var("REDIS_PASSWORD").unwrap_or_else(|_| "accusys".to_string());
format!("redis://:{}@localhost:6379", password)
});
let client = Client::open(redis_url.as_str()).context("Failed to connect to Redis")?;
Ok(Self {
client,
state: Arc::new(RwLock::new(RedisState { connected: true })),
})
}
pub async fn is_connected(&self) -> bool {
self.state.read().await.connected
}
pub async fn get_conn(&self) -> Result<MultiplexedConnection> {
self.get_conn_internal().await
}
pub async fn get_conn_internal(&self) -> Result<MultiplexedConnection> {
self.client
.get_multiplexed_async_connection()
.await
.context("Failed to get Redis connection")
}
pub async fn get_job_status(&self, uuid: &str) -> Result<Option<JobStatus>> {
let mut conn = self.get_conn_internal().await?;
let key = format!("momentry:job:{}", uuid);
let status: Option<String> = conn.hget(&key, "status").await?;
if status.is_none() {
return Ok(None);
}
let current_processor: String = conn.hget(&key, "current_processor").await?;
let progress_total: i32 = conn.hget(&key, "progress_total").await?;
let progress_current: i32 = conn.hget(&key, "progress_current").await?;
let started_at: String = conn.hget(&key, "started_at").await?;
let updated_at: String = conn.hget(&key, "updated_at").await?;
let error_count: i32 = conn.hget(&key, "error_count").await?;
let last_error: String = conn.hget(&key, "last_error").await?;
Ok(Some(JobStatus {
status: status.unwrap_or_default(),
current_processor,
progress_total,
progress_current,
started_at,
updated_at,
error_count,
last_error,
}))
}
pub async fn set_processor_status(
&self,
uuid: &str,
processor: &str,
status: &ProcessorStatus,
) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let key = format!("momentry:job:{}:processor:{}", uuid, processor);
let _: Option<String> = conn
.hset_multiple(
&key,
&[
("status", status.status.as_str()),
("progress", status.progress.to_string().as_str()),
("current", status.current.to_string().as_str()),
("total", status.total.to_string().as_str()),
("started_at", status.started_at.as_str()),
("updated_at", status.updated_at.as_str()),
("message", status.message.as_str()),
],
)
.await?;
let _: bool = conn.expire(&key, 86400).await?;
Ok(())
}
pub async fn get_processor_status(
&self,
uuid: &str,
processor: &str,
) -> Result<Option<ProcessorStatus>> {
let mut conn = self.get_conn_internal().await?;
let key = format!("momentry:job:{}:processor:{}", uuid, processor);
let status: Option<String> = conn.hget(&key, "status").await?;
if status.is_none() {
return Ok(None);
}
let progress: i32 = conn.hget(&key, "progress").await?;
let current: i32 = conn.hget(&key, "current").await?;
let total: i32 = conn.hget(&key, "total").await?;
let started_at: String = conn.hget(&key, "started_at").await?;
let updated_at: String = conn.hget(&key, "updated_at").await?;
let message: String = conn.hget(&key, "message").await?;
Ok(Some(ProcessorStatus {
status: status.unwrap_or_default(),
progress,
current,
total,
started_at,
updated_at,
message,
}))
}
pub async fn publish_progress(&self, uuid: &str, message: &ProgressMessage) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let channel = format!("momentry:progress:{}", uuid);
let json = serde_json::to_string(message)?;
let _: usize = conn.publish(&channel, json).await?;
Ok(())
}
pub async fn subscribe_progress(&self, uuid: &str) -> Result<redis::aio::PubSub> {
let mut pubsub = self.client.get_async_pubsub().await?;
let channel = format!("momentry:progress:{}", uuid);
pubsub.subscribe(channel).await?;
Ok(pubsub)
}
pub async fn subscribe_and_callback<F>(&self, uuid: &str, mut callback: F) -> Result<()>
where
F: FnMut(ProgressMessage) + Send + 'static,
{
let mut pubsub = self.subscribe_progress(uuid).await?;
let mut stream = pubsub.on_message();
while let Some(msg) = stream.next().await {
let payload: String = msg.get_payload().map_err(|e| anyhow::anyhow!("{}", e))?;
if let Ok(progress_msg) = serde_json::from_str::<ProgressMessage>(&payload) {
callback(progress_msg);
}
}
Ok(())
}
pub async fn add_to_active_jobs(&self, uuid: &str) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let _: usize = conn.sadd("momentry:jobs:active", uuid).await?;
Ok(())
}
pub async fn move_to_completed_jobs(&self, uuid: &str) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let _: bool = conn
.smove("momentry:jobs:active", "momentry:jobs:completed", uuid)
.await?;
Ok(())
}
pub async fn move_to_failed_jobs(&self, uuid: &str) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let _: bool = conn
.smove("momentry:jobs:active", "momentry:jobs:failed", uuid)
.await?;
Ok(())
}
pub async fn get_active_jobs(&self) -> Result<Vec<String>> {
let mut conn = self.get_conn_internal().await?;
let jobs: Vec<String> = conn.smembers("momentry:jobs:active").await?;
Ok(jobs)
}
pub async fn set_health(&self, status: &str) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let _: String = conn
.set_ex("momentry:health:momentry_core", status, 60)
.await?;
Ok(())
}
pub async fn get_health(&self) -> Result<Option<String>> {
let mut conn = self.get_conn_internal().await?;
let health: Option<String> = conn.get("momentry:health:momentry_core").await?;
Ok(health)
}
}
impl Default for RedisClient {
fn default() -> Self {
Self::new().expect("Failed to create Redis client")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobStatus {
pub status: String,
pub current_processor: String,
pub progress_total: i32,
pub progress_current: i32,
pub started_at: String,
pub updated_at: String,
pub error_count: i32,
pub last_error: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessorStatus {
pub status: String,
pub progress: i32,
pub current: i32,
pub total: i32,
pub started_at: String,
pub updated_at: String,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProgressMessage {
#[serde(rename = "type")]
pub msg_type: String,
pub processor: String,
pub uuid: String,
pub timestamp: i64,
pub data: ProgressData,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProgressData {
pub message: Option<String>,
pub current: Option<i32>,
pub total: Option<i32>,
}

File diff suppressed because it is too large Load Diff