import asyncio from typing import Optional from enum import Enum class QueueState(Enum): STOPPED = 0 IDLE = 1 POPPING = 2 # As in: Popping actions from the queue and performing them class ActionQueue: def __init__(self, idle_action=None) -> None: self.queued_actions: list = [] self.idle_action: Optional[any] = None self.state = QueueState.STOPPED self.background_task = None if idle_action: self.set_idle_action(idle_action) async def run_queue(self): while len(self.queued_actions) > 0: self.state = QueueState.POPPING action = self.queued_actions.pop(0) action[0](*(action[1]), **(action[2])) self.state = QueueState.IDLE self.idle_action() async def stop_queue(self): if self.background_task is None: return self.state = QueueState.STOPPED self.background_task.cancel() async def restart_queue(self): await self.stop_queue() self.background_task = asyncio.create_task(self.run_queue()) async def add_action_to_queue(self, action, *args, **kwargs): self.queued_actions.append((action, args, kwargs)) if self.state == QueueState.IDLE: await self.restart_queue() async def set_idle_action(self, action): self.idle_action = action if self.state == QueueState.IDLE: await self.restart_queue()