Quellcode für modules.mqtt

#!/usr/bin/env python3
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
#  Copyright 2018-      Martin Sinn                         m.sinn@gmx.de
#  This file is part of SmartHomeNG.
#  MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity
#  protocol. It was designed as an extremely lightweight publish/subscribe
#  messaging transport.
#  SmartHomeNG is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#  SmartHomeNG is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  GNU General Public License for more details.
#  You should have received a copy of the GNU General Public License
#  along with SmartHomeNG.  If not, see <http://www.gnu.org/licenses/>.

import threading
import logging
import json
# import os
import platform
import socket    # for gethostbyname
import inspect
import datetime

from importlib.metadata import version

import paho.mqtt.client as mqtt

from lib.model.module import Module
from lib.shtime import Shtime
from lib.utils import Utils
from lib.scheduler import Scheduler

[Doku]class Mqtt(Module): version = '1.8.0' longname = 'MQTT module for SmartHomeNG' __plugif_CallbackTopics = {} # for plugin interface __plugif_Sub = None scheduler = None _broker_version = '?' _broker = {} def __init__(self, sh): """ Initialization Routine for the module """ # TO DO: Shortname anders setzen (oder warten bis der Modul Loader es beim Laden setzt) self._shortname = self.__class__.__name__ self._shortname = self._shortname.lower() self.logger = logging.getLogger(__name__) self._sh = sh self.shtime = Shtime.get_instance() self.logger.debug("Module '{}': Initializing".format(self._shortname)) # get the parameters for the plugin (as defined in metadata plugin.yaml): self.logger.debug("Module '{}': Parameters = '{}'".format(self._shortname, dict(self._parameters))) try: self.broker_hostname = self._parameters['broker_host'] self.broker_port = self._parameters['broker_port'] self.broker_client = self._parameters['broker_client'] self.broker_monitoring = self._parameters['broker_monitoring'] self.qos = self._parameters['qos'] self.last_will_topic = self._parameters['last_will_topic'] self.last_will_payload = self._parameters['last_will_payload'] self.birth_topic = self._parameters['birth_topic'] self.birth_payload = self._parameters['birth_payload'] self.bool_values = self._parameters['bool_values'] # self.publish_items = self._parameters['publish_items'] # self.items_topic_prefix = self._parameters['items_topic_prefix'] self.username = self._parameters['user'] self.password = self._parameters['password'] # self.tls = self._parameters['tls'] # self.ca_certs = self._parameters['ca_certs'] # self.acl = self._parameters['acl'].lower() except KeyError as e: self.logger.critical( "Module '{}': Inconsistent module (invalid metadata definition: {} not defined)".format(self._shortname, e)) self._init_complete = False return # resolve broker name, is no ip address is specified try: self.broker_ip = socket.gethostbyname(self.broker_hostname) except Exception as e: self.logger.error("Error resolving '{}': {}".format(self.broker_hostname, e)) self._init_complete = False return if self.broker_ip == self.broker_hostname: self.broker_hostname = '' # handle last_will and birth topic configuration if (self.last_will_topic != '') and (self.last_will_topic[-1] == '/'): self.last_will_topic = self.last_will_topic[:-1] if self.birth_topic == '': self.birth_topic = self.last_will_topic else: if self.birth_topic[-1] == '/': self.birth_topic = self.birth_topic[:-1] # if self.items_topic_prefix [-1] != '/': # self.items_topic_prefix = self.items_topic_prefix + '/' if self.password == '': self.password = None # # check paho.mqtt version # self.paho_ver = 0 try: self.paho_ver = int(version('paho_mqtt').split('.')[0]) except Exception as e: self.logger.error(f'unable to determin paho_mqtt version: {e}') if self.paho_ver not in (1, 2): self.logger.error(f'unsupported version of paho_mqtt installed: {version("paho_mqtt")}, module will possibly not work') # tls ... # ca_certs ... # _subscribed_topics is a datastructure to keep track of subscribed topics # and the needed additional information # - who subscribed to the topic # - kind of subscriber (logic, plugin, ...) # - datatype of payload # # <topic1>: # <subscriber1_name>: # subsciber_type: 'logic' # callback: 'logic1name' # payload_type: 'str' # <subscriber2_name>: # subsciber_type: 'logic' # callback: 'logic2name' # payload_type: 'dict' # <topic2>: # <subscriber3_name>: # subsciber_type: 'plugin' # callback: obj_callback3 # payload_type: 'str' # <subscriber4_name>: # self._subscribed_topics_lock = threading.Lock() self._subscribed_topics = {} # subscribed topics self.logicpayloadtypes = {} # payload types for subscribed topics for triggering logics # ONLY used for multiinstance handling of plugins? # # needed because self.set_attr_value() can only set but not add attributes # self.at_instance_name = self.get_instance_name() # if self.at_instance_name != '': # self.at_instance_name = '@' + self.at_instance_name self.scheduler = Scheduler.get_instance() self._network_connected_to_broker = False self._connected = False self._got_disconnected = False self._connect_result = '' # tls ... # ca_certs ...
[Doku] def start(self): """ This method starts the mqtt module It is called by lib.module and should not be called otherwise. """ self.logger.dbghigh(self.translate("Methode '{method}' aufgerufen", {'method': 'start()'})) self._connect_to_broker() # self.alive = True if (self.birth_topic != '') and (self.birth_payload != ''): self._client.publish(self.birth_topic, self.birth_payload, self.qos, retain=True) self._client.loop_start() self.logger.debug("MQTT client loop started") # set the name of the paho thread for this plugin instance try: self._client._thread.name = 'modules.' + self.get_fullname() + ".paho_client" except Exception: self.logger.warning("Unable to set name for paho thread")
[Doku] def stop(self): """ This method stops the mqtt module It is called by lib.module and should not be called otherwise. """ # self.logger.debug("Module '{}': Shutting down".format(self.shortname)) self.logger.dbghigh(self.translate("Methode '{method}' aufgerufen", {'method': 'stop()'})) self._client.loop_stop() self.logger.debug("MQTT client loop stopped") self._disconnect_from_broker()
# self.alive = False # ---------------------------------------------------------------------------------------- # methods to handle the broker connection # ---------------------------------------------------------------------------------------- def _connect_to_broker(self): """ Establish connection to MQTT broker """ clientname = platform.uname()[1] + '.' + self.broker_client self.logger.info("Connecting to broker '{}:{}'. Starting mqtt client '{}'".format(self.broker_ip, self.broker_port, clientname)) if self.paho_ver <= 1: self.logger.info('starting mqtt client with API version 1') self._client = mqtt.Client(client_id=clientname) else: self.logger.info('starting mqtt client with API version 2') self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, client_id=clientname) # set testament, if configured if (self.last_will_topic != '') and (self.last_will_payload != ''): retain = False if (self.birth_topic != '') and (self.birth_payload != ''): retain = True self._client.will_set(self.last_will_topic, self.last_will_payload, self.qos, retain=retain) self.logger.debug("- Last will set to topic '{}' and payload '{}' with retain set to '{}'".format(self.last_will_topic, self.last_will_payload, retain)) if self.username != '': self._client.username_pw_set(self.username, self.password) self.logger.debug("- Using broker login information user '{}' and password".format(self.username)) self._client.on_connect = self._on_connect self._client.on_disconnect = self._on_disconnect self._client.on_log = self._on_mqtt_log self._client.on_message = self._on_mqtt_message self._network_connected_to_broker = False if not self._network_connect_to_broker(from_init=True): self.logger.warning("MQTT broker can not be reached. No messages are sent/received until the broker can be reached") return def _network_connect_to_broker(self, from_init=False): """ Esteblish network connect to broker :return: success """ try: self._client.connect(self.broker_ip, self.broker_port, 60) self.logger.debug("- Sending connect request to broker") except Exception as e: if from_init: self.logger.error('Connection error: {0}'.format(e)) # schedule a task that is run once to try to connecg next = self.shtime.now() + datetime.timedelta(seconds=10) self.scheduler.add('module.mqtt: connect to broker', self._network_connect_to_broker, next=next) return False if not from_init: self.logger.warning("MQTT broker has been reached. Connection is starting") self._network_connected_to_broker = True return True def _disconnect_from_broker(self): """ Stop all communication with MQTT broker """ self._unsubscribe_broker_infos() for topic in self._subscribed_topics: item = self._subscribed_topics[topic] self.logger.debug("Unsubscribing topic '{}' for item '{}'".format(str(topic), str(item.property.path))) self._client.unsubscribe(topic) self.logger.info("Stopping mqtt client '{}'. Disconnecting from broker.".format(self._client._client_id.decode('utf-8'))) if (self.last_will_topic != '') and (self.last_will_payload != ''): if (self.birth_topic != '') and (self.birth_payload != ''): self._client.publish(self.last_will_topic, self.last_will_payload + ' (shutdown)', self.qos, retain=True) self.logger.debug("- Disconnect: Last will sent with topic '{}' and payload '{}' and retain set to '{}'".format(self.last_will_topic,self.last_will_payload, True)) self._client.loop_stop() self._connected = False self._client.disconnect() def _log_brokerinfo(self, payload): """ Log info about broker connection """ payload = self.cast_from_mqtt('str', payload) if self.broker_hostname == '': address = str(self.broker_ip)+':'+str(self.broker_port) else: address = self.broker_hostname + ' (' + str(self.broker_ip)+':'+str(self.broker_port) + ')' self.logger.info("Connected to broker '{}' at address {}".format( str(payload), address ))
[Doku] def broker_uptime(self): """ Return formatted uptime of broker """ try: return self.shtime.seconds_to_displaysting(int(self._broker['uptime'])) except: return '-'
[Doku] def get_broker_info(self): """ Return the collected broker information :return: Broker information :rtype: dict """ return (self._broker, self.broker_monitoring)
[Doku] def get_broker_config(self): """ Return the configuration of the broker connection :return: Broker configuration :rtype: dict """ broker_config = {} if self.broker_hostname: broker_config['host'] = self.broker_hostname elif self.broker_ip == '': broker_config['host'] = 'localhost' else: broker_config['host'] = self.broker_ip broker_config['port'] = self.broker_port broker_config['user'] = self.username broker_config['password'] = '-' broker_config['qos'] = self.qos # broker_config['acl'] = self.acl return broker_config
# ---------------------------------------------------------------------------------------- # methods to handle mqtt # ---------------------------------------------------------------------------------------- def _add_subscription_definition(self, topic, subscription_source, subscriber_type, callback, payload_type, bool_values, qos=None): """ Add a subscription definition to a defined topic in the _subscribed_topics data :param topic: :param subscription_source: :param subscriber_type: :param callback: :param payload_type: """ if self._subscribed_topics[topic].get(subscription_source, None): self.logger.warning("_add_subscription_definition: Subscription to topic '{}' from subscription_source '{}' already exisis, overwriting it".format(topic, subscription_source)) self._subscribed_topics[topic][subscription_source] = {} self._subscribed_topics[topic][subscription_source]['subscriber_type'] = subscriber_type.lower() self._subscribed_topics[topic][subscription_source]['callback'] = callback self._subscribed_topics[topic][subscription_source]['payload_type'] = payload_type self._subscribed_topics[topic][subscription_source]['bool_values'] = bool_values #self._subscribed_topics[topic]['qos'] = qos self.logger.info("_add_subscription_definition: {} '{}' is subscribing to topic '{}'".format(subscriber_type, subscription_source, topic)) return
[Doku] def subscribe_topic(self, source, topic, callback=None, qos=None, payload_type='str', item_type=None, bool_values=None): """ method to subscribe to a topic this function is to be called from plugins, which are utilizing the mqtt module :param source: name of plugin or logic which want's to publish a topic :param topic: topic to subscribe to :param callback: plugin callback function or name of logic for logics-callbacks :param qos: Optional: quality of service (0-2) otherwise the default of the mqtt plugin will be used :param payload_type: Optional: 'str', 'num', 'bool', 'list', 'dict', 'scene', 'bytes' :param item_type: Type of item (used for casting) :param bool_values: List with values used for representation of bool items :type source: str :type topic: str :type callback: str (if logic) or function (if called from MqttPlugin class) :type qos: int :type payload_type: str :type item_type: str or None :type bool_values: list or None """ if bool_values is None: bool_values = self.bool_values source_type = self._get_caller_type() self.logger.debug("'{}()' - called from {} by '{}()'".format(inspect.stack()[0][3], source_type, inspect.stack()[1][3])) # self.logger.debug("subscribe_topic: inspect.stack()[2][3] = '{}', inspect.stack()[3][3] = '{}'".format(inspect.stack()[2][3], inspect.stack()[3][3])) if qos is None: qos = self.qos if bool_values: if not (isinstance(bool_values, list) and len(bool_values) == 2): self.logger.warning("subscribe_topic: topic '{}', source '{}': Invalid bool_values specified ('{}') - Ignoring bool_values".format(topic, source, bool_values)) if not payload_type.lower() in ['str', 'num', 'bool', 'list', 'dict', 'scene', 'bytes', 'dict/str']: self.logger.warning("Invalid payload-datatype '{}' specified for {} '{}', ignored".format(payload_type, source_type, callback)) payload_type = 'str' if not self._subscribed_topics.get(topic, None): # self.logger.info("subscribe_topic: No MQTT Subscription to topic '{}' exists yet, adding topic".format(topic)) self.logger.info("subscribe_topic: Adding topic '{}'".format(topic)) with self._subscribed_topics_lock: # add topic self._subscribed_topics[topic] = {} # add subscription definition self._add_subscription_definition(topic, source, source_type, callback, payload_type, bool_values, qos) # subscribe to topic try: result, mid = self._client.subscribe(topic, qos=qos) self.logger.info("subscribe_topic: mqtt module is subscribing to topic '{}' with qos={} at broker (result={}, mid={})".format(topic, qos, result, mid)) except Exception as e: self.logger.critical("subscribe_topic: mqtt module tried to subscribe to topic '{}' for callback {}, exception: {}".format(topic, callback, e)) else: self.logger.info("subscribe_topic: A MQTT Subscription to topic '{}' already exists".format(topic)) with self._subscribed_topics_lock: # add subscription definition self._add_subscription_definition(topic, source, source_type, callback, payload_type, bool_values, qos) return
[Doku] def unsubscribe_topic(self, source, topic): """ method to unsubscribe from a topic this function is to be called from plugins, which are utilizing the mqtt module :param source: name of logic which want's to publish a topic :param topic: topic to unsubscribe from """ source_type = self._get_caller_type() self.logger.debug("'{}()' - called from {} by '{}()'".format(inspect.stack()[0][3], source_type, inspect.stack()[1][3])) if not self._subscribed_topics.get(topic, None): # the topic is not subscribed self.logger.info("unsubscribe_topic: NO MQTT Subscription to topic '{}' exists".format(topic)) return if not self._subscribed_topics[topic].get(source, None): # the topic is not subscribed by this source self.logger.info("unsubscribe_topic: Topic '{}' is not subscribed by '{}'".format(topic, source)) return self.logger.info("unsubscribe_topic: Subscription to topic '{}' for '{}' is removed".format(topic, source)) needUnsubscribe = False with self._subscribed_topics_lock: # delete subscription-source for this topic del self._subscribed_topics[topic][source] if self._subscribed_topics[topic] == {}: # unsubscribe on broker needed, if no source is subscribing the topic any more del self._subscribed_topics[topic] needUnsubscribe = True if needUnsubscribe: # Unsubscribe without lock (to avoid deadlock) self._client.unsubscribe(topic) return
def _trigger_logic(self, subscription_dict, topic, payload): """ This method is called by on_mqtt_message to trigger the right logic :param subscription_dict: :param topic: Topic of message received via mqtt :param payload: Payload of message received via mqtt :return: :rtype: bool """ datatype = subscription_dict.get('payload_type', 'foo') bool_values = subscription_dict.get('bool_values', None) payload = self.cast_from_mqtt(datatype, payload, bool_values) logic = subscription_dict.get('callback', None) subscription_found = False if logic: self.logger.info("_trigger_logic: Using topic '{}', payload '{} (type {})' for triggering logic '{}'".format(topic, payload, datatype, logic)) self._sh.logics.trigger_logic(logic, source='mqtt', by=topic, value=payload) subscription_found = True return subscription_found def _callback_to_plugin(self, plugin_name, subscription_dict, topic, payload, qos, retain): """ This method is called by on_mqtt_message to callback the right plugin :param plugin_name: Name of the plugin with the callback function :param subscription_dict: :param topic: Topic of message received via mqtt :param payload: Payload of message received via mqtt :param qos: :param retain: :return: :rtype: bool """ datatype = subscription_dict.get('payload_type', 'foo') bool_values = subscription_dict.get('bool_values', None) payload = self.cast_from_mqtt(datatype, payload, bool_values) plugin = subscription_dict.get('callback', None) subscription_found = False if plugin: self.logger.info("_callback_to_plugin: Using topic '{}', payload '{}' (type {}) for callback to plugin '{}' {}".format(topic, payload, datatype, plugin_name, plugin)) #s elf._sh.logics.trigger_logic(logic, source='mqtt', by=topic, value=payload) plugin(topic, payload, qos, retain) subscription_found = True else: self.logger.error("_callback_to_plugin: callback for plugin '{}' not defined".format(plugin_name)) return subscription_found def _on_mqtt_message(self, client, userdata, message): """ Callback function to handle received messages for items and logics :param client: the client instance for this callback :param userdata: the private user data as set in Client() or userdata_set() :param message: an instance of MQTTMessage. This is a class with members topic, payload, qos, retain. """ self.logger.debug("_on_mqtt_message: RECEIVED topic '{}', payload '{}, QoS '{}', retain '{}'".format(message.topic, message.payload, message.qos, message.retain)) with self._subscribed_topics_lock: subscibed_topics = list(self._subscribed_topics.keys()) # look for subscriptions to the received topic subscription_found = False for topic in subscibed_topics: topics_equal = False if (topic.find('+') != -1) or (topic.find('#') != -1): topics_equal = False wc_topic = topic.split('/') msg_topic = message.topic.split('/') equal = False if (len(wc_topic) == len(msg_topic)) or (wc_topic[len(wc_topic) - 1] == '#' and (len(wc_topic) <= len(msg_topic))): equal = True for i in range(len(wc_topic)): if not (wc_topic[i] == msg_topic[i] or wc_topic[i] == '+' or wc_topic[i] == '#'): equal = False if equal: topics_equal = True if (topic == message.topic) or topics_equal: topic_dict = self._subscribed_topics[topic] for subscription in list(topic_dict): self.logger.debug("_on_mqtt_message: subscription '{}': {}".format(subscription, topic_dict[subscription])) subscriber_type = topic_dict[subscription].get('subscriber_type', None) try: if subscriber_type == 'plugin': subscription_found = self._callback_to_plugin(subscription, topic_dict[subscription], message.topic, message.payload, message.qos, message.retain) elif subscriber_type == 'logic': subscription_found = self._trigger_logic(topic_dict[subscription], message.topic, message.payload) else: self.logger.error("_on_mqtt_message: received topic for unknown subscriber_type '{}'".format(subscriber_type)) except UnicodeDecodeError: self.logger.warning(f"_on_mqtt_message: received ill-formed message with topic '{message.topic}', payload '{message.payload}', discarding") return if not subscription_found: if not self._handle_broker_infos(message): self.logger.error("_on_mqtt_message: Received topic '{}', payload '{}', QoS '{}', retain '{}' WITHOUT matching item/logic".format( message.topic, message.payload, message.qos, message.retain)) # ---------------------------------------------------------------------------------------- def _get_qos_forTopic(self, topic, item): """ Return the configured QoS for a topic/item as an integer :param item: item to get the QoS for :return: Quality of Service (0..2) """ #qos = self.get_iattr_value(item.conf, 'mqtt_qos') #qos = item.get('qos', None) #qos = self._subscribed_topics[topic]['qos'] qos = None if qos == None: qos = self.qos return int(qos) def _on_mqtt_log(self, client, userdata, level, buf): if str(buf).startswith('Caught exception'): self.logger.error(f"_on_log: {buf} - client={client}, userdata={userdata}, level={level}") else: self.logger.debug(f"_on_log: {buf} - client={client}, userdata={userdata}, level={level} - dir(client)={dir(client)}") self.logger.debug(f"_on_log: {buf} - client={client}, userdata={userdata}, level={level} - dir(client.__class__)={dir(client.__class__)}") return def _subscribe_broker_infos(self): """ Subscribe to broker's infos This method is called from on_connect """ self._client.subscribe('$SYS/broker/version', qos=0) self._client.subscribe('$SYS/broker/clients/active', qos=0) self._client.subscribe('$SYS/broker/subscriptions/count', qos=0) self._client.subscribe('$SYS/broker/messages/stored', qos=0) if self.broker_monitoring: self._client.subscribe('$SYS/broker/uptime', qos=0) self._client.subscribe('$SYS/broker/retained messages/count', qos=0) self._client.subscribe('$SYS/broker/load/messages/received/1min', qos=0) self._client.subscribe('$SYS/broker/load/messages/received/5min', qos=0) self._client.subscribe('$SYS/broker/load/messages/received/15min', qos=0) self._client.subscribe('$SYS/broker/load/messages/sent/1min', qos=0) self._client.subscribe('$SYS/broker/load/messages/sent/5min', qos=0) self._client.subscribe('$SYS/broker/load/messages/sent/15min', qos=0) return def _handle_broker_infos(self, message): if message.topic == '$SYS/broker/clients/active': self._broker['active_clients'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/subscriptions/count': self._broker['subscriptions'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/messages/stored': self._broker['stored_messages'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/retained messages/count': self._broker['retained_messages'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/uptime': self._broker['uptime'] = message.payload.decode('utf-8').split(' ')[0] elif message.topic == '$SYS/broker/load/messages/received/1min': self._broker['msg_rcv_1min'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/load/messages/received/5min': self._broker['msg_rcv_5min'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/load/messages/received/15min': self._broker['msg_rcv_15min'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/load/messages/sent/1min': self._broker['msg_snt_1min'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/load/messages/sent/5min': self._broker['msg_snt_5min'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/load/messages/sent/15min': self._broker['msg_snt_15min'] = message.payload.decode('utf-8') elif message.topic == '$SYS/broker/version': self._log_brokerinfo(message.payload) self._broker['version'] = message.payload.decode('utf-8') # self._client.unsubscribe('$SYS/broker/version') else: return False self.logger.debug("_handle_broker_infos: $SYS/broker info = '{}'".format(self._broker)) return True def _unsubscribe_broker_infos(self): """ Unsubscribe from broker's infos This method is called from DisconnectFromBroker """ self._client.unsubscribe('$SYS/broker/version') self._client.unsubscribe('$SYS/broker/clients/active') self._client.unsubscribe('$SYS/broker/subscriptions/count') self._client.unsubscribe('$SYS/broker/messages/stored') if self.broker_monitoring: self._client.unsubscribe('$SYS/broker/uptime') self._client.unsubscribe('$SYS/broker/retained messages/count') self._client.unsubscribe('$SYS/broker/load/messages/received/1min') self._client.unsubscribe('$SYS/broker/load/messages/received/5min') self._client.unsubscribe('$SYS/broker/load/messages/received/15min') self._client.unsubscribe('$SYS/broker/load/messages/sent/1min') self._client.unsubscribe('$SYS/broker/load/messages/sent/5min') self._client.unsubscribe('$SYS/broker/load/messages/sent/15min') return def _on_connect(self, client, userdata, flags, reason_code, properties=None): """ Callback function called on connect """ if reason_code == 0: if self._got_disconnected: self.logger.notice("Reconnected to broker") self._got_disconnected = False else: self.logger.info(f"Connection returned result '{str(reason_code)}' (userdata={userdata})") self._connected = True self._subscribe_broker_infos() # subscribe to topics to listen for items for topic in self._subscribed_topics: item = self._subscribed_topics[topic] self._client.subscribe(topic, qos=self._get_qos_forTopic(topic, item)) self.logger.info(f"self._subscribed_topics = {self._subscribed_topics}") return msg = f"Connection returned result '{reason_code}': {str(reason_code)} (client={client}, userdata={userdata}, self._client={self._client})" if (self.paho_ver == 1 and reason_code == 5) or (self.paho_ver == 2 and reason_code == 'Not authorized'): self.logger.error(msg) self._disconnect_from_broker() else: self.logger.warning(msg) def _on_disconnect(self, client, userdata, reason_code, properties=None): """ Callback function called on disconnect """ if reason_code == 0: self.logger.info(f"Disconnection was successful ({reason_code})") elif reason_code == "Unspecified error": self.logger.warning(f"Disconnected from broker with error '{reason_code}'") self._got_disconnected = True else: self.logger.notice(f"Disconnection returned result '{reason_code}'") return # --------------------------------------------------------------------------------- # Following functions build the interface for other plugins which want to use MQTT # def _get_caller_type(self): """ determine if called from logic or plugin :return: caller type ('Plugin' | 'Logic' | 'Unknown') :rtype: str """ caller = inspect.stack()[2][1] split = caller.split('/') self.logger.debug("_get_caller_type: inspect.stack()[2][1] = '{}', split = {}".format(caller, split)) if split[-3] == 'lib' and split[-2] == 'model': source_type = 'Plugin' elif split[-3] == 'plugins': source_type = 'Plugin' elif split[-2] == 'logics': source_type = 'Logic' else: source_type = 'Unknown' self.logger.info("_get_caller_type: inspect.stack()[2][1] = '{}', split = {}".format(caller, split)) return source_type
[Doku] def publish_topic(self, source, topic, payload, qos=None, retain=False, bool_values=None): """ method to publish a topic this method is to be called from plugins or logics :param source: name of plugin or logic which want's to publish a topic :param topic: topic to publish to :param payload: payload to publish :param qos: quality of service (optional) otherwise the default of the mqtt plugin will be used :param retain: retain flag (optional) """ if bool_values is None: bool_values = self.bool_values source_type = self._get_caller_type() self.logger.info("'{}()' - called from {} by '{}()'".format(inspect.stack()[0][3], source_type, inspect.stack()[1][3])) # self.logger.info("inspect.stack()[1][1] = '{}', split = {}".format(inspect.stack()[1][1], inspect.stack()[1][1].split('/'))) if not self._connected: return False if qos is None: qos = self.qos self.logger.info("{} '{}' is publishing topic '{}' with payload '{}' (qos={}, retain={})".format(source_type, source, topic, payload, qos, retain)) payload = self.cast_to_mqtt(payload, bool_values) try: self._client.publish(topic=topic, payload=payload, qos=qos, retain=retain) self.logger.info("{} '{}' has published topic '{}' with payload '{}'".format(source_type, source, topic, payload)) except Exception as e: self.logger.error("{}: Publish exception '{}'".format(inspect.stack()[0][3], e)) return False return True
# ---------------------------------------------------------------------------------------- # casting methods # ----------------------------------------------------------------------------------------
[Doku] def cast_from_mqtt(self, datatype, raw_data, bool_values=None): """ Cast payload data to SmartHomeNG datatypes :param datatype: datatype to which the data should be casted to :param raw_data: data as received from the mqtt broker :type datatype: str :type raw_data: str :return: data casted to the datatype of the item it should be written to :rtype: str | bool | list | dict """ if bool_values is None: bool_values = self.bool_values if isinstance(raw_data, bytes): str_data = raw_data.decode('utf-8') else: self.logger.info("cast_from_mqtt: type(raw_data)={}".format(type(raw_data))) str_data = str(raw_data) if datatype == 'bytes': data = raw_data if datatype == 'str': data = str(str_data) elif datatype == 'num': data = str_data elif datatype == 'bool': self.logger.debug("cast_from_mqtt: datatype 'bool', str_data = '{}', bool_values = ‘{}‘".format(str_data, bool_values)) if bool_values: try: data = bool(bool_values.index(str_data.strip())) except: data = Utils.to_bool(str_data, default=False) else: data = Utils.to_bool(str_data, default=False) elif datatype == 'list': if not ((len(str_data) > 0) and (str_data[0] == '[')): str_data = '[' + str_data + ']' try: data = json.loads(str_data) except Exception as e: self.logger.error("cast_from_mqtt: datatype 'list', error '{}', data = ‘{}‘".format(e, str_data)) data = str_data elif datatype == 'dict': try: if str_data == '': data = {} else: data = json.loads(str_data) except Exception as e: self.logger.error("cast_from_mqtt: datatype 'dict', error '{}', data = ‘{}‘".format(e, str_data)) data = {} elif datatype == 'scene': data = '0' if Utils.is_int(str_data): if (int(str_data) >= 0) and (int(str_data) < 0): data = str_data elif datatype == 'foo': data = raw_data elif datatype == 'dict/str': try: if str_data == '': data = {} else: data = json.loads(str_data) except Exception as e: data = str_data else: self.logger.warning("cast_from_mqtt: Casting '{}' to '{}' is not implemented".format(raw_data, datatype)) data = raw_data return data
[Doku] def cast_to_mqtt(self, data, bool_values=None): """ Cast SmartHomeNG datatypes to payload data :param data: data which should be casted to a payload compatible format :type data: str | bool | int | float | list | dict | bytes :return: data casted from the SmartHomeNG datatype to str to be written to payload :rtype: str """ if bool_values is None: bool_values = self.bool_values try: self.logger.debug("cast_to_mqtt: data = '{}', type(data) = '{}', bool_values ='{}'".format(data, type(data), bool_values)) if isinstance(data, bytes): payload_data = data elif isinstance(data, str): payload_data = data elif isinstance(data, bool): self.logger.info(" : data = '{}', type(data) = '{}', bool_values ='{}'".format(data, type(data), bool_values)) if bool_values: payload_data = str(bool_values[data]) else: payload_data = 'true' if data else 'false' self.logger.info(" : payload_data = '{}', type(payload_data) = '{}', bool_values ='{}'".format(payload_data, type(payload_data), bool_values)) elif isinstance(data, int): payload_data = str(data) elif isinstance(data, float): payload_data = str(data) elif isinstance(data, list): payload_data = json.dumps(data) elif isinstance(data, dict): payload_data = json.dumps(data) else: self.logger.warning("cast_to_mqtt: Casting '{}' type = '{}' to payload fat is not implemented".format(data, type(data))) payload_data = str(data) except Exception as e: self.logger.error("cast_to_mqtt: Cast exception'{}'".format(e)) return payload_data