#!/usr/bin/env python3 """ Video YOLO Object Prescan - Pre-process video for object detection Saves detection results to .yolo.json file Features: - Pause/Resume: Press Ctrl+C to pause and save progress - Resume from checkpoint: Automatically continues from last frame - Auto-save: Configurable auto-save interval (default: 30 seconds) Usage: python video_yolo_object_prescan.py [--save-interval SECONDS] Examples: # Default auto-save every 30 seconds python video_yolo_object_prescan.py video.mp4 yolov8n.pt # Auto-save every 60 seconds python video_yolo_object_prescan.py video.mp4 yolov8n.pt --save-interval 60 # Auto-save every 15 seconds (for long videos) python video_yolo_object_prescan.py video.mp4 yolov8n.pt --save-interval 15 """ import cv2 import sys import os import json import time import signal import argparse from datetime import datetime from ultralytics import YOLO YOLO_NAMES = [ "person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "sofa", "pottedplant", "bed", "diningtable", "toilet", "tvmonitor", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush", ] def format_time(seconds): """Format seconds to HH:MM:SS""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{secs:02d}" def get_detections_list(result): """Extract detection info as list of dicts""" detections = [] if result.boxes is None: return detections boxes = result.boxes.xyxy.cpu().numpy() confidences = result.boxes.conf.cpu().numpy() class_ids = result.boxes.cls.cpu().numpy().astype(int) for box, conf, class_id in zip(boxes, confidences, class_ids): x1, y1, x2, y2 = box class_name = YOLO_NAMES[class_id] if class_id < len(YOLO_NAMES) else "unknown" detections.append({ 'class_id': int(class_id), 'class_name': class_name, 'confidence': float(conf), 'x1': float(x1), 'y1': float(y1), 'x2': float(x2), 'y2': float(y2) }) return detections def load_existing_data(output_file): """Load existing detection data from file""" if not os.path.exists(output_file): return None, 0 try: with open(output_file, 'r', encoding='utf-8') as f: data = json.load(f) # Get last processed frame number frames = data.get('frames', {}) if frames: last_frame = max(int(k) for k in frames.keys()) return data, last_frame except Exception as e: print(f"Warning: Could not load existing file: {e}") return None, 0 def save_detection_data(output_file, detection_data, is_interrupted=False, silent=False): """Save detection data to JSON file""" try: # Update metadata if 'metadata' in detection_data: detection_data['metadata']['last_saved_at'] = datetime.now().isoformat() if is_interrupted: detection_data['metadata']['status'] = 'interrupted' else: detection_data['metadata']['status'] = 'in_progress' # Write to file with open(output_file, 'w', encoding='utf-8') as f: json.dump(detection_data, f, indent=2, ensure_ascii=False) if not silent: return True, os.path.getsize(output_file) return True, 0 except Exception as e: print(f"Error saving data: {e}") return False, 0 # Global variables for signal handling detection_data_global = None output_file_global = None frame_count_global = 0 total_frames_global = 0 start_time_global = None def signal_handler(signum, frame): """Handle Ctrl+C to pause and save progress""" global detection_data_global, output_file_global, frame_count_global global total_frames_global, start_time_global print(f"\n\n{'=' * 60}") print("āø PAUSED - Saving progress...") print(f"{'=' * 60}") if detection_data_global and output_file_global: # Calculate stats elapsed = time.time() - start_time_global if start_time_global else 0 total_detections = sum( len(f.get('detections', [])) for f in detection_data_global.get('frames', {}).values() ) # Update metadata detection_data_global['metadata']['processing_time'] = elapsed detection_data_global['metadata']['total_detections'] = total_detections detection_data_global['metadata']['avg_detections_per_frame'] = ( round(total_detections / frame_count_global, 2) if frame_count_global > 0 else 0 ) detection_data_global['metadata']['avg_time_per_frame'] = ( round(elapsed / frame_count_global, 3) if frame_count_global > 0 else 0 ) # Save data success, _ = save_detection_data(output_file_global, detection_data_global, is_interrupted=True) if success: print(f"āœ“ Progress saved to: {output_file_global}") print(f" Frames processed: {frame_count_global}/{total_frames_global}") print(f" Total detections: {total_detections}") print(f" Elapsed time: {elapsed:.1f}s") print(f"\nšŸ’” Run the same command again to resume from frame {frame_count_global + 1}") print(f"{'=' * 60}\n") sys.exit(0) def prescan_video(video_path, model_path, output_file, save_interval=30): """Process video and save detection results to JSON file""" global detection_data_global, output_file_global, frame_count_global global total_frames_global, start_time_global if not os.path.exists(video_path): print(f"Error: Video file not found: {video_path}") return False if not os.path.exists(model_path): print(f"Error: YOLO model not found: {model_path}") return False # Set up signal handler signal.signal(signal.SIGINT, signal_handler) # Check for existing data (resume support) existing_data, last_processed_frame = load_existing_data(output_file) resume_mode = existing_data is not None and last_processed_frame > 0 if resume_mode: print(f"\n{'=' * 60}") print(f"šŸ“‚ Found existing data: {output_file}") print(f" Last processed frame: {last_processed_frame}") print(f"{'=' * 60}") response = input("\nResume from last checkpoint? (Y/n): ").strip().lower() if response == 'n': print("Starting from beginning...") resume_mode = False existing_data = None last_processed_frame = 0 else: print("Resuming from checkpoint...") start_time = time.time() start_time_global = start_time # Load YOLO model print(f"\nLoading YOLO model from: {model_path}") model = YOLO(model_path) print("āœ“ Model loaded successfully") # Open video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Cannot open video: {video_path}") return False fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) total_duration = total_frames / fps if fps > 0 else 0 total_frames_global = total_frames print(f"\nVideo Info:") print(f" Path: {video_path}") print(f" Resolution: {width}x{height}") print(f" FPS: {fps:.2f}") print(f" Total frames: {total_frames}") print(f" Duration: {total_duration:.1f}s ({format_time(total_duration)})") if resume_mode: print(f" Resume from: frame {last_processed_frame + 1}") print(f"\nOutput: {output_file}") print(f"Auto-save interval: {save_interval} seconds") print("=" * 60) # Initialize or load detection data if resume_mode and existing_data: detection_data = existing_data frame_count = last_processed_frame total_detections = sum( len(f.get('detections', [])) for f in detection_data.get('frames', {}).values() ) # Seek to resume position cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count) else: # Initialize new detection data detection_data = { "metadata": { "video_path": os.path.abspath(video_path), "model_path": os.path.abspath(model_path), "width": width, "height": height, "fps": fps, "total_frames": total_frames, "total_duration": total_duration, "processed_at": datetime.now().isoformat(), "auto_save_interval": save_interval, "status": "in_progress" }, "frames": {} } frame_count = 0 total_detections = 0 # Set global variables for signal handler detection_data_global = detection_data output_file_global = output_file frame_count_global = frame_count print(f"\n{'Resuming' if resume_mode else 'Starting'} video processing...") print("šŸ’” Press Ctrl+C to pause and save progress\n") # Process frames last_save_time = time.time() auto_save_count = 0 while True: ret, frame = cap.read() if not ret: break frame_count += 1 frame_count_global = frame_count current_time = frame_count / fps if fps > 0 else 0 # Run YOLO detection results = model(frame, verbose=False) result = results[0] detections = get_detections_list(result) total_detections += len(detections) # Store detection data detection_data["frames"][str(frame_count)] = { "frame_number": frame_count, "time_seconds": round(current_time, 3), "time_formatted": format_time(current_time), "detections": detections } # Progress indicator if frame_count % 100 == 0: elapsed = time.time() - start_time progress = (frame_count / total_frames) * 100 eta = (elapsed / frame_count) * (total_frames - frame_count) if frame_count > 0 else 0 print(f" Progress: {frame_count}/{total_frames} frames ({progress:.1f}%) - " f"{len(detections)} objects - Elapsed: {elapsed:.1f}s, ETA: {eta:.1f}s") # Auto-save every save_interval seconds current_time_val = time.time() if current_time_val - last_save_time >= save_interval: success, file_size = save_detection_data(output_file, detection_data, is_interrupted=False, silent=True) if success: auto_save_count += 1 elapsed = time.time() - start_time progress = (frame_count / total_frames) * 100 print(f" šŸ’¾ Auto-saved (#{auto_save_count}): {frame_count}/{total_frames} frames ({progress:.1f}%) - " f"Size: {file_size / 1024:.1f} KB - Elapsed: {elapsed:.1f}s") last_save_time = current_time_val cap.release() processing_time = time.time() - start_time # Update final metadata detection_data["metadata"]["processing_time"] = processing_time detection_data["metadata"]["total_detections"] = total_detections detection_data["metadata"]["avg_detections_per_frame"] = ( round(total_detections / frame_count, 2) if frame_count > 0 else 0 ) detection_data["metadata"]["avg_time_per_frame"] = ( round(processing_time / frame_count, 3) if frame_count > 0 else 0 ) detection_data["metadata"]["completed_at"] = datetime.now().isoformat() detection_data["metadata"]["status"] = "completed" detection_data["metadata"]["auto_save_count"] = auto_save_count # Save final data save_detection_data(output_file, detection_data, is_interrupted=False) # Print summary print(f"\n{'=' * 60}") print(f"āœ“ Detection complete!") print(f" Total frames processed: {frame_count}") print(f" Total objects detected: {total_detections}") print(f" Average objects per frame: {total_detections/frame_count:.2f}") print(f" Total processing time: {processing_time:.2f} seconds") print(f" Average time per frame: {processing_time/frame_count:.3f} seconds") print(f" Auto-saves performed: {auto_save_count}") print(f" Results saved to: {output_file}") print(f" File size: {os.path.getsize(output_file) / 1024:.2f} KB") print("=" * 60) return True def main(): parser = argparse.ArgumentParser( description='Video YOLO Object Prescan - Pre-process video for object detection', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Default auto-save every 30 seconds python %(prog)s video.mp4 yolov8n.pt # Auto-save every 60 seconds python %(prog)s video.mp4 yolov8n.pt --save-interval 60 # Auto-save every 15 seconds (for long videos) python %(prog)s video.mp4 yolov8n.pt --save-interval 15 Features: - Press Ctrl+C to pause and save progress - Run again to resume from last checkpoint - Auto-save at configurable intervals (default: 30 seconds) """ ) parser.add_argument('video_path', help='Path to video file') parser.add_argument('model_path', help='Path to YOLO model file') parser.add_argument('--save-interval', type=int, default=30, help='Auto-save interval in seconds (default: 30)') args = parser.parse_args() # Generate output filename video_dir = os.path.dirname(args.video_path) video_name = os.path.splitext(os.path.basename(args.video_path))[0] output_file = os.path.join(video_dir, f"{video_name}.yolo.json") # Validate save interval if args.save_interval < 5: print("Warning: Save interval too small (minimum 5 seconds). Using 5 seconds.") save_interval = 5 elif args.save_interval > 300: print("Warning: Save interval too large (maximum 300 seconds). Using 300 seconds.") save_interval = 300 else: save_interval = args.save_interval # Run prescan success = prescan_video(args.video_path, args.model_path, output_file, save_interval) if not success: print("\nāœ— Error during detection processing") sys.exit(1) print("\nāœ“ Video YOLO prescan completed successfully!") if __name__ == "__main__": main()