Add video_player.py - Video YOLO Player with full functionality
This commit is contained in:
743
video_player.py
Normal file
743
video_player.py
Normal file
@@ -0,0 +1,743 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Video YOLO Player - Play video with YOLO object detection overlay
|
||||
Shows two windows: Original Video and YOLO Detection
|
||||
|
||||
Usage:
|
||||
python video_yolo_player.py <video_path> <yolo_model_path>
|
||||
|
||||
Controls:
|
||||
y/Y - Toggle live YOLO detection (blue boxes)
|
||||
p/P - Toggle pre-scanned YOLO data (green boxes)
|
||||
i/I - Show video probe information
|
||||
Space - Pause/Resume
|
||||
s/S - Toggle sound
|
||||
b/B - Toggle status bar
|
||||
h/H - Hide current window
|
||||
1/2/3 - Toggle windows
|
||||
←/→ - Seek ±5s
|
||||
Shift+←/→ - Seek ±30s
|
||||
q/ESC - Quit
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import shutil
|
||||
import json
|
||||
import platform
|
||||
from datetime import datetime
|
||||
from typing import Tuple, Dict, Any, Optional
|
||||
from ultralytics import YOLO
|
||||
|
||||
FFPLAY = shutil.which('ffplay') or '/opt/homebrew/bin/ffplay'
|
||||
|
||||
BUILD_VERSION = "2.0.0"
|
||||
BUILD_TIME = "2026-03-06 12:00:00"
|
||||
|
||||
|
||||
def get_window_rect(win_name: str) -> Tuple[int, int, int, int]:
|
||||
"""Get window geometry as tuple (x, y, w, h)"""
|
||||
rect = cv2.getWindowImageRect(win_name)
|
||||
return (int(rect[0]), int(rect[1]), int(rect[2]), int(rect[3]))
|
||||
|
||||
|
||||
def get_screen_resolution() -> Tuple[int, int]:
|
||||
"""Detect screen resolution using platform-specific methods"""
|
||||
system = platform.system()
|
||||
|
||||
if system == "Darwin": # macOS
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['system_profiler', 'SPDisplaysDataType'],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
output = result.stdout
|
||||
for line in output.split('\n'):
|
||||
if 'Resolution:' in line:
|
||||
match = re.search(r'(\d+)\s*x\s*(\d+)', line)
|
||||
if match:
|
||||
return int(match.group(1)), int(match.group(2))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
elif system == "Linux":
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['xrandr'], capture_output=True, text=True, timeout=5
|
||||
)
|
||||
output = result.stdout
|
||||
for line in output.split('\n'):
|
||||
if ' connected' in line and '*' in line:
|
||||
match = re.search(r'(\d+)x(\d+)', line)
|
||||
if match:
|
||||
return int(match.group(1)), int(match.group(2))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
elif system == "Windows":
|
||||
try:
|
||||
import ctypes
|
||||
user32 = ctypes.windll.user32
|
||||
return user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return 1920, 1080
|
||||
|
||||
|
||||
YOLO_NAMES = [
|
||||
"person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", "boat",
|
||||
"traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat",
|
||||
"dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack",
|
||||
"umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball",
|
||||
"kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket",
|
||||
"bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
|
||||
"sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair",
|
||||
"sofa", "pottedplant", "bed", "diningtable", "toilet", "tvmonitor", "laptop", "mouse",
|
||||
"remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator",
|
||||
"book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush",
|
||||
]
|
||||
|
||||
|
||||
def format_time(seconds: float) -> str:
|
||||
"""Format seconds to HH:MM:SS"""
|
||||
hours = int(seconds // 3600)
|
||||
minutes = int((seconds % 3600) // 60)
|
||||
secs = int(seconds % 60)
|
||||
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
||||
|
||||
|
||||
def format_time_with_frame(seconds: float, frame_num: int, fps: float) -> Tuple[str, str]:
|
||||
"""Format time with frame: HH:MM:SS.ff"""
|
||||
hours = int(seconds // 3600)
|
||||
minutes = int((seconds % 3600) // 60)
|
||||
secs = int(seconds % 60)
|
||||
frame_in_sec = int(frame_num % fps) if fps > 0 else 0
|
||||
return f"{hours:02d}:{minutes:02d}:{secs:02d}.{frame_in_sec:02d}", f"Frame: {frame_num}"
|
||||
|
||||
|
||||
def load_probe_data(video_path: str) -> Optional[Dict]:
|
||||
"""Load .probe.json file"""
|
||||
video_dir = os.path.dirname(video_path)
|
||||
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
||||
probe_file = os.path.join(video_dir, f"{video_name}.probe.json")
|
||||
|
||||
if not os.path.exists(probe_file):
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(probe_file, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
print(f"Error loading probe file: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def load_yolo_data(video_path: str) -> Optional[Dict]:
|
||||
"""Load .yolo.json file"""
|
||||
video_dir = os.path.dirname(video_path)
|
||||
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
||||
yolo_file = os.path.join(video_dir, f"{video_name}.yolo.json")
|
||||
|
||||
if not os.path.exists(yolo_file):
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(yolo_file, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
print(f"Error loading YOLO file: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def get_detections_list(result) -> list:
|
||||
"""Extract detection info as list of dicts"""
|
||||
detections = []
|
||||
|
||||
if result.boxes is None:
|
||||
return detections
|
||||
|
||||
boxes = result.boxes.xyxy.cpu().numpy()
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
class_ids = result.boxes.cls.cpu().numpy().astype(int)
|
||||
|
||||
for box, conf, class_id in zip(boxes, confidences, class_ids):
|
||||
x1, y1, x2, y2 = box
|
||||
class_name = YOLO_NAMES[class_id] if class_id < len(YOLO_NAMES) else "unknown"
|
||||
|
||||
detections.append({
|
||||
'class_id': int(class_id),
|
||||
'class_name': class_name,
|
||||
'confidence': float(conf),
|
||||
'x1': float(x1),
|
||||
'y1': float(y1),
|
||||
'x2': float(x2),
|
||||
'y2': float(y2)
|
||||
})
|
||||
|
||||
return detections
|
||||
|
||||
|
||||
def draw_detections(frame: np.ndarray, detections: list, color: Tuple[int, int, int], label_prefix: str = "") -> np.ndarray:
|
||||
"""Draw detection boxes on frame"""
|
||||
annotated_frame = frame.copy()
|
||||
|
||||
for det in detections:
|
||||
x1, y1, x2, y2 = int(det['x1']), int(det['y1']), int(det['x2']), int(det['y2'])
|
||||
class_name = det['class_name']
|
||||
conf = det['confidence']
|
||||
|
||||
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
|
||||
|
||||
label = f"{label_prefix}{class_name} {conf:.1%}"
|
||||
(label_w, label_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
|
||||
cv2.rectangle(annotated_frame, (x1, y1 - label_h - 10), (x1 + label_w, y1), color, -1)
|
||||
|
||||
text_color = (255, 255, 255) if color != (0, 255, 0) else (0, 0, 0)
|
||||
cv2.putText(annotated_frame, label, (x1, y1 - 5),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color, 2)
|
||||
|
||||
return annotated_frame
|
||||
|
||||
|
||||
def draw_time_overlay(frame: np.ndarray, current_time: float, current_frame: int,
|
||||
total_time: float, total_frames: int, fps: float,
|
||||
object_count: int = 0, is_paused: bool = False,
|
||||
sound_on: bool = False, live_yolo: bool = False,
|
||||
pre_yolo: bool = False) -> np.ndarray:
|
||||
"""Draw time code and frame overlay at bottom of video"""
|
||||
height, width = frame.shape[:2]
|
||||
|
||||
time_str, frame_str = format_time_with_frame(current_time, current_frame, fps)
|
||||
total_time_str, total_frame_str = format_time_with_frame(total_time, total_frames, fps)
|
||||
|
||||
mode_parts = []
|
||||
if live_yolo:
|
||||
mode_parts.append("LIVE-YOLO")
|
||||
if pre_yolo:
|
||||
mode_parts.append("PRE-YOLO")
|
||||
mode_str = f" [{'+'.join(mode_parts)}]" if mode_parts else ""
|
||||
|
||||
sound_label = " [SOUND]" if sound_on else ""
|
||||
time_text = f"{time_str} / {total_time_str} | {frame_str}/{total_frames} | Objects: {object_count}{mode_str}{sound_label}"
|
||||
if is_paused:
|
||||
time_text = f"[PAUSED] {time_text}"
|
||||
|
||||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||
font_scale = 0.55
|
||||
thickness = 2
|
||||
padding = 10
|
||||
|
||||
(text_w, text_h), baseline = cv2.getTextSize(time_text, font, font_scale, thickness)
|
||||
bar_height = text_h + baseline + padding * 3
|
||||
|
||||
overlay = frame.copy()
|
||||
cv2.rectangle(overlay, (0, height - bar_height), (width, height), (0, 0, 0), -1)
|
||||
cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)
|
||||
cv2.line(frame, (0, height - bar_height), (width, height - bar_height), (100, 100, 100), 1)
|
||||
|
||||
text_x = (width - text_w) // 2
|
||||
text_y = height - bar_height // 2 + text_h // 2
|
||||
|
||||
text_color = (255, 100, 100) if is_paused else (255, 255, 255)
|
||||
|
||||
cv2.putText(frame, time_text, (text_x + 1, text_y + 1), font, font_scale, (0, 0, 0), thickness + 1)
|
||||
cv2.putText(frame, time_text, (text_x, text_y), font, font_scale, text_color, thickness)
|
||||
|
||||
return frame
|
||||
|
||||
|
||||
def play_video(video_path: str, model_path: str, probe_data: Optional[Dict], yolo_data: Optional[Dict]):
|
||||
"""Play video with YOLO overlay"""
|
||||
|
||||
print(f"\nOpening video: {video_path}")
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
|
||||
if not cap.isOpened():
|
||||
print(f"Error: Cannot open video: {video_path}")
|
||||
return
|
||||
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
total_duration = total_frames / fps if fps > 0 else 0
|
||||
|
||||
print(f"Video info: {width}x{height} @ {fps:.2f} fps, {total_frames} frames")
|
||||
|
||||
# Load YOLO model (lazy loading - only when needed)
|
||||
model = None
|
||||
|
||||
# Extract YOLO detections by frame
|
||||
yolo_detections_by_frame = {}
|
||||
if yolo_data and 'frames' in yolo_data:
|
||||
for frame_num_str, frame_data in yolo_data['frames'].items():
|
||||
yolo_detections_by_frame[int(frame_num_str)] = frame_data.get('detections', [])
|
||||
print(f"Loaded {len(yolo_detections_by_frame)} frames from .yolo.json")
|
||||
|
||||
# Screen resolution detection and window layout
|
||||
screen_w, screen_h = get_screen_resolution()
|
||||
print(f"Detected screen resolution: {screen_w}x{screen_h}")
|
||||
|
||||
GAP = 10
|
||||
margin = 40
|
||||
|
||||
available_width = screen_w - 3 * margin
|
||||
w_vid = min(width, available_width // 2)
|
||||
h_vid = int(w_vid * 9 / 16)
|
||||
h_vid = min(h_vid, screen_h - margin * 2 - 200)
|
||||
|
||||
w_cmd = w_vid * 2 + GAP
|
||||
h_cmd = 280
|
||||
|
||||
WIN_ORIGINAL = "1: Original Video"
|
||||
WIN_YOLO = "2: YOLO Detection"
|
||||
WIN_CMD = "3: Command"
|
||||
|
||||
x_start = margin
|
||||
y_start = margin
|
||||
|
||||
INIT_GEOM = {
|
||||
WIN_ORIGINAL: (x_start, y_start, w_vid, h_vid),
|
||||
WIN_YOLO: (x_start + w_vid + GAP, y_start, w_vid, h_vid),
|
||||
WIN_CMD: (x_start, y_start + h_vid + GAP + 30, w_cmd, h_cmd),
|
||||
}
|
||||
|
||||
print(f"Window layout: Original={w_vid}x{h_vid}, YOLO={w_vid}x{h_vid}, Command={w_cmd}x{h_cmd}")
|
||||
|
||||
def make_win(name):
|
||||
x, y, w, h = INIT_GEOM[name]
|
||||
cv2.namedWindow(name, cv2.WINDOW_NORMAL)
|
||||
cv2.resizeWindow(name, w, h)
|
||||
cv2.moveWindow(name, x, y)
|
||||
|
||||
make_win(WIN_ORIGINAL)
|
||||
make_win(WIN_YOLO)
|
||||
make_win(WIN_CMD)
|
||||
|
||||
# Trackbar
|
||||
tb_code_val = {"v": 0}
|
||||
seek_request = {"frame": None}
|
||||
|
||||
def on_progress(val):
|
||||
if val != tb_code_val["v"]:
|
||||
seek_request["frame"] = val
|
||||
|
||||
for wn in (WIN_ORIGINAL, WIN_YOLO):
|
||||
cv2.createTrackbar("Progress", wn, 0, max(total_frames - 1, 1), on_progress)
|
||||
|
||||
win_geom = dict(INIT_GEOM)
|
||||
win_visible = {WIN_ORIGINAL: True, WIN_YOLO: True, WIN_CMD: True}
|
||||
last_shown = WIN_ORIGINAL
|
||||
|
||||
sound_process = None
|
||||
|
||||
def start_audio(pos_secs):
|
||||
stop_audio()
|
||||
try:
|
||||
return subprocess.Popen(
|
||||
[FFPLAY, '-nodisp', '-autoexit',
|
||||
'-ss', f'{max(0, pos_secs):.2f}', video_path],
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Audio error: {e}")
|
||||
return None
|
||||
|
||||
def stop_audio():
|
||||
subprocess.run(['pkill', '-f', 'ffplay'],
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
|
||||
def do_seek(target_frame):
|
||||
nonlocal frame_count, current_frame, annotated_frame, object_count, sound_process
|
||||
|
||||
target_frame = max(0, min(total_frames - 1, int(target_frame)))
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
|
||||
ret, f = cap.read()
|
||||
if not ret:
|
||||
return
|
||||
|
||||
frame_count = target_frame + 1
|
||||
current_frame = f.copy()
|
||||
|
||||
# Update detections
|
||||
object_count = 0
|
||||
annotated_frame = f.copy()
|
||||
|
||||
if pre_yolo_mode and frame_count in yolo_detections_by_frame:
|
||||
dets = yolo_detections_by_frame[frame_count]
|
||||
object_count += len(dets)
|
||||
annotated_frame = draw_detections(annotated_frame, dets, (0, 255, 0), "[PRE] ")
|
||||
|
||||
if live_yolo_mode and model is not None:
|
||||
r = model(f, verbose=False)[0]
|
||||
live_dets = get_detections_list(r)
|
||||
object_count += len(live_dets)
|
||||
annotated_frame = draw_detections(annotated_frame, live_dets, (255, 0, 0), "[LIVE] ")
|
||||
|
||||
if sound_on:
|
||||
sound_process = start_audio(frame_count / fps)
|
||||
|
||||
# Update trackbar
|
||||
tb_code_val["v"] = frame_count
|
||||
for wn in (WIN_ORIGINAL, WIN_YOLO):
|
||||
if win_visible.get(wn):
|
||||
cv2.setTrackbarPos("Progress", wn, frame_count)
|
||||
|
||||
print(f"Seek → frame {frame_count} ({frame_count/fps:.2f}s)")
|
||||
|
||||
def seek_delta(delta_secs):
|
||||
do_seek(frame_count + int(delta_secs * fps))
|
||||
|
||||
# Command-line state
|
||||
cmd_input = ""
|
||||
cmd_log = []
|
||||
|
||||
def cmd_log_add(line):
|
||||
cmd_log.append(line)
|
||||
if len(cmd_log) > 12:
|
||||
cmd_log.pop(0)
|
||||
|
||||
def execute_command(s):
|
||||
s = s.strip()
|
||||
if not s:
|
||||
return
|
||||
|
||||
try:
|
||||
if s.lower() in ('i', 'info', 'probe'):
|
||||
# Show probe information
|
||||
if probe_data:
|
||||
cmd_log_add(">> Video Probe Info:")
|
||||
fmt = probe_data.get('format', {})
|
||||
cmd_log_add(f" Format: {fmt.get('format_long_name', 'N/A')}")
|
||||
cmd_log_add(f" Duration: {fmt.get('duration', 0):.2f}s")
|
||||
cmd_log_add(f" Size: {fmt.get('size', 0) / 1024 / 1024:.2f} MB")
|
||||
vs = probe_data.get('video_stream', {})
|
||||
if vs:
|
||||
cmd_log_add(f" Video: {vs.get('codec_name')} {vs.get('width')}x{vs.get('height')}")
|
||||
cmd_log_add(f" Audio: {len(probe_data.get('audio_streams', []))} streams")
|
||||
else:
|
||||
cmd_log_add("!! No .probe.json found")
|
||||
return
|
||||
|
||||
if s.startswith(('+', '-')):
|
||||
seek_delta(float(s))
|
||||
cmd_log_add(f">> seek {float(s):+.1f}s")
|
||||
return
|
||||
|
||||
if ':' in s:
|
||||
parts = s.split(':')
|
||||
hh = int(parts[0])
|
||||
mm = int(parts[1])
|
||||
ss_parts = parts[2].split('.')
|
||||
ss = int(ss_parts[0])
|
||||
ff = int(ss_parts[1]) if len(ss_parts) > 1 else 0
|
||||
total_s = hh*3600 + mm*60 + ss + ff/fps
|
||||
do_seek(int(total_s * fps))
|
||||
cmd_log_add(f">> seek {s}")
|
||||
return
|
||||
|
||||
do_seek(int(float(s)))
|
||||
cmd_log_add(f">> seek frame {int(float(s))}")
|
||||
|
||||
except Exception as e:
|
||||
cmd_log_add(f"!! {e}")
|
||||
|
||||
print("\nPlaying video...")
|
||||
print("Keys: q/ESC=quit space=pause s=sound b=statusbar")
|
||||
print(" y=live YOLO p=pre YOLO i=probe info h=hide 1/2/3=toggle windows")
|
||||
print(" ←/→=±5s Shift+←/→=±30s")
|
||||
print("Command: <frame> | hh:mm:ss[.ff] | +/-secs | i (probe info)")
|
||||
|
||||
frame_count = 0
|
||||
is_paused = False
|
||||
sound_on = False
|
||||
show_statusbar = True
|
||||
current_frame = None
|
||||
annotated_frame = None
|
||||
object_count = 0
|
||||
|
||||
# YOLO modes
|
||||
live_yolo_mode = False
|
||||
pre_yolo_mode = False
|
||||
|
||||
while True:
|
||||
if not is_paused:
|
||||
ret, frame = cap.read()
|
||||
|
||||
if not ret:
|
||||
print("End of video, looping...")
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
||||
frame_count = 0
|
||||
continue
|
||||
|
||||
current_frame = frame.copy()
|
||||
frame_count += 1
|
||||
|
||||
# Process detections
|
||||
object_count = 0
|
||||
annotated_frame = frame.copy()
|
||||
|
||||
if pre_yolo_mode and frame_count in yolo_detections_by_frame:
|
||||
dets = yolo_detections_by_frame[frame_count]
|
||||
object_count += len(dets)
|
||||
annotated_frame = draw_detections(annotated_frame, dets, (0, 255, 0), "[PRE] ")
|
||||
|
||||
if live_yolo_mode:
|
||||
if model is None:
|
||||
print("Loading YOLO model for live detection...")
|
||||
model = YOLO(model_path)
|
||||
print("✓ Model loaded")
|
||||
|
||||
results = model(frame, verbose=False)
|
||||
result = results[0]
|
||||
live_dets = get_detections_list(result)
|
||||
object_count += len(live_dets)
|
||||
annotated_frame = draw_detections(annotated_frame, live_dets, (255, 0, 0), "[LIVE] ")
|
||||
|
||||
current_time = frame_count / fps if fps > 0 else 0
|
||||
|
||||
if frame_count % 30 == 0 and not is_paused:
|
||||
print(f"Frame: {frame_count}/{total_frames}, Objects: {object_count}")
|
||||
|
||||
if current_frame is None:
|
||||
continue
|
||||
|
||||
# Handle trackbar seek
|
||||
if seek_request["frame"] is not None:
|
||||
do_seek(seek_request["frame"])
|
||||
seek_request["frame"] = None
|
||||
|
||||
# Sync trackbar
|
||||
if not seek_request["frame"]:
|
||||
tb_code_val["v"] = frame_count
|
||||
for wn in (WIN_ORIGINAL, WIN_YOLO):
|
||||
if win_visible.get(wn):
|
||||
cv2.setTrackbarPos("Progress", wn, frame_count)
|
||||
|
||||
# Render status bar
|
||||
overlay_args = (current_time, frame_count, total_duration, total_frames, fps,
|
||||
object_count, is_paused, sound_on, live_yolo_mode, pre_yolo_mode)
|
||||
|
||||
if win_visible[WIN_ORIGINAL]:
|
||||
if show_statusbar:
|
||||
frame_out = draw_time_overlay(current_frame, *overlay_args)
|
||||
else:
|
||||
frame_out = current_frame.copy()
|
||||
cv2.imshow(WIN_ORIGINAL, frame_out)
|
||||
last_shown = WIN_ORIGINAL
|
||||
|
||||
if win_visible[WIN_YOLO] and annotated_frame is not None:
|
||||
if show_statusbar:
|
||||
ann_out = draw_time_overlay(annotated_frame, *overlay_args)
|
||||
else:
|
||||
ann_out = annotated_frame.copy()
|
||||
cv2.imshow(WIN_YOLO, ann_out)
|
||||
last_shown = WIN_YOLO
|
||||
|
||||
# Command window
|
||||
if win_visible[WIN_CMD]:
|
||||
cmd_h, cmd_w = 320, w_cmd
|
||||
panel = np.zeros((cmd_h, cmd_w, 3), dtype=np.uint8)
|
||||
|
||||
# Title bar with build info
|
||||
cv2.rectangle(panel, (0, 0), (cmd_w, 28), (40, 40, 80), -1)
|
||||
title = f"3: Command | v{BUILD_VERSION} | {BUILD_TIME}"
|
||||
cv2.putText(panel, title, (6, 18), cv2.FONT_HERSHEY_SIMPLEX, 0.48, (180, 220, 255), 1)
|
||||
|
||||
# Examples section
|
||||
examples = [
|
||||
"Examples: 123 | 00:01:30 | +10 | -5 | i (probe info)"
|
||||
]
|
||||
y = 50
|
||||
for ex in examples:
|
||||
cv2.putText(panel, ex, (8, y), cv2.FONT_HERSHEY_SIMPLEX, 0.42, (150, 150, 150), 1)
|
||||
y += 20
|
||||
|
||||
# Separator
|
||||
cv2.line(panel, (0, y), (cmd_w, y), (60, 60, 60), 1)
|
||||
y += 15
|
||||
|
||||
# Log lines
|
||||
for line in cmd_log[-8:]:
|
||||
color = (80, 200, 80) if line.startswith(">>") else \
|
||||
(80, 80, 200) if line.startswith("!!") else (180, 180, 180)
|
||||
cv2.putText(panel, line, (8, y), cv2.FONT_HERSHEY_SIMPLEX, 0.50, color, 1)
|
||||
y += 22
|
||||
|
||||
# Status line
|
||||
mode_str = f"Live:{'Y' if live_yolo_mode else 'N'} Pre:{'Y' if pre_yolo_mode else 'N'}"
|
||||
s_line = (f" [{format_time(current_time)} f:{frame_count}/{total_frames}]"
|
||||
f" {mode_str}"
|
||||
f" Pause:{'Y' if is_paused else 'N'}"
|
||||
f" Sound:{'Y' if sound_on else 'N'}")
|
||||
cv2.putText(panel, s_line, (6, cmd_h - 38),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.42, (120, 200, 255), 1)
|
||||
|
||||
# Input prompt
|
||||
cv2.line(panel, (0, cmd_h - 28), (cmd_w, cmd_h - 28), (80, 80, 80), 1)
|
||||
prompt = f"> {cmd_input}_"
|
||||
cv2.putText(panel, prompt, (8, cmd_h - 8),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 200), 1)
|
||||
|
||||
# Focus indicator
|
||||
if cmd_input:
|
||||
cv2.putText(panel, "[TYPING]", (cmd_w - 100, cmd_h - 8),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.40, (255, 200, 0), 1)
|
||||
|
||||
cv2.imshow(WIN_CMD, panel)
|
||||
last_shown = WIN_CMD
|
||||
|
||||
# Key handling
|
||||
key = cv2.waitKeyEx(30 if not is_paused else 100)
|
||||
key_char = key & 0xFF
|
||||
|
||||
in_focus_mode = bool(cmd_input)
|
||||
|
||||
if key_char == 13: # Enter
|
||||
if cmd_input.strip():
|
||||
cmd_log_add(f"> {cmd_input}")
|
||||
execute_command(cmd_input)
|
||||
cmd_input = ""
|
||||
|
||||
elif key_char in (8, 127): # Backspace / Delete
|
||||
cmd_input = cmd_input[:-1]
|
||||
|
||||
elif 32 <= key_char <= 126:
|
||||
if in_focus_mode or chr(key_char) in ('+', '-', '0','1','2','3','4','5','6','7','8','9', ':'):
|
||||
cmd_input += chr(key_char)
|
||||
elif key_char == ord('q') or key_char == ord('Q') or key_char == 27:
|
||||
print("Quitting...")
|
||||
break
|
||||
elif key_char == ord(' '):
|
||||
is_paused = not is_paused
|
||||
if sound_on:
|
||||
if is_paused:
|
||||
stop_audio()
|
||||
sound_process = None
|
||||
else:
|
||||
sound_process = start_audio(frame_count / fps)
|
||||
print(f"{'Paused' if is_paused else 'Resumed'}")
|
||||
elif key_char == ord('b') or key_char == ord('B'):
|
||||
show_statusbar = not show_statusbar
|
||||
print(f"Status bar {'ON' if show_statusbar else 'OFF'}")
|
||||
elif key_char == ord('s') or key_char == ord('S'):
|
||||
sound_on = not sound_on
|
||||
if sound_on:
|
||||
sound_process = start_audio(frame_count / fps)
|
||||
print(f"Sound ON (at {frame_count/fps:.1f}s)")
|
||||
else:
|
||||
stop_audio()
|
||||
sound_process = None
|
||||
print("Sound OFF")
|
||||
elif key_char == ord('y') or key_char == ord('Y'):
|
||||
live_yolo_mode = not live_yolo_mode
|
||||
print(f"Live YOLO {'ON' if live_yolo_mode else 'OFF'}")
|
||||
elif key_char == ord('p') or key_char == ord('P'):
|
||||
if yolo_data:
|
||||
pre_yolo_mode = not pre_yolo_mode
|
||||
print(f"Pre-scanned YOLO {'ON' if pre_yolo_mode else 'OFF'}")
|
||||
else:
|
||||
print("No .yolo.json file found")
|
||||
cmd_log_add("!! No .yolo.json found")
|
||||
elif key_char == ord('h') or key_char == ord('H'):
|
||||
target = last_shown
|
||||
if target and win_visible.get(target):
|
||||
win_geom[target] = get_window_rect(target)
|
||||
win_visible[target] = False
|
||||
cv2.destroyWindow(target)
|
||||
print(f"Hidden: {target}")
|
||||
elif key_char == ord('1'):
|
||||
win_visible[WIN_ORIGINAL] = not win_visible[WIN_ORIGINAL]
|
||||
if not win_visible[WIN_ORIGINAL]:
|
||||
win_geom[WIN_ORIGINAL] = get_window_rect(WIN_ORIGINAL)
|
||||
cv2.destroyWindow(WIN_ORIGINAL)
|
||||
else:
|
||||
g = win_geom.get(WIN_ORIGINAL, INIT_GEOM[WIN_ORIGINAL])
|
||||
cv2.namedWindow(WIN_ORIGINAL, cv2.WINDOW_NORMAL)
|
||||
cv2.resizeWindow(WIN_ORIGINAL, g[2], g[3])
|
||||
cv2.moveWindow(WIN_ORIGINAL, g[0], g[1])
|
||||
cv2.createTrackbar("Progress", WIN_ORIGINAL,
|
||||
frame_count, max(total_frames-1,1), on_progress)
|
||||
print(f"[1] Original: {'ON' if win_visible[WIN_ORIGINAL] else 'OFF'}")
|
||||
elif key_char == ord('2'):
|
||||
win_visible[WIN_YOLO] = not win_visible[WIN_YOLO]
|
||||
if not win_visible[WIN_YOLO]:
|
||||
win_geom[WIN_YOLO] = get_window_rect(WIN_YOLO)
|
||||
cv2.destroyWindow(WIN_YOLO)
|
||||
else:
|
||||
g = win_geom.get(WIN_YOLO, INIT_GEOM[WIN_YOLO])
|
||||
cv2.namedWindow(WIN_YOLO, cv2.WINDOW_NORMAL)
|
||||
cv2.resizeWindow(WIN_YOLO, g[2], g[3])
|
||||
cv2.moveWindow(WIN_YOLO, g[0], g[1])
|
||||
cv2.createTrackbar("Progress", WIN_YOLO,
|
||||
frame_count, max(total_frames-1,1), on_progress)
|
||||
print(f"[2] YOLO: {'ON' if win_visible[WIN_YOLO] else 'OFF'}")
|
||||
elif key_char == ord('3'):
|
||||
win_visible[WIN_CMD] = not win_visible[WIN_CMD]
|
||||
if not win_visible[WIN_CMD]:
|
||||
win_geom[WIN_CMD] = get_window_rect(WIN_CMD)
|
||||
cv2.destroyWindow(WIN_CMD)
|
||||
else:
|
||||
g = win_geom.get(WIN_CMD, INIT_GEOM[WIN_CMD])
|
||||
cv2.namedWindow(WIN_CMD, cv2.WINDOW_NORMAL)
|
||||
cv2.resizeWindow(WIN_CMD, g[2], g[3])
|
||||
cv2.moveWindow(WIN_CMD, g[0], g[1])
|
||||
print(f"[3] Command: {'ON' if win_visible[WIN_CMD] else 'OFF'}")
|
||||
|
||||
# Arrow key seek
|
||||
elif key in (2424832, 63234, 65361): # ←
|
||||
seek_delta(-5)
|
||||
elif key in (2555904, 63235, 65363): # →
|
||||
seek_delta(5)
|
||||
elif key in (2162688, 63232, 65360): # Shift+←
|
||||
seek_delta(-30)
|
||||
elif key in (2293760, 63233, 65367): # Shift+→
|
||||
seek_delta(30)
|
||||
|
||||
# Cleanup
|
||||
stop_audio()
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
print("Done!")
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 3:
|
||||
print(f"Usage: python {sys.argv[0]} <video_path> <yolo_model_path>")
|
||||
print(f"Example: python {sys.argv[0]} video.mp4 yolov8n.pt")
|
||||
sys.exit(1)
|
||||
|
||||
video_path = sys.argv[1]
|
||||
model_path = sys.argv[2]
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("Video YOLO Player v" + BUILD_VERSION)
|
||||
print("=" * 60)
|
||||
|
||||
# Load probe data
|
||||
probe_data = load_probe_data(video_path)
|
||||
if probe_data:
|
||||
print(f"✓ Found .probe.json")
|
||||
else:
|
||||
print(f"⚠ No .probe.json found (run video_probe.py first)")
|
||||
|
||||
# Load YOLO pre-scan data
|
||||
yolo_data = load_yolo_data(video_path)
|
||||
if yolo_data:
|
||||
print(f"✓ Found .yolo.json")
|
||||
else:
|
||||
print(f"⚠ No .yolo.json found (run video_yolo_object_prescan.py first)")
|
||||
|
||||
print("=" * 60)
|
||||
|
||||
# Play video
|
||||
play_video(video_path, model_path, probe_data, yolo_data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user