Files
video_register/app/services/video_register.py
accusys b4aa7b96d3 Initial commit: Video metadata registration service
- Add FastAPI server for video metadata registration
- PostgreSQL database models for videos, video_streams, audio_streams, subtitle_streams
- Batch registration script for .probe.json files
- RESTful API endpoints for CRUD operations
- Search functionality by title, artist, codec, resolution
2026-03-11 00:30:31 +08:00

203 lines
7.0 KiB
Python

import os
import uuid
from datetime import datetime
from typing import Dict, Any, List
from sqlalchemy.orm import Session
from app.models.video import Video, VideoStream, AudioStream, SubtitleStream
from app.services.probe_parser import ProbeParser
class VideoRegisterService:
def __init__(self, db: Session):
self.db = db
self.parser = ProbeParser()
def register_video(self, probe_json_path: str, absolute_file_path: str) -> Video:
probe_data = self.parser.load_probe_json(probe_json_path)
existing_video = (
self.db.query(Video).filter(Video.file_path == absolute_file_path).first()
)
if existing_video:
return self._update_video(existing_video, probe_data)
return self._create_video(probe_data, absolute_file_path)
def _create_video(
self, probe_data: Dict[str, Any], absolute_file_path: str
) -> Video:
video_metadata = self.parser.parse_video_metadata(
probe_data, absolute_file_path
)
video_stream_data = self.parser.parse_video_stream(
probe_data.get("video_stream")
)
audio_streams_data = self.parser.parse_audio_streams(
probe_data.get("audio_streams", [])
)
subtitle_streams_data = self.parser.parse_subtitle_streams(
probe_data.get("subtitle_streams", [])
)
video = Video(**video_metadata)
if video_stream_data:
video.video_streams.append(VideoStream(**video_stream_data))
for audio_data in audio_streams_data:
video.audio_streams.append(AudioStream(**audio_data))
for subtitle_data in subtitle_streams_data:
video.subtitle_streams.append(SubtitleStream(**subtitle_data))
self.db.add(video)
self.db.commit()
self.db.refresh(video)
return video
def _update_video(self, video: Video, probe_data: Dict[str, Any]) -> Video:
video_metadata = self.parser.parse_video_metadata(probe_data, video.file_path)
video_stream_data = self.parser.parse_video_stream(
probe_data.get("video_stream")
)
audio_streams_data = self.parser.parse_audio_streams(
probe_data.get("audio_streams", [])
)
subtitle_streams_data = self.parser.parse_subtitle_streams(
probe_data.get("subtitle_streams", [])
)
for key, value in video_metadata.items():
if value is not None:
setattr(video, key, value)
video.updated_at = datetime.utcnow()
self.db.query(VideoStream).filter(VideoStream.video_id == video.id).delete()
self.db.query(AudioStream).filter(AudioStream.video_id == video.id).delete()
self.db.query(SubtitleStream).filter(
SubtitleStream.video_id == video.id
).delete()
if video_stream_data:
video.video_streams.append(VideoStream(**video_stream_data))
for audio_data in audio_streams_data:
video.audio_streams.append(AudioStream(**audio_data))
for subtitle_data in subtitle_streams_data:
video.subtitle_streams.append(SubtitleStream(**subtitle_data))
self.db.commit()
self.db.refresh(video)
return video
def register_batch(self, directory: str) -> List[Video]:
videos = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith(".probe.json"):
probe_json_path = os.path.join(root, file)
video_filename = file.replace(".probe.json", "")
possible_extensions = [
".mp4",
".mov",
".avi",
".mkv",
".m4v",
".wmv",
".flv",
".webm",
]
absolute_file_path = None
for ext in possible_extensions:
test_path = os.path.join(root, video_filename + ext)
if os.path.exists(test_path):
absolute_file_path = test_path
break
if not absolute_file_path:
video_file = video_filename
for f in os.listdir(root):
if (
f.startswith(video_filename)
and not f.endswith(".probe.json")
and not f.endswith(".yolo.json")
):
absolute_file_path = os.path.join(root, f)
break
if absolute_file_path:
try:
video = self.register_video(
probe_json_path, absolute_file_path
)
videos.append(video)
print(f"Registered: {video.file_name}")
except Exception as e:
print(f"Error registering {probe_json_path}: {e}")
else:
print(f"Video file not found for: {probe_json_path}")
return videos
def get_video_by_id(self, video_id: uuid.UUID) -> Video:
return self.db.query(Video).filter(Video.id == video_id).first()
def get_video_by_path(self, file_path: str) -> Video:
return self.db.query(Video).filter(Video.file_path == file_path).first()
def search_videos(
self,
title=None,
artist=None,
codec_name=None,
min_width=None,
max_width=None,
min_height=None,
max_height=None,
format_name=None,
skip=0,
limit=20,
):
query = self.db.query(Video)
if title:
query = query.filter(Video.title.ilike(f"%{title}%"))
if artist:
query = query.filter(Video.artist.ilike(f"%{artist}%"))
if format_name:
query = query.filter(Video.format_name.ilike(f"%{format_name}%"))
if min_width or max_width or min_height or max_height:
query = query.join(VideoStream).filter(VideoStream.video_id == Video.id)
if min_width:
query = query.filter(VideoStream.width >= min_width)
if max_width:
query = query.filter(VideoStream.width <= max_width)
if min_height:
query = query.filter(VideoStream.height >= min_height)
if max_height:
query = query.filter(VideoStream.height <= max_height)
if codec_name:
query = query.join(VideoStream).filter(
VideoStream.video_id == Video.id,
VideoStream.codec_name.ilike(f"%{codec_name}%"),
)
total = query.count()
videos = query.offset(skip).limit(limit).all()
return total, videos