Multiprocessing

This commit is contained in:
Maximilian Giller 2025-09-05 15:07:04 +02:00
parent 99d25c5430
commit f8ee93a9c2

View file

@ -1,5 +1,7 @@
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np import numpy as np
import cv2 import cv2
import logging
def normalize_image(image: np.ndarray) -> np.ndarray: def normalize_image(image: np.ndarray) -> np.ndarray:
return cv2.resize(image, (160*2,90*2)) return cv2.resize(image, (160*2,90*2))
@ -117,14 +119,14 @@ def compare_episode_to_images(path_to_episode: str, images: list[np.ndarray]) ->
def match_episodes_to_references(episodes: list[str], references: dict[str, list[np.ndarray]]) -> dict[str, dict[str, float]]: def match_episodes_to_references(episodes: list[str], references: dict[str, list[np.ndarray]]) -> dict[str, dict[str, float]]:
results = {} results = {}
# Normalize references # Normalize references
print("Normalizing reference images ...") print("Normalizing reference images ...")
for ref in references.keys(): for ref in references.keys():
for i in range(len(references[ref])): for i in range(len(references[ref])):
references[ref][i] = normalize_image(references[ref][i]) references[ref][i] = normalize_image(references[ref][i])
# cv2.imwrite(f"{i}.png", references[ref][i]) # cv2.imwrite(f"{i}.png", references[ref][i])
# Compare to episodes # Compare to episodes
for episode in episodes: for episode in episodes:
print(f"Processing: {episode}") print(f"Processing: {episode}")
@ -132,11 +134,93 @@ def match_episodes_to_references(episodes: list[str], references: dict[str, list
for ref, images in references.items(): for ref, images in references.items():
print(f" -- Reference {ref}") print(f" -- Reference {ref}")
results[episode][ref] = compare_episode_to_images(episode, images) results[episode][ref] = compare_episode_to_images(episode, images)
print(results[episode]) print(results[episode])
return results return results
def _match_episode_batch(
frames: list[np.ndarray], references: dict[str, list[np.ndarray]], step_size: int
) -> dict[str, float]:
"""Worker function: compare a batch of frames to all references, return local max per reference."""
local_results = {ref: 0.0 for ref in references.keys()}
for frame in frames:
for ref, ref_images in references.items():
for ref_img in ref_images:
score = compare_images(frame, ref_img)
local_results[ref] = max(local_results[ref], score)
return local_results
def efficient_episode_matching(
path_to_episode: str,
references: dict[str, list[np.ndarray]],
batch_size: int = 1440,
step_size: int = 24,
) -> dict[str, float]:
"""_summary_
Args:
path_to_episode (str): _description_
references (dict[str, list[np.ndarray]]): _description_
batch_size (int, optional): Batch size in frame count for multicore frame matching. At least step_size+1. Defaults to 1440, 24fps for 1 minute.
step_size (int, optional): Number of frames to skip on intial match-step. Defaults to 24.
Returns:
dict[str, float]: _description_
"""
logging.info(f"Processing [{path_to_episode}]")
# Load and prepare episode
video = cv2.VideoCapture(path_to_episode)
frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT)
logging.info(f"Loading and preprocessing [{frame_count}] frames ...")
normalized_frames = []
while True:
success, frame = video.read()
if not success:
break
normalized_frames.append(normalize_image(frame))
video.release()
# Match frames with references
# Split frames into batches
batches = [
normalized_frames[i : i + batch_size + step_size - 1]
for i in range(0, len(normalized_frames), batch_size)
]
# Parallel processing
logging.info(f"Matching [{len(batches)}] batches ...")
results: dict[str, float] = {ref: 0.0 for ref in references.keys()}
with ProcessPoolExecutor() as executor:
futures = [
executor.submit(_match_episode_batch, batch, references, step_size)
for batch in batches
]
for i, future in enumerate(as_completed(futures)):
logging.info(f"[{i}/{len(batches)}] batches done")
local_results = future.result()
# merge max results
for ref in results.keys():
results[ref] = max(results[ref], local_results[ref])
del normalized_frames # Free memory
logging.info(f"Finished [{path_to_episode}]")
results_msg = "Results"
for ref, score in results.items():
results_msg += f"\n{ref}\t{score:.4f}"
logging.info(results_msg)
return results
if __name__ == "__main__": if __name__ == "__main__":
episodes = [ episodes = [
"./episodes/B1_t05.mkv", "./episodes/B1_t05.mkv",