- Add FastAPI server for video metadata registration - PostgreSQL database models for videos, video_streams, audio_streams, subtitle_streams - Batch registration script for .probe.json files - RESTful API endpoints for CRUD operations - Search functionality by title, artist, codec, resolution
121 lines
4.7 KiB
Python
121 lines
4.7 KiB
Python
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional
|
|
from pathlib import Path
|
|
|
|
|
|
class ProbeParser:
|
|
@staticmethod
|
|
def load_probe_json(probe_json_path: str) -> Dict[str, Any]:
|
|
if not os.path.exists(probe_json_path):
|
|
raise FileNotFoundError(f"Probe JSON file not found: {probe_json_path}")
|
|
|
|
with open(probe_json_path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
@staticmethod
|
|
def parse_video_metadata(
|
|
probe_data: Dict[str, Any], absolute_file_path: str
|
|
) -> Dict[str, Any]:
|
|
format_data = probe_data.get("format", {})
|
|
video_stream = probe_data.get("video_stream")
|
|
audio_streams = probe_data.get("audio_streams", [])
|
|
subtitle_streams = probe_data.get("subtitle_streams", [])
|
|
|
|
file_name = os.path.basename(absolute_file_path)
|
|
name_without_ext = os.path.splitext(file_name)[0]
|
|
file_ext = os.path.splitext(file_name)[1]
|
|
|
|
tags = format_data.get("tags", {})
|
|
|
|
probed_at_str = probe_data.get("probed_at")
|
|
probed_at = None
|
|
if probed_at_str:
|
|
try:
|
|
probed_at = datetime.fromisoformat(probed_at_str)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
metadata = {
|
|
"file_path": absolute_file_path,
|
|
"file_name": name_without_ext,
|
|
"file_extension": file_ext,
|
|
"file_size": format_data.get("size"),
|
|
"format_name": format_data.get("format_name"),
|
|
"format_long_name": format_data.get("format_long_name"),
|
|
"duration": format_data.get("duration"),
|
|
"bit_rate": format_data.get("bit_rate"),
|
|
"nb_streams": len(probe_data.get("streams", [])),
|
|
"start_time": video_stream.get("start_time") if video_stream else 0,
|
|
"title": tags.get("title"),
|
|
"artist": tags.get("artist"),
|
|
"description": tags.get("description"),
|
|
"probed_at": probed_at,
|
|
}
|
|
|
|
return metadata
|
|
|
|
@staticmethod
|
|
def parse_video_stream(video_stream: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
|
if not video_stream:
|
|
return {}
|
|
|
|
return {
|
|
"stream_index": video_stream.get("index"),
|
|
"codec_name": video_stream.get("codec_name"),
|
|
"codec_long_name": video_stream.get("codec_long_name"),
|
|
"profile": video_stream.get("profile"),
|
|
"level": video_stream.get("level"),
|
|
"width": video_stream.get("width"),
|
|
"height": video_stream.get("height"),
|
|
"coded_width": video_stream.get("coded_width"),
|
|
"coded_height": video_stream.get("coded_height"),
|
|
"aspect_ratio": video_stream.get("aspect_ratio"),
|
|
"pix_fmt": video_stream.get("pix_fmt"),
|
|
"field_order": video_stream.get("field_order"),
|
|
"frame_rate": video_stream.get("r_frame_rate"),
|
|
"start_time": video_stream.get("start_time"),
|
|
"duration": video_stream.get("duration"),
|
|
"bit_rate": video_stream.get("bit_rate"),
|
|
"nb_frames": video_stream.get("nb_frames"),
|
|
"color_range": video_stream.get("color_range"),
|
|
"color_space": video_stream.get("color_space"),
|
|
"has_b_frames": video_stream.get("has_b_frames"),
|
|
"sample_aspect_ratio": video_stream.get("sample_aspect_ratio"),
|
|
}
|
|
|
|
@staticmethod
|
|
def parse_audio_streams(audio_streams: list) -> list:
|
|
result = []
|
|
for audio in audio_streams:
|
|
result.append(
|
|
{
|
|
"stream_index": audio.get("index"),
|
|
"codec_name": audio.get("codec_name"),
|
|
"codec_long_name": audio.get("codec_long_name"),
|
|
"profile": audio.get("profile"),
|
|
"channels": audio.get("channels"),
|
|
"channel_layout": audio.get("channel_layout"),
|
|
"sample_rate": audio.get("sample_rate"),
|
|
"sample_fmt": audio.get("sample_fmt"),
|
|
"bit_rate": audio.get("bit_rate"),
|
|
"duration": audio.get("duration"),
|
|
"language": audio.get("tags", {}).get("language"),
|
|
}
|
|
)
|
|
return result
|
|
|
|
@staticmethod
|
|
def parse_subtitle_streams(subtitle_streams: list) -> list:
|
|
result = []
|
|
for subtitle in subtitle_streams:
|
|
result.append(
|
|
{
|
|
"stream_index": subtitle.get("index"),
|
|
"codec_name": subtitle.get("codec_name"),
|
|
"language": subtitle.get("language"),
|
|
}
|
|
)
|
|
return result
|