import json import os from datetime import datetime from typing import Dict, Any, Optional from pathlib import Path class ProbeParser: @staticmethod def load_probe_json(probe_json_path: str) -> Dict[str, Any]: if not os.path.exists(probe_json_path): raise FileNotFoundError(f"Probe JSON file not found: {probe_json_path}") with open(probe_json_path, "r", encoding="utf-8") as f: return json.load(f) @staticmethod def parse_video_metadata( probe_data: Dict[str, Any], absolute_file_path: str ) -> Dict[str, Any]: format_data = probe_data.get("format", {}) video_stream = probe_data.get("video_stream") audio_streams = probe_data.get("audio_streams", []) subtitle_streams = probe_data.get("subtitle_streams", []) file_name = os.path.basename(absolute_file_path) name_without_ext = os.path.splitext(file_name)[0] file_ext = os.path.splitext(file_name)[1] tags = format_data.get("tags", {}) probed_at_str = probe_data.get("probed_at") probed_at = None if probed_at_str: try: probed_at = datetime.fromisoformat(probed_at_str) except (ValueError, TypeError): pass metadata = { "file_path": absolute_file_path, "file_name": name_without_ext, "file_extension": file_ext, "file_size": format_data.get("size"), "format_name": format_data.get("format_name"), "format_long_name": format_data.get("format_long_name"), "duration": format_data.get("duration"), "bit_rate": format_data.get("bit_rate"), "nb_streams": len(probe_data.get("streams", [])), "start_time": video_stream.get("start_time") if video_stream else 0, "title": tags.get("title"), "artist": tags.get("artist"), "description": tags.get("description"), "probed_at": probed_at, } return metadata @staticmethod def parse_video_stream(video_stream: Optional[Dict[str, Any]]) -> Dict[str, Any]: if not video_stream: return {} return { "stream_index": video_stream.get("index"), "codec_name": video_stream.get("codec_name"), "codec_long_name": video_stream.get("codec_long_name"), "profile": video_stream.get("profile"), "level": video_stream.get("level"), "width": video_stream.get("width"), "height": video_stream.get("height"), "coded_width": video_stream.get("coded_width"), "coded_height": video_stream.get("coded_height"), "aspect_ratio": video_stream.get("aspect_ratio"), "pix_fmt": video_stream.get("pix_fmt"), "field_order": video_stream.get("field_order"), "frame_rate": video_stream.get("r_frame_rate"), "start_time": video_stream.get("start_time"), "duration": video_stream.get("duration"), "bit_rate": video_stream.get("bit_rate"), "nb_frames": video_stream.get("nb_frames"), "color_range": video_stream.get("color_range"), "color_space": video_stream.get("color_space"), "has_b_frames": video_stream.get("has_b_frames"), "sample_aspect_ratio": video_stream.get("sample_aspect_ratio"), } @staticmethod def parse_audio_streams(audio_streams: list) -> list: result = [] for audio in audio_streams: result.append( { "stream_index": audio.get("index"), "codec_name": audio.get("codec_name"), "codec_long_name": audio.get("codec_long_name"), "profile": audio.get("profile"), "channels": audio.get("channels"), "channel_layout": audio.get("channel_layout"), "sample_rate": audio.get("sample_rate"), "sample_fmt": audio.get("sample_fmt"), "bit_rate": audio.get("bit_rate"), "duration": audio.get("duration"), "language": audio.get("tags", {}).get("language"), } ) return result @staticmethod def parse_subtitle_streams(subtitle_streams: list) -> list: result = [] for subtitle in subtitle_streams: result.append( { "stream_index": subtitle.get("index"), "codec_name": subtitle.get("codec_name"), "language": subtitle.get("language"), } ) return result