Skip to content
Snippets Groups Projects
Commit 50896499 authored by Johannes Walcher's avatar Johannes Walcher
Browse files

initial commit, base hauptbahnhof architecture

parents
No related branches found
No related tags found
No related merge requests found
# Created by https://www.gitignore.io/api/vim,emacs,python
### Emacs ###
# -*- mode: gitignore; -*-
*~
\#*\#
/.emacs.desktop
/.emacs.desktop.lock
*.elc
auto-save-list
tramp
.\#*
# Org-mode
.org-id-locations
*_archive
# flymake-mode
*_flymake.*
# eshell files
/eshell/history
/eshell/lastdir
# elpa packages
/elpa/
# reftex files
*.rel
# AUCTeX auto folder
/auto/
# cask packages
.cask/
dist/
# Flycheck
flycheck_*.el
# server auth directory
/server/
# projectiles files
.projectile
projectile-bookmarks.eld
# directory configuration
.dir-locals.el
# saveplace
places
# url cache
url/cache/
# cedet
ede-projects.el
# smex
smex-items
# company-statistics
company-statistics-cache.el
# anaconda-mode
anaconda-mode/
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
.pytest_cache/
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule.*
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
### Vim ###
# swap
.sw[a-p]
.*.sw[a-p]
# session
Session.vim
# temporary
.netrwhist
# auto-generated tag files
tags
# End of https://www.gitignore.io/api/vim,emacs,python
Mosquitto Hauptbahnhof
---------------------------
A Hackerspace control system based on the mqtt protocol
## Functionality
The mqtt hauptbahnof provides you with the basic functionality to load common system
configuration and connects you to the mqtt server.
The modules used should be independent from each other and only communicate via mqtt
messages, enabling a distribution of the modules across the haspa-of-things.
## Launching
Each module is launched individually from its own submodule and should run independently.
That way if somebody else fucks up, everything still works fine.
You can launch with the following commandline:
```
python3 -m <modulename> --confdir <config directory>
```
To launch a test issue
```
python3 -m <modulename>.test --confdir <config directory>
```
## Configuration
The config directory has to contain at least a `hauptbahnof.conf` with the following
content:
```
host = <mqtt-server>
```
The modules may also load config files - they need to exist as well.
## Create your own module
Just simply copy a module that is already out there and does close to nothing, replace
this nonexistent functionality and you are set to go.
A module should instantiate a Hauptbahnhof object, since this manages all the connection
magic with the mqtt server.
using `Hauptbahnhof.subscribe("topic", callback)` a callback is registered upon a message.
The callback will receive the arguments `(client, messageobj, messageraw)` to process.
using `Hauptbahnhof.publish("topic", object)` to send a json-formatted message to another
Hauptbahnhof (and you yourself will also receive this message)
with `Hauptbahnhof.config("configfile")` you get the config structure of the file
`configfile.conf` from your config directory.
It is recommended to create a test, that tests the functionality of the modules
individually to avoid headaches for your friends and yourself if the beat you up.
host = "knecht.stusta.de"
from .hauptbahnhof import Hauptbahnhof
import json
import asyncio
import traceback
import io
import sys
import logging
import libconf
import aiomqtt
class Hauptbahnhof:
"""
Hauptbahnhof manager with a lot of convenience methods
"""
def __init__(self, loop=None):
try:
idx = sys.argv.index('--confdir')
self._configbase = sys.argv[idx + 1]
except (ValueError, KeyError):
self._configbase = '/etc/hauptbahnhof'
if not loop:
loop = asyncio.get_event_loop()
self.loop = loop
# find master config
self._config = self.config("hauptbahnhof")
self._subscriptions = {}
self._mqtt = None
self._mqtt_queue = []
self._mqtt_start_task = self.loop.create_task(self.start())
self._queue = asyncio.Queue()
self._message_process_task = self.loop.create_task(self.message_processing())
self.connected = asyncio.Event(loop=self.loop)
logformat = '%(asctime)s | %(name)s | %(levelname)5s | %(message)s'
logging.basicConfig(format=logformat)
self.log = logging.getLogger(__name__)
self.log.setLevel(logging.DEBUG)
async def teardown(self):
"""
The last one switches off the light
"""
self._mqtt_start_task.cancel()
self._message_process_task.cancel()
results = await asyncio.gather(self._mqtt_start_task,
self._message_process_task,
return_exceptions=True)
for r in results:
if isinstance (r, Exception):
if not isinstance(r, asyncio.CancelledError):
traceback.print_exception(type(r), r, r.__traceback__)
async def start(self):
"""
Start the mqtt locally, and when it is connected send it back
"""
mqtt = aiomqtt.Client(self.loop)
mqtt.loop_start()
mqtt.on_message = self.on_message
mqtt.on_connect = lambda client, userdata, flags, rc: self.connected.set()
try:
await mqtt.connect(self._config['host'])
except KeyError:
self.log.error("Could not connect to %s"%self._config['host'])
self.loop.cancel()
await self.connected.wait()
self.log.info("Connected to %s" % self._config['host'])
for topic in self._subscriptions:
mqtt.subscribe(topic)
while self._mqtt_queue:
topic, msg = self._mqtt_queue.pop(0)
self.log.debug("Topic: %s"%topic)
await self._mqtt.publish(topic, msg)
# Now we have mqtt available!
self._mqtt = mqtt
self.log.info("Startup done")
async def message_processing(self):
while True:
(client, userdata, msg) = await self._queue.get()
# is called when a message is received
payload = msg.payload.decode('utf-8')
try:
futures = self._subscriptions[msg.topic]
except KeyError:
# Received unknown message - this is strange, log and ignore
self.log("Received unsubscribed message on %s with content: %s"%(
msg.topic, msg.payload))
return
try:
payloadobj = json.loads(payload)
await asyncio.gather(*[fut(client, payloadobj, msg) for fut in futures])
except json.JSONDecodeError:
self.log("Invalid json received: %s"%payload)
except Exception as e:
traceback.print_exc()
def on_message(self, client, userdata, msg):
self._queue.put_nowait((client, userdata, msg))
def config(self, module):
"""
Load a config from the pile of configs ( usually in /etc/hauptbahnhof/*.conf )
"""
with io.open('%s/%s.conf'%(self._configbase, module), 'r') as f:
return libconf.load(f)
def subscribe(self, topic, coroutine):
"""
Subscribe to topic
"""
try:
self._subscriptions[topic].append(coroutine)
except KeyError:
self._subscriptions[topic] = [coroutine]
if self._mqtt:
self._mqtt.subscribe(topic)
async def publish(self, topic, message):
"""
Publish a message on the given topic
"""
jsonmsg = json.dumps(message)
# Maybe the system is not already online?
if self._mqtt:
await self._mqtt.publish(topic, jsonmsg).wait_for_publish()
else:
self._mqtt_queue.append((topic, jsonmsg))
def mqtt(self):
"""
Get the underlying mqtt object. For debug use only!
"""
return self._mqtt
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment