Skip to content
Snippets Groups Projects
Commit 27cf6821 authored by Johannes Walcher's avatar Johannes Walcher
Browse files

core: hardened message reception

parent 764175f4
No related branches found
No related tags found
No related merge requests found
......@@ -91,23 +91,40 @@ class Hauptbahnhof:
async def message_processing(self):
while True:
(client, userdata, msg) = await self._queue.get()
# is called when a message is received
payload = msg.payload.decode('utf-8')
try:
futures = self._subscriptions[msg.topic]
except KeyError:
# Received unknown message - this is strange, log and ignore
self.log("Received unsubscribed message on %s with content: %s"%(
msg.topic, msg.payload))
return
try:
payloadobj = json.loads(payload)
await asyncio.gather(*[fut(client, payloadobj, msg) for fut in futures])
except json.JSONDecodeError:
self.log("Invalid json received: %s"%payload)
(client, userdata, msg) = await self._queue.get()
# is called when a message is received
try:
payload = msg.payload.decode('utf-8')
except UnicodeDecodeError:
continue
try:
futures = self._subscriptions[msg.topic]
except KeyError:
# Received unknown message - this is strange, log and ignore
self.log("Received unsubscribed message on %s with content: %s"%(
msg.topic, msg.payload))
try:
payloadobj = json.loads(payload)
except json.JSONDecodeError:
self.log("Invalid json received: %s"%payload)
continue
except:
continue
try:
await asyncio.gather(*[fut(client, payloadobj, msg) for fut in futures])
except Exception:
traceback.print_exc()
continue
except asyncio.CancelledError:
break
except Exception:
traceback.print_exc()
continue
def on_message(self, client, userdata, msg):
self._queue.put_nowait((client, userdata, msg))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment