import asyncio import logging class ActionQueue: def __init__(self) -> None: self.queued_actions: asyncio.Queue = asyncio.Queue() self.idle_action: tuple = (None,[],{}) self.queue_task = asyncio.create_task(self.run_queue()) self.idle_action_task = None def __del___(self): self.queue_task.cancel() if self.idle_action_task is not None: self.idle_action_task.cancel() async def run_queue(self): while True: try: if self.queued_actions.empty() and self.idle_action is not None and self.idle_action[0] is not None: self.idle_action_task = asyncio.create_task(self.idle_action[0](*(self.idle_action[1]), **(self.idle_action[2]))) action = await self.queued_actions.get() if self.idle_action_task is not None: self.idle_action_task.cancel() self.idle_action_task = None if action is not None and action[0] is not None: # If none -> Is idle update try: await action[0](*(action[1]), **(action[2])) except Exception as ex: logging.exception("Something went wrong during execution of action.", ex) except Exception as ex: logging.exception("Something went wrong in queue task.", ex) async def add_action_to_queue(self, action, *args, **kwargs): await self.queued_actions.put((action, args, kwargs)) async def set_idle_action(self, action, *args, **kwargs): self.idle_action = (action, args, kwargs) await self.queued_actions.put(None)