Skip to content
Snippets Groups Projects
Commit 5cb99670 authored by Johannes Walcher's avatar Johannes Walcher
Browse files

added rupprechtinterface

parent 50896499
No related branches found
No related tags found
No related merge requests found
import asyncio
from rupprecht.rupprecht import Rupprecht
def main():
"""
Actually start the shit
"""
loop = asyncio.get_event_loop()
rup = Rupprecht()
loop.set_debug(True)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
loop.run_until_complete(rup.teardown())
if __name__ == "__main__":
main()
#!/usr/bin/python3
"""
generate packets for quigg rc switches
"""
import argparse
import serial
import asyncio
class Quigg1000:
def __init__(self, loop=None, serial=None, rupprecht=None, code=1337, subaddr=0):
"""
Create a new switch representation
The switch can either be connected to a raw-serial port (old arduino)
or it can be connected via a rupprecht-interface, that will handle all interfacing
"""
if loop == None:
loop=asyncio.get_event_loop()
self.loop = loop
self.serial = serial
self.rupprecht = rupprecht
self.code = code
self.subaddr = str(subaddr)
if serial != None and rupprecht != None:
raise ArgumentException("Only one of serial and rupprecht may be set")
async def state(self, state, debug=False):
"""
Set the switch to the value indicated by state (true or false)
"""
msg = self.__build_msg(state, debug=debug)
if debug:
print(msg)
await self.__send_msg(msg)
async def on(self, debug=False):
msg = self.__build_msg(True, debug=debug)
if debug:
print(msg)
await self.__send_msg(msg)
async def off(self, debug=False):
msg = self.__build_msg(False, debug=debug)
if debug:
print(msg)
await self.__send_msg(msg)
def __build_msg(self, state, dimm=0, debug=False):
"""
Build a quigg message.
Read the inline docs for details
state: 1 bit value: True: On, False: Off
dimm: ??
"""
# key on remote -> real addr mapping
key_lookup = {'1': 0, '2': 1, '4': 2, '3': 3}
# 12 bits set code addr
set_code = '{0:0>12b}'.format(self.code)
# 2 bit addr of rc switch
addr = '{0:0>2b}'.format(key_lookup[self.subaddr])[::-1]
# address all rf devices (not implemented)
allflag = '0'
# 1 bit state
state = '1' if int(state) else '0'
# 2 bit dimm information
dimm = '{0:0>2}'.format(dimm)
# repeat bit 13 / addr bit 0
internal = addr[0]
# build buffer with leading 1
buffer_string = '1' + set_code + addr + allflag + state + dimm + internal
# parity over last 7 bits
parity_bit = '1' if buffer_string[13:20].count('1') % 2 else '0'
# append parity bit
buffer_string += parity_bit
if debug:
print('set code:\t{}'.format(set_code))
print('addr:\t\t{}'.format(addr))
print('state:\t\t{}'.format(state))
print('dimm:\t\t{}'.format(dimm))
print('internal:\t{}'.format(internal))
print('parity range:\t{}'.format(buffer_string[13:20]))
print('parity bit:\t{}'.format(parity_bit))
print('\nbuffer string:\t{}'.format(buffer_string))
if len(buffer_string) != 21:
print('Bad packet length.')
return None
return buffer_string
async def __send_msg(self, msg):
if self.rupprecht:
await self.rupprecht.light(msg)
if self.serial:
try:
self.ser.write(str.encode(msg))
except serial.SerialException:
print('Serial communication failed.')
def main():
parser = argparse.ArgumentParser(description='Remote COntrol rc switches')
parser.add_argument('addr', help='number of the switch')
parser.add_argument('state', help='1 (on) or 0 (off)')
parser.add_argument('--set-code', '-c', help='set-code-number',
default=2816, type=int)
parser.add_argument('--dimm', help='dimm value', default=0)
parser.add_argument('--device',
help='serial device path/name (e.g. COM11 or /dev/ttyUSB3)',
default='/dev/ttyUSB3')
parser.add_argument('--debug', '-d', help='enable debug output',
default=False, action='store_true')
args = parser.parse_args()
try:
s = serial.Serial(args.device, 9600)
except serial.SerialException:
print('Serial communication failed.')
exit()
r = Quigg1000(serial=s, loop=loop, code=args.set_code, subaddr=args.addr)
loop=asyncio.get_event_loop()
loop.run_until_complete(r.state(args.state, args.debug))
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import asyncio
import json
import serial_asyncio
import serial
from hauptbahnhof import Hauptbahnhof
from . import rcswitch
class RupprechtError:
pass
class RupprechtCommandError(RupprechtError):
def __init__(self, command):
self.command = command
def __repr__(self):
return "RupprechtCommandError: {}".format(self.command)
class Rupprecht:
"""
Implement serial connection to rupprecht
"""
def __init__(self, loop=None):
if not loop:
loop = asyncio.get_event_loop()
self.loop = loop
self.hbf = Hauptbahnhof(loop)
self.hbf.subscribe('/haspa/led', self.command_led)
self.space_is_open = False
try:
r = RupprechtInterface("/dev/ttyACM0")
except serial.SerialException:
r = RupprechtInterface("/tmp/rupprechtemulator")
r.subscribe_button(self.button_message)
self.imposed_ids = {
'rupprecht-table': rcswitch.Quigg1000(code=1337, subaddr=1, rupprecht=r),
'rupprecht-alarm': rcswitch.Quigg1000(code=1337, subaddr=2, rupprecht=r),
'rupprecht-fan': rcswitch.Quigg1000(code=1337, subaddr=3, rupprecht=r)
}
async def teardown(self):
await self.hbf.teardown()
async def command_led(self, source, payload, msg):
for devid, value in msg.items():
try:
if value == 0:
self.imposed_ids[devid].off()
elif value == 1023:
self.imposed_ids[devid].on()
except KeyError:
pass
async def button_message(self, msg):
if msg['open'] and not self.space_is_open:
self.space_is_open = True
await self.hbf.publish('/haspa/status', 'open')
elif not msg['open'] and self.space_is_open:
self.space_is_open = False
await self.hbf.publish('/haspa/status', 'closed')
class RupprechtInterface:
class SerialProtocol(asyncio.Protocol):
def __init__(self, master):
self.master = master
self.buffer = []
def connection_made(self, transport):
#transport.serial.rts = False
self.master.transport = transport
def data_received(self, data):
for d in data:
if chr(d) == '\n':
self.master.received(self.buffer)
self.buffer = []
else:
self.buffer.append(chr(d))
def connection_lost(self, exc):
print('port closed, exiting')
self.master.loop.stop()
def __init__(self, serial_port, baudrate=115200, loop=None):
if loop == None:
loop = asyncio.get_event_loop()
self.loop = loop
self.serial_port = serial_port
self.baudrate = baudrate
self.transport = None
self.response_pending = False
self._queue = []
self.response_event = asyncio.Event(loop=loop)
self.ready = False
self.ready_event = asyncio.Event(loop=loop)
self.button_callbacks = [];
self.last_result = ''
coro = serial_asyncio.create_serial_connection(loop,
(lambda: RupprechtInterface.SerialProtocol(self)),
serial_port, baudrate=baudrate)
loop.run_until_complete(coro)
self.echo_allowed = True
def subscribe_button(self, callback):
self.button_callbacks.append(callback)
async def send_raw(self, msg, expect_response=True, force=False):
"""
Send the raw line to the serial stream
This will wait, until the preceding message has been processed.
"""
if not self.ready:
print("Waiting for client to become ready")
await self.ready_event.wait()
print("Client is now ready")
if self.echo_allowed and "CONFIG ECHO" not in msg:
await self.send_raw("CONFIG ECHO OFF")
# The queue is the message stack that has to be processed
self._queue.append(msg)
while self._queue:
self.response_event.clear()
# Await, if there was another message in the pipeline that has not
# yet been processed
if not force:
while self.response_pending:
print("before sending '{}' we need a response for '{}'".format(msg, self.last_command))
await self.response_event.wait()
self.response_event.clear()
print("Sending", msg)
# If the queue has been processed by somebody else in the meantime
if not self._queue:
break
self.response_pending = True
next_msg = self._queue.pop(0)
# Now actually send the data
self.last_command = next_msg
self.transport.write(next_msg.encode('ascii'))
#append a newline if the sender hasnt already
if next_msg[-1] != '\n':
self.transport.write(b'\n')
await self.response_event.wait()
self.response_event.clear()
try:
if str.startswith(self.last_result, "ERROR"):
raise RupprechtCommandError(msg)
elif str.startswith(self.last_result, "OK"):
if self.last_command == "CONFIG ECHO OFF":
self.echo_allowed = False
self.response_pending = False
self.response_event.set()
return True
else:
if not expect_response:
self.response_pending = False
return True
#raise RupprechtCommandError("Unknown result code: {}".format(self.last_result))
except Exception as e:
print("Exception while parsing response", msg, e)
break
def received(self, msg):
msg = ''.join(msg).strip()
if not msg:
return
print("received \033[03m{}\033[0m".format(msg))
if str.startswith(msg, "BUTTON"):
try:
self.buttons = json.loads(msg[len("BUTTON"):])
asyncio.gather(*[b(self.buttons) for b in self.button_callbacks], loop=self.loop)
except json.decoder.JSONDecodeError:
print("Invalid json received:", msg)
elif msg == "READY" and not self.ready:
self.ready = True
self.ready_event.set()
else:
self.last_result = msg
self.response_event.set()
async def help(self):
await self.send_raw("HELP")
async def config(self, key, value):
await self.send_raw("CONFIG {} {}".format(key, value))
async def text(self, msg):
await self.send_raw("TEXT {}".format(msg), expect_response=False)
async def button(self):
await self.send_raw("BUTTON", expect_response=False)
async def light(self, raw_msg):
await self.send_raw("LIGHT {}".format(raw_msg), expect_response=False)
import asyncio
from hauptbahnhof import Hauptbahnhof
from rupprecht.rupprecht import Rupprecht
messages = asyncio.Queue()
async def on_message(client, message, _):
print("Got message: %s"%message)
await messages.put(message)
async def test(loop):
testbf = Hauptbahnhof(loop=loop)
await asyncio.sleep(2)
# Now everythin should be set up
await testbf.publish("/haspa/led", {'rupprecht-table':1023})
# This module is not sending any status back
try:
await testbf.teardown()
except asyncio.CancelledError:
pass
def main():
loop = asyncio.get_event_loop()
lib = Rupprecht(loop=loop)
result = loop.run_until_complete(test(loop))
loop.run_until_complete(lib.teardown())
if result:
exit(0)
else:
exit(1)
if __name__=="__main__":
main()
import time
while True:
time.sleep(30)
print("READY")
for i in range(10):
time.sleep(30)
print("OK")
socat -d PTY,link=/tmp/rupprechtemulator,echo=0 "EXEC:python3 rupprechtemulator.py ...,pty,raw",echo=0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment