Some refactoring and implemented renaming

This commit is contained in:
Maximilian Giller 2025-09-18 03:57:14 +02:00
parent fd9652bdec
commit d21ceb6a71
4 changed files with 142 additions and 73 deletions

View file

@ -1,51 +1,13 @@
import logging import logging
import imdbinfo as imdb import imdbinfo as imdb
from models import PathInfo, FileInfo, PathCategory, FileCategory from models import PathInfo, PathCategory
from pathlib import Path from pathlib import Path
import os
import re import re
import math
import tmdb import tmdb
def classify_show(info: PathInfo) -> PathInfo: def identify_path(path: str) -> PathInfo | None:
# Gather meta information for identifying episodes
episode_durations: set[int] = set()
for ep in imdb.get_all_episodes(info.imdb.imdb_id):
if ep.duration:
episode_durations.add(int(ep.duration / 60))
logging.debug(episode_durations)
# Go over all files
count = 0
for root, dirs, files in os.walk(info.path):
for filename in files:
filepath = Path(os.path.join(root, filename))
file = FileInfo(filepath, info)
if file.path.suffix != ".mkv" or file.duration_in_seconds is None:
continue
if (
math.floor(file.duration_in_seconds / 60) in episode_durations
or math.ceil(file.duration_in_seconds / 60) in episode_durations
):
print(f"{filename} {file.duration_in_seconds / 60} EPISODE")
count += 1
else:
print(f"{filename} {file.duration_in_seconds / 60}")
logging.info(f"Identified [{count}] episodes.")
return info
def classify_movie(info: PathInfo) -> PathInfo:
logging.error(f"Movie classification not yet implemented.")
return info
def classify_files(path: str) -> PathInfo | None:
p = Path(path) p = Path(path)
# Extract title and year # Extract title and year
@ -95,11 +57,9 @@ def classify_files(path: str) -> PathInfo | None:
if imdb_entry.kind == "tvSeries": if imdb_entry.kind == "tvSeries":
info.category = PathCategory.SHOW info.category = PathCategory.SHOW
logging.info(f"Path identified as containing SHOW.") logging.info(f"Path identified as containing SHOW.")
info = classify_show(info)
elif imdb_entry.kind == "movie": elif imdb_entry.kind == "movie":
info.category = PathCategory.MOVIE info.category = PathCategory.MOVIE
logging.info(f"Path identified as containing MOVIE.") logging.info(f"Path identified as containing MOVIE.")
info = classify_movie(info)
else: else:
info.category = PathCategory.UNKNOWN info.category = PathCategory.UNKNOWN
logging.error( logging.error(
@ -110,7 +70,7 @@ def classify_files(path: str) -> PathInfo | None:
if __name__ == "__main__": if __name__ == "__main__":
results = classify_files("/home/max/Media Library/testing/The Mentalist (2008)") results = identify_path("/home/max/Media Library/testing/The Mentalist (2008)")
print(results) print(results)
print("Done.") print("Done.")

View file

@ -1,16 +1,17 @@
from pathlib import Path from pathlib import Path
from file_classifier import classify_files from file_classifier import identify_path
from match_episodes import match_episodes_to_references from show import process_show
import argparse import argparse
import logging import logging
import os
import shutil
from models import PathCategory from models import PathCategory
import tmdb
def main(args: argparse.Namespace): def main(args: argparse.Namespace):
info = classify_files(args.input) info = identify_path(args.input)
if info is None: if info is None:
logging.error("Could not classify files.") logging.error("Could not classify files.")
@ -22,29 +23,14 @@ def main(args: argparse.Namespace):
) )
return return
# ==== Process SHOW ==== process_show(info)
if info.episodes is None:
logging.error(
"Episodes could not be identified, no reference matching possible."
)
return
if info.tmdb_id is None:
logging.error("TMDB entry not identified, cannot find reference images.")
return
# Match episodes to references
references = tmdb.download_episode_images(info.tmdb_id)
matches = match_episodes_to_references(
[str(f.path.absolute()) for f in info.episodes], references.flatten()
)
# Set new episode names
# TODO: Resolve matching results
# Rename files # Rename files
# TODO: Rename files info.output = info.path.name
for file in info.files:
os.makedirs(os.path.dirname(file.new_path), exist_ok=True)
shutil.move(file.path, file.new_path)
logging.info(f"Finished processing [{info.path}].") logging.info(f"Finished processing [{info.path}].")

View file

@ -136,6 +136,15 @@ class FileInfo:
video_stream: dict | None = None video_stream: dict | None = None
"""Meta information about the first video stream found.""" """Meta information about the first video stream found."""
season: str | None = None
"""What season does this file belong to. Otherwise None."""
episode: str | None = None
"""What episode does this file belong to. Otherwise None."""
output: str | None = None
"""Relative path between parent and new file path."""
@property @property
def video_bitrate(self) -> float | None: def video_bitrate(self) -> float | None:
"""Bitrate of video in bps (bits per second).""" """Bitrate of video in bps (bits per second)."""
@ -167,12 +176,30 @@ class FileInfo:
@property @property
def new_path(self) -> Path: def new_path(self) -> Path:
"""New Path.""" """New Path."""
return Path( file_path = self.path.relative_to(self.parent_path.path)
path.join( if self.new_file_name:
self.parent_path.path, file_path = self.new_file_name
f"{self.new_file_name}{self.path.suffix}", elif self.category == FileCategory.EPISODE and self.season and self.episode:
try:
season = int(self.season)
season = f"{season:02}"
except:
season = self.season
try:
episode = int(self.episode)
episode = f"{episode:02}"
except:
episode = self.episode
file_path = path.join(
f"Season {season}",
f"{self.parent_path.path.name} S{season}E{episode}{self.path.suffix}",
) )
)
if self.output:
file_path = path.join(self.output, file_path)
return Path(path.join(self.parent_path.path, file_path))
def read_metadata(self) -> dict: def read_metadata(self) -> dict:
"""Reads metadata using ffprobe.""" """Reads metadata using ffprobe."""
@ -221,6 +248,19 @@ class PathInfo:
files: list[FileInfo] = field(default_factory=list) files: list[FileInfo] = field(default_factory=list)
"""List of all files in the path.""" """List of all files in the path."""
_output: str | None = None
"""Change output directory for child items."""
@property
def output(self) -> str | None:
return self._output
@output.setter
def output(self, value: str | None):
self._output = value
for f in self.files:
f.output = value
def get_files_by_category(self, category: FileInfo): def get_files_by_category(self, category: FileInfo):
"""Get all files of a specific category.""" """Get all files of a specific category."""
return filter(lambda f: f.category == category, self.files) return filter(lambda f: f.category == category, self.files)

View file

@ -1,9 +1,13 @@
from concurrent.futures import ProcessPoolExecutor, as_completed from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass, field from dataclasses import dataclass, field
import re
import numpy as np import numpy as np
import cv2 import cv2
import logging import logging
from models import PathInfo
import tmdb
def normalize_image(image: np.ndarray) -> np.ndarray: def normalize_image(image: np.ndarray) -> np.ndarray:
return cv2.resize(image, (160 * 2, 90 * 2)) return cv2.resize(image, (160 * 2, 90 * 2))
@ -236,6 +240,85 @@ def match_episodes_to_references(
return results return results
def process_show(info: PathInfo) -> PathInfo:
# Gather meta information for identifying episodes
episode_durations: set[int] = set()
for ep in imdb.get_all_episodes(info.imdb.imdb_id):
if ep.duration:
episode_durations.add(int(ep.duration / 60))
logging.debug(episode_durations)
# Go over all files
count = 0
for root, dirs, files in os.walk(info.path):
for filename in files:
filepath = Path(os.path.join(root, filename))
file = FileInfo(filepath, info)
if file.path.suffix != ".mkv" or file.duration_in_seconds is None:
continue
if (
math.floor(file.duration_in_seconds / 60) in episode_durations
or math.ceil(file.duration_in_seconds / 60) in episode_durations
):
print(f"{filename} {file.duration_in_seconds / 60} EPISODE")
count += 1
else:
print(f"{filename} {file.duration_in_seconds / 60}")
logging.info(f"Identified [{count}] episodes.")
if info.episodes is None:
logging.error(
"Episodes could not be identified, no reference matching possible."
)
return info
if info.tmdb_id is None:
logging.error("TMDB entry not identified, cannot find reference images.")
return info
# Match episodes to references
references = tmdb.download_episode_images(info.tmdb_id)
matches = match_episodes_to_references(
[
str(f.path.absolute())
for f in info.episodes
if f.episode is None or f.season is None
],
references.flatten(),
)
logging.info(matches)
# Set new episode names
if not matches.perfect_match or matches.reference_by_episode is None:
logging.error("Episodes not a perfect matching.")
return info
logging.info("Converting matching results to filenames.")
pattern = re.compile(r"^S\[(?P<season>[^\]]+)\]E\[(?P<episode>[^\]]+)\]$")
for ep in info.episodes:
season_episode = matches.reference_by_episode[str(ep.path.absolute())]
m = pattern.match(season_episode)
if m is None:
raise BaseException(
f"Could not match reference Season/Episode tag [{season_episode}]."
)
ep.season = m.group("season")
ep.episode = m.group("episode")
return info
if __name__ == "__main__": if __name__ == "__main__":
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)