import os import uuid from datetime import datetime from typing import Dict, Any, List from sqlalchemy.orm import Session from app.models.video import Video, VideoStream, AudioStream, SubtitleStream from app.services.probe_parser import ProbeParser class VideoRegisterService: def __init__(self, db: Session): self.db = db self.parser = ProbeParser() def register_video(self, probe_json_path: str, absolute_file_path: str) -> Video: probe_data = self.parser.load_probe_json(probe_json_path) existing_video = ( self.db.query(Video).filter(Video.file_path == absolute_file_path).first() ) if existing_video: return self._update_video(existing_video, probe_data) return self._create_video(probe_data, absolute_file_path) def _create_video( self, probe_data: Dict[str, Any], absolute_file_path: str ) -> Video: video_metadata = self.parser.parse_video_metadata( probe_data, absolute_file_path ) video_stream_data = self.parser.parse_video_stream( probe_data.get("video_stream") ) audio_streams_data = self.parser.parse_audio_streams( probe_data.get("audio_streams", []) ) subtitle_streams_data = self.parser.parse_subtitle_streams( probe_data.get("subtitle_streams", []) ) video = Video(**video_metadata) if video_stream_data: video.video_streams.append(VideoStream(**video_stream_data)) for audio_data in audio_streams_data: video.audio_streams.append(AudioStream(**audio_data)) for subtitle_data in subtitle_streams_data: video.subtitle_streams.append(SubtitleStream(**subtitle_data)) self.db.add(video) self.db.commit() self.db.refresh(video) return video def _update_video(self, video: Video, probe_data: Dict[str, Any]) -> Video: video_metadata = self.parser.parse_video_metadata(probe_data, video.file_path) video_stream_data = self.parser.parse_video_stream( probe_data.get("video_stream") ) audio_streams_data = self.parser.parse_audio_streams( probe_data.get("audio_streams", []) ) subtitle_streams_data = self.parser.parse_subtitle_streams( probe_data.get("subtitle_streams", []) ) for key, value in video_metadata.items(): if value is not None: setattr(video, key, value) video.updated_at = datetime.utcnow() self.db.query(VideoStream).filter(VideoStream.video_id == video.id).delete() self.db.query(AudioStream).filter(AudioStream.video_id == video.id).delete() self.db.query(SubtitleStream).filter( SubtitleStream.video_id == video.id ).delete() if video_stream_data: video.video_streams.append(VideoStream(**video_stream_data)) for audio_data in audio_streams_data: video.audio_streams.append(AudioStream(**audio_data)) for subtitle_data in subtitle_streams_data: video.subtitle_streams.append(SubtitleStream(**subtitle_data)) self.db.commit() self.db.refresh(video) return video def register_batch(self, directory: str) -> List[Video]: videos = [] for root, dirs, files in os.walk(directory): for file in files: if file.endswith(".probe.json"): probe_json_path = os.path.join(root, file) video_filename = file.replace(".probe.json", "") possible_extensions = [ ".mp4", ".mov", ".avi", ".mkv", ".m4v", ".wmv", ".flv", ".webm", ] absolute_file_path = None for ext in possible_extensions: test_path = os.path.join(root, video_filename + ext) if os.path.exists(test_path): absolute_file_path = test_path break if not absolute_file_path: video_file = video_filename for f in os.listdir(root): if ( f.startswith(video_filename) and not f.endswith(".probe.json") and not f.endswith(".yolo.json") ): absolute_file_path = os.path.join(root, f) break if absolute_file_path: try: video = self.register_video( probe_json_path, absolute_file_path ) videos.append(video) print(f"Registered: {video.file_name}") except Exception as e: print(f"Error registering {probe_json_path}: {e}") else: print(f"Video file not found for: {probe_json_path}") return videos def get_video_by_id(self, video_id: uuid.UUID) -> Video: return self.db.query(Video).filter(Video.id == video_id).first() def get_video_by_path(self, file_path: str) -> Video: return self.db.query(Video).filter(Video.file_path == file_path).first() def search_videos( self, title=None, artist=None, codec_name=None, min_width=None, max_width=None, min_height=None, max_height=None, format_name=None, skip=0, limit=20, ): query = self.db.query(Video) if title: query = query.filter(Video.title.ilike(f"%{title}%")) if artist: query = query.filter(Video.artist.ilike(f"%{artist}%")) if format_name: query = query.filter(Video.format_name.ilike(f"%{format_name}%")) if min_width or max_width or min_height or max_height: query = query.join(VideoStream).filter(VideoStream.video_id == Video.id) if min_width: query = query.filter(VideoStream.width >= min_width) if max_width: query = query.filter(VideoStream.width <= max_width) if min_height: query = query.filter(VideoStream.height >= min_height) if max_height: query = query.filter(VideoStream.height <= max_height) if codec_name: query = query.join(VideoStream).filter( VideoStream.video_id == Video.id, VideoStream.codec_name.ilike(f"%{codec_name}%"), ) total = query.count() videos = query.offset(skip).limit(limit).all() return total, videos