Implemented most of the engine loop

This commit is contained in:
Maximilian Giller 2025-09-21 16:39:24 +02:00
parent 5b5e6c697a
commit 4b598679f6
2 changed files with 82 additions and 15 deletions

View file

@ -1,7 +1,16 @@
from decimal import Decimal from decimal import Decimal
import logging import logging
from models import EngineFixture, EngineTrack, Flickr from models import (
EngineFixture,
EngineKeyframe,
EngineTrack,
Flickr,
Interpolation,
Keyframe,
KeyframeValue,
Parameter,
)
class Engine: class Engine:
@ -25,6 +34,7 @@ class Engine:
def load_tracks(self, track_ids: list[str]): def load_tracks(self, track_ids: list[str]):
"""Loads specific tracks and resets the render paremeters.""" """Loads specific tracks and resets the render paremeters."""
self.render_time: Decimal = Decimal.from_float(0) self.render_time: Decimal = Decimal.from_float(0)
self.is_running: bool = True
self.tracks = [ self.tracks = [
EngineTrack(t) EngineTrack(t)
for s in self.flickr.sequences for s in self.flickr.sequences
@ -45,18 +55,56 @@ class Engine:
def next(self, step_size: Decimal): def next(self, step_size: Decimal):
"""Calculate light values for next step in time. Step size in seconds.""" """Calculate light values for next step in time. Step size in seconds."""
self.render_time += step_size if not self.is_running:
raise BaseException("Cannot render next step, already finished all tracks.")
self.render_time += step_size
still_running = False # Keep track if there are still running tracks
# Render all tracks
for t in self.tracks: for t in self.tracks:
if t.next is None: if not t.is_running:
continue continue
still_running = True
# Move keyframes forward # Move keyframes forward
t.step(step_size) t.step(step_size)
# TODO: Interpolate values if t.current is None:
raise BaseException(f"Track has no current keyframes.")
# Interpolate over keyframes of track
for k in t.current:
next_value: KeyframeValue = None
if t.next:
try:
next_value = t.next.get(k.parameter).value
except:
pass # next_value already None by default
interpolated_value = self.interpolate_value(k.value, next_value, k.interpolation)
# Update values of relevant fixtures
for f in self.fixtures.values():
if f.fixture.id in t.track.fixture_ids:
f.set(k.parameter, interpolated_value)
# Update running state
self.is_running = still_running
if not self.is_running:
logging.info("All tracks finished rendering.")
def propagate(self): def propagate(self):
"""Forward new fixture states to APIs.""" """Forward new fixture states to APIs."""
for f in self.fixtures.values(): for f in self.fixtures.values():
api = self.apis[f.fixture.api] api = self.apis[f.fixture.api]
# TODO: Pass update to API # TODO: Pass update to API
def interpolate_value(
self, current: KeyframeValue, next: KeyframeValue, interpolation: Interpolation
) -> KeyframeValue:
"""Interpolate between two keyframe values and return the interpolated value."""
cur_frame: Keyframe = current.get(parameter)
if
# TODO
return current.get(parameter).value

View file

@ -1,10 +1,14 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
import logging import logging
from typing import Iterator
from dataclasses_json import dataclass_json from dataclasses_json import dataclass_json
from enum import Enum from enum import Enum
from decimal import Decimal from decimal import Decimal
KeyframeValue = None | str | int | float | bool
@dataclass_json @dataclass_json
@dataclass @dataclass
class SpaceVector: class SpaceVector:
@ -100,7 +104,7 @@ class EngineFixture:
topic: str | None = None topic: str | None = None
"""Topic fixture currently displays.""" """Topic fixture currently displays."""
def set(self, parameter: Parameter, value: None | bool | float | int | str): def set(self, parameter: Parameter, value: KeyframeValue):
"""Set value of fixture based on parameter.""" """Set value of fixture based on parameter."""
match parameter: match parameter:
case Parameter.ON: case Parameter.ON:
@ -160,7 +164,7 @@ class Keyframe:
parameter: Parameter parameter: Parameter
"""Parameter targeted by frame value.""" """Parameter targeted by frame value."""
value: None | bool | float | int | str = None value: KeyframeValue = None
"""Targeted state value reached by reaching the keyframe.""" """Targeted state value reached by reaching the keyframe."""
interpolation: Interpolation = Interpolation.STEP interpolation: Interpolation = Interpolation.STEP
@ -177,6 +181,16 @@ class EngineKeyframe:
delta: Decimal delta: Decimal
"""Remaining time in seconds relative to previous keyframes.""" """Remaining time in seconds relative to previous keyframes."""
def __iter__(self) -> Iterator[Keyframe]:
return iter(self.keyframes)
def get(self, parameter: Parameter) -> Keyframe:
"""Get keyframe for parameter found among the keyframes. Assumed to exist only once. Exception raised, if parameter not defined by keyframes."""
for k in self.keyframes:
if k.parameter == parameter:
return k
raise ValueError(f"Parameter [{parameter}] not given among keyframes.")
@dataclass_json @dataclass_json
@dataclass @dataclass
@ -206,8 +220,8 @@ class EngineTrack:
keyframes: list[EngineKeyframe] = field(default_factory=list) keyframes: list[EngineKeyframe] = field(default_factory=list)
"""Relative keyframes of track.""" """Relative keyframes of track."""
previous: EngineKeyframe | None = None current: EngineKeyframe | None = None
"""Keyframes before next keyframe.""" """Keyframes before the next keyframes. Trigger time has been reached."""
@property @property
def next(self) -> EngineKeyframe | None: def next(self) -> EngineKeyframe | None:
@ -216,6 +230,11 @@ class EngineTrack:
return None return None
return self.keyframes[0] return self.keyframes[0]
@property
def is_running(self) -> bool:
"""Indicates wether end of track has been reached."""
return self.next is not None
def __post_init__(self): def __post_init__(self):
# Calculate relative keyframes # Calculate relative keyframes
sorted_keyframes = sorted(self.track.keyframes, key=lambda k: k.time) sorted_keyframes = sorted(self.track.keyframes, key=lambda k: k.time)
@ -238,11 +257,11 @@ class EngineTrack:
"""Step keyframes forward by given step.""" """Step keyframes forward by given step."""
while self.next and step_size > 0: while self.next and step_size > 0:
self.next.delta -= step_size self.next.delta -= step_size
if self.next.delta >= 0: if self.next.delta > 0:
return return
self.previous = self.keyframes.pop(0) self.current = self.keyframes.pop(0)
step_size = self.previous.delta.copy_abs() step_size = self.current.delta.copy_abs()
@dataclass_json @dataclass_json
@ -280,7 +299,7 @@ class Flickr:
def save(self, path: str) -> int: def save(self, path: str) -> int:
"""Write flickr data to file.""" """Write flickr data to file."""
with open(path, "x", encoding="UTF-8") as fp: with open(path, "x", encoding="UTF-8") as fp:
return fp.write(self.to_json()) return fp.write(self.to_json()) # type: ignore
@staticmethod @staticmethod
def load(path: str) -> "Flickr": def load(path: str) -> "Flickr":
@ -288,7 +307,7 @@ class Flickr:
with open(path, "r", encoding="UTF-8") as fp: with open(path, "r", encoding="UTF-8") as fp:
json = "\n".join(fp.readlines()) json = "\n".join(fp.readlines())
return Flickr.schema().loads(json) return Flickr.schema().loads(json) # type: ignore
if __name__ == "__main__": if __name__ == "__main__":
@ -322,7 +341,7 @@ if __name__ == "__main__":
"89idf", "89idf",
"Ceiling flicker", "Ceiling flicker",
["98iwd"], ["98iwd"],
[Keyframe(Decimal.from_float(0), "bri", 1)], [Keyframe(Decimal.from_float(0), Parameter.BRIGHTNESS, 1)],
) )
], ],
) )
@ -332,4 +351,4 @@ if __name__ == "__main__":
f.save("test.json") f.save("test.json")
a = Flickr.load("test.json") a = Flickr.load("test.json")
print(f.to_json()) print(f.to_json()) # type: ignore