#!/usr/bin/env python3
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
#########################################################################
# Copyright 2019- Martin Sinn m.sinn@gmx.de
#########################################################################
# This file is part of SmartHomeNG
#
# SmartHomeNG is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SmartHomeNG is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SmartHomeNG If not, see <http://www.gnu.org/licenses/>.
#########################################################################
import threading
from lib.module import Modules
from lib.model.smartplugin import SmartPlugin
from lib.shtime import Shtime
from lib.translation import translate as lib_translate
import os
[Doku]class MqttPlugin(SmartPlugin):
_item_values = {} # dict of dicts
# Initialization of SmartPlugin class called by super().__init__() from the plugin's __init__() method
def __init__(self):
"""
Initialization Routine for the mqtt extension class to SmartPlugin
"""
super().__init__()
# get instance of MQTT module
try:
self.mod_mqtt = Modules.get_instance().get_module('mqtt') # try/except to handle running in a core version that does not support modules
except:
self.mod_mqtt = None
if self.mod_mqtt == None:
self.logger.error("Module 'mqtt' not loaded. The plugin is not starting")
self._init_complete = False
return False
self._subscribed_topics_lock = threading.Lock()
self._subscribed_topics = {} # subscribed topics (a dict of dicts)
self._subscribe_current_number = 0 # current number of the subscription entry
self._subscriptions_started = False
# get broker configuration (for display in web interface)
self.broker_config = self.mod_mqtt.get_broker_config()
[Doku] def translate(self, txt, vars=None, block=None):
"""
Returns translated text for class SmartPlugin
"""
txt = str(txt)
if block:
self.logger.warning(f"unsuported 3. parameter '{block}' used in translation function _( ... )")
if self._add_translation is None:
# test initially, if plugin has additional translations
translation_fn = os.path.join(self._plugin_dir, 'locale.yaml')
self._add_translation = os.path.isfile(translation_fn)
if self._add_translation:
return lib_translate(txt, vars, plugin_translations='plugin/'+self.get_shortname(), additional_translations='module/mqtt')
else:
return lib_translate(txt, vars, additional_translations='module/mqtt')
[Doku] def start_subscriptions(self):
"""
Start subscription to all topics
Should be called from the run method of a plugin
"""
if self.mod_mqtt:
with self._subscribed_topics_lock:
for topic in self._subscribed_topics:
# start subscription to all items for this topic
for item_path in self._subscribed_topics[topic]:
self._start_subscription(topic, item_path)
self._subscriptions_started = True
return
[Doku] def stop_subscriptions(self):
"""
Stop subscription to all topics
Should be called from the stop method of a plugin
"""
if self.mod_mqtt:
with self._subscribed_topics_lock:
for topic in self._subscribed_topics:
# stop subscription to all items for this topic
for item_path in self._subscribed_topics[topic]:
current = str(self._subscribed_topics[topic][item_path]['current'])
if item_path == '*no_item*':
self.logger.info(f"Unsubscribing from topic {topic}")
else:
self.logger.info(f"Unsubscribing from topic {topic} for item {item_path}")
self.mod_mqtt.unsubscribe_topic(self.get_shortname() + '-' + current, topic)
self._subscriptions_started = False
return
def _start_subscription(self, topic, item_path):
current = str(self._subscribed_topics[topic][item_path]['current'])
qos = self._subscribed_topics[topic][item_path].get('qos', None)
payload_type = self._subscribed_topics[topic][item_path].get('payload_type', None)
callback = self._subscribed_topics[topic][item_path].get('callback', None)
bool_values = self._subscribed_topics[topic][item_path].get('bool_values', None)
if item_path == '*no_item*':
self.logger.info(f"Subscribing to topic {topic}, payload_type '{payload_type}' - callback={callback}")
else:
self.logger.info(f"Subscribing to topic {topic}, payload_type '{payload_type}' - for item '{item_path}'")
self.mod_mqtt.subscribe_topic(self.get_shortname() + '-' + current, topic, callback=callback,
qos=qos, payload_type=payload_type, bool_values=bool_values)
return
[Doku] def add_subscription(self, topic, payload_type, bool_values=None, item=None, callback=None):
"""
Add mqtt subscription to subscribed_topics list
subscribing is done directly, if subscriptions have been started by self.start_subscriptions()
:param topic: topic to subscribe to
:param payload_type: payload type of the topic (for this subscription to the topic)
:param bool_values: bool values (for this subscription to the topic)
:param item: item that should receive the payload as value. Used by the standard handler (if no callback function is specified)
:param callback: a plugin can provide an own callback function, if special handling of the payload is needed
:return:
"""
with self._subscribed_topics_lock:
# test if topic is new
if not self._subscribed_topics.get(topic, None):
self._subscribed_topics[topic] = {}
# add this item to topic
if item is None:
item_path = '*no_item*'
else:
item_path = item.property.path
self._subscribed_topics[topic][item_path] = {}
self._subscribe_current_number += 1
self._subscribed_topics[topic][item_path]['current'] = self._subscribe_current_number
self._subscribed_topics[topic][item_path]['item'] = item
self._subscribed_topics[topic][item_path]['qos'] = None
self._subscribed_topics[topic][item_path]['payload_type'] = payload_type
if callback:
self._subscribed_topics[topic][item_path]['callback'] = callback
else:
self._subscribed_topics[topic][item_path]['callback'] = self._on_mqtt_message
self._subscribed_topics[topic][item_path]['bool_values'] = bool_values
if self._subscriptions_started:
# directly subscribe to added subscription, if subscribtions are started
self._start_subscription(topic, item_path)
return
[Doku] def publish_topic(self, topic, payload, item=None, qos=None, retain=False, bool_values=None):
"""
Publish a topic to mqtt
:param topic: topic to publish
:param payload: payload to publish
:param item: item (if relevant)
:param qos: qos for this message (optional)
:param retain: retain flag for this message (optional)
:param bool_values: bool values (for publishing this topic, optional)
:return:
"""
self.mod_mqtt.publish_topic(self.get_shortname(), topic, payload, qos, retain, bool_values)
if item is not None:
self.logger.dbghigh(f"publish_topic: Item '{item.property.path}' -> topic '{topic}', payload '{payload}', QoS '{qos}', retain '{retain}'")
# Update dict for periodic updates of the web interface
self._update_item_values(item, payload)
else:
self.logger.dbghigh(f"publish_topic: topic '{topic}', payload '{payload}', QoS '{qos}', retain '{retain}'")
return
# ----------------------------------------------------------------------------------------
# methods to handle the broker connection
# ----------------------------------------------------------------------------------------
_broker_version = '?'
_broker = {}
broker_config = {}
broker_monitoring = False
[Doku] def get_broker_info(self):
if self.mod_mqtt:
(self._broker, self.broker_monitoring) = self.mod_mqtt.get_broker_info()
[Doku] def broker_uptime(self):
"""
Return formatted uptime of broker
"""
if self.shtime is None:
self.shtime = Shtime.get_instance()
try:
return self.shtime.seconds_to_displaystring(int(self._broker['uptime']))
except Exception as e:
return '-'
[Doku] def mqtt_init(self):
"""
Dummy method - should not be called any more
:return: Bool value True
:rtype: bool
"""
self.logger.warning("'mqtt_init()' method called. it is not used anymore. The Plugin should remove the call to mqtt_init(), use 'super.__init__()' instead")
pass
return True
# -----------------------------------------------------------------------
def _on_mqtt_message(self, topic, payload, qos=None, retain=None):
"""
Callback function to handle received messages
:param topic:
:param payload:
:param qos:
:param retain:
"""
self.logger.debug(f"_on_mqtt_message: Received topic '{topic}', payload '{payload} (type {type(payload)})', QoS '{qos}', retain '{retain}' ")
# get item for topic
if self._subscribed_topics.get(topic, None):
# at least 1 item has subscribed to this topic
for item_path in self._subscribed_topics[topic]:
item = self._subscribed_topics[topic][item_path].get('item', None)
if item != None:
try:
log_info = (float(payload) != float(item()))
except:
log_info = (str(payload) != str(item()))
if log_info:
self.logger.dbghigh(f"_on_mqtt_message: Received topic '{topic}', payload '{payload}' (item-type {item.type()}), QoS '{qos}', retain '{retain}' for item '{item.property.path}' (value={item()})")
else:
self.logger.debug(f"_on_mqtt_message: Received topic '{topic}', payload '{payload}' (item-type {item.type()}), QoS '{qos}', retain '{retain}' for item '{item.property.path}' (value={item()})")
item(payload, self.get_shortname())
# Update dict for periodic updates of the web interface
self._update_item_values(item, payload)
else:
self.logger.error(f"_on_mqtt_message: No definition found for subscribed topic '{topic}'")
return
def _update_item_values(self, item, payload):
"""
Update dict for periodic updates of the web interface
:param item:
:param payload:
"""
if not self._item_values.get(item.property.path):
self._item_values[item.property.path] = {}
if isinstance(payload, bool):
self._item_values[item.property.path]['value'] = str(payload)
else:
self._item_values[item.property.path]['value'] = payload
self._item_values[item.property.path]['last_update'] = item.last_update().strftime('%d.%m.%Y %H:%M:%S')
self._item_values[item.property.path]['last_change'] = item.last_change().strftime('%d.%m.%Y %H:%M:%S')
return
from lib.model.smartplugin import SmartPluginWebIf
# try:
# from jinja2 import Environment, FileSystemLoader
# except:
# pass
# from lib.module import Modules
class MqttPluginWebIf(SmartPluginWebIf):
def translate(self, txt, vars=None):
"""
Returns translated text for class SmartPluginWebIf
This method extends the jinja2 template engine _( ... ) -> translate( ... )
"""
txt = str(txt)
if self.plugin._add_translation is None:
# test initially, if plugin has additional translations
translation_fn = os.path.join(self.plugin._plugin_dir, 'locale.yaml')
self.plugin._add_translation = os.path.isfile(translation_fn)
if self.plugin._add_translation:
return lib_translate(txt, vars, plugin_translations='plugin/' + self.plugin.get_shortname(), module_translations='module/http', additional_translations='module/mqtt')
else:
return lib_translate(txt, vars, module_translations='module/http', additional_translations='module/mqtt')