Skip to content
Snippets Groups Projects
Commit 36fa2473 authored by Michael Loipführer's avatar Michael Loipführer
Browse files

improve number sent of mqtt messages

parent 8da7cf15
No related branches found
No related tags found
No related merge requests found
......@@ -2,9 +2,9 @@ import asyncio
import itertools
import json
import logging
from typing import Dict, List
from typing import Dict, List, Tuple, Set
from .node import create_nodes_from_config
from .node import create_nodes_from_config, Node
from .translation import Translation
from .config import Config
from .utils import MQTTUpdate, StateUpdate
......@@ -27,20 +27,24 @@ class State:
MQTTUpdate(node.topic, node.state_as_mqtt_message())
)
async def _update_node_state(self, topic: str, value: int) -> List[StateUpdate]:
update = StateUpdate(topic, value)
async def _update_node_state(self, topic: str, value: int) -> Tuple[List[StateUpdate], Set[Node]]:
"""
go through all of our registered nodes, check if the topic matches and update their internal state
:returns: List of applied state updates, list of changed nodes
"""
did_update = False
updated_nodes = set()
for node in self.nodes:
valid_node = node.set_state_for_topic(topic, value)
if valid_node:
did_update = True
await self.mqtt_update_queue.put(
MQTTUpdate(node.topic, node.state_as_mqtt_message())
)
updated_nodes.add(node)
updates = [StateUpdate(topic, value)] if did_update else []
return [update] if did_update else []
return updates, updated_nodes
async def update_node_topic(self, topic: str, value: int) -> List[StateUpdate]:
async def update_node_topic(self, topic: str, value: int) -> Tuple[List[StateUpdate], Set[Node]]:
"""
Process a value input on a topic with translation
"""
......@@ -49,21 +53,38 @@ class State:
# we expect a base topic here
return await self._update_node_state(topic, value)
updates: List[List[StateUpdate]] = await asyncio.gather(
updates: List[Tuple[List[StateUpdate], List[Node]]] = await asyncio.gather(
*[self._update_node_state(topic, value) for topic in topics]
)
return list(itertools.chain.from_iterable(updates))
state_updates = []
updated_nodes = set()
for update in updates:
# TODO: find a more pythonic way to do this
state_updates.extend(update[0])
updated_nodes = updated_nodes.union(update[1])
return state_updates, updated_nodes
async def process_updates(self, updates: Dict) -> None:
# process node updates
updates: List[List[StateUpdate]] = await asyncio.gather(
updates: List[Tuple[List[StateUpdate], List[Node]]] = await asyncio.gather(
*[
self.update_node_topic(topic, value)
for topic, value in updates.get("nodes", {}).items()
]
)
state_updates = []
updated_nodes = set()
for update in updates:
# TODO: find a more pythonic way to do this
state_updates.extend(update[0])
updated_nodes = updated_nodes.union(update[1])
# insert any other state update processing here
await self.ws_update_queue.put(list(itertools.chain.from_iterable(updates)))
await asyncio.gather(
self.ws_update_queue.put(state_updates),
*[self.mqtt_update_queue.put(MQTTUpdate(node.topic, node.state_as_mqtt_message())) for node in updated_nodes]
)
def to_dict(self) -> Dict:
state_dict = {"nodes": {}, **self._state}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment