Skip to content
Snippets Groups Projects
Commit a9dcae38 authored by Johannes Walcher's avatar Johannes Walcher
Browse files

added hackerman: hackerspace manager

parent 5cb99670
No related branches found
No related tags found
No related merge requests found
import asyncio
from hackerman.hackerman import Hackerman
def main():
"""
Actually start the shit
"""
loop = asyncio.get_event_loop()
hackerman = Hackerman(loop=loop)
loop.set_debug(True)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
loop.run_until_complete(hackerman.teardown())
if __name__ == "__main__":
main()
import asyncio
from hauptbahnhof import Hauptbahnhof
class Hackerman:
"""
Scan the local hackerspace network for new and unknown devices to send back a result
"""
def __init__(self, loop=None):
if not loop:
loop = asyncio.get_event_loop()
self.loop = loop
self.hbf = Hauptbahnhof(loop)
self.hbf.subscribe('/haspa/status', self.command_status)
async def teardown(self):
await self.hbf.teardown()
async def command_status(self, client, message, _):
"""
React to a status change of the hackerspace - switch the lights, ...
"""
try:
if 'haspa' in message:
if message['haspa'] == 'open':
await self.hbf.publish('/haspa/power', {
'table':1023,
'fan':1023,
'ledstrip':400,
})
elif message['haspa'] == 'closed':
await self.hbf.publish('/haspa/power', {
'table':0,
'fan':0,
'ledstrip':0,
'alarm':0,
})
except KeyError:
raise # because - fuck you sender! i will die now, silently.
import asyncio
from hauptbahnhof import Hauptbahnhof
from hackerman.hackerman import Hackerman
messages = asyncio.Queue()
async def on_message(client, message, _):
print("Got message: %s"%message)
await messages.put(message)
async def test(loop):
testbf = Hauptbahnhof(loop=loop)
testbf.subscribe("/haspa/power", on_message)
await asyncio.sleep(2)
await testbf.publish("/haspa/status", {'haspa':'open'}) # without blacklist
# Now everythin should be set up
msg = await asyncio.wait_for(messages.get(), 10) # wait max 10 secs
assert(msg['table'] == 1023)
assert(msg['ledstrip'] == 400)
try:
await testbf.teardown()
except asyncio.CancelledError:
pass
def main():
loop = asyncio.get_event_loop()
lib = Hackerman(loop=loop)
result = loop.run_until_complete(test(loop))
loop.run_until_complete(lib.teardown())
if result:
exit(0)
else:
exit(1)
if __name__=="__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment