Skip to content
Snippets Groups Projects
Commit 25ce6209 authored by Johannes Walcher's avatar Johannes Walcher
Browse files

added nsa: scan your home network

parent 36662de5
No related branches found
No related tags found
No related merge requests found
space_network="10.150.9.0/24"
import asyncio
from nsa.nsa import NSA
def main():
"""
Actually start the shit
"""
loop = asyncio.get_event_loop()
nsa = NSA()
loop.set_debug(True)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
loop.run_until_complete(nsa.teardown())
if __name__ == "__main__":
main()
import asyncio
from hauptbahnhof import Hauptbahnhof
class NSA:
"""
Scan the local hackerspace network for new and unknown devices to send back a result
"""
def __init__(self, loop=None):
if not loop:
loop = asyncio.get_event_loop()
self.loop = loop
self.hbf = Hauptbahnhof(loop)
self.hbf.subscribe('/haspa/nsa/scan', self.command_scan)
async def teardown(self):
"""
Clean up your stuff...
"""
await self.hbf.teardown()
async def command_scan(self, client, message, _):
"""
space.get_number_of_network_devices() -> int
Return the number of non-stationary, connected network devices.
"""
try:
cfg = self.hbf.config("arplist")
except FileNotFoundError as exc:
self.hbf.log.warning("Coult not find config:%s", str(exc))
cfg = {}
known_devices = []
try:
known_devices = message['blacklist']
except (KeyError, TypeError):
known_devices = []
try:
known_devices += cfg['spacedevices']
except KeyError:
self.hbf.log.warning("You might want to configure space devices")
# TODO use util/arp-scan
proc = await asyncio.create_subprocess_exec(
*['arp-scan', cfg['space_network']],
stdout=asyncio.subprocess.PIPE,
loop=self.loop)
output, _ = await proc.communicate()
for line in output.decode('utf8').split('\n'):
self.hbf.log.debug(line)
output = output.decode('utf8').split()
dev_list = []
for line in output:
if line.find(':') != -1 and len(line) == 17:
if line not in known_devices:
dev_list.append(line)
await self.hbf.publish('/haspa/nsa/result', {'count': len(dev_list)})
import asyncio
from hauptbahnhof import Hauptbahnhof
from nsa.nsa import NSA
messages = asyncio.Queue()
async def on_message(client, message, _):
print("Got message: %s"%message)
await messages.put(message)
async def test(loop):
testbf = Hauptbahnhof(loop=loop)
testbf.subscribe("/haspa/nsa/result", on_message)
await asyncio.sleep(2)
await testbf.publish("/haspa/nsa/scan", {}) # without blacklist
# Now everythin should be set up
msg = await asyncio.wait_for(messages.get(), 10) # wait max 10 secs
if msg['count'] > 0:
testbf.log.info("test successfull")
return True
else:
raise ValueError("test failed")
try:
await testbf.teardown()
except asyncio.CancelledError:
pass
def main():
loop = asyncio.get_event_loop()
lib = NSA(loop=loop)
result = loop.run_until_complete(test(loop))
loop.run_until_complete(lib.teardown())
if result:
exit(0)
else:
exit(1)
if __name__=="__main__":
main()
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment