Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • stustanet/temperature-daemon
  • roman/temperature-daemon
  • 007638/temperature-daemon
3 results
Show changes
from abc import ABCMeta
PLUGINS = dict()
class PluginMeta(ABCMeta):
"""
Metaclass for available container backends.
"""
def __init__(cls, name, bases, classdict):
super().__init__(name, bases, classdict)
PLUGINS[cls.plugin_name()] = cls
class Plugin(metaclass=PluginMeta):
@classmethod
def plugin_name(cls):
return cls.__name__.lower()
@property
def name(self):
return self.__class__.__name__.lower()
# import all plugins so metaclass can populare PLUGINS dict
from . import collectd, mail, prometheus, warnings
import asyncio
import time
from . import Plugin
class Collectd(Plugin):
"""
Implements a super simple collectd interface for only sending temperature data
"""
def __init__(self, monitor):
self.config = monitor.config
self.path = self.config['collectd']['socketpath']
self._reader, self._writer = (None, None)
asyncio.run(self.reconnect())
self.monitor = monitor
self.last_store = 0
async def reconnect(self):
"""
optionally close and then reconnect to the unix socket
"""
if self._writer:
self._writer.close()
self._reader, self._writer = await asyncio.open_unix_connection(
path=self.path)
async def _send(self, identifier, interval, timestamp, value):
"""
The collectd naming convention is:
host "/" plugin ["-" plugin instance] "/" type ["-" type instance]
Whereby:
- host: local host name
- plugin: the tab in CGP
- plugin-instance: the graph
- type the line in the graph
- type instance : if there are more than one "temperature"s
"""
data = "PUTVAL \"{}/{}\" interval={} {}:{}\n".format(
self.config['collectd']['hostname'],
identifier,
interval,
timestamp,
value)
try:
#print("Sending data:", data.strip())
self._writer.write(data.encode('utf-8'))
await self._writer.drain()
except:
await self.reconnect()
return
try:
line = await asyncio.wait_for(self._reader.readline(), 1)
except asyncio.TimeoutError:
print("Collectd did not respond.")
return
line = line.decode('utf-8').strip()
if not line:
print("Connection reset. reconnecting")
await self.reconnect()
else:
pass
# print("recv:", line)
async def send_sensor_values(self, sensor):
"""
Store the temperature to collectd for fancy graphs
"""
await self._send("tail-temperature/temperature-{}".format(sensor.name),
int(self.config['collectd']['interval']),
int(sensor.last_update),
sensor.temperature)
## Plugin Callbacks ##
async def send_stats_graph(self, graph, stattype, stattime, statval):
"""
to be called as a plugin callback to store stuff into collectd
"""
await self._send("tail-{}/{}".format(graph, stattype),
int(self.config['collectd']['interval']),
int(stattime),
statval)
async def sensor_update(self):
"""
Receive sensor data to store them regularely into collectd
"""
for sensor in self.monitor.sensors.values():
if sensor.valid:
await self.send_sensor_values(sensor)
self.last_store = time.time()
import time
from email.mime.text import MIMEText
from email.utils import formatdate
import smtplib
from . import Plugin
UNKNOWN_SENSOR_SUBJECT = "WARNING: Unconfigured Sensor ID: {owid}"
UNKNOWN_SENSOR_BODY = """Hello Guys,
An unknown sensor has been connected to the temperature monitoring service.
Please add the following section to the list of known sensors in {config}.
[{owid}]
name=changeme
calibration=0
The current temperature of the sensor is {temp}
Regards, Temperature
"""
SENSOR_MEASUREMENT_MISSED_SUBJECT = "WARNING: Sensor Measurement was missed"
SENSOR_MEASUREMENT_MISSED_BODY = """Hello Guys,
A sensor measurement was missed from the temperature monitoring.
This indicates either a problem with the hardware (check the wireing!) or the config.
ID: {owid}
NAME: {name}.
Please go check it!
Regards, Temperature
"""
SENSOR_PROBLEM_SUBJECT = "WARNING: Sensor error"
SENSOR_PROBLEM_BODY = """Hello Guys,
A sensor measurement was invalid. This might mean, that the sensor was disconnected.
Please go and check the sensor with the id
ID: {owid}
NAME: {name}.
LAST: {temp}
Regards, Temperature
"""
NO_DATA_SUBJECT = "WARNING: Did not receive any data"
NO_DATA_BODY = """Helly guys,
It has been {time} seconds since i have last received a temperature value.
This is unlikely - please come and check
Regards, Temperature
"""
NO_VALID_DATA_SUBJECT = "WARNING: Garbage data"
NO_VALID_DATA_BODY = """Helly guys,
We have data on the line - but it fails even the most simple verification.
The last received line was:
{last_line}
please check if the controller is going haywire.
I will try to fix this issue by reconnecting...
Regards, Temperature
"""
SENSOR_TEMPERATURE_WARNING_SUBJECT = "Temperaturwarnung Serverraum"
SENSOR_TEMPERATURE_WARNING_BODY = """Hi Guys,
Die Temperaturen im Serverraum werden langsam Bedenklich:
{temperatures}
Auslöser: {reason}
Aktuelle Temperaturen:
{alltemperatures}
Bitte haltet die Temperaturen im Auge und fahrt eventuell heiß laufende Server herunter
with love,
Temperator"""
class Mail(Plugin):
"""
Handle all the mail sending stuff
"""
def __init__(self, monitor):
self.monitor = monitor
self.config = self.monitor.config
self._mail_rate_limit = {}
async def send_mail(self, subject, body, urgent=False):
"""
Send a mail to the configured recipients
"""
msg = MIMEText(body, _charset="UTF-8")
msg['Subject'] = subject
msg['From'] = self.config['mail']['from']
if urgent:
recipients = self.config['mail']['to_urgent'].split(',')
else:
recipients = self.config['mail']['to'].split(',')
msg['To'] = ",".join([s.strip() for s in recipients])
msg['Date'] = formatdate(localtime=True)
print("Notification: {}".format(subject))
# Ratelimit the emails
time_since_last_mail = time.time() - self._mail_rate_limit.get(subject, 0)
if time_since_last_mail < int(self.config['mail']['min_delay_between_messages']):
print("Not sending due to ratelimiting: %i", time_since_last_mail)
return
print("Body: {}".format(body))
self._mail_rate_limit[subject] = time.time()
smtp = smtplib.SMTP("mail.stusta.mhn.de")
smtp.sendmail(msg['From'], recipients, msg.as_string())
smtp.quit()
async def err_nodata(self, **kwargs):
await self.send_mail(
NO_DATA_SUBJECT.format(**kwargs),
NO_DATA_BODY.format(**kwargs))
async def err_no_valid_data(self, **kwargs):
await self.send_mail(
NO_VALID_DATA_SUBJECT.format(**kwargs),
NO_VALID_DATA_BODY.format(**kwargs))
async def err_unknown_sensor(self, **kwargs):
await self.send_mail(
UNKNOWN_SENSOR_SUBJECT.format(**kwargs),
UNKNOWN_SENSOR_BODY.format(**kwargs))
async def err_problem_sensor(self, **kwargs):
await self.send_mail(
SENSOR_PROBLEM_SUBJECT,
SENSOR_PROBLEM_BODY.format(**kwargs))
async def err_missed_sensor(self, **kwargs):
await self.send_mail(
SENSOR_MEASUREMENT_MISSED_SUBJECT,
SENSOR_MEASUREMENT_MISSED_BODY.format(**kwargs))
async def temperature_warning(self, source, urgent=False, **kwargs):
if source == "tempdiff":
temperatures = "{name1}:{temp1}\n{name2}:{temp2}".format(**kwargs)
reason = "Differenztemperatur: {tempdiff}".format(**kwargs)
elif source == "singlehot":
temperatures = "{name}:{temp}".format(**kwargs)
reason = "Einzeltemperatur zu hoch"
alltemperatures = '\n'.join([
"{}: {}".format(sensor.name, sensor.temperature) if sensor.valid
else "{}: INVALID".format(sensor.name)
for sensor in self.monitor.sensors.values()])
await self.send_mail(
SENSOR_TEMPERATURE_WARNING_SUBJECT,
SENSOR_TEMPERATURE_WARNING_BODY.format(
temperatures=temperatures,
reason=reason,
alltemperatures=alltemperatures),
urgent=urgent
)
import re
import asyncio
from prometheus_client import start_http_server, Gauge
from . import Plugin
stats_name_re = re.compile(r'^temperature-(?P<group>\w+)-(?P<type>\w+)$')
class Prometheus(Plugin):
def __init__(self, monitor):
self.loop = asyncio.get_event_loop()
self.config = monitor.config
self.last_store = 0
self.monitor = monitor
self.sensor_metrics = Gauge(
name=self.config["prometheus"]["sensor_metric_name"],
documentation="Container Temperature Measurements",
labelnames=["sensor"]
)
self.aggregated_metrics = Gauge(
name=self.config["prometheus"]["aggregated_metric_name"],
documentation="Container Temperature Aggregations",
labelnames=["group", "type"]
)
start_http_server(
addr=self.config["prometheus"].get('address', 'localhost'),
port=int(self.config["prometheus"]["port"])
)
print("started prometheus http server")
async def send_stats_graph(self, graph, stattype, stattime, statval):
"""
to be called as a plugin callback to export aggregated measurements
"""
m = stats_name_re.match(stattype)
if not m:
return
self.aggregated_metrics.labels(group=m.group('group'), type=m.group('type')).set(statval)
async def sensor_update(self):
"""
Receive sensor data to store them regularely into collectd
"""
for sensor in self.monitor.sensors.values():
if sensor.valid:
self.sensor_metrics.labels(sensor=sensor.name).set(sensor.temperature)
import time
from . import Plugin
class Warnings(Plugin):
"""
Generate all kind of warnings whenever needed and observe the sensor
if they see a problematic situation in the container
"""
def __init__(self, monitor):
self.monitor = monitor
self.revmapping = {
sensor.name: sensor
for sensor in self.monitor.sensors.values()
}
self.warning_conf = self.monitor.config['warning']
conftest = [
self.warning_conf['floor_sensors'],
self.warning_conf['ceiling_sensors'],
self.warning_conf['floor_ceiling_diff'],
self.warning_conf['ceiling_warning_level'],
]
del conftest
def get_sensor(self, sensorname):
"""
Resovle the reverse mapping and get back the sensor
"""
return self.revmapping[sensorname]
def get_stats(self, sensorlist):
"""
Calculate mininmum, maximum average and variance over the sensor names given
"""
sensors = [self.get_sensor(sensor) for sensor in sensorlist]
sensors = [sensor for sensor in sensors if sensor.valid]
if not sensors:
return [], 0, 0, 0, 0
avg = sum(sensor.temperature for sensor in sensors) / len(sensors)
var = sum((sensor.temperature - avg) ** 2 for sensor in sensors) / len(sensors)
sensormin = +9999
sensormax = -9999
for sensor in sensors:
if sensor.temperature < sensormin:
sensormin = sensor.temperature
if sensor.temperature > sensormax:
sensormax = sensor.temperature
return sensors, sensormin, sensormax, avg, var
async def sensor_update(self):
"""
First generate stats and relay them to the collectd module, then use these stats
to decide wether it is currently critical in the container, and if so, send
warnings
"""
# Do nothing yet
floor_sensors, floor_min, floor_max, floor_avg, floor_var = self.get_stats(
self.warning_conf['floor_sensors'].split(','))
ceil_sensors, ceil_min, ceil_max, ceil_avg, ceil_var = self.get_stats(
self.warning_conf['ceiling_sensors'].split(','))
now = time.time()
if floor_sensors:
await self.monitor.call_plugin(
"send_stats_graph", graph="stats",
stattype="temperature-floor-min", stattime=now, statval=floor_min)
await self.monitor.call_plugin(
"send_stats_graph", graph="stats",
stattype="temperature-floor-max", stattime=now, statval=floor_max)
await self.monitor.call_plugin(
"send_stats_graph", graph="stats",
stattype="temperature-floor-avg", stattime=now, statval=floor_avg)
await self.monitor.call_plugin(
"send_stats_graph", graph="stats",
stattype="temperature-floor-var", stattime=now, statval=floor_var)
if ceil_sensors:
await self.monitor.call_plugin(
"send_stats_graph", graph="stats",
stattype="temperature-ceil-min", stattime=now, statval=ceil_min)
await self.monitor.call_plugin(
"send_stats_graph", graph="stats",
stattype="temperature-ceil-max", stattime=now, statval=ceil_max)
await self.monitor.call_plugin(
"send_stats_graph", graph="stats",
stattype="temperature-ceil-avg", stattime=now, statval=ceil_avg)
await self.monitor.call_plugin(
"send_stats_graph", graph="stats",
stattype="temperature-ceil-var", stattime=now, statval=ceil_var)
if floor_sensors and ceil_sensors:
# Else we already have sent warning messages for broken sensors
tempdiff = ceil_avg - floor_avg
await self.monitor.call_plugin(
"send_stats_graph", graph="stats",
stattype="temperature-floor_ceil-diff", stattime=now, statval=tempdiff)
print("floor: min {:05.2f} max {:05.2f} avg {:05.2f} var {:05.2f}".format(
floor_min, floor_max, floor_avg, floor_var))
print("ceil: min {:05.2f} max {:05.2f} avg {:05.2f} var {:05.2f}".format(
ceil_min, ceil_max, ceil_avg, ceil_var))
# Here comes the warning magic
# Critical: ceiling temperature > threshold (sane default: 45)
if ceil_max > int(self.warning_conf['ceiling_critical_level']):
await self.monitor.call_plugin("temperature_warning",
source="singlehot",
name="ceiling",
temp=ceil_max,
urgent=True)
# Warning: ceiling tempareture > threshold (sane default: 40)
if ceil_avg > int(self.warning_conf['ceiling_warning_level']):
await self.monitor.call_plugin("temperature_warning",
source="singlehot",
name="ceiling",
temp=ceil_avg)
# Warning: temperature difference > threshold (sane default: 17)
if ceil_max > int(self.warning_conf['min_ceiling_warning']):
if tempdiff > int(self.warning_conf['floor_ceiling_diff']):
await self.monitor.call_plugin("temperature_warning",
source="tempdiff",
name1="floor",
name2="ceiling",
temp1=floor_avg,
temp2=ceil_avg,
tempdiff=tempdiff)
#!/usr/bin/env python3
"""
This is the tempermonitoring system v2.
it is based on the work of the old temperature monitoring system and released
under the terms as stated in the LICENSE.md file.
Changelog:
2018-08 jotweh: reimplemented using a micropython-esp32
2020-04 milo: added prometheus plugin
Open issues:
- Temperature Limits
- Integrate USB Sensors
"""
import asyncio
import configparser
import sys
import time
from datetime import datetime
import serial_asyncio
import serial
from .plugins import PLUGINS
class Sensor:
"""
One instance as sensor posing as measurement proxy
"""
def __init__(self, config, owid):
self.temperature = None
self.last_update = 0
self.calibration = 0
self.valid = True
if owid not in config:
print(f"Invalid Config: missing section {owid}")
return
if 'name' not in config[owid] or 'calibration' not in config[owid]:
print(f"Invalid Config for: {owid}")
raise RuntimeError(f"Invalid Config for: {owid}")
self.name = config[owid]['name']
self.calibration = config[owid]['calibration']
def update(self, temperature):
"""
Store a new measurement, and remember the time it was taken
"""
self.temperature = float(temperature)
self.last_update = time.time()
class TempMonitor:
"""
Interact with the esp-one-wire interface that sends:
one-wire-id1 temperature
one-wire-id1 temperature
one-wire-id1 temperature
followed by an empty line as data packet
"""
def __init__(self, loop, configfile):
self.loop = loop or asyncio.get_event_loop()
self._configname = configfile
self.config = configparser.ConfigParser()
self.config.read(configfile)
self.plugins = []
self.sensors = {}
self._last_store = 0
self._reader, self._writer = (None, None)
# Test if all necessary config fields are set, that are not part of the normal
# startup
configtest = [
self.config['general']['plugins'],
self.config['collectd']['hostname'],
self.config['collectd']['interval'],
self.config['mail']['from'],
self.config['mail']['to'],
self.config['mail']['to_urgent'],
self.config['mail']['min_delay_between_messages'],
self.config['serial']['timeout'],
self.config['serial']['port'],
self.config['serial']['baudrate'],
]
del configtest
print("connecting to", self.config['serial']['port'])
for owid in self.config:
# Skip all known and predefined sections
if owid in ['DEFAULT', 'serial', 'collectd', 'mail', 'warning', 'prometheus', 'general']:
continue
self.sensors[owid] = Sensor(self.config, owid)
self._run_task = loop.create_task(self.run())
async def reconnect(self):
"""
Connect to the ESP chip
"""
try:
self._reader, self._writer = await serial_asyncio.open_serial_connection(
url=self.config['serial']['port'],
baudrate=self.config['serial']['baudrate'],
loop=self.loop)
except serial.SerialException:
print("Connection failed!")
self.loop.stop()
raise
# upon startup we only see garbage. (micropython starting up),
# also it will produce warnings if the recording is started in the middle
# of a message, so wait until the end of a message block to start the game
# If the baudrate is wrong during micropython startup - this will also be
# skiped.
while True:
try:
if (await self._reader.readline()).decode('ascii').strip() == "":
break
except UnicodeError:
continue
async def run(self):
"""
Read the protocol, update the sensors or trigger a collectd update
"""
await self.reconnect()
last_valid_data_received = time.time()
line = ""
reconnected_on_error = False
while True:
# Wait for the next line
if time.time() - last_valid_data_received > 10:
await self.call_plugin("err_no_valid_data", last_line=line)
if not reconnected_on_error:
reconnected_on_error = True
await self.reconnect()
try:
line = await asyncio.wait_for(
self._reader.readline(),
timeout=int(self.config['serial']['timeout']))
print("Received: ", line)
except asyncio.TimeoutError:
print("No Data")
await self.call_plugin("err_nodata")
continue
except serial.SerialException as exc:
print("Problem with the serial connection - reconnecting: ", exc)
await self.reconnect()
continue
try:
line = line.decode('ascii').strip()
except UnicodeError:
print("Unicode error")
continue
# print("recv:", line)
if line == '':
# Block has ended
print("Done block, storing sensors")
await self.store_sensors()
print("Done")
continue
# Try to parse the line
try:
owid, temp = line.split(' ')
temp = float(temp)
except ValueError as exc:
print("Invaid line received: {}\n{}".format(line, exc))
continue
## we have at least a valid line
last_valid_data_received = time.time()
reconnected_on_error = False
sensor = self.sensors.get(owid, None)
if not sensor:
# If the sensor is new - notify the operators
print("Unknown sensor")
await self.call_plugin("err_unknown_sensor",
config=self._configname,
owid=owid,
temp=temp)
elif temp > 1000 or temp < -1000:
print("Sensor invalid")
sensor.valid = False
# if the sensor is giving bullshit data - notify the operators
await self.call_plugin("err_problem_sensor",
owid=owid,
name=sensor.name,
temp=temp)
else:
sensor.valid = True
# in the unlikely event that everyting is fine: log the data
sensor.update(temp)
async def teardown(self):
""" Terminate all started tasks """
self._run_task.cancel()
try:
await self._run_task
except asyncio.CancelledError:
pass
async def call_plugin(self, call, *args, **kwargs):
"""
Call the given method on all plugins, proxying arguments
"""
result = {}
for plugin in self.plugins:
func = getattr(plugin, call, None)
if func:
if asyncio.iscoroutinefunction(func):
result[plugin.name] = await func(*args, **kwargs)
else:
result[plugin.name] = func(*args, **kwargs)
async def store_sensors(self):
"""
Prepare the sensors to be stored and maybe send an email
"""
sensorstr = "measurements: "
for owid, sensor in self.sensors.items():
if sensor.valid:
sensorstr += "{}: {}; ".format(sensor.name, sensor.temperature)
if sensor.last_update <= self._last_store:
sensor.valid = False
isotime = datetime.utcfromtimestamp(sensor.last_update).isoformat()
await self.call_plugin("err_missed_sensor",
owid=owid,
name=sensor.name,
last_update=isotime)
else:
sensorstr += "{}: INVALID; ".format(sensor.name)
print(sensorstr)
await self.call_plugin("sensor_update")
self._last_store = time.time()
def main():
"""
Start the tempmonitor
"""
loop = asyncio.get_event_loop()
configfile = "/etc/tempermonitor.ini"
if len(sys.argv) == 2:
configfile = sys.argv[1]
print(f"Configuring temperature monitoring system from {configfile}.")
monitor = TempMonitor(loop, configfile)
active_plugins = monitor.config["general"]["plugins"].split(",")
print(f"Active plugins: {active_plugins}")
for plugin in active_plugins:
if plugin in PLUGINS:
p = PLUGINS[plugin](monitor)
monitor.plugins.append(p)
print(f"Loaded plugin: {plugin}")
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(monitor.teardown())
#!/usr/bin/env python3
import sys
for line in sys.stdin:
print("Ack")
sys.stdout.flush()
# IMPORTANT: Set the config to use the right socket path
socat -d PTY,link=/tmp/temperature_pts,echo=0 "EXEC:python3 test.py ...,pty,raw",echo=0 &
socat UNIX-LISTEN:/tmp/collectd_sock,fork EXEC:"python3 collectdmock.py"
#!/usr/bin/env python3
import random
import time
import sys
delay = 0.1
while True:
for _ in range(3):
print("testsensor {}".format(random.random() * 40))
print("sensortest {}".format(random.random() * 40))
print()
time.sleep(delay)
print("Sensor Error test", file=sys.stderr)
print("testsensor {}".format(9001))
print("sensortest {}".format(random.random() * 40))
print()
time.sleep(delay)
for _ in range(3):
print("testsensor {}".format(random.random() * 40))
print("sensortest {}".format(random.random() * 40))
print()
time.sleep(delay)
print("Missing sensor test", file=sys.stderr)
print("sensortest {}".format(random.random() * 40))
print()
time.sleep(delay)
for _ in range(3):
print("testsensor {}".format(random.random() * 40))
print("sensortest {}".format(random.random() * 40))
print()
time.sleep(delay)
print("Extra sensor test", file=sys.stderr)
print("idonotexist {}".format(random.random() * 40))
print("testsensor {}".format(random.random() * 40))
print("sensortest {}".format(random.random() * 40))
print()
time.sleep(delay)
for _ in range(3):
print("testsensor {}".format(random.random() * 40))
print("sensortest {}".format(random.random() * 40))
print()
time.sleep(delay)
print("Too Hot Test", file=sys.stderr)
print("testsensor {}".format(45))
print("sensortest {}".format(40))
print()
time.sleep(delay)
for _ in range(3):
print("testsensor {}".format(random.random() * 40))
print("sensortest {}".format(random.random() * 40))
print()
time.sleep(delay)
print("high diff test", file=sys.stderr)
print("testsensor {}".format(45))
print("sensortest {}".format(25))
print()
time.sleep(delay)