Files
video_probe/video_yolo_object_prescan.py
accusys f3e2d2dca7 Initial implementation of video_probe (Rust)
Core modules:
- probe.rs: ffprobe execution logic
- parser.rs: JSON parsing logic
- output.rs: Output formatting
- lib.rs: Library interface
- main.rs: CLI entry point

Features:
- Extract video metadata using ffprobe
- Parse video/audio/subtitle streams
- Save to JSON file
- Console summary output

Documentation:
- Added QUICKSTART.md
- Added ENVIRONMENT_SETUP_REPORT.md
2026-03-07 10:10:19 +08:00

429 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Video YOLO Object Prescan - Pre-process video for object detection
Saves detection results to .yolo.json file
Features:
- Pause/Resume: Press Ctrl+C to pause and save progress
- Resume from checkpoint: Automatically continues from last frame
- Auto-save: Configurable auto-save interval (default: 30 seconds)
Usage:
python video_yolo_object_prescan.py <video_path> <yolo_model_path> [--save-interval SECONDS]
Examples:
# Default auto-save every 30 seconds
python video_yolo_object_prescan.py video.mp4 yolov8n.pt
# Auto-save every 60 seconds
python video_yolo_object_prescan.py video.mp4 yolov8n.pt --save-interval 60
# Auto-save every 15 seconds (for long videos)
python video_yolo_object_prescan.py video.mp4 yolov8n.pt --save-interval 15
"""
import cv2
import sys
import os
import json
import time
import signal
import argparse
from datetime import datetime
from ultralytics import YOLO
YOLO_NAMES = [
"person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", "boat",
"traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat",
"dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack",
"umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball",
"kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket",
"bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
"sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair",
"sofa", "pottedplant", "bed", "diningtable", "toilet", "tvmonitor", "laptop", "mouse",
"remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator",
"book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush",
]
def format_time(seconds):
"""Format seconds to HH:MM:SS"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def get_detections_list(result):
"""Extract detection info as list of dicts"""
detections = []
if result.boxes is None:
return detections
boxes = result.boxes.xyxy.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy().astype(int)
for box, conf, class_id in zip(boxes, confidences, class_ids):
x1, y1, x2, y2 = box
class_name = YOLO_NAMES[class_id] if class_id < len(YOLO_NAMES) else "unknown"
detections.append({
'class_id': int(class_id),
'class_name': class_name,
'confidence': float(conf),
'x1': float(x1),
'y1': float(y1),
'x2': float(x2),
'y2': float(y2)
})
return detections
def load_existing_data(output_file):
"""Load existing detection data from file"""
if not os.path.exists(output_file):
return None, 0
try:
with open(output_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Get last processed frame number
frames = data.get('frames', {})
if frames:
last_frame = max(int(k) for k in frames.keys())
return data, last_frame
except Exception as e:
print(f"Warning: Could not load existing file: {e}")
return None, 0
def save_detection_data(output_file, detection_data, is_interrupted=False, silent=False):
"""Save detection data to JSON file"""
try:
# Update metadata
if 'metadata' in detection_data:
detection_data['metadata']['last_saved_at'] = datetime.now().isoformat()
if is_interrupted:
detection_data['metadata']['status'] = 'interrupted'
else:
detection_data['metadata']['status'] = 'in_progress'
# Write to file
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(detection_data, f, indent=2, ensure_ascii=False)
if not silent:
return True, os.path.getsize(output_file)
return True, 0
except Exception as e:
print(f"Error saving data: {e}")
return False, 0
# Global variables for signal handling
detection_data_global = None
output_file_global = None
frame_count_global = 0
total_frames_global = 0
start_time_global = None
def signal_handler(signum, frame):
"""Handle Ctrl+C to pause and save progress"""
global detection_data_global, output_file_global, frame_count_global
global total_frames_global, start_time_global
print(f"\n\n{'=' * 60}")
print("⏸ PAUSED - Saving progress...")
print(f"{'=' * 60}")
if detection_data_global and output_file_global:
# Calculate stats
elapsed = time.time() - start_time_global if start_time_global else 0
total_detections = sum(
len(f.get('detections', []))
for f in detection_data_global.get('frames', {}).values()
)
# Update metadata
detection_data_global['metadata']['processing_time'] = elapsed
detection_data_global['metadata']['total_detections'] = total_detections
detection_data_global['metadata']['avg_detections_per_frame'] = (
round(total_detections / frame_count_global, 2) if frame_count_global > 0 else 0
)
detection_data_global['metadata']['avg_time_per_frame'] = (
round(elapsed / frame_count_global, 3) if frame_count_global > 0 else 0
)
# Save data
success, _ = save_detection_data(output_file_global, detection_data_global, is_interrupted=True)
if success:
print(f"✓ Progress saved to: {output_file_global}")
print(f" Frames processed: {frame_count_global}/{total_frames_global}")
print(f" Total detections: {total_detections}")
print(f" Elapsed time: {elapsed:.1f}s")
print(f"\n💡 Run the same command again to resume from frame {frame_count_global + 1}")
print(f"{'=' * 60}\n")
sys.exit(0)
def prescan_video(video_path, model_path, output_file, save_interval=30):
"""Process video and save detection results to JSON file"""
global detection_data_global, output_file_global, frame_count_global
global total_frames_global, start_time_global
if not os.path.exists(video_path):
print(f"Error: Video file not found: {video_path}")
return False
if not os.path.exists(model_path):
print(f"Error: YOLO model not found: {model_path}")
return False
# Set up signal handler
signal.signal(signal.SIGINT, signal_handler)
# Check for existing data (resume support)
existing_data, last_processed_frame = load_existing_data(output_file)
resume_mode = existing_data is not None and last_processed_frame > 0
if resume_mode:
print(f"\n{'=' * 60}")
print(f"📂 Found existing data: {output_file}")
print(f" Last processed frame: {last_processed_frame}")
print(f"{'=' * 60}")
response = input("\nResume from last checkpoint? (Y/n): ").strip().lower()
if response == 'n':
print("Starting from beginning...")
resume_mode = False
existing_data = None
last_processed_frame = 0
else:
print("Resuming from checkpoint...")
start_time = time.time()
start_time_global = start_time
# Load YOLO model
print(f"\nLoading YOLO model from: {model_path}")
model = YOLO(model_path)
print("✓ Model loaded successfully")
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Cannot open video: {video_path}")
return False
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
total_duration = total_frames / fps if fps > 0 else 0
total_frames_global = total_frames
print(f"\nVideo Info:")
print(f" Path: {video_path}")
print(f" Resolution: {width}x{height}")
print(f" FPS: {fps:.2f}")
print(f" Total frames: {total_frames}")
print(f" Duration: {total_duration:.1f}s ({format_time(total_duration)})")
if resume_mode:
print(f" Resume from: frame {last_processed_frame + 1}")
print(f"\nOutput: {output_file}")
print(f"Auto-save interval: {save_interval} seconds")
print("=" * 60)
# Initialize or load detection data
if resume_mode and existing_data:
detection_data = existing_data
frame_count = last_processed_frame
total_detections = sum(
len(f.get('detections', []))
for f in detection_data.get('frames', {}).values()
)
# Seek to resume position
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count)
else:
# Initialize new detection data
detection_data = {
"metadata": {
"video_path": os.path.abspath(video_path),
"model_path": os.path.abspath(model_path),
"width": width,
"height": height,
"fps": fps,
"total_frames": total_frames,
"total_duration": total_duration,
"processed_at": datetime.now().isoformat(),
"auto_save_interval": save_interval,
"status": "in_progress"
},
"frames": {}
}
frame_count = 0
total_detections = 0
# Set global variables for signal handler
detection_data_global = detection_data
output_file_global = output_file
frame_count_global = frame_count
print(f"\n{'Resuming' if resume_mode else 'Starting'} video processing...")
print("💡 Press Ctrl+C to pause and save progress\n")
# Process frames
last_save_time = time.time()
auto_save_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
frame_count_global = frame_count
current_time = frame_count / fps if fps > 0 else 0
# Run YOLO detection
results = model(frame, verbose=False)
result = results[0]
detections = get_detections_list(result)
total_detections += len(detections)
# Store detection data
detection_data["frames"][str(frame_count)] = {
"frame_number": frame_count,
"time_seconds": round(current_time, 3),
"time_formatted": format_time(current_time),
"detections": detections
}
# Progress indicator
if frame_count % 100 == 0:
elapsed = time.time() - start_time
progress = (frame_count / total_frames) * 100
eta = (elapsed / frame_count) * (total_frames - frame_count) if frame_count > 0 else 0
print(f" Progress: {frame_count}/{total_frames} frames ({progress:.1f}%) - "
f"{len(detections)} objects - Elapsed: {elapsed:.1f}s, ETA: {eta:.1f}s")
# Auto-save every save_interval seconds
current_time_val = time.time()
if current_time_val - last_save_time >= save_interval:
success, file_size = save_detection_data(output_file, detection_data, is_interrupted=False, silent=True)
if success:
auto_save_count += 1
elapsed = time.time() - start_time
progress = (frame_count / total_frames) * 100
print(f" 💾 Auto-saved (#{auto_save_count}): {frame_count}/{total_frames} frames ({progress:.1f}%) - "
f"Size: {file_size / 1024:.1f} KB - Elapsed: {elapsed:.1f}s")
last_save_time = current_time_val
cap.release()
processing_time = time.time() - start_time
# Update final metadata
detection_data["metadata"]["processing_time"] = processing_time
detection_data["metadata"]["total_detections"] = total_detections
detection_data["metadata"]["avg_detections_per_frame"] = (
round(total_detections / frame_count, 2) if frame_count > 0 else 0
)
detection_data["metadata"]["avg_time_per_frame"] = (
round(processing_time / frame_count, 3) if frame_count > 0 else 0
)
detection_data["metadata"]["completed_at"] = datetime.now().isoformat()
detection_data["metadata"]["status"] = "completed"
detection_data["metadata"]["auto_save_count"] = auto_save_count
# Save final data
save_detection_data(output_file, detection_data, is_interrupted=False)
# Print summary
print(f"\n{'=' * 60}")
print(f"✓ Detection complete!")
print(f" Total frames processed: {frame_count}")
print(f" Total objects detected: {total_detections}")
print(f" Average objects per frame: {total_detections/frame_count:.2f}")
print(f" Total processing time: {processing_time:.2f} seconds")
print(f" Average time per frame: {processing_time/frame_count:.3f} seconds")
print(f" Auto-saves performed: {auto_save_count}")
print(f" Results saved to: {output_file}")
print(f" File size: {os.path.getsize(output_file) / 1024:.2f} KB")
print("=" * 60)
return True
def main():
parser = argparse.ArgumentParser(
description='Video YOLO Object Prescan - Pre-process video for object detection',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Default auto-save every 30 seconds
python %(prog)s video.mp4 yolov8n.pt
# Auto-save every 60 seconds
python %(prog)s video.mp4 yolov8n.pt --save-interval 60
# Auto-save every 15 seconds (for long videos)
python %(prog)s video.mp4 yolov8n.pt --save-interval 15
Features:
- Press Ctrl+C to pause and save progress
- Run again to resume from last checkpoint
- Auto-save at configurable intervals (default: 30 seconds)
"""
)
parser.add_argument('video_path', help='Path to video file')
parser.add_argument('model_path', help='Path to YOLO model file')
parser.add_argument('--save-interval', type=int, default=30,
help='Auto-save interval in seconds (default: 30)')
args = parser.parse_args()
# Generate output filename
video_dir = os.path.dirname(args.video_path)
video_name = os.path.splitext(os.path.basename(args.video_path))[0]
output_file = os.path.join(video_dir, f"{video_name}.yolo.json")
# Validate save interval
if args.save_interval < 5:
print("Warning: Save interval too small (minimum 5 seconds). Using 5 seconds.")
save_interval = 5
elif args.save_interval > 300:
print("Warning: Save interval too large (maximum 300 seconds). Using 300 seconds.")
save_interval = 300
else:
save_interval = args.save_interval
# Run prescan
success = prescan_video(args.video_path, args.model_path, output_file, save_interval)
if not success:
print("\n✗ Error during detection processing")
sys.exit(1)
print("\n✓ Video YOLO prescan completed successfully!")
if __name__ == "__main__":
main()