Testing action queue

This commit is contained in:
Maximilian Giller 2023-12-22 19:09:18 +01:00
parent fb98e38e00
commit 123ee48529
2 changed files with 41 additions and 2 deletions

View file

@ -23,10 +23,10 @@ class ActionQueue:
while len(self.queued_actions) > 0: while len(self.queued_actions) > 0:
self.state = QueueState.POPPING self.state = QueueState.POPPING
action = self.queued_actions.pop(0) action = self.queued_actions.pop(0)
action[0](*(action[1]), **(action[2])) await action[0](*(action[1]), **(action[2]))
self.state = QueueState.IDLE self.state = QueueState.IDLE
self.idle_action() await self.idle_action()
async def stop_queue(self): async def stop_queue(self):
if self.background_task is None: if self.background_task is None:

39
src/testing.py Normal file
View file

@ -0,0 +1,39 @@
import asyncio
from handler.action_queue import ActionQueue
async def idle_a():
while True:
print("Idleling ...")
await asyncio.sleep(1)
async def action_a():
print("Starting action A")
await asyncio.sleep(1)
print("Ended action A")
async def action_b():
print("Starting action B")
await asyncio.sleep(1)
print("Ended action B")
async def main():
queue = ActionQueue(idle_a)
await asyncio.sleep(7)
await queue.add_action_to_queue(action_a)
await queue.add_action_to_queue(action_b)
await asyncio.sleep(10)
asyncio.run(main)