Add CLI command for video registration with UUID generation

This commit is contained in:
accusys
2026-03-11 21:56:44 +08:00
parent c979c0297a
commit d6ebbc8c3a
2 changed files with 340 additions and 1 deletions

224
cli.py Normal file
View File

@@ -0,0 +1,224 @@
import os
import sys
import json
import uuid
import hashlib
import argparse
from datetime import datetime
from pathlib import Path
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from app.config import get_settings
from app.models.video import Video, VideoStream, AudioStream, SubtitleStream
from app.services.probe_parser import ProbeParser
def generate_uuid_from_path(file_path: str) -> uuid.UUID:
hash_input = file_path.encode("utf-8")
hash_digest = hashlib.sha256(hash_input).hexdigest()
return uuid.UUID(hash_digest[:32])
def reverse_uuid_to_path(video_uuid: uuid.UUID, db_session) -> tuple:
video = db_session.query(Video).filter(Video.id == video_uuid).first()
if video:
return video.file_path, video.file_name, video.file_extension
return None, None, None
class VideoRegisterCLI:
def __init__(self, dry_run: bool = False):
self.settings = get_settings()
self.engine = create_engine(self.settings.database_url)
self.Session = sessionmaker(bind=self.engine)
self.parser = ProbeParser()
self.dry_run = dry_run
def register(self, probe_json_path: str) -> Video:
if not os.path.exists(probe_json_path):
raise FileNotFoundError(f"Probe JSON not found: {probe_json_path}")
probe_data = self.parser.load_probe_json(probe_json_path)
absolute_file_path = probe_data.get("video_path")
if not absolute_file_path:
raise ValueError("video_path not found in probe.json")
if not os.path.isabs(absolute_file_path):
probe_dir = os.path.dirname(probe_json_path)
absolute_file_path = os.path.join(probe_dir, absolute_file_path)
absolute_file_path = os.path.normpath(absolute_file_path)
if not os.path.exists(absolute_file_path):
raise FileNotFoundError(f"Video file not found: {absolute_file_path}")
file_uuid = generate_uuid_from_path(absolute_file_path)
session = self.Session()
try:
existing = session.query(Video).filter(Video.id == file_uuid).first()
if existing:
print(f"Video already registered: {existing.id}")
return existing
video_metadata = self.parser.parse_video_metadata(
probe_data, absolute_file_path
)
video_metadata["id"] = file_uuid
video_stream_data = self.parser.parse_video_stream(
probe_data.get("video_stream")
)
audio_streams_data = self.parser.parse_audio_streams(
probe_data.get("audio_streams", [])
)
subtitle_streams_data = self.parser.parse_subtitle_streams(
probe_data.get("subtitle_streams", [])
)
video = Video(**video_metadata)
if video_stream_data:
video.video_streams.append(VideoStream(**video_stream_data))
for audio_data in audio_streams_data:
video.audio_streams.append(AudioStream(**audio_data))
for subtitle_data in subtitle_streams_data:
video.subtitle_streams.append(SubtitleStream(**subtitle_data))
session.add(video)
original_extension = video.file_extension
new_filename = f"{file_uuid}{original_extension}"
new_path = os.path.join(os.path.dirname(absolute_file_path), new_filename)
if not self.dry_run:
if not os.path.exists(new_path):
import shutil
shutil.copy2(absolute_file_path, new_path)
print(f"Created: {new_path}")
else:
print(f"Target file already exists: {new_path}")
else:
print(f"[DRY RUN] Would rename: {absolute_file_path} -> {new_path}")
session.commit()
session.refresh(video)
print(f"\n{'=' * 60}")
print(f"Video registered successfully!")
print(f"UUID: {video.id}")
print(f"Original path: {video.file_path}")
print(f"New filename: {new_filename}")
print(f"{'=' * 60}\n")
return video
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def query_and_verify(self, video_uuid: uuid.UUID):
session = self.Session()
try:
video = session.query(Video).filter(Video.id == video_uuid).first()
if not video:
print(f"Video not found: {video_uuid}")
return
print(f"\n{'=' * 60}")
print("SQL Query Result:")
print(f"{'=' * 60}")
print(f"\n--- videos table ---")
print(f"id: {video.id}")
print(f"file_path: {video.file_path}")
print(f"file_name: {video.file_name}")
print(f"file_extension: {video.file_extension}")
print(f"file_size: {video.file_size}")
print(f"format_name: {video.format_name}")
print(f"duration: {video.duration}")
print(f"title: {video.title}")
print(f"artist: {video.artist}")
print(f"registered_at: {video.registered_at}")
for vs in video.video_streams:
print(f"\n--- video_streams ---")
print(f"codec_name: {vs.codec_name}")
print(f"width x height: {vs.width} x {vs.height}")
print(f"color_space: {vs.color_space}")
print(f"color_range: {vs.color_range}")
for aud in video.audio_streams:
print(f"\n--- audio_streams ---")
print(f"codec_name: {aud.codec_name}")
print(f"channels: {aud.channels}")
print(f"sample_rate: {aud.sample_rate}")
print(f"language: {aud.language}")
print(f"\n{'=' * 60}")
print("Reverse Verification:")
print(f"{'=' * 60}")
recalculated_uuid = generate_uuid_from_path(video.file_path)
print(f"Original UUID: {video.id}")
print(f"Recalculated UUID: {recalculated_uuid}")
print(f"Match: {video.id == recalculated_uuid}")
expected_new_filename = f"{video.id}{video.file_extension}"
expected_new_path = os.path.join(
os.path.dirname(video.file_path), expected_new_filename
)
print(f"\nExpected new file: {expected_new_path}")
print(f"File exists: {os.path.exists(expected_new_path)}")
print(f"\n{'=' * 60}")
finally:
session.close()
def main():
parser = argparse.ArgumentParser(description="Video Registration CLI")
subparsers = parser.add_subparsers(dest="command", help="Commands")
register_parser = subparsers.add_parser("register", help="Register a video")
register_parser.add_argument("probe_json", help="Path to probe.json file")
register_parser.add_argument(
"--dry-run", action="store_true", help="Dry run mode (no file copy)"
)
verify_parser = subparsers.add_parser("verify", help="Verify a registered video")
verify_parser.add_argument("uuid", help="Video UUID to verify")
args = parser.parse_args()
cli = VideoRegisterCLI(dry_run=getattr(args, "dry_run", False))
if args.command == "register":
try:
video = cli.register(args.probe_json)
cli.query_and_verify(video.id)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
elif args.command == "verify":
try:
video_uuid = uuid.UUID(args.uuid)
cli.query_and_verify(video_uuid)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
else:
parser.print_help()
if __name__ == "__main__":
main()