Compare commits

...

2 Commits

Author SHA1 Message Date
accusys
c979c0297a Merge branch 'main' of http://localhost:3000/warren/video_register 2026-03-11 00:44:41 +08:00
accusys
b4aa7b96d3 Initial commit: Video metadata registration service
- Add FastAPI server for video metadata registration
- PostgreSQL database models for videos, video_streams, audio_streams, subtitle_streams
- Batch registration script for .probe.json files
- RESTful API endpoints for CRUD operations
- Search functionality by title, artist, codec, resolution
2026-03-11 00:30:31 +08:00
16 changed files with 709 additions and 0 deletions

1
.env.example Normal file
View File

@@ -0,0 +1 @@
DATABASE_URL=postgresql://accusys@localhost:5432/video_register

1
app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# app package

1
app/api/__init__.py Normal file
View File

@@ -0,0 +1 @@
# api package

View File

@@ -0,0 +1 @@
# routes package

66
app/api/routes/videos.py Normal file
View File

@@ -0,0 +1,66 @@
import os
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.models.schemas import VideoSchema, VideoListResponse, RegisterRequest
from app.services.video_register import VideoRegisterService
router = APIRouter(prefix="/videos", tags=["videos"])
@router.post("", response_model=VideoSchema)
def register_video(request: RegisterRequest, db: Session = Depends(get_db)):
if not os.path.exists(request.probe_json_path):
raise HTTPException(status_code=404, detail="Probe JSON file not found")
if not os.path.exists(request.absolute_file_path):
raise HTTPException(status_code=404, detail="Video file not found")
service = VideoRegisterService(db)
video = service.register_video(request.probe_json_path, request.absolute_file_path)
return video
@router.get("/{video_id}", response_model=VideoSchema)
def get_video(video_id: UUID, db: Session = Depends(get_db)):
service = VideoRegisterService(db)
video = service.get_video_by_id(video_id)
if not video:
raise HTTPException(status_code=404, detail="Video not found")
return video
@router.get("", response_model=VideoListResponse)
def search_videos(
title: Optional[str] = Query(None, description="Search by title"),
artist: Optional[str] = Query(None, description="Search by artist"),
codec_name: Optional[str] = Query(None, description="Search by video codec"),
min_width: Optional[int] = Query(None, description="Minimum video width"),
max_width: Optional[int] = Query(None, description="Maximum video width"),
min_height: Optional[int] = Query(None, description="Minimum video height"),
max_height: Optional[int] = Query(None, description="Maximum video height"),
format_name: Optional[str] = Query(None, description="Search by format name"),
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
service = VideoRegisterService(db)
total, videos = service.search_videos(
title=title,
artist=artist,
codec_name=codec_name,
min_width=min_width,
max_width=max_width,
min_height=min_height,
max_height=max_height,
format_name=format_name,
skip=skip,
limit=limit,
)
return {"total": total, "videos": videos}

15
app/config.py Normal file
View File

@@ -0,0 +1,15 @@
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
database_url: str = "postgresql://accusys@localhost:5432/video_register"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache()
def get_settings() -> Settings:
return Settings()

17
app/database.py Normal file
View File

@@ -0,0 +1,17 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from app.config import get_settings
settings = get_settings()
engine = create_engine(settings.database_url, echo=False)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

18
app/main.py Normal file
View File

@@ -0,0 +1,18 @@
from fastapi import FastAPI
from app.api.routes import videos
from app.database import engine, Base
Base.metadata.create_all(bind=engine)
app = FastAPI(
title="Video Register API",
description="API for registering and searching video metadata",
version="1.0.0",
)
app.include_router(videos.router)
@app.get("/health")
def health_check():
return {"status": "ok"}

1
app/models/__init__.py Normal file
View File

@@ -0,0 +1 @@
# models package

103
app/models/schemas.py Normal file
View File

@@ -0,0 +1,103 @@
from datetime import datetime
from uuid import UUID
from typing import Optional, List
from pydantic import BaseModel, ConfigDict
class VideoStreamSchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
stream_index: Optional[int] = None
codec_name: Optional[str] = None
codec_long_name: Optional[str] = None
profile: Optional[str] = None
level: Optional[int] = None
width: Optional[int] = None
height: Optional[int] = None
coded_width: Optional[int] = None
coded_height: Optional[int] = None
aspect_ratio: Optional[str] = None
pix_fmt: Optional[str] = None
field_order: Optional[str] = None
frame_rate: Optional[str] = None
start_time: Optional[float] = None
duration: Optional[float] = None
bit_rate: Optional[int] = None
nb_frames: Optional[int] = None
color_range: Optional[str] = None
color_space: Optional[str] = None
has_b_frames: Optional[int] = None
sample_aspect_ratio: Optional[str] = None
class AudioStreamSchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
stream_index: Optional[int] = None
codec_name: Optional[str] = None
codec_long_name: Optional[str] = None
profile: Optional[str] = None
channels: Optional[int] = None
channel_layout: Optional[str] = None
sample_rate: Optional[int] = None
sample_fmt: Optional[str] = None
bit_rate: Optional[int] = None
duration: Optional[float] = None
language: Optional[str] = None
class SubtitleStreamSchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
stream_index: Optional[int] = None
codec_name: Optional[str] = None
language: Optional[str] = None
class VideoSchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
file_path: str
file_name: str
file_extension: Optional[str] = None
file_size: Optional[int] = None
format_name: Optional[str] = None
format_long_name: Optional[str] = None
duration: Optional[float] = None
bit_rate: Optional[int] = None
nb_streams: Optional[int] = None
start_time: Optional[float] = None
title: Optional[str] = None
artist: Optional[str] = None
description: Optional[str] = None
probed_at: Optional[datetime] = None
registered_at: Optional[datetime] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
video_streams: List[VideoStreamSchema] = []
audio_streams: List[AudioStreamSchema] = []
subtitle_streams: List[SubtitleStreamSchema] = []
class VideoListResponse(BaseModel):
total: int
videos: List[VideoSchema]
class VideoSearchQuery(BaseModel):
title: Optional[str] = None
artist: Optional[str] = None
codec_name: Optional[str] = None
min_width: Optional[int] = None
max_width: Optional[int] = None
min_height: Optional[int] = None
max_height: Optional[int] = None
format_name: Optional[str] = None
skip: int = 0
limit: int = 20
class RegisterRequest(BaseModel):
probe_json_path: str
absolute_file_path: str

116
app/models/video.py Normal file
View File

@@ -0,0 +1,116 @@
import uuid
from datetime import datetime
from sqlalchemy import (
Column,
String,
Integer,
BigInteger,
Float,
Text,
ForeignKey,
DateTime,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
class Video(Base):
__tablename__ = "videos"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
file_path = Column(String(512), unique=True, nullable=False)
file_name = Column(String(255), nullable=False)
file_extension = Column(String(10))
file_size = Column(BigInteger)
format_name = Column(String(50))
format_long_name = Column(String(100))
duration = Column(Float)
bit_rate = Column(BigInteger)
nb_streams = Column(Integer)
start_time = Column(Float, default=0)
title = Column(String(255))
artist = Column(String(255))
description = Column(Text)
probed_at = Column(DateTime)
registered_at = Column(DateTime, default=datetime.utcnow)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
video_streams = relationship(
"VideoStream", back_populates="video", cascade="all, delete-orphan"
)
audio_streams = relationship(
"AudioStream", back_populates="video", cascade="all, delete-orphan"
)
subtitle_streams = relationship(
"SubtitleStream", back_populates="video", cascade="all, delete-orphan"
)
class VideoStream(Base):
__tablename__ = "video_streams"
id = Column(Integer, primary_key=True, autoincrement=True)
video_id = Column(
UUID(as_uuid=True), ForeignKey("videos.id", ondelete="CASCADE"), nullable=False
)
stream_index = Column(Integer)
codec_name = Column(String(30))
codec_long_name = Column(String(100))
profile = Column(String(30))
level = Column(Integer)
width = Column(Integer)
height = Column(Integer)
coded_width = Column(Integer)
coded_height = Column(Integer)
aspect_ratio = Column(String(20))
pix_fmt = Column(String(30))
field_order = Column(String(20))
frame_rate = Column(String(20))
start_time = Column(Float)
duration = Column(Float)
bit_rate = Column(BigInteger)
nb_frames = Column(BigInteger)
color_range = Column(String(10))
color_space = Column(String(20))
has_b_frames = Column(Integer)
sample_aspect_ratio = Column(String(20))
video = relationship("Video", back_populates="video_streams")
class AudioStream(Base):
__tablename__ = "audio_streams"
id = Column(Integer, primary_key=True, autoincrement=True)
video_id = Column(
UUID(as_uuid=True), ForeignKey("videos.id", ondelete="CASCADE"), nullable=False
)
stream_index = Column(Integer)
codec_name = Column(String(30))
codec_long_name = Column(String(100))
profile = Column(String(30))
channels = Column(Integer)
channel_layout = Column(String(30))
sample_rate = Column(Integer)
sample_fmt = Column(String(20))
bit_rate = Column(BigInteger)
duration = Column(Float)
language = Column(String(10))
video = relationship("Video", back_populates="audio_streams")
class SubtitleStream(Base):
__tablename__ = "subtitle_streams"
id = Column(Integer, primary_key=True, autoincrement=True)
video_id = Column(
UUID(as_uuid=True), ForeignKey("videos.id", ondelete="CASCADE"), nullable=False
)
stream_index = Column(Integer)
codec_name = Column(String(30))
language = Column(String(10))
video = relationship("Video", back_populates="subtitle_streams")

1
app/services/__init__.py Normal file
View File

@@ -0,0 +1 @@
# services package

View File

@@ -0,0 +1,120 @@
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional
from pathlib import Path
class ProbeParser:
@staticmethod
def load_probe_json(probe_json_path: str) -> Dict[str, Any]:
if not os.path.exists(probe_json_path):
raise FileNotFoundError(f"Probe JSON file not found: {probe_json_path}")
with open(probe_json_path, "r", encoding="utf-8") as f:
return json.load(f)
@staticmethod
def parse_video_metadata(
probe_data: Dict[str, Any], absolute_file_path: str
) -> Dict[str, Any]:
format_data = probe_data.get("format", {})
video_stream = probe_data.get("video_stream")
audio_streams = probe_data.get("audio_streams", [])
subtitle_streams = probe_data.get("subtitle_streams", [])
file_name = os.path.basename(absolute_file_path)
name_without_ext = os.path.splitext(file_name)[0]
file_ext = os.path.splitext(file_name)[1]
tags = format_data.get("tags", {})
probed_at_str = probe_data.get("probed_at")
probed_at = None
if probed_at_str:
try:
probed_at = datetime.fromisoformat(probed_at_str)
except (ValueError, TypeError):
pass
metadata = {
"file_path": absolute_file_path,
"file_name": name_without_ext,
"file_extension": file_ext,
"file_size": format_data.get("size"),
"format_name": format_data.get("format_name"),
"format_long_name": format_data.get("format_long_name"),
"duration": format_data.get("duration"),
"bit_rate": format_data.get("bit_rate"),
"nb_streams": len(probe_data.get("streams", [])),
"start_time": video_stream.get("start_time") if video_stream else 0,
"title": tags.get("title"),
"artist": tags.get("artist"),
"description": tags.get("description"),
"probed_at": probed_at,
}
return metadata
@staticmethod
def parse_video_stream(video_stream: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if not video_stream:
return {}
return {
"stream_index": video_stream.get("index"),
"codec_name": video_stream.get("codec_name"),
"codec_long_name": video_stream.get("codec_long_name"),
"profile": video_stream.get("profile"),
"level": video_stream.get("level"),
"width": video_stream.get("width"),
"height": video_stream.get("height"),
"coded_width": video_stream.get("coded_width"),
"coded_height": video_stream.get("coded_height"),
"aspect_ratio": video_stream.get("aspect_ratio"),
"pix_fmt": video_stream.get("pix_fmt"),
"field_order": video_stream.get("field_order"),
"frame_rate": video_stream.get("r_frame_rate"),
"start_time": video_stream.get("start_time"),
"duration": video_stream.get("duration"),
"bit_rate": video_stream.get("bit_rate"),
"nb_frames": video_stream.get("nb_frames"),
"color_range": video_stream.get("color_range"),
"color_space": video_stream.get("color_space"),
"has_b_frames": video_stream.get("has_b_frames"),
"sample_aspect_ratio": video_stream.get("sample_aspect_ratio"),
}
@staticmethod
def parse_audio_streams(audio_streams: list) -> list:
result = []
for audio in audio_streams:
result.append(
{
"stream_index": audio.get("index"),
"codec_name": audio.get("codec_name"),
"codec_long_name": audio.get("codec_long_name"),
"profile": audio.get("profile"),
"channels": audio.get("channels"),
"channel_layout": audio.get("channel_layout"),
"sample_rate": audio.get("sample_rate"),
"sample_fmt": audio.get("sample_fmt"),
"bit_rate": audio.get("bit_rate"),
"duration": audio.get("duration"),
"language": audio.get("tags", {}).get("language"),
}
)
return result
@staticmethod
def parse_subtitle_streams(subtitle_streams: list) -> list:
result = []
for subtitle in subtitle_streams:
result.append(
{
"stream_index": subtitle.get("index"),
"codec_name": subtitle.get("codec_name"),
"language": subtitle.get("language"),
}
)
return result

View File

@@ -0,0 +1,202 @@
import os
import uuid
from datetime import datetime
from typing import Dict, Any, List
from sqlalchemy.orm import Session
from app.models.video import Video, VideoStream, AudioStream, SubtitleStream
from app.services.probe_parser import ProbeParser
class VideoRegisterService:
def __init__(self, db: Session):
self.db = db
self.parser = ProbeParser()
def register_video(self, probe_json_path: str, absolute_file_path: str) -> Video:
probe_data = self.parser.load_probe_json(probe_json_path)
existing_video = (
self.db.query(Video).filter(Video.file_path == absolute_file_path).first()
)
if existing_video:
return self._update_video(existing_video, probe_data)
return self._create_video(probe_data, absolute_file_path)
def _create_video(
self, probe_data: Dict[str, Any], absolute_file_path: str
) -> Video:
video_metadata = self.parser.parse_video_metadata(
probe_data, absolute_file_path
)
video_stream_data = self.parser.parse_video_stream(
probe_data.get("video_stream")
)
audio_streams_data = self.parser.parse_audio_streams(
probe_data.get("audio_streams", [])
)
subtitle_streams_data = self.parser.parse_subtitle_streams(
probe_data.get("subtitle_streams", [])
)
video = Video(**video_metadata)
if video_stream_data:
video.video_streams.append(VideoStream(**video_stream_data))
for audio_data in audio_streams_data:
video.audio_streams.append(AudioStream(**audio_data))
for subtitle_data in subtitle_streams_data:
video.subtitle_streams.append(SubtitleStream(**subtitle_data))
self.db.add(video)
self.db.commit()
self.db.refresh(video)
return video
def _update_video(self, video: Video, probe_data: Dict[str, Any]) -> Video:
video_metadata = self.parser.parse_video_metadata(probe_data, video.file_path)
video_stream_data = self.parser.parse_video_stream(
probe_data.get("video_stream")
)
audio_streams_data = self.parser.parse_audio_streams(
probe_data.get("audio_streams", [])
)
subtitle_streams_data = self.parser.parse_subtitle_streams(
probe_data.get("subtitle_streams", [])
)
for key, value in video_metadata.items():
if value is not None:
setattr(video, key, value)
video.updated_at = datetime.utcnow()
self.db.query(VideoStream).filter(VideoStream.video_id == video.id).delete()
self.db.query(AudioStream).filter(AudioStream.video_id == video.id).delete()
self.db.query(SubtitleStream).filter(
SubtitleStream.video_id == video.id
).delete()
if video_stream_data:
video.video_streams.append(VideoStream(**video_stream_data))
for audio_data in audio_streams_data:
video.audio_streams.append(AudioStream(**audio_data))
for subtitle_data in subtitle_streams_data:
video.subtitle_streams.append(SubtitleStream(**subtitle_data))
self.db.commit()
self.db.refresh(video)
return video
def register_batch(self, directory: str) -> List[Video]:
videos = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith(".probe.json"):
probe_json_path = os.path.join(root, file)
video_filename = file.replace(".probe.json", "")
possible_extensions = [
".mp4",
".mov",
".avi",
".mkv",
".m4v",
".wmv",
".flv",
".webm",
]
absolute_file_path = None
for ext in possible_extensions:
test_path = os.path.join(root, video_filename + ext)
if os.path.exists(test_path):
absolute_file_path = test_path
break
if not absolute_file_path:
video_file = video_filename
for f in os.listdir(root):
if (
f.startswith(video_filename)
and not f.endswith(".probe.json")
and not f.endswith(".yolo.json")
):
absolute_file_path = os.path.join(root, f)
break
if absolute_file_path:
try:
video = self.register_video(
probe_json_path, absolute_file_path
)
videos.append(video)
print(f"Registered: {video.file_name}")
except Exception as e:
print(f"Error registering {probe_json_path}: {e}")
else:
print(f"Video file not found for: {probe_json_path}")
return videos
def get_video_by_id(self, video_id: uuid.UUID) -> Video:
return self.db.query(Video).filter(Video.id == video_id).first()
def get_video_by_path(self, file_path: str) -> Video:
return self.db.query(Video).filter(Video.file_path == file_path).first()
def search_videos(
self,
title=None,
artist=None,
codec_name=None,
min_width=None,
max_width=None,
min_height=None,
max_height=None,
format_name=None,
skip=0,
limit=20,
):
query = self.db.query(Video)
if title:
query = query.filter(Video.title.ilike(f"%{title}%"))
if artist:
query = query.filter(Video.artist.ilike(f"%{artist}%"))
if format_name:
query = query.filter(Video.format_name.ilike(f"%{format_name}%"))
if min_width or max_width or min_height or max_height:
query = query.join(VideoStream).filter(VideoStream.video_id == Video.id)
if min_width:
query = query.filter(VideoStream.width >= min_width)
if max_width:
query = query.filter(VideoStream.width <= max_width)
if min_height:
query = query.filter(VideoStream.height >= min_height)
if max_height:
query = query.filter(VideoStream.height <= max_height)
if codec_name:
query = query.join(VideoStream).filter(
VideoStream.video_id == Video.id,
VideoStream.codec_name.ilike(f"%{codec_name}%"),
)
total = query.count()
videos = query.offset(skip).limit(limit).all()
return total, videos

7
requirements.txt Normal file
View File

@@ -0,0 +1,7 @@
fastapi==0.115.0
uvicorn[standard]==0.32.0
sqlalchemy==2.0.35
psycopg2-binary==2.9.10
pydantic==2.9.2
pydantic-settings==2.6.0
python-dotenv==1.0.1

39
scripts/register_batch.py Normal file
View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.database import SessionLocal
from app.services.video_register import VideoRegisterService
def main():
if len(sys.argv) < 2:
print("Usage: python register_batch.py <directory>")
print("Example: python register_batch.py ../test_video")
sys.exit(1)
directory = sys.argv[1]
if not os.path.isdir(directory):
print(f"Error: Directory not found: {directory}")
sys.exit(1)
print(f"Scanning directory: {directory}")
print("=" * 60)
db = SessionLocal()
try:
service = VideoRegisterService(db)
videos = service.register_batch(directory)
print("=" * 60)
print(f"Total registered: {len(videos)} videos")
finally:
db.close()
if __name__ == "__main__":
main()