From 27cf6821f238de42a5b6cbdf9406df2c033c5d33 Mon Sep 17 00:00:00 2001
From: johannes walcher <johannes.walcher@stusta.de>
Date: Mon, 19 Nov 2018 11:41:36 +0100
Subject: [PATCH] core: hardened message reception

---
 hauptbahnhof/hauptbahnhof.py | 45 +++++++++++++++++++++++++-----------
 1 file changed, 31 insertions(+), 14 deletions(-)

diff --git a/hauptbahnhof/hauptbahnhof.py b/hauptbahnhof/hauptbahnhof.py
index d5639ce..bb75f19 100644
--- a/hauptbahnhof/hauptbahnhof.py
+++ b/hauptbahnhof/hauptbahnhof.py
@@ -91,23 +91,40 @@ class Hauptbahnhof:
 
     async def message_processing(self):
         while True:
-            (client, userdata, msg) = await self._queue.get()
-            # is called when a message is received
-            payload = msg.payload.decode('utf-8')
             try:
-                futures = self._subscriptions[msg.topic]
-            except KeyError:
-                # Received unknown message - this is strange, log and ignore
-                self.log("Received unsubscribed message on %s with content: %s"%(
-                    msg.topic, msg.payload))
-                return
-            try:
-                payloadobj = json.loads(payload)
-                await asyncio.gather(*[fut(client, payloadobj, msg) for fut in futures])
-            except json.JSONDecodeError:
-                self.log("Invalid json received: %s"%payload)
+                (client, userdata, msg) = await self._queue.get()
+                # is called when a message is received
+                try:
+                    payload = msg.payload.decode('utf-8')
+                except UnicodeDecodeError:
+                    continue
+
+                try:
+                    futures = self._subscriptions[msg.topic]
+                except KeyError:
+                    # Received unknown message - this is strange, log and ignore
+                    self.log("Received unsubscribed message on %s with content: %s"%(
+                        msg.topic, msg.payload))
+
+                try:
+                    payloadobj = json.loads(payload)
+                except json.JSONDecodeError:
+                    self.log("Invalid json received: %s"%payload)
+                    continue
+                except:
+                    continue
+
+                try:
+                    await asyncio.gather(*[fut(client, payloadobj, msg) for fut in futures])
+                except Exception:
+                    traceback.print_exc()
+                    continue
+            except asyncio.CancelledError:
+                break
+
             except Exception:
                 traceback.print_exc()
+                continue
 
     def on_message(self, client, userdata, msg):
         self._queue.put_nowait((client, userdata, msg))
-- 
GitLab