From 049e0685066ec0e91372d55850ca762d1e7a90f6 Mon Sep 17 00:00:00 2001 From: Michael Loipfuehrer <michael.loipfuehrer@stusta.de> Date: Sun, 8 Mar 2020 19:40:17 +0100 Subject: [PATCH] move to non async --- .gitignore | 3 + babel/__main__.py | 3 +- babel/babel.py | 97 ++++++-------- conf/arplist.conf | 1 - conf/arplist.json | 3 + babel/config.json => conf/babel.json | 0 conf/hauptbahnhof.conf | 1 - conf/hauptbahnhof.json | 3 + hackerman/__main__.py | 13 +- hackerman/hackerman.py | 168 ++++++++++++------------ hackerman/test.py | 44 +------ hauptbahnhof/hauptbahnhof.py | 183 ++++++++------------------- mpd/__main__.py | 10 +- mpd/mpd.py | 41 +++--- nsa/__main__.py | 10 +- nsa/nsa.py | 69 +++++----- requirements.txt | 6 +- 17 files changed, 266 insertions(+), 389 deletions(-) delete mode 100644 conf/arplist.conf create mode 100644 conf/arplist.json rename babel/config.json => conf/babel.json (100%) delete mode 100644 conf/hauptbahnhof.conf create mode 100644 conf/hauptbahnhof.json diff --git a/.gitignore b/.gitignore index 0d6cb60..f191884 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,9 @@ auto-save-list tramp .\#* +# Jetbrains +.idea + # Org-mode .org-id-locations *_archive diff --git a/babel/__main__.py b/babel/__main__.py index 9591f9f..a7eec22 100644 --- a/babel/__main__.py +++ b/babel/__main__.py @@ -1,6 +1,6 @@ -import asyncio from babel.babel import Babel + def main(): """ Actually start the shit @@ -8,5 +8,6 @@ def main(): babel = Babel() babel.run() + if __name__ == "__main__": main() diff --git a/babel/babel.py b/babel/babel.py index 7dddbb7..e87e9f9 100644 --- a/babel/babel.py +++ b/babel/babel.py @@ -1,36 +1,22 @@ -import paho.mqtt.client as mqtt import json +from hauptbahnhof import Hauptbahnhof + # maximum recursion depth in translation -MAX_TTL=5 +MAX_TTL = 5 + -class Babel: +class Babel(Hauptbahnhof): def __init__(self, config="/etc/hauptbahnhof/realms.json"): - with open(config) as cfgfile: - if not self.parse_config(cfgfile): - raise RuntimeError("Error in config.") + super().__init__('babel') + self._load_config('babel') self.dfnode_state = {} - self.mqtt = mqtt.Client() - self.mqtt.on_message = self.on_message - self.mqtt.on_connect = lambda mqtt, obj, flags, rc: \ - self.subscribe_to_config(self.config) - self.mqtt.on_message = self.on_message - #self.mqtt.on_log = self.on_log - - self.connect() - - def connect(self): - self.mqtt.connect("knecht.stusta.de", 1883) - - def on_log(self, mqttc, obj, level, string): - print("mqtt: ", string) - - def run(self): - self.mqtt.loop_forever() + def on_connect(self, client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + self.subscribe_to_config() - def parse_config(self, config): - self.config = json.load(config) + def parse_config(self): # now process every path in the final translation result if it is parsable success = True @@ -45,24 +31,29 @@ class Babel: cfg = self.topicconfig(target) except KeyError: success = False - print("Cannot translate output path: ", target, "for translating from ", source) + self.log.error(f"Cannot translate output path: " + f"{target} for translating from {source}") continue try: if cfg['type'] == "dfnode": - _ = cfg['topic'] # require topic - _ = cfg['espid'] # require espid - index = cfg['index'] # 0 <= index < 8 + _ = cfg['topic'] # require topic + _ = cfg['espid'] # require espid + index = cfg['index'] # 0 <= index < 8 if int(index) < 0 or int(index) >= 8: - print("index must be between (including) 0 and (excluding) 8") + self.log.error( + "index must be between (including) 0 " + "and (excluding) 8") success = False elif cfg['type'] == "delock": - _ = cfg['topic'] # require topic + _ = cfg['topic'] # require topic except KeyError as e: - print("could not find expected element", e, "in path ", target) + self.log.error( + f"could not find expected " + f"element {e} in path {target}") success = False except ValueError as e: - print("could not convert to int", e) + self.log.error(f"could not convert to int {e}") success = False return success @@ -76,36 +67,32 @@ class Babel: return [path] return data - def subscribe_to_config(self, config): - try: - print("Connected") - # subscribe to all basechannels - baseconfig = self.make_baseconfig_tree(config['basechannels'], "") - - # subscribe to all translated topics - baseconfig += config['translation'].keys() - print("Subcribing to topics:\n\t", "\n\t".join(baseconfig)) - self.mqtt.subscribe([ - (topic, 0) for topic in baseconfig]) - except Exception as e: - print('subscribe', e) + def subscribe_to_config(self): + baseconfig = self.make_baseconfig_tree(self.config['basechannels'], "") + + # subscribe to all translated topics + baseconfig += self.config['translation'].keys() + self.log.info("Subcribing to topics: " + ", ".join(baseconfig)) + self._mqtt.subscribe([(topic, 0) for topic in baseconfig]) - def on_message(self, mqtt, userdata, msg): - print("msg", msg.topic, ":", msg.payload) + def on_message(self, msg): + self.log.debug(f"msg {msg.topic}: {msg.payload}") try: # TODO sanitize Payload: # has to be 0 - 100 integer or string payload = int(msg.payload) self.handle_message(msg.topic, payload, MAX_TTL) except Exception as e: - print('on_message', e) + self.log.warn( + f'Received invalid payload in topic {msg.topic}.' + f'Got error {e}.') def handle_message(self, topic, payload, ttl): if ttl <= 0: raise RuntimeError("ttl exceeded") if topic in self.config['translation']: for subtopic in self.config['translation'][topic]: - self.handle_message(subtopic, payload, ttl-1) + self.handle_message(subtopic, payload, ttl - 1) else: self.send_message(topic, payload) @@ -117,7 +104,7 @@ class Babel: elif cfg['type'] == "delock": self.send_delock(cfg, topic, payload) else: - print("Config had unknown type", cfg['type']) + self.log.warn(f"Config had unknown type {cfg['type']}") def topicconfig(self, topic): try: @@ -136,20 +123,20 @@ class Babel: self.dfnode_state[cfg['topic']][int(cfg['index'])] = payload - #print("Sending ", self.dfnode_state[cfg['topic']], "to", cfg['topic']) + # print("Sending ", self.dfnode_state[cfg['topic']], "to", cfg['topic']) payload = {cfg['espid']: self.dfnode_state[cfg['topic']]} - self.mqtt.publish(cfg['topic'], json.dumps(payload)) - + self.publish(cfg['topic'], json.dumps(payload)) def send_delock(self, cfg, topic, payload): msg = "OFF" if payload == 0 else "ON" - self.mqtt.publish(cfg['topic'], msg) + self.publish(cfg['topic'], msg) def test(): babel = Babel(config="./config.json") babel.run() + if __name__ == "__main__": test() diff --git a/conf/arplist.conf b/conf/arplist.conf deleted file mode 100644 index 61fe71b..0000000 --- a/conf/arplist.conf +++ /dev/null @@ -1 +0,0 @@ -space_network="10.150.9.0/24" diff --git a/conf/arplist.json b/conf/arplist.json new file mode 100644 index 0000000..177f5e0 --- /dev/null +++ b/conf/arplist.json @@ -0,0 +1,3 @@ +{ + "space_network": "10.150.9.0/24" +} \ No newline at end of file diff --git a/babel/config.json b/conf/babel.json similarity index 100% rename from babel/config.json rename to conf/babel.json diff --git a/conf/hauptbahnhof.conf b/conf/hauptbahnhof.conf deleted file mode 100644 index 701e02d..0000000 --- a/conf/hauptbahnhof.conf +++ /dev/null @@ -1 +0,0 @@ -host = "knecht.stusta.de" diff --git a/conf/hauptbahnhof.json b/conf/hauptbahnhof.json new file mode 100644 index 0000000..51154af --- /dev/null +++ b/conf/hauptbahnhof.json @@ -0,0 +1,3 @@ +{ + "host": "127.0.0.1" +} \ No newline at end of file diff --git a/hackerman/__main__.py b/hackerman/__main__.py index d8dc1e1..a6bc58f 100644 --- a/hackerman/__main__.py +++ b/hackerman/__main__.py @@ -1,20 +1,13 @@ -import asyncio - from hackerman.hackerman import Hackerman + def main(): """ Actually start the shit """ - loop = asyncio.get_event_loop() - hackerman = Hackerman(loop=loop) - loop.set_debug(True) - try: - loop.run_forever() - except KeyboardInterrupt: - pass + hackerman = Hackerman() + hackerman.run() - loop.run_until_complete(hackerman.teardown()) if __name__ == "__main__": main() diff --git a/hackerman/hackerman.py b/hackerman/hackerman.py index 0c26103..1b95d91 100644 --- a/hackerman/hackerman.py +++ b/hackerman/hackerman.py @@ -1,136 +1,146 @@ import asyncio +import json +import time +from json import JSONDecodeError + import requests import random from hauptbahnhof import Hauptbahnhof -class Hackerman: + +class Hackerman(Hauptbahnhof): """ Scan the local hackerspace network for new and unknown devices to send back a result """ - def __init__(self, loop=None): - if not loop: - loop = asyncio.get_event_loop() - - self.loop = loop - self.hbf = Hauptbahnhof("hackerman", loop) - self.hbf.subscribe('/haspa/status', self.command_status) - self.hbf.subscribe('/haspa/action', self.command_action) + def __init__(self): + super().__init__('hackerman') - async def teardown(self): - await self.hbf.teardown() + def on_connect(self, client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + self.subscribe('/haspa/status', self.command_status) + self.subscribe('/haspa/action', self.command_action) - async def command_status(self, client, message, _): + def command_status(self, client, userdata, msg,): """ React to a status change of the hackerspace - switch the lights, ... """ - del client try: - if message['haspa'] in ['open', 'offen', 'auf']: - await self.hbf.publish('/haspa/licht', 400) - await self.hbf.publish('/haspa/music/control', { - 'play': True - }) - elif message['haspa'] in ['close', 'zu', 'closed']: - await self.hbf.publish('/haspa/licht', 0) - await self.hbf.publish('/haspa/music/control', { - 'play': False - }) - else: - print("Haspa state undetermined: ", message['haspa']) - except KeyError: - print("/haspa/status message malformed: ", message) - - async def command_action(self, client, message, _): + message = json.loads(msg.payload) + except JSONDecodeError: + self.log.warn(f'malformed msg on topic {msg.topic}: {msg.payload}') + return + + if 'haspa' not in message: + self.log.warn(f"/haspa/status message malformed: {message}") + return + + if message['haspa'] in ['open', 'offen', 'auf']: + self.publish('/haspa/licht', 400) + self.publish('/haspa/music/control', { + 'play': True + }) + elif message['haspa'] in ['close', 'zu', 'closed']: + self.publish('/haspa/licht', 0) + self.publish('/haspa/music/control', { + 'play': False + }) + else: + self.log.warn(f"Haspa state undetermined: {message['haspa']}") + + async def command_action(self, client, userdata, msg): """ Handle actions like alarm or party """ - del client - print(message) + try: + message = json.loads(msg.payload) + except JSONDecodeError: + self.log.warn(f'malformed msg on topic {msg.topic}: {msg.payload}') + return + if 'action' in message: if message['action'] == 'alarm': - print("Performing alarm...") - await self.hbf.publish('/haspa/licht/alarm', 1) - await asyncio.sleep(2) - await self.hbf.publish('/haspa/licht/alarm', 0) + self.log.info("Performing alarm...") + self.publish('/haspa/licht/alarm', 1) + time.sleep(2) + self.publish('/haspa/licht/alarm', 0) elif message['action'] == 'strobo': for i in range(100): - await self.hbf.publish('/haspa/licht/c', 0) - await asyncio.sleep(0.05) - await self.hbf.publish('/haspa/licht/c', 1023) - await asyncio.sleep(0.03) + self.publish('/haspa/licht/c', 0) + time.sleep(0.05) + self.publish('/haspa/licht/c', 1023) + time.sleep(0.03) elif message['action'] == 'party': - await self.hbf.publish('/haspa/licht', 0) - await self.hbf.publish('/haspa/licht/c', 0) - await self.hbf.publish('/haspa/licht/w', 0) + self.publish('/haspa/licht', 0) + self.publish('/haspa/licht/c', 0) + self.publish('/haspa/licht/w', 0) delay = 0.05 sounds = [ - #('56', 3.5), # führer - ('97', 4.7), # sonnenschein - ('63', 5), #epische musik - ('110', 3.7),#dota + # ('56', 3.5), # führer + ('97', 4.7), # sonnenschein + ('63', 5), # epische musik + ('110', 3.7), # dota ('113', 9), # skrillex ] sound = random.choice(sounds) - if sound[0] == '97': await asyncio.sleep(1) for i in range(0, 300): if i == 100: requests.get("https://bot.stusta.de/set/" + sound[0]) - await self.hbf.publish('/haspa/licht/w', i * 10/3) - await asyncio.sleep(0.01) + self.publish('/haspa/licht/w', i * 10 / 3) + time.sleep(0.01) elif sound[0] == '113': requests.get("https://bot.stusta.de/set/" + sound[0]) for i in range(2): - await asyncio.sleep(1.5) - await self.hbf.publish('/haspa/licht/1/c', 1023) - await self.hbf.publish('/haspa/licht/1/c', 1023) - await self.hbf.publish('/haspa/licht/alarm', 1) + time.sleep(1.5) + self.publish('/haspa/licht/1/c', 1023) + self.publish('/haspa/licht/1/c', 1023) + self.publish('/haspa/licht/alarm', 1) - await asyncio.sleep(0.01) - await self.hbf.publish('/haspa/licht/c', 0) + time.sleep(0.01) + self.publish('/haspa/licht/c', 0) for o in range(40): - await self.hbf.publish('/haspa/licht/c', 20 * o) - await asyncio.sleep(0.01) + self.publish('/haspa/licht/c', 20 * o) + time.sleep(0.01) - await self.hbf.publish('/haspa/licht/c', 0) + self.publish('/haspa/licht/c', 0) - #await asyncio.sleep(1.49) + # time.sleep(1.49) for o in range(20): - await self.hbf.publish('/haspa/licht/1/c', 1023) - await self.hbf.publish('/haspa/licht/3/c', 0) + self.publish('/haspa/licht/1/c', 1023) + self.publish('/haspa/licht/3/c', 0) - await asyncio.sleep(0.01) + time.sleep(0.01) - await self.hbf.publish('/haspa/licht/1/c', 0) - await self.hbf.publish('/haspa/licht/3/c', 1023) - await asyncio.sleep(0.01) + self.publish('/haspa/licht/1/c', 0) + self.publish('/haspa/licht/3/c', 1023) + time.sleep(0.01) else: requests.get("https://bot.stusta.de/set/" + sound[0]) - for _ in range(int(sound[1]/(delay * 4))): - await self.hbf.publish('/haspa/licht/3/c', 0) - await self.hbf.publish('/haspa/licht/1/c', 1023) - await asyncio.sleep(delay) + for _ in range(int(sound[1] / (delay * 4))): + self.publish('/haspa/licht/3/c', 0) + self.publish('/haspa/licht/1/c', 1023) + time.sleep(delay) - await self.hbf.publish('/haspa/licht/1/c', 0) - await self.hbf.publish('/haspa/licht/4/c', 1023) - await asyncio.sleep(delay) + self.publish('/haspa/licht/1/c', 0) + self.publish('/haspa/licht/4/c', 1023) + time.sleep(delay) - await self.hbf.publish('/haspa/licht/4/c', 0) - await self.hbf.publish('/haspa/licht/2/c', 1023) - await asyncio.sleep(delay) + self.publish('/haspa/licht/4/c', 0) + self.publish('/haspa/licht/2/c', 1023) + time.sleep(delay) - await self.hbf.publish('/haspa/licht/2/c', 0) - await self.hbf.publish('/haspa/licht/3/c', 1023) - await asyncio.sleep(delay) + self.publish('/haspa/licht/2/c', 0) + self.publish('/haspa/licht/3/c', 1023) + time.sleep(delay) - await self.hbf.publish('/haspa/licht', 300) + self.publish('/haspa/licht', 300) diff --git a/hackerman/test.py b/hackerman/test.py index f467543..3ee40c2 100644 --- a/hackerman/test.py +++ b/hackerman/test.py @@ -1,45 +1,15 @@ -import asyncio -from hauptbahnhof import Hauptbahnhof - -from hackerman.hackerman import Hackerman - - -messages = asyncio.Queue() -async def on_message(client, message, _): - print("Got message: %s"%message) - await messages.put(message) - - -async def test(loop): - testbf = Hauptbahnhof("test", loop=loop) - testbf.subscribe("/haspa/licht", on_message) +import time - await asyncio.sleep(2) - - await testbf.publish("/haspa/status", {'haspa':'open'}) # without blacklist - - # Now everythin should be set up - msg = await asyncio.wait_for(messages.get(), 10) # wait max 10 secs - - assert(msg['table'] == 1023) - assert(msg['ledstrip'] == 400) +from hauptbahnhof import Hauptbahnhof - try: - await testbf.teardown() - except asyncio.CancelledError: - pass def main(): - loop = asyncio.get_event_loop() - lib = Hackerman(loop=loop) + testbf = Hauptbahnhof("test", ) + + time.sleep(2) - result = loop.run_until_complete(test(loop)) - loop.run_until_complete(lib.teardown()) + testbf.publish("/haspa/status", {'haspa': 'open'}) # without blacklist - if result: - exit(0) - else: - exit(1) -if __name__=="__main__": +if __name__ == "__main__": main() diff --git a/hauptbahnhof/hauptbahnhof.py b/hauptbahnhof/hauptbahnhof.py index deb8b8e..4bbbc90 100644 --- a/hauptbahnhof/hauptbahnhof.py +++ b/hauptbahnhof/hauptbahnhof.py @@ -1,168 +1,95 @@ import json -import asyncio -import traceback -import io import sys import logging +from json import JSONDecodeError +from typing import Dict, Callable, Any, Union, Optional + +import paho.mqtt.client as mqtt -import libconf -import aiomqtt class Hauptbahnhof: """ Hauptbahnhof manager with a lot of convenience methods """ - def __init__(self, name, loop=None): + def __init__(self, name): try: idx = sys.argv.index('--confdir') self._configbase = sys.argv[idx + 1] except (ValueError, KeyError): self._configbase = '/etc/hauptbahnhof' - if not loop: - loop = asyncio.get_event_loop() - - self.loop = loop # find master config - self._config = self.config("hauptbahnhof") - - self._host = self._config['host'] - - self._subscriptions = {} - self._mqtt = None - self._mqtt_queue = [] + self.config = {} + self._load_config("hauptbahnhof") - self._mqtt_start_task = self.loop.create_task(self.start()) - self._queue = asyncio.Queue() - self._message_process_task = self.loop.create_task(self.message_processing()) - self.connected = asyncio.Event(loop=self.loop) + self._mqtt = mqtt.Client(client_id=f'hauptbahnhof-{name}') + self._mqtt.on_message = self._on_message + self._mqtt.on_connect = self.on_connect logformat = '%(name)s | %(levelname)5s | %(message)s' logging.basicConfig(format=logformat) self.log = logging.getLogger(name) - self.log.setLevel(logging.INFO) + self.log.setLevel(logging.DEBUG) + self._mqtt.enable_logger(self.log) - async def teardown(self): - """ - The last one switches off the light - """ - self._mqtt_start_task.cancel() - self._message_process_task.cancel() + def run(self): + self._connect() + self._mqtt.loop_forever() - results = await asyncio.gather(self._mqtt_start_task, - self._message_process_task, - return_exceptions=True) - for r in results: - if isinstance (r, Exception): - if not isinstance(r, asyncio.CancelledError): - traceback.print_exception(type(r), r, r.__traceback__) + def _connect(self): + self.log.debug(f'Trying to connect to broker {self.config["host"]}') + self._mqtt.connect(self.config['host']) - async def start(self): - """ - Start the mqtt locally, and when it is connected send it back - """ + def on_connect(self, client, userdata, flags, rc): + self.log.info(f'Connected to mqqt broker on {self.config["host"]}') - mqtt = aiomqtt.Client(self.loop) - mqtt.loop_start() - - mqtt.on_message = self.on_message - mqtt.on_connect = lambda client, userdata, flags, rc: self.connected.set() + def _on_message(self, client, userdata, msg): try: - await mqtt.connect(self._host) - except: - self.log.error("Could not connect to %s", self._host) - self.loop.cancel() - raise - await self.connected.wait() - - self.log.info("Successfully connected to %s", self._config['host']) - for topic in self._subscriptions: - mqtt.subscribe(topic) - - self._mqtt = mqtt - while self._mqtt_queue: - topic, msg = self._mqtt_queue.pop(0) - self.log.debug("Topic: %s", topic) - self._mqtt.publish(topic, msg) - - # Now we have mqtt available! - self._mqtt = mqtt - - async def message_processing(self): - while True: - try: - (client, userdata, msg) = await self._queue.get() - # is called when a message is received - try: - payload = msg.payload.decode('utf-8') - except UnicodeDecodeError: - continue + self.on_message(msg) + except Exception as e: + self.log.warning( + msg=f'Invalid message payload received on ' + f'topic {msg.topic}. Got error {e}') - try: - futures = self._subscriptions[msg.topic] - except KeyError: - # Received unknown message - this is strange, log and ignore - self.log.info("Received unsubscribed message on %s with content: %s"%( - msg.topic, msg.payload)) + def on_message(self, msg) -> None: + self.log.debug(f'received {msg = }') + raise NotImplementedError() - try: - payloadobj = json.loads(payload) - except json.JSONDecodeError: - self.log.info("Invalid json received: %s"%payload) - continue - except: - continue - - try: - await asyncio.gather(*[fut(client, payloadobj, msg) for fut in futures]) - except Exception: - traceback.print_exc() - continue - except asyncio.CancelledError: - break - - except Exception: - traceback.print_exc() - continue - - def on_message(self, client, userdata, msg): - self._queue.put_nowait((client, userdata, msg)) - - - def config(self, module): + def _load_config(self, module: str, not_found_ok=False) -> Dict: """ Load a config from the pile of configs ( usually in /etc/hauptbahnhof/*.conf ) """ - with io.open('%s/%s.conf'%(self._configbase, module), 'r') as f: - return libconf.load(f) - - def subscribe(self, topic, coroutine): + try: + with open(f'{self._configbase}/{module}.json', 'r') as f: + self.config.update(json.load(f)) + return self.config + except FileNotFoundError as e: + if not_found_ok: + return self.config + raise e + + def subscribe(self, topic: str, callback: Optional[Callable] = None) -> None: """ Subscribe to topic """ - try: - self._subscriptions[topic].append(coroutine) - except KeyError: - self._subscriptions[topic] = [coroutine] + self._mqtt.subscribe(topic) + if callback: + self._mqtt.message_callback_add(topic, callback) + self.log.info(f'subscribed to topic {topic}') - if self._mqtt: - self._mqtt.subscribe(topic) - - async def publish(self, topic, message): + def publish(self, topic: str, msg: Union[str, int, float, Dict]) -> None: """ Publish a message on the given topic """ - jsonmsg = json.dumps(message) - - # Maybe the system is not already online? - if self._mqtt: - await self._mqtt.publish(topic, jsonmsg).wait_for_publish() + if isinstance(msg, dict): + try: + payload = json.dumps(msg) + except JSONDecodeError as e: + self.log.error( + msg=f'Got unserializable json dict when trying to' + f'send msg to topic {topic}.') + return + self._mqtt.publish(topic, payload) else: - self._mqtt_queue.append((topic, jsonmsg)) - - def mqtt(self): - """ - Get the underlying mqtt object. For debug use only! - """ - return self._mqtt + self._mqtt.publish(topic, msg) diff --git a/mpd/__main__.py b/mpd/__main__.py index 78bc25f..88e45a3 100644 --- a/mpd/__main__.py +++ b/mpd/__main__.py @@ -1,19 +1,13 @@ -import asyncio from mpd.mpd import MPD + def main(): """ Actually start the shit """ - loop = asyncio.get_event_loop() mpd = MPD() - loop.set_debug(True) - try: - loop.run_forever() - except KeyboardInterrupt: - pass + mpd.run() - loop.run_until_complete(mpd.teardown()) if __name__ == "__main__": main() diff --git a/mpd/mpd.py b/mpd/mpd.py index d831a53..264d551 100644 --- a/mpd/mpd.py +++ b/mpd/mpd.py @@ -1,42 +1,45 @@ -import asyncio -import subprocess +import json import re +import subprocess +from json import JSONDecodeError from hauptbahnhof import Hauptbahnhof -class MPD: + +class MPD(Hauptbahnhof): """ Implement Interfacing to a locally running mpd server """ - def __init__(self, loop=None): - if not loop: - loop = asyncio.get_event_loop() - - self.loop = loop - self.hbf = Hauptbahnhof("mpd", loop) - self.hbf.subscribe('/haspa/music/control', self.command_control) - self.hbf.subscribe('/haspa/music/song', self.command_song) + def __init__(self): + super().__init__('mpd') + def on_connect(self, client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + self.subscribe('/haspa/music/control', self.command_control) + self.subscribe('/haspa/music/song', self.command_song) - async def teardown(self): - await self.hbf.teardown() + def command_control(self, client, userdata, msg): + try: + message = json.loads(msg.payload) + except JSONDecodeError: + self.log.warn(f'malformed msg on topic {msg.topic}: {msg.payload}') + return - async def command_control(self, client, message, _): if 'play' in message: if message['play']: - self.hbf.log.info("Starting music") + self.log.info("Starting music") subprocess.call(['mpc', 'play']) else: - self.hbf.log.info("Stopping music") + self.log.info("Stopping music") subprocess.call(['mpc', 'pause']) if 'volume' in message: p = re.compile(r'^[-+]?[0-9]{3}$') if p.match(message['volume']): - self.hbf.log.info("adjusting volume") + self.log.info("adjusting volume") subprocess.call(['mpc', 'volume', message['volume']]) else: - self.hbf.log.info("Will not parse volume: %s", message['volume']) + self.log.info("Will not parse volume: %s", message['volume']) - async def command_song(self, client, message, _): + async def command_song(self, client, userdata, msg): pass diff --git a/nsa/__main__.py b/nsa/__main__.py index 478e9e5..6ffa550 100644 --- a/nsa/__main__.py +++ b/nsa/__main__.py @@ -1,19 +1,13 @@ -import asyncio from nsa.nsa import NSA + def main(): """ Actually start the shit """ - loop = asyncio.get_event_loop() nsa = NSA() - loop.set_debug(True) - try: - loop.run_forever() - except KeyboardInterrupt: - pass + nsa.run() - loop.run_until_complete(nsa.teardown()) if __name__ == "__main__": main() diff --git a/nsa/nsa.py b/nsa/nsa.py index e88ac62..18798fa 100644 --- a/nsa/nsa.py +++ b/nsa/nsa.py @@ -1,58 +1,51 @@ -import asyncio +import json +import subprocess +from json import JSONDecodeError from hauptbahnhof import Hauptbahnhof -class NSA: + +class NSA(Hauptbahnhof): """ Scan the local hackerspace network for new and unknown devices to send back a result """ - def __init__(self, loop=None): - if not loop: - loop = asyncio.get_event_loop() - - self.loop = loop - self.hbf = Hauptbahnhof("nsa", loop) - self.hbf.subscribe('/haspa/nsa/scan', self.command_scan) + def __init__(self): + super().__init__('nsa') + try: + self._load_config('arplist') + except FileNotFoundError as e: + self.log.error('no arplist found') + self.config = {} - async def teardown(self): - """ - Clean up your stuff... - """ - await self.hbf.teardown() + def on_connect(self, client, userdata, flags, rc): + super().on_connect(client, userdata, flags, rc) + self.subscribe('/haspa/nsa/scan', self.command_scan) - async def command_scan(self, client, message, _): + def command_scan(self, client, userdata, msg): """ space.get_number_of_network_devices() -> int Return the number of non-stationary, connected network devices. """ try: - cfg = self.hbf.config("arplist") - except FileNotFoundError as exc: - self.hbf.log.warning("Coult not find config:%s", str(exc)) - cfg = {} - - known_devices = [] - try: - known_devices = message['blacklist'] - except (KeyError, TypeError): - known_devices = [] + message = json.loads(msg.payload) + except JSONDecodeError: + self.log.warn(f'malformed msg on topic {msg.topic}: {msg.payload}') + return - try: - known_devices += cfg['spacedevices'] - except KeyError: - self.hbf.log.warning("You might want to configure space devices") + known_devices = message.get('blacklist', []) + known_devices += self.config.get('spacedevices', []) # TODO use util/arp-scan - proc = await asyncio.create_subprocess_exec( - *['arp-scan', cfg['space_network']], - stdout=asyncio.subprocess.PIPE, - loop=self.loop) + proc = subprocess.run( + ['arp-scan', self.config['space_network']], + stdout=subprocess.PIPE, + ) - output, _ = await proc.communicate() - for line in output.decode('utf8').split('\n'): - self.hbf.log.debug(line) - output = output.decode('utf8').split() + output = proc.stdout + for line in output.decode('utf-8').split('\n'): + self.log.debug(line) + output = output.decode('utf-8').split() dev_list = [] @@ -61,4 +54,4 @@ class NSA: if line not in known_devices: dev_list.append(line) - await self.hbf.publish('/haspa/nsa/result', {'count': len(dev_list)}) + self.publish('/haspa/nsa/result', {'count': len(dev_list)}) diff --git a/requirements.txt b/requirements.txt index 37f6209..0b8bd25 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,2 @@ -libconf==1.0.1 -paho-mqtt==1.3.1 -pyserial==3.4 -pyserial-asyncio==0.4 +paho-mqtt==1.5.0 +requests==2.23.0 -- GitLab