From d6ebbc8c3a19a9132ff82cea03e6cf20426c6127 Mon Sep 17 00:00:00 2001 From: accusys Date: Wed, 11 Mar 2026 21:56:44 +0800 Subject: [PATCH] Add CLI command for video registration with UUID generation --- README.md | 117 +++++++++++++++++++++++++++- cli.py | 224 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 340 insertions(+), 1 deletion(-) create mode 100644 cli.py diff --git a/README.md b/README.md index cc8c4ac..ea646ac 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,118 @@ # video_register -影片註冊服務 \ No newline at end of file +影片元數據註冊服務 + +## 功能 + +- 讀取 `.probe.json` 文件(由 video_probe 生成) +- 根據檔案路徑自動生成 UUID 作為檔案 ID +- 將影片元數據註冊到 PostgreSQL 資料庫 +- 支援 SQL 查詢搜尋影片元數據 +- 註冊後將檔案複製為 UUID 命名(測試期間保留原始檔) + +## 安裝 + +```bash +pip install -r requirements.txt +``` + +## 配置 + +在專案根目錄創建 `.env` 文件: + +```env +DATABASE_URL=postgresql://accusys@localhost:5432/video_register +``` + +## 使用方式 + +### CLI 命令 + +#### 註冊影片 + +```bash +python cli.py register +``` + +選項: +- `--dry-run`: 測試模式,不實際複製檔案 + +範例: +```bash +python cli.py register ../test_video/Old_Time_Movie_Show_-_Charade_1963.HD.probe.json +python cli.py register ../test_video/Old_Time_Movie_Show_-_Charade_1963.HD.probe.json --dry-run +``` + +#### 驗證已註冊的影片 + +```bash +python cli.py verify +``` + +範例: +```bash +python cli.py verify 1636719d-c31f-78ac-f1dd-8ab0b0b36c66 +``` + +### API 服務 + +啟動 FastAPI 服務: + +```bash +uvicorn app.main:app --reload +``` + +API 文件:http://localhost:8000/docs + +## probe.json 格式 + +輸入的 `probe.json` 必須包含 `video_path` 欄位: + +```json +{ + "video_path": "/absolute/path/to/video.mov", + "probed_at": "2026-03-10T23:26:04", + "format": { ... }, + "video_stream": { ... }, + "audio_streams": [...], + "subtitle_streams": [...] +} +``` + +## UUID 生成規則 + +UUID 透過 SHA-256 雜湊檔案路徑生成,確保: +- 相同路徑產生相同的 UUID +- 可從 UUID 反向計算驗證 + +## 資料庫結構 + +### videos 表 +- `id`: UUID (主鍵) +- `file_path`: 原始檔案路徑 +- `file_name`: 檔案名稱(不含副檔名) +- `file_extension`: 副檔名 +- `file_size`: 檔案大小 +- `format_name`: 格式名稱 +- `duration`: 時長 +- `title`: 標題 +- `artist`: 藝術家 + +### video_streams 表 +- `video_id`: 關聯的 videos ID +- `codec_name`: 編碼名稱 +- `width`, `height`: 解析度 +- `color_space`: 色彩空間 +- `color_range`: 色彩範圍 + +### audio_streams 表 +- `video_id`: 關聯的 videos ID +- `codec_name`: 編碼名稱 +- `channels`: 聲道數 +- `sample_rate`: 取樣率 +- `language`: 語言 + +### subtitle_streams 表 +- `video_id`: 關聯的 videos ID +- `codec_name`: 編碼名稱 +- `language`: 語言 diff --git a/cli.py b/cli.py new file mode 100644 index 0000000..0304025 --- /dev/null +++ b/cli.py @@ -0,0 +1,224 @@ +import os +import sys +import json +import uuid +import hashlib +import argparse +from datetime import datetime +from pathlib import Path + +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker + +from app.config import get_settings +from app.models.video import Video, VideoStream, AudioStream, SubtitleStream +from app.services.probe_parser import ProbeParser + + +def generate_uuid_from_path(file_path: str) -> uuid.UUID: + hash_input = file_path.encode("utf-8") + hash_digest = hashlib.sha256(hash_input).hexdigest() + return uuid.UUID(hash_digest[:32]) + + +def reverse_uuid_to_path(video_uuid: uuid.UUID, db_session) -> tuple: + video = db_session.query(Video).filter(Video.id == video_uuid).first() + if video: + return video.file_path, video.file_name, video.file_extension + return None, None, None + + +class VideoRegisterCLI: + def __init__(self, dry_run: bool = False): + self.settings = get_settings() + self.engine = create_engine(self.settings.database_url) + self.Session = sessionmaker(bind=self.engine) + self.parser = ProbeParser() + self.dry_run = dry_run + + def register(self, probe_json_path: str) -> Video: + if not os.path.exists(probe_json_path): + raise FileNotFoundError(f"Probe JSON not found: {probe_json_path}") + + probe_data = self.parser.load_probe_json(probe_json_path) + + absolute_file_path = probe_data.get("video_path") + if not absolute_file_path: + raise ValueError("video_path not found in probe.json") + + if not os.path.isabs(absolute_file_path): + probe_dir = os.path.dirname(probe_json_path) + absolute_file_path = os.path.join(probe_dir, absolute_file_path) + absolute_file_path = os.path.normpath(absolute_file_path) + + if not os.path.exists(absolute_file_path): + raise FileNotFoundError(f"Video file not found: {absolute_file_path}") + + file_uuid = generate_uuid_from_path(absolute_file_path) + + session = self.Session() + try: + existing = session.query(Video).filter(Video.id == file_uuid).first() + if existing: + print(f"Video already registered: {existing.id}") + return existing + + video_metadata = self.parser.parse_video_metadata( + probe_data, absolute_file_path + ) + video_metadata["id"] = file_uuid + + video_stream_data = self.parser.parse_video_stream( + probe_data.get("video_stream") + ) + audio_streams_data = self.parser.parse_audio_streams( + probe_data.get("audio_streams", []) + ) + subtitle_streams_data = self.parser.parse_subtitle_streams( + probe_data.get("subtitle_streams", []) + ) + + video = Video(**video_metadata) + + if video_stream_data: + video.video_streams.append(VideoStream(**video_stream_data)) + + for audio_data in audio_streams_data: + video.audio_streams.append(AudioStream(**audio_data)) + + for subtitle_data in subtitle_streams_data: + video.subtitle_streams.append(SubtitleStream(**subtitle_data)) + + session.add(video) + + original_extension = video.file_extension + new_filename = f"{file_uuid}{original_extension}" + new_path = os.path.join(os.path.dirname(absolute_file_path), new_filename) + + if not self.dry_run: + if not os.path.exists(new_path): + import shutil + + shutil.copy2(absolute_file_path, new_path) + print(f"Created: {new_path}") + else: + print(f"Target file already exists: {new_path}") + else: + print(f"[DRY RUN] Would rename: {absolute_file_path} -> {new_path}") + + session.commit() + session.refresh(video) + + print(f"\n{'=' * 60}") + print(f"Video registered successfully!") + print(f"UUID: {video.id}") + print(f"Original path: {video.file_path}") + print(f"New filename: {new_filename}") + print(f"{'=' * 60}\n") + + return video + + except Exception as e: + session.rollback() + raise e + finally: + session.close() + + def query_and_verify(self, video_uuid: uuid.UUID): + session = self.Session() + try: + video = session.query(Video).filter(Video.id == video_uuid).first() + + if not video: + print(f"Video not found: {video_uuid}") + return + + print(f"\n{'=' * 60}") + print("SQL Query Result:") + print(f"{'=' * 60}") + + print(f"\n--- videos table ---") + print(f"id: {video.id}") + print(f"file_path: {video.file_path}") + print(f"file_name: {video.file_name}") + print(f"file_extension: {video.file_extension}") + print(f"file_size: {video.file_size}") + print(f"format_name: {video.format_name}") + print(f"duration: {video.duration}") + print(f"title: {video.title}") + print(f"artist: {video.artist}") + print(f"registered_at: {video.registered_at}") + + for vs in video.video_streams: + print(f"\n--- video_streams ---") + print(f"codec_name: {vs.codec_name}") + print(f"width x height: {vs.width} x {vs.height}") + print(f"color_space: {vs.color_space}") + print(f"color_range: {vs.color_range}") + + for aud in video.audio_streams: + print(f"\n--- audio_streams ---") + print(f"codec_name: {aud.codec_name}") + print(f"channels: {aud.channels}") + print(f"sample_rate: {aud.sample_rate}") + print(f"language: {aud.language}") + + print(f"\n{'=' * 60}") + print("Reverse Verification:") + print(f"{'=' * 60}") + + recalculated_uuid = generate_uuid_from_path(video.file_path) + print(f"Original UUID: {video.id}") + print(f"Recalculated UUID: {recalculated_uuid}") + print(f"Match: {video.id == recalculated_uuid}") + + expected_new_filename = f"{video.id}{video.file_extension}" + expected_new_path = os.path.join( + os.path.dirname(video.file_path), expected_new_filename + ) + print(f"\nExpected new file: {expected_new_path}") + print(f"File exists: {os.path.exists(expected_new_path)}") + + print(f"\n{'=' * 60}") + + finally: + session.close() + + +def main(): + parser = argparse.ArgumentParser(description="Video Registration CLI") + subparsers = parser.add_subparsers(dest="command", help="Commands") + + register_parser = subparsers.add_parser("register", help="Register a video") + register_parser.add_argument("probe_json", help="Path to probe.json file") + register_parser.add_argument( + "--dry-run", action="store_true", help="Dry run mode (no file copy)" + ) + + verify_parser = subparsers.add_parser("verify", help="Verify a registered video") + verify_parser.add_argument("uuid", help="Video UUID to verify") + + args = parser.parse_args() + + cli = VideoRegisterCLI(dry_run=getattr(args, "dry_run", False)) + + if args.command == "register": + try: + video = cli.register(args.probe_json) + cli.query_and_verify(video.id) + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + elif args.command == "verify": + try: + video_uuid = uuid.UUID(args.uuid) + cli.query_and_verify(video_uuid) + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + else: + parser.print_help() + + +if __name__ == "__main__": + main()