From f8ee93a9c26c94d866b4d3f5373f96a593984c45 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 5 Sep 2025 15:07:04 +0200 Subject: [PATCH] Multiprocessing --- src/label_episodes.py | 92 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 4 deletions(-) diff --git a/src/label_episodes.py b/src/label_episodes.py index 53826be..cd5fedc 100644 --- a/src/label_episodes.py +++ b/src/label_episodes.py @@ -1,5 +1,7 @@ +from concurrent.futures import ProcessPoolExecutor, as_completed import numpy as np import cv2 +import logging def normalize_image(image: np.ndarray) -> np.ndarray: return cv2.resize(image, (160*2,90*2)) @@ -117,14 +119,14 @@ def compare_episode_to_images(path_to_episode: str, images: list[np.ndarray]) -> def match_episodes_to_references(episodes: list[str], references: dict[str, list[np.ndarray]]) -> dict[str, dict[str, float]]: results = {} - + # Normalize references print("Normalizing reference images ...") for ref in references.keys(): for i in range(len(references[ref])): references[ref][i] = normalize_image(references[ref][i]) # cv2.imwrite(f"{i}.png", references[ref][i]) - + # Compare to episodes for episode in episodes: print(f"Processing: {episode}") @@ -132,11 +134,93 @@ def match_episodes_to_references(episodes: list[str], references: dict[str, list for ref, images in references.items(): print(f" -- Reference {ref}") results[episode][ref] = compare_episode_to_images(episode, images) - + print(results[episode]) - + return results + +def _match_episode_batch( + frames: list[np.ndarray], references: dict[str, list[np.ndarray]], step_size: int +) -> dict[str, float]: + """Worker function: compare a batch of frames to all references, return local max per reference.""" + local_results = {ref: 0.0 for ref in references.keys()} + for frame in frames: + for ref, ref_images in references.items(): + for ref_img in ref_images: + score = compare_images(frame, ref_img) + local_results[ref] = max(local_results[ref], score) + return local_results + + +def efficient_episode_matching( + path_to_episode: str, + references: dict[str, list[np.ndarray]], + batch_size: int = 1440, + step_size: int = 24, +) -> dict[str, float]: + """_summary_ + + Args: + path_to_episode (str): _description_ + references (dict[str, list[np.ndarray]]): _description_ + batch_size (int, optional): Batch size in frame count for multicore frame matching. At least step_size+1. Defaults to 1440, 24fps for 1 minute. + step_size (int, optional): Number of frames to skip on intial match-step. Defaults to 24. + + Returns: + dict[str, float]: _description_ + """ + + logging.info(f"Processing [{path_to_episode}]") + + # Load and prepare episode + video = cv2.VideoCapture(path_to_episode) + frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT) + logging.info(f"Loading and preprocessing [{frame_count}] frames ...") + + normalized_frames = [] + while True: + success, frame = video.read() + if not success: + break + + normalized_frames.append(normalize_image(frame)) + + video.release() + + # Match frames with references + # Split frames into batches + batches = [ + normalized_frames[i : i + batch_size + step_size - 1] + for i in range(0, len(normalized_frames), batch_size) + ] + + # Parallel processing + logging.info(f"Matching [{len(batches)}] batches ...") + results: dict[str, float] = {ref: 0.0 for ref in references.keys()} + with ProcessPoolExecutor() as executor: + futures = [ + executor.submit(_match_episode_batch, batch, references, step_size) + for batch in batches + ] + for i, future in enumerate(as_completed(futures)): + logging.info(f"[{i}/{len(batches)}] batches done") + local_results = future.result() + # merge max results + for ref in results.keys(): + results[ref] = max(results[ref], local_results[ref]) + + del normalized_frames # Free memory + + logging.info(f"Finished [{path_to_episode}]") + results_msg = "Results" + for ref, score in results.items(): + results_msg += f"\n{ref}\t{score:.4f}" + logging.info(results_msg) + + return results + + if __name__ == "__main__": episodes = [ "./episodes/B1_t05.mkv",