Compare commits

..

1 Commits

Author SHA1 Message Date
accusys
d6ebbc8c3a Add CLI command for video registration with UUID generation 2026-03-11 21:56:44 +08:00
2 changed files with 340 additions and 1 deletions

117
README.md
View File

@@ -1,3 +1,118 @@
# video_register # video_register
影片註冊服務 影片元數據註冊服務
## 功能
- 讀取 `.probe.json` 文件(由 video_probe 生成)
- 根據檔案路徑自動生成 UUID 作為檔案 ID
- 將影片元數據註冊到 PostgreSQL 資料庫
- 支援 SQL 查詢搜尋影片元數據
- 註冊後將檔案複製為 UUID 命名(測試期間保留原始檔)
## 安裝
```bash
pip install -r requirements.txt
```
## 配置
在專案根目錄創建 `.env` 文件:
```env
DATABASE_URL=postgresql://accusys@localhost:5432/video_register
```
## 使用方式
### CLI 命令
#### 註冊影片
```bash
python cli.py register <probe.json 路徑>
```
選項:
- `--dry-run`: 測試模式,不實際複製檔案
範例:
```bash
python cli.py register ../test_video/Old_Time_Movie_Show_-_Charade_1963.HD.probe.json
python cli.py register ../test_video/Old_Time_Movie_Show_-_Charade_1963.HD.probe.json --dry-run
```
#### 驗證已註冊的影片
```bash
python cli.py verify <UUID>
```
範例:
```bash
python cli.py verify 1636719d-c31f-78ac-f1dd-8ab0b0b36c66
```
### API 服務
啟動 FastAPI 服務:
```bash
uvicorn app.main:app --reload
```
API 文件http://localhost:8000/docs
## probe.json 格式
輸入的 `probe.json` 必須包含 `video_path` 欄位:
```json
{
"video_path": "/absolute/path/to/video.mov",
"probed_at": "2026-03-10T23:26:04",
"format": { ... },
"video_stream": { ... },
"audio_streams": [...],
"subtitle_streams": [...]
}
```
## UUID 生成規則
UUID 透過 SHA-256 雜湊檔案路徑生成,確保:
- 相同路徑產生相同的 UUID
- 可從 UUID 反向計算驗證
## 資料庫結構
### videos 表
- `id`: UUID (主鍵)
- `file_path`: 原始檔案路徑
- `file_name`: 檔案名稱(不含副檔名)
- `file_extension`: 副檔名
- `file_size`: 檔案大小
- `format_name`: 格式名稱
- `duration`: 時長
- `title`: 標題
- `artist`: 藝術家
### video_streams 表
- `video_id`: 關聯的 videos ID
- `codec_name`: 編碼名稱
- `width`, `height`: 解析度
- `color_space`: 色彩空間
- `color_range`: 色彩範圍
### audio_streams 表
- `video_id`: 關聯的 videos ID
- `codec_name`: 編碼名稱
- `channels`: 聲道數
- `sample_rate`: 取樣率
- `language`: 語言
### subtitle_streams 表
- `video_id`: 關聯的 videos ID
- `codec_name`: 編碼名稱
- `language`: 語言

224
cli.py Normal file
View File

@@ -0,0 +1,224 @@
import os
import sys
import json
import uuid
import hashlib
import argparse
from datetime import datetime
from pathlib import Path
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from app.config import get_settings
from app.models.video import Video, VideoStream, AudioStream, SubtitleStream
from app.services.probe_parser import ProbeParser
def generate_uuid_from_path(file_path: str) -> uuid.UUID:
hash_input = file_path.encode("utf-8")
hash_digest = hashlib.sha256(hash_input).hexdigest()
return uuid.UUID(hash_digest[:32])
def reverse_uuid_to_path(video_uuid: uuid.UUID, db_session) -> tuple:
video = db_session.query(Video).filter(Video.id == video_uuid).first()
if video:
return video.file_path, video.file_name, video.file_extension
return None, None, None
class VideoRegisterCLI:
def __init__(self, dry_run: bool = False):
self.settings = get_settings()
self.engine = create_engine(self.settings.database_url)
self.Session = sessionmaker(bind=self.engine)
self.parser = ProbeParser()
self.dry_run = dry_run
def register(self, probe_json_path: str) -> Video:
if not os.path.exists(probe_json_path):
raise FileNotFoundError(f"Probe JSON not found: {probe_json_path}")
probe_data = self.parser.load_probe_json(probe_json_path)
absolute_file_path = probe_data.get("video_path")
if not absolute_file_path:
raise ValueError("video_path not found in probe.json")
if not os.path.isabs(absolute_file_path):
probe_dir = os.path.dirname(probe_json_path)
absolute_file_path = os.path.join(probe_dir, absolute_file_path)
absolute_file_path = os.path.normpath(absolute_file_path)
if not os.path.exists(absolute_file_path):
raise FileNotFoundError(f"Video file not found: {absolute_file_path}")
file_uuid = generate_uuid_from_path(absolute_file_path)
session = self.Session()
try:
existing = session.query(Video).filter(Video.id == file_uuid).first()
if existing:
print(f"Video already registered: {existing.id}")
return existing
video_metadata = self.parser.parse_video_metadata(
probe_data, absolute_file_path
)
video_metadata["id"] = file_uuid
video_stream_data = self.parser.parse_video_stream(
probe_data.get("video_stream")
)
audio_streams_data = self.parser.parse_audio_streams(
probe_data.get("audio_streams", [])
)
subtitle_streams_data = self.parser.parse_subtitle_streams(
probe_data.get("subtitle_streams", [])
)
video = Video(**video_metadata)
if video_stream_data:
video.video_streams.append(VideoStream(**video_stream_data))
for audio_data in audio_streams_data:
video.audio_streams.append(AudioStream(**audio_data))
for subtitle_data in subtitle_streams_data:
video.subtitle_streams.append(SubtitleStream(**subtitle_data))
session.add(video)
original_extension = video.file_extension
new_filename = f"{file_uuid}{original_extension}"
new_path = os.path.join(os.path.dirname(absolute_file_path), new_filename)
if not self.dry_run:
if not os.path.exists(new_path):
import shutil
shutil.copy2(absolute_file_path, new_path)
print(f"Created: {new_path}")
else:
print(f"Target file already exists: {new_path}")
else:
print(f"[DRY RUN] Would rename: {absolute_file_path} -> {new_path}")
session.commit()
session.refresh(video)
print(f"\n{'=' * 60}")
print(f"Video registered successfully!")
print(f"UUID: {video.id}")
print(f"Original path: {video.file_path}")
print(f"New filename: {new_filename}")
print(f"{'=' * 60}\n")
return video
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def query_and_verify(self, video_uuid: uuid.UUID):
session = self.Session()
try:
video = session.query(Video).filter(Video.id == video_uuid).first()
if not video:
print(f"Video not found: {video_uuid}")
return
print(f"\n{'=' * 60}")
print("SQL Query Result:")
print(f"{'=' * 60}")
print(f"\n--- videos table ---")
print(f"id: {video.id}")
print(f"file_path: {video.file_path}")
print(f"file_name: {video.file_name}")
print(f"file_extension: {video.file_extension}")
print(f"file_size: {video.file_size}")
print(f"format_name: {video.format_name}")
print(f"duration: {video.duration}")
print(f"title: {video.title}")
print(f"artist: {video.artist}")
print(f"registered_at: {video.registered_at}")
for vs in video.video_streams:
print(f"\n--- video_streams ---")
print(f"codec_name: {vs.codec_name}")
print(f"width x height: {vs.width} x {vs.height}")
print(f"color_space: {vs.color_space}")
print(f"color_range: {vs.color_range}")
for aud in video.audio_streams:
print(f"\n--- audio_streams ---")
print(f"codec_name: {aud.codec_name}")
print(f"channels: {aud.channels}")
print(f"sample_rate: {aud.sample_rate}")
print(f"language: {aud.language}")
print(f"\n{'=' * 60}")
print("Reverse Verification:")
print(f"{'=' * 60}")
recalculated_uuid = generate_uuid_from_path(video.file_path)
print(f"Original UUID: {video.id}")
print(f"Recalculated UUID: {recalculated_uuid}")
print(f"Match: {video.id == recalculated_uuid}")
expected_new_filename = f"{video.id}{video.file_extension}"
expected_new_path = os.path.join(
os.path.dirname(video.file_path), expected_new_filename
)
print(f"\nExpected new file: {expected_new_path}")
print(f"File exists: {os.path.exists(expected_new_path)}")
print(f"\n{'=' * 60}")
finally:
session.close()
def main():
parser = argparse.ArgumentParser(description="Video Registration CLI")
subparsers = parser.add_subparsers(dest="command", help="Commands")
register_parser = subparsers.add_parser("register", help="Register a video")
register_parser.add_argument("probe_json", help="Path to probe.json file")
register_parser.add_argument(
"--dry-run", action="store_true", help="Dry run mode (no file copy)"
)
verify_parser = subparsers.add_parser("verify", help="Verify a registered video")
verify_parser.add_argument("uuid", help="Video UUID to verify")
args = parser.parse_args()
cli = VideoRegisterCLI(dry_run=getattr(args, "dry_run", False))
if args.command == "register":
try:
video = cli.register(args.probe_json)
cli.query_and_verify(video.id)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
elif args.command == "verify":
try:
video_uuid = uuid.UUID(args.uuid)
cli.query_and_verify(video_uuid)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
else:
parser.print_help()
if __name__ == "__main__":
main()