diff --git a/rupprecht/rupprecht.py b/rupprecht/rupprecht.py index 11e36ffbd93c2c849e9c0ee64ee5145564eb5ec2..c55709cec742dccd7cd3d3aec76e9b515dcf7e8a 100644 --- a/rupprecht/rupprecht.py +++ b/rupprecht/rupprecht.py @@ -75,123 +75,78 @@ class Rupprecht: self.rupprecht.text("Status:Closed... StuStaNet.e.V....") class RupprechtInterface: - class SerialProtocol(asyncio.Protocol): - def __init__(self, master): - self.master = master - self.buffer = [] + def __init__(self, serial_port, baudrate=115200, loop=None): + self.loop = loop or asyncio.get_event_loop() - def connection_made(self, transport): - #transport.serial.rts = False - self.master.transport = transport - def data_received(self, data): - for d in data: - if chr(d) == '\n': - self.master.received(self.buffer) - self.buffer = [] - else: - self.buffer.append(chr(d)) + self.button_callbacks = []; - def connection_lost(self, exc): - print('port closed, exiting') - self.master.loop.stop() + coro = serial_asyncio.open_serial_connection(loop=self.loop, url=serial_port, + baudrate=baudrate) + self.reader, self.writer = self.loop.run_until_complete(coro) + self.button_queue = asyncio.Queue(loop=loop) + self.data_queue = asyncio.Queue(loop=loop) + self.receive_task = self.loop.create_task(self.receive_messages()) + self.callback_task = self.loop.create_task(self.handlecallbacks()) - def __init__(self, serial_port, baudrate=115200, loop=None): - if loop == None: - loop = asyncio.get_event_loop() - self.loop = loop - self.serial_port = serial_port - self.baudrate = baudrate - self.transport = None - self.response_pending = False - self._queue = [] - self.response_event = asyncio.Event(loop=loop) - self.ready = False - self.ready_event = asyncio.Event(loop=loop) - self.button_callbacks = []; - self.last_result = '' - coro = serial_asyncio.create_serial_connection(loop, - (lambda: RupprechtInterface.SerialProtocol(self)), - serial_port, baudrate=baudrate) - loop.run_until_complete(coro) - self.echo_allowed = True + + async def teardown(self): + try: + self.receive_task.cancel() + await self.receive_task + except asyncio.CancelledError: + pass + + try: + self.callback_task.cancel() + await self.callback_task + except asyncio.CancelledError: + pass + + async def receive_messages(self): + while True: + line = await self.reader.readline() + line = line.strip() + if str.startswith(line, "BUTTON"): + await self.button_queue.put(line[len("BUTTON"):]) + else: + await self.data_queue.put(line) + + async def handlecallbacks(self): + await self.send_raw("CONFIG ECHO OFF") + # filter out the last echo + await self.data_queue.get() + + while True: + button_msg = await self.button_queue.get() + try: + buttons = json.loads(button_msg) + except json.JSONDecodeError: + print("Invalid json: ", button_msg) + continue + + for callback in self.button_callbacks: + try: + callback(buttons) + except Exception as e: + print("Exception while executing callback for", button_msg , e) def subscribe_button(self, callback): self.button_callbacks.append(callback) - async def send_raw(self, msg, expect_response=True, force=False): + async def send_raw(self, msg, expect_response=True): """ Send the raw line to the serial stream This will wait, until the preceding message has been processed. """ - if not self.ready: - print("Waiting for client to become ready") - await self.ready_event.wait() - print("Client is now ready") - - if self.echo_allowed and "CONFIG ECHO" not in msg: - await self.send_raw("CONFIG ECHO OFF") - - # The queue is the message stack that has to be processed - self._queue.append(msg) - while self._queue: - self.response_event.clear() - # Await, if there was another message in the pipeline that has not - # yet been processed - - print("Sending", msg) - # If the queue has been processed by somebody else in the meantime - if not self._queue: - break - self.response_pending = True - next_msg = self._queue.pop(0) - - # Now actually send the data - self.last_command = next_msg - self.transport.write(next_msg.encode('ascii')) - #append a newline if the sender hasnt already - if next_msg[-1] != '\n': - self.transport.write(b'\n') - - await self.response_event.wait() - self.response_event.clear() - try: - if str.startswith(self.last_result, "ERROR"): - raise RupprechtCommandError(msg) - elif str.startswith(self.last_result, "OK"): - if self.last_command == "CONFIG ECHO OFF": - self.echo_allowed = False - self.response_pending = False - self.response_event.set() - return True - else: - if not expect_response: - self.response_pending = False - return True - #raise RupprechtCommandError("Unknown result code: {}".format(self.last_result)) - except Exception as e: - print("Exception while parsing response", msg, e) - break - - def received(self, msg): - msg = ''.join(msg).strip() - if not msg: - return - print("received \033[03m{}\033[0m".format(msg)) - if str.startswith(msg, "BUTTON"): - try: - buttons = json.loads(msg[len("BUTTON"):]) - asyncio.gather(*[b(buttons) for b in self.button_callbacks], loop=self.loop) - except json.decoder.JSONDecodeError: - print("Invalid json received:", msg) - elif msg == "READY" and not self.ready: - self.ready = True - self.ready_event.set() - else: - self.last_result = msg - self.response_event.set() + + self.writer.write(msg.encode('ascii')) + await self.writer.drain() + if expect_response: + return await self.data_queue.get() + return None async def help(self): await self.send_raw("HELP")