From d21ceb6a71904f8c1eb39550239256012d9e59be Mon Sep 17 00:00:00 2001 From: Maximilian Giller Date: Thu, 18 Sep 2025 03:57:14 +0200 Subject: [PATCH] Some refactoring and implemented renaming --- src/file_classifier.py | 46 ++--------------- src/main.py | 36 ++++--------- src/models.py | 50 ++++++++++++++++-- src/{match_episodes.py => show.py} | 83 ++++++++++++++++++++++++++++++ 4 files changed, 142 insertions(+), 73 deletions(-) rename src/{match_episodes.py => show.py} (80%) diff --git a/src/file_classifier.py b/src/file_classifier.py index 6f951d6..641f6fd 100644 --- a/src/file_classifier.py +++ b/src/file_classifier.py @@ -1,51 +1,13 @@ import logging import imdbinfo as imdb -from models import PathInfo, FileInfo, PathCategory, FileCategory +from models import PathInfo, PathCategory from pathlib import Path -import os import re -import math import tmdb -def classify_show(info: PathInfo) -> PathInfo: - # Gather meta information for identifying episodes - episode_durations: set[int] = set() - for ep in imdb.get_all_episodes(info.imdb.imdb_id): - if ep.duration: - episode_durations.add(int(ep.duration / 60)) - - logging.debug(episode_durations) - - # Go over all files - count = 0 - for root, dirs, files in os.walk(info.path): - for filename in files: - filepath = Path(os.path.join(root, filename)) - file = FileInfo(filepath, info) - - if file.path.suffix != ".mkv" or file.duration_in_seconds is None: - continue - if ( - math.floor(file.duration_in_seconds / 60) in episode_durations - or math.ceil(file.duration_in_seconds / 60) in episode_durations - ): - print(f"{filename} {file.duration_in_seconds / 60} EPISODE") - count += 1 - else: - print(f"{filename} {file.duration_in_seconds / 60}") - - logging.info(f"Identified [{count}] episodes.") - return info - - -def classify_movie(info: PathInfo) -> PathInfo: - logging.error(f"Movie classification not yet implemented.") - return info - - -def classify_files(path: str) -> PathInfo | None: +def identify_path(path: str) -> PathInfo | None: p = Path(path) # Extract title and year @@ -95,11 +57,9 @@ def classify_files(path: str) -> PathInfo | None: if imdb_entry.kind == "tvSeries": info.category = PathCategory.SHOW logging.info(f"Path identified as containing SHOW.") - info = classify_show(info) elif imdb_entry.kind == "movie": info.category = PathCategory.MOVIE logging.info(f"Path identified as containing MOVIE.") - info = classify_movie(info) else: info.category = PathCategory.UNKNOWN logging.error( @@ -110,7 +70,7 @@ def classify_files(path: str) -> PathInfo | None: if __name__ == "__main__": - results = classify_files("/home/max/Media Library/testing/The Mentalist (2008)") + results = identify_path("/home/max/Media Library/testing/The Mentalist (2008)") print(results) print("Done.") diff --git a/src/main.py b/src/main.py index 10ad6f2..deeed16 100644 --- a/src/main.py +++ b/src/main.py @@ -1,16 +1,17 @@ from pathlib import Path -from file_classifier import classify_files -from match_episodes import match_episodes_to_references +from file_classifier import identify_path +from show import process_show import argparse import logging +import os +import shutil from models import PathCategory -import tmdb def main(args: argparse.Namespace): - info = classify_files(args.input) + info = identify_path(args.input) if info is None: logging.error("Could not classify files.") @@ -22,29 +23,14 @@ def main(args: argparse.Namespace): ) return - # ==== Process SHOW ==== - if info.episodes is None: - logging.error( - "Episodes could not be identified, no reference matching possible." - ) - return + process_show(info) - if info.tmdb_id is None: - logging.error("TMDB entry not identified, cannot find reference images.") - return - - # Match episodes to references - references = tmdb.download_episode_images(info.tmdb_id) - matches = match_episodes_to_references( - [str(f.path.absolute()) for f in info.episodes], references.flatten() - ) - - # Set new episode names - # TODO: Resolve matching results - # Rename files - # TODO: Rename files - + info.output = info.path.name + for file in info.files: + os.makedirs(os.path.dirname(file.new_path), exist_ok=True) + shutil.move(file.path, file.new_path) + logging.info(f"Finished processing [{info.path}].") diff --git a/src/models.py b/src/models.py index 36a61bf..752c34b 100644 --- a/src/models.py +++ b/src/models.py @@ -136,6 +136,15 @@ class FileInfo: video_stream: dict | None = None """Meta information about the first video stream found.""" + season: str | None = None + """What season does this file belong to. Otherwise None.""" + + episode: str | None = None + """What episode does this file belong to. Otherwise None.""" + + output: str | None = None + """Relative path between parent and new file path.""" + @property def video_bitrate(self) -> float | None: """Bitrate of video in bps (bits per second).""" @@ -167,12 +176,30 @@ class FileInfo: @property def new_path(self) -> Path: """New Path.""" - return Path( - path.join( - self.parent_path.path, - f"{self.new_file_name}{self.path.suffix}", + file_path = self.path.relative_to(self.parent_path.path) + if self.new_file_name: + file_path = self.new_file_name + elif self.category == FileCategory.EPISODE and self.season and self.episode: + try: + season = int(self.season) + season = f"{season:02}" + except: + season = self.season + try: + episode = int(self.episode) + episode = f"{episode:02}" + except: + episode = self.episode + + file_path = path.join( + f"Season {season}", + f"{self.parent_path.path.name} S{season}E{episode}{self.path.suffix}", ) - ) + + if self.output: + file_path = path.join(self.output, file_path) + + return Path(path.join(self.parent_path.path, file_path)) def read_metadata(self) -> dict: """Reads metadata using ffprobe.""" @@ -221,6 +248,19 @@ class PathInfo: files: list[FileInfo] = field(default_factory=list) """List of all files in the path.""" + _output: str | None = None + """Change output directory for child items.""" + + @property + def output(self) -> str | None: + return self._output + + @output.setter + def output(self, value: str | None): + self._output = value + for f in self.files: + f.output = value + def get_files_by_category(self, category: FileInfo): """Get all files of a specific category.""" return filter(lambda f: f.category == category, self.files) diff --git a/src/match_episodes.py b/src/show.py similarity index 80% rename from src/match_episodes.py rename to src/show.py index a10c264..232be40 100644 --- a/src/match_episodes.py +++ b/src/show.py @@ -1,9 +1,13 @@ from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass, field +import re import numpy as np import cv2 import logging +from models import PathInfo +import tmdb + def normalize_image(image: np.ndarray) -> np.ndarray: return cv2.resize(image, (160 * 2, 90 * 2)) @@ -236,6 +240,85 @@ def match_episodes_to_references( return results +def process_show(info: PathInfo) -> PathInfo: + # Gather meta information for identifying episodes + episode_durations: set[int] = set() + for ep in imdb.get_all_episodes(info.imdb.imdb_id): + if ep.duration: + episode_durations.add(int(ep.duration / 60)) + + logging.debug(episode_durations) + + # Go over all files + count = 0 + for root, dirs, files in os.walk(info.path): + for filename in files: + filepath = Path(os.path.join(root, filename)) + file = FileInfo(filepath, info) + + if file.path.suffix != ".mkv" or file.duration_in_seconds is None: + continue + if ( + math.floor(file.duration_in_seconds / 60) in episode_durations + or math.ceil(file.duration_in_seconds / 60) in episode_durations + ): + print(f"{filename} {file.duration_in_seconds / 60} EPISODE") + count += 1 + else: + print(f"{filename} {file.duration_in_seconds / 60}") + + logging.info(f"Identified [{count}] episodes.") + + + + + + + + + + + if info.episodes is None: + logging.error( + "Episodes could not be identified, no reference matching possible." + ) + return info + + if info.tmdb_id is None: + logging.error("TMDB entry not identified, cannot find reference images.") + return info + + # Match episodes to references + references = tmdb.download_episode_images(info.tmdb_id) + matches = match_episodes_to_references( + [ + str(f.path.absolute()) + for f in info.episodes + if f.episode is None or f.season is None + ], + references.flatten(), + ) + logging.info(matches) + + # Set new episode names + if not matches.perfect_match or matches.reference_by_episode is None: + logging.error("Episodes not a perfect matching.") + return info + + logging.info("Converting matching results to filenames.") + pattern = re.compile(r"^S\[(?P[^\]]+)\]E\[(?P[^\]]+)\]$") + for ep in info.episodes: + season_episode = matches.reference_by_episode[str(ep.path.absolute())] + m = pattern.match(season_episode) + if m is None: + raise BaseException( + f"Could not match reference Season/Episode tag [{season_episode}]." + ) + ep.season = m.group("season") + ep.episode = m.group("episode") + return info + + if __name__ == "__main__": logger = logging.getLogger() logger.setLevel(logging.DEBUG)