Quellcode für lib.metadata
#!/usr/bin/env python3
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
#########################################################################
# Copyright 2017- Martin Sinn m.sinn@gmx.de
#########################################################################
# This file is part of SmartHomeNG
#
# SmartHomeNG is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SmartHomeNG is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SmartHomeNG If not, see <http://www.gnu.org/licenses/>.
#########################################################################
import logging
import os
import sys
import collections
from lib.utils import Utils
from lib.utils import Version
import lib.shyaml as shyaml
from lib.constants import (YAML_FILE, FOO, META_DATA_TYPES, META_DATA_DEFAULTS, DIR_PLUGINS, DIR_MODULES)
from lib.model.sdp.globals import SDP_VERSION
META_MODULE_PARAMETER_SECTION = 'parameters'
META_PLUGIN_SECTION = 'plugin'
META_PLUGIN_PARAMETER_SECTION = 'parameters'
META_PLUGIN_ITEMATTRIBUTE_SECTION = 'item_attributes'
META_PLUGIN_ITEMATTRIBUTEPREFIX_SECTION = 'item_attribute_prefixes'
META_PLUGIN_LOGIC_PARAMETER_SECTION = 'logic_parameters'
META_PLUGIN_FUNCTION_SECTION = 'plugin_functions'
META_STRUCT_SECTION = 'item_structs'
# META_DATA_TYPES=['bool', 'int', 'float','num', 'scene', 'str', ['list','list(subtype)'], 'dict', 'ip', 'ipv4', 'ipv6', 'mac', 'knx_ga', 'foo']
# META_DATA_DEFAULTS={'bool': False, 'int': 0, 'float': 0.0, 'scene': 0, 'str': '', 'list': [], 'dict': {}, 'OrderedDict': {}, 'num': 0, 'scene': 0, 'ip': '0.0.0.0', 'ipv4': '0.0.0.0', 'mac': '00:00:00:00:00:00', 'knx_ga': '', 'foo': None}
logger = logging.getLogger(__name__)
# global variables to take definitions of multiple plugins
all_itemdefinitions = {}
all_itemprefixdefinitions = {}
all_prefixes_tuple = None
[Doku]class Metadata():
_version = '?'
def __init__(self, sh, addon_name, addon_type, classpath=''):
"""
Initialzes the metadata for an addon (plugin or module) from the definition file
:param sh: SmartHomeNG main object
:param addon_name:
:param addon_type: 'plugin' or 'module'
:param classpath:
:type sh: object
:type addon_name: str
:type addon_type: str
:type classpath: str
"""
global all_itemdefinitions
global all_itemprefixdefinitions
self._sh = sh
self._addon_name = addon_name.lower()
self._addon_type = addon_type
self._log_premsg = "{} '{}': ".format(addon_type, self._addon_name)
# logger.warning(self._log_premsg + "classpath = '{}'".format(classpath))
if classpath == '':
if addon_type == 'plugin':
addon_type_dir = DIR_PLUGINS
elif addon_type == 'module':
addon_type_dir = DIR_MODULES
else:
return
self.relative_filename = os.path.join(addon_type_dir, self._addon_name, addon_type + YAML_FILE)
else:
self.relative_filename = os.path.join(classpath.replace('.', os.sep), addon_type + YAML_FILE)
# logger.warning(self._log_premsg + "relative_filename = '{}'".format(self.relative_filename))
# read complete definitions from metadata file
filename = os.path.join(self._sh.get_basedir(), self.relative_filename)
self.meta = shyaml.yaml_load(filename, ordered=True)
self.parameters = None
self._paramlist = []
self.itemdefinitions = None
self.itemprefixdefinitions = None
self.all_itemprefixdefinitions = {}
self._itemdeflist = []
self.itemstructs = None
self._itemstructlist = []
self.logic_parameters = None
self._logic_paramlist = []
self.plugin_functions = None
self._plugin_functionlist = []
# Add dummy prefixes only, if all_itemprefixdefinitions is still empty
if all_itemprefixdefinitions == {}:
# dummy 'my_' prefix for user's attributes for logics, etc.
prefix_name = 'my_'
all_itemprefixdefinitions[prefix_name] = {'type': 'foo',
'description': {'de': 'Attribute für verschiedene Tests',
'en': 'Attributes for various tests'},
'listtype': ['foo'], 'listlen': 0, '_addon_name': 'lib_metadata',
'_addon_type': 'plugin', '_name': prefix_name, '_type': 'prefix'}
# dummy '_' prefix for hidden user's attributes for logics, etc.
prefix_name = '_'
all_itemprefixdefinitions[prefix_name] = {'type': 'foo',
'description': {'de': 'Attribute für internes Handling (z.B. in structs',
'en': 'Attributes for internal handling (e.g. in structs)'},
'listtype': ['foo'], 'listlen': 0, '_addon_name': 'lib_metadata',
'_addon_type': 'plugin', '_name': prefix_name, '_type': 'prefix'}
logger.info(f"Definierte spezielle Präfixe für Namen von Attributen: {list(all_itemprefixdefinitions.keys())}")
if self.meta is not None:
# read paramter and item definition sections
if self._addon_type == 'module':
self.parameters = self.meta.get(META_MODULE_PARAMETER_SECTION)
self.itemstructs = self.meta.get(META_STRUCT_SECTION)
else:
self.global_parameters = self.get_global_plugin_parameters()
self.parameters = self.meta.get(META_PLUGIN_PARAMETER_SECTION, {})
if self.parameters is None or isinstance(self.parameters, str):
# if plugin parameter section is empty or is declared as NONE
self.parameters = self.global_parameters
else:
self.parameters.update(self.global_parameters)
self.itemdefinitions = self.meta.get(META_PLUGIN_ITEMATTRIBUTE_SECTION)
self.itemprefixdefinitions = self.meta.get(META_PLUGIN_ITEMATTRIBUTEPREFIX_SECTION)
self.itemstructs = self.meta.get(META_STRUCT_SECTION)
self.logic_parameters = self.meta.get(META_PLUGIN_LOGIC_PARAMETER_SECTION)
self.plugin_functions = self.meta.get(META_PLUGIN_FUNCTION_SECTION)
# test validity of parameter definition section
if self.parameters is not None:
if self.parameters == 'NONE':
self.parameters = None
else:
for param_name in self.parameters.keys():
if self.parameters.get(param_name, None) is not None:
self.parameters[param_name]['_name'] = param_name
self.parameters[param_name]['_type'] = 'parameter'
self._paramlist = list(self.parameters.keys())
logger.info(self._log_premsg + "Metadata paramlist = '{}'".format(str(self._paramlist)))
if self.parameters is not None:
self._test_definitions(self._paramlist, self.parameters)
else:
logger.debug(self._log_premsg + "has no parameter definitions in metadata")
# test validity of item definition section
if self.itemdefinitions is not None:
if self.itemdefinitions == 'NONE':
self.itemdefinitions = None
else:
self._itemdeflist = list(self.itemdefinitions.keys())
logger.info(self._log_premsg + "Metadata itemdeflist = '{}'".format(str(self._itemdeflist)))
if self.itemdefinitions is not None:
self._test_definitions(self._itemdeflist, self.itemdefinitions)
else:
logger.debug(self._log_premsg + "has no item definitions in metadata")
# test validity of item-prefix definition section
if self.itemprefixdefinitions is not None:
if self.itemprefixdefinitions == 'NONE':
self.itemprefixdefinitions = None
else:
self._itemprefixdeflist = list(self.itemprefixdefinitions.keys())
logger.info(self._log_premsg + "Metadata itemprefixdeflist = '{}'".format(str(self._itemprefixdeflist)))
if self.itemprefixdefinitions is not None:
self._test_definitions(self._itemprefixdeflist, self.itemprefixdefinitions)
else:
logger.debug(self._log_premsg + "has no item definitions in metadata")
# build dict for checking of item attributes and their values
if self.itemdefinitions is not None:
for attr_name in self.itemdefinitions:
all_itemdefinitions[attr_name] = self.itemdefinitions[attr_name]
all_itemdefinitions[attr_name]['_addon_name'] = self._addon_name
all_itemdefinitions[attr_name]['_addon_type'] = self._addon_type
all_itemdefinitions[attr_name]['_name'] = attr_name
all_itemdefinitions[attr_name]['_type'] = 'attribute'
# build dict for checking of item attributes and their values
if self.itemprefixdefinitions is not None:
# add all prefixes loaded from metadate of the plugin
for prefix_name in self.itemprefixdefinitions:
all_itemprefixdefinitions[prefix_name] = self.itemprefixdefinitions[prefix_name]
all_itemprefixdefinitions[prefix_name]['_addon_name'] = self._addon_name
all_itemprefixdefinitions[prefix_name]['_addon_type'] = self._addon_type
all_itemprefixdefinitions[prefix_name]['_name'] = prefix_name
all_itemprefixdefinitions[prefix_name]['_type'] = 'prefix'
# test validity of logic-parameter definition section
if self.logic_parameters is not None:
if self.logic_parameters == 'NONE':
self.logic_parameters = None
else:
self._logic_paramlist = list(self.logic_parameters.keys())
logger.info(self._log_premsg + "Metadata logic_paramlist = '{}'".format(str(self._logic_paramlist)))
if self.logic_parameters is not None:
self._test_definitions(self._logic_paramlist, self.logic_parameters)
else:
logger.debug(self._log_premsg + "has no logic-parameter definitions in metadata")
# test validity of plugin-function definition section
if self.plugin_functions is not None:
if self.plugin_functions == 'NONE':
self.plugin_functions = None
else:
self._plugin_functionlist = list(self.plugin_functions.keys())
logger.info(self._log_premsg + "Metadata plugin_functionlist = '{}'".format(str(self._plugin_functionlist)))
if self.plugin_functions is not None:
# self._test_definitions(self._plugin_functionlist, self.plugin_functions)
pass
dummy = self.get_plugin_function_defstrings(with_type=False, with_default=False)
dummy = self.get_plugin_function_defstrings(with_type=True, with_default=False)
dummy = self.get_plugin_function_defstrings(with_type=False, with_default=True)
dummy = self.get_plugin_function_defstrings(with_type=True, with_default=True)
else:
logger.debug(self._log_premsg + "has no plugin-function definitions in metadata")
# test validity of structs definition section
if self.itemstructs is not None:
if self.itemstructs == 'NONE':
self.itemstructs = None
else:
logger.info(self._log_premsg + "Metadata itemstructlist = '{}'".format(self._itemstructlist))
# for struct in self._itemstructlist:
# for i in self.itemstructs[struct]:
# self.itemstructs[struct][i] = dict(self.itemstructs[struct][i])
# for si in self.itemstructs[struct][i]:
# if type(self.itemstructs[struct][i][si]) is collections.OrderedDict:
# self.itemstructs[struct][i][si] = dict(self.itemstructs[struct][i][si])
# logger.info(self._log_premsg + "Metadata itemstruct '{}' = '{}'".format(struct, dict(self.itemstructs[struct])))
# if self.itemstructs is not None:
# self._test_definitions(self._itemdeflist, self.itemdefinitions)
# pass
else:
logger.info(self._log_premsg + "has no item-struct definitions in metadata")
# Read global metadata for addon (either 'plugin' or 'module'
if self.meta is not None:
self.addon_metadata = self.meta.get(addon_type)
else:
self.addon_metadata = None
return
[Doku] def get_global_plugin_parameters(self):
result = {}
if self._sh.modules.get_module('http') is not None:
# only if http module is loaded:
# global plugin parameter 'webif_pagelength'
result['webif_pagelength'] = {}
result['webif_pagelength']['type'] = 'int'
result['webif_pagelength']['valid_list'] = [-1, 0, 25, 50, 100]
result['webif_pagelength']['description'] = {}
# get description of webif_pagelength-parameter in all available laguages
result['webif_pagelength']['description'] = self._sh.modules.get_module('http')._metadata.meta['parameters']['webif_pagelength'].get('description', {'en': 'No description found!'})
try:
result['webif_pagelength']['default'] = self._sh.modules.get_module('http')._webif_pagelength
except:
result['webif_pagelength']['default'] = 0
self.pluginsettings = self.meta.get(META_PLUGIN_SECTION)
if self.pluginsettings.get('multi_instance', False):
# only for multi-instance Plugins:
# global plugin parameter 'instance'
result['instance'] = {}
result['instance']['type'] = 'str'
result['instance']['description'] = {}
result['instance']['description']['de'] = "Falls mehrere Instanzen eines Multi-Instance Plugins konfiguriert sind, muss hier ein eindeutiger Instanz-Name angegeben werden (eine Instanz darf ohnen Namen bleiben). Falls nur eine Instanz konfiguriert ist, sollte hier kein Name vergeben werden."
result['instance']['description']['en'] = "If several instances of a multi-instance plugin are configured, a unique instance name must be specified here (one instance may remain without a name). If only one instance is configured, no name should be assigned here."
result['instance']['description']['fr'] = "Si plusieurs instances d'un plug-in multi-instance sont configurées, un nom d'instance unique doit être spécifié ici (une instance peut rester sans nom). Si une seule instance est configurée, aucun nom ne doit être attribué ici."
return result
[Doku] def get_plugin_function_defstrings(self, with_type=False, with_default=True):
"""
Build the documentation strings of the plugin's functions
used e.g. for code completion in logic editor
"""
docstr_list = []
if self.plugin_functions is not None:
for f in sorted(self.plugin_functions):
fp = ''
func_param_yaml = self.plugin_functions[f].get('parameters', None)
if func_param_yaml is not None:
for par in func_param_yaml:
if fp != '':
fp += ', '
fp += par
if with_type:
if func_param_yaml[par].get('type', None) != None:
type = str(func_param_yaml[par].get('type', None))
fp += ':' + type
if with_default:
if func_param_yaml[par].get('default', None) != None:
default = str(func_param_yaml[par].get('default', None))
if func_param_yaml[par].get('type', 'foo') == 'str':
if default == 'None*':
default = 'None'
else:
default = " '" + default + "'"
fp += '=' + default
docstr_list.append(f + '(' + fp + ')')
logger.info(self._log_premsg + "Metadata get_plugin_function_defstrings -> '{}'".format(docstr_list))
return docstr_list
def _test_definitions(self, definition_list, definition_dict):
"""
Test parameter or item-attribute definitions for validity
"""
definition_list = list(definition_dict.keys())
# logger.warning(self._log_premsg + "Metadata definition_list = '{}'".format(definition_list))
for definition in definition_list:
if definition_dict[definition] is not None:
typ = str(definition_dict[definition].get('type', FOO)).lower()
# to be implemented: timeframe
definition_dict[definition]['listtype'] = [FOO]
definition_dict[definition]['listlen'] = 0
if definition_dict[definition].get('type', FOO) == 'list':
logger.debug(self._log_premsg + "definition = '{}' of type '{}'".format(definition, str(definition_dict[definition].get('type', FOO)).lower()))
if not (typ in META_DATA_TYPES):
# test for list with specified datatype
if typ.startswith('list(') and typ.endswith(')'):
logger.debug(self._log_premsg + "definition = '{}' of type '{}'".format(definition, str(definition_dict[definition].get('type', FOO)).lower()))
definition_dict[definition]['type'] = 'list'
listparam = typ[5:]
listparam = listparam[:-1].strip().split(',')
if len(listparam) > 0:
if Utils.is_int(listparam[0]):
l = int(listparam[0])
if l < 0:
l = 0
definition_dict[definition]['listlen'] = l
listparam.pop(0)
if len(listparam) == 0:
listparam = [FOO]
subtyp = ''
if len(listparam) > 0:
listparam2 = []
for i in range(0,len(listparam)):
if listparam[i].strip() in META_DATA_TYPES:
listparam2.append(listparam[i].strip())
else:
listparam2.append(FOO)
logger.error(self._log_premsg + "definition = '{}': Invalid subtype '{}' specified, using '{}' instead (META_DATA_TYPES={})".format(definition, listparam[i], FOO, META_DATA_TYPES))
listparam = listparam2
definition_dict[definition]['listtype'] = listparam
else:
logger.error(self._log_premsg + "Invalid definition in metadata file '{}': type '{}' for parameter '{}' -> using type '{}' instead".format(self.relative_filename, typ, definition, FOO))
definition_dict[definition]['type'] = FOO
if definition_dict[definition].get('type', FOO) == 'list':
logger.debug(self._log_premsg + "definition = '{}' list of subtype_list = {}, listlen={}".format(definition, definition_dict[definition]['listtype'], definition_dict[definition]['listlen']))
else:
logger.debug(self._log_premsg + "definition = '{}' list of listparam = >{}<, listlen={}".format(definition, definition_dict[definition]['listtype'], definition_dict[definition]['listlen']))
else:
logger.info(self._log_premsg + "definition = '{}'".format(definition))
return
def _strip_quotes(self, string):
if type(string) is str:
string = string.strip()
if len(string) >= 2:
if string[0] in ['"', "'"]: # check if string starts with ' or "
if string[0] == string[-1]: # and end with it
if string.count(string[0]) == 2: # if they are the only one
string = string[1:-1] # remove them
return string
# ------------------------------------------------------------------------
# Methods for global values
#
[Doku] def get_string(self, key):
"""
Return the value for a global key as a string
:param key: global key to look up (in section 'plugin' or 'module')
:type key: str
:return: value for the key
:rtype: str
"""
if self.addon_metadata == None:
return ''
return self.addon_metadata.get(key, '')
[Doku] def get_mlstring(self, mlkey):
"""
Return the value for a global multilanguage-key as a string
It trys to lookup th value for the default language.
If the value for the default language is empty, it trys to look up the value for English.
If there is no value for the default language and for English, it trys to lookup the value for German.
:param key: global multilanguage-key to look up (in section 'plugin' or 'module')
:type key: str
:return: value for the key
:rtype: str
"""
if self.addon_metadata is None:
return ''
key_dict = self.addon_metadata.get(mlkey)
if key_dict is None:
return ''
try:
result = key_dict.get(self._sh.get_defaultlanguage(), '')
except:
return ''
if result == '':
result = key_dict.get('en','')
if result == '':
result = key_dict.get('de','')
return result
[Doku] def get_bool(self, key):
"""
Return the value for a global key as a bool
:param key: global key to look up (in section 'plugin' or 'module')
:type key: str
:return: value for the key
:rtype: bool
"""
if self.addon_metadata is None:
return False
return Utils.to_bool(self.addon_metadata.get(key, ''))
[Doku] def test_shngcompatibility(self):
"""
Test if the actual running version of SmartHomeNG is in the range of supported versions for this addon (module/plugin)
:return: True if the SmartHomeNG version is in the supported range
:rtype: bool
"""
shng_version = Version.format(self._sh.version.split('-')[0])
min_shngversion = Version.format(str(self.get_string('sh_minversion')))
max_shngversion = Version.format(str(self.get_string('sh_maxversion')))
mod_version = Version.format(self.get_string('version'))
if min_shngversion != '':
# if min_shngversion > shng_version:
if Version.compare(min_shngversion, shng_version, '>'):
logger.error(f"{self._addon_type} '{self._addon_name}' {mod_version}: SmartHomeNG {shng_version} is too old for this {self._addon_type}. It requires at least version {Version.format(min_shngversion)}. The {self._addon_type} was not loaded.")
return False
if max_shngversion != '':
# if max_shngversion < shng_version:
if Version.compare(max_shngversion, shng_version, '<'):
logger.error(f"{self._addon_type} '{self._addon_name}' {mod_version}: SmartHomeNG {shng_version} is too new for this {self._addon_type}. It requires a version up to {Version.format(max_shngversion)}. The {self._addon_type} was not loaded.")
return False
return True
[Doku] def test_pythoncompatibility(self):
"""
Test if the actual running version of Python is in the range of supported versions for this addon (module/plugin)
:return: True if the Python version is in the supported range
:rtype: bool
"""
l = sys.version_info
py_version = Version.format(str(l[0])+'.'+str(l[1])+'.'+str(l[2]))
min_pyversion = Version.format(str(self.get_string('py_minversion')))
max = str(self.get_string('py_maxversion'))
if len(max.split('.')) == 2:
# if given max version has only two parts, make it the max for that version: 3.8 -> 3.8.999
max += '.999'
max_pyversion = Version.format(str(max))
mod_version = Version.format(self.get_string('version'))
#if min_pyversion != '' or max_pyversion != '':
# logger.notice(f"{self._addon_type} '{self._addon_name}' {mod_version}: Python Version: {py_version}, min: {min_pyversion}, max: {max_pyversion}")
if min_pyversion != '':
#self._compare_versions(min_pyversion, py_version, '>', (min_pyversion > py_version))
# if min_pyversion > py_version:
#if self._compare_versions(min_pyversion, py_version, '>', (min_pyversion > py_version)):
if Version.compare(min_pyversion, py_version, '>'):
logger.error(f"{self._addon_type} '{self._addon_name}' {mod_version}: The Python version {py_version} is too old for this {self._addon_type}. It requires at least version {min_pyversion}. The {self._addon_type} was not loaded.")
return False
if max_pyversion != '':
#self._compare_versions(max_pyversion, py_version, '<', (max_pyversion < py_version))
# if max_pyversion < py_version:
#if self._compare_versions(max_pyversion, py_version, '<', (max_pyversion < py_version)):
if Version.compare(max_pyversion, py_version, '<'):
logger.error(f"{self._addon_type} '{self._addon_name}' {mod_version}: The Python version {py_version} is too new for this {self._addon_type}. It requires a version up to {max_pyversion}. The {self._addon_type} was not loaded.")
return False
return True
[Doku] def test_sdpcompatibility(self):
"""
Test if the plugin is based on SDP and check if the version requirements match
:return: True if not SDP-based or the SDP version is in the supported range
:rtype: bool
"""
sdp_version = SDP_VERSION
min_sdpversion = Version.format(str(self.get_string('sdp_minversion')))
max_sdpversion = Version.format(str(self.get_string('sdp_maxversion')))
mod_version = Version.format(self.get_string('version'))
if min_sdpversion != '':
if Version.compare(min_sdpversion, sdp_version, '>'):
logger.error(f"{self._addon_type} '{self._addon_name}' {mod_version}: SmartDevicePlugin {sdp_version} is too old for this {self._addon_type}. It requires at least version {Version.format(min_sdpversion)}. The {self._addon_type} was not loaded.")
return False
if max_sdpversion != '':
if Version.compare(max_sdpversion, sdp_version, '<'):
logger.error(f"{self._addon_type} '{self._addon_name}' {mod_version}: SmartDevicePlugin {sdp_version} is too new for this {self._addon_type}. It requires a version up to {Version.format(max_sdpversion)}. The {self._addon_type} was not loaded.")
return False
return True
[Doku] def get_version(self):
"""
Returns the version of the addon
If test_version has been called before, the code_version is taken into account,
otherwise the version of the metadata-file is returned
:return: version
:rtype: str
"""
if self._version == '?':
self._version = self.get_string('version')
return self._version
[Doku] def test_version(self, code_version):
"""
Tests if the loaded Python code has a version set and compares it to the metadata version.
:param code_version: version of the python code
:type code_version: str
:return: True: version numbers match, or Python code has no version
:rtype: bool
"""
self._version = self.get_string('version')
if code_version is None:
logger.info("{} '{}' version not defined in Python code, metadata version is {}".format(self._addon_type, self._addon_name, self._version))
return True
else:
if 2 > code_version.count('.') > 4:
logger.warning(
"{} '{}' code version not compliant to plugin version schemas x.x.x or x.x.x.x ".format(
self._addon_type, self._addon_name))
if self._version == '':
logger.info("{} '{}' metadata contains no version number".format(self._addon_type, self._addon_name))
self._version = code_version
else:
if 2 > str(self._version).count('.') < 4:
logger.warning(
"{} '{}' metadata version not compliant to plugin version schemas x.x.x or x.x.x.x ".format(
self._addon_type, self._addon_name))
if str(code_version) != str(self._version):
logger.error("{} '{}' version differs between Python code ({}) and metadata ({})".format(self._addon_type, self._addon_name, str(code_version), self._version))
return False
return True
# ------------------------------------------------------------------------
# Methods for parameter/attribute checking
#
def _test_valuetype(self, typ, subtype, value):
"""
Returns True, if the value can be converted to the specified type
"""
# logger.warning(self._log_premsg + "_test_valuetype-list: typ={}, subtype={}, value={}".format(typ, subtype, value))
if typ == 'bool':
return Utils.to_bool(value, default='?') != '?'
elif typ == 'int':
return Utils.is_int(value)
elif typ in ['float','num']:
return Utils.is_float(value)
elif typ == 'scene':
if Utils.is_int(value):
return (int(value) >= 0) and (int(value) < 256)
else:
return False
elif typ in ['str','password']:
return True # Everything can be converted to a string
elif typ == 'list':
if subtype != '' and subtype != FOO:
result = True
if isinstance(value, list):
for i in range(0, len(value)):
if i < len(subtype):
sub = subtype[i]
else:
sub = subtype[len(subtype)-1]
if not self._test_valuetype(sub, '', value[i]):
result = False
# logger.warning("_test_valuetype: value[{}] = {}, sub = {}, result = False".format(i, value[i], sub))
# logger.warning("_test_valuetype: value = {}, type(value) = {}, typ = {}, subtype = {}".format(value, type(value), typ, subtype))
return result
return (type(value) is list)
elif typ == 'dict':
try:
d = dict(value)
except:
import ast
try:
d = ast.literal_eval(value)
except:
return False
return (isinstance(d,dict))
elif typ == 'ip':
if Utils.is_ipv4(value):
return True
if Utils.is_ipv6(value):
return True
return Utils.is_hostname(value)
elif typ == 'ipv4':
return Utils.is_ipv4(value)
elif typ == 'ipv6':
return Utils.is_ipv6(value)
elif typ == 'mac':
return Utils.is_mac(value)
elif typ == 'knx_ga':
return Utils.is_knx_groupaddress(value)
elif typ == FOO:
return True
def _test_value(self, value, definition):
"""
Returns True, if the value can be converted to specified type
:param value: value to be checked
:param definition: definition dict of parameter/attribute
:type value: str
:type definition: dict
:return: True, if value can be converted
:rtype: bool
"""
if definition is not None:
typ = definition.get('type', 'foo')
subtype = ''
if typ == 'list':
subtype = definition.get('listtype', ['?'])
# if subtype != '':
# logger.warning("_test_value '{}': before _test_valuetype(): typ = {}, subtype = {}, value = {}".format(param, typ, subtype, value))
return self._test_valuetype(typ, subtype, value)
return False
def _expand_listvalues(self, value, definition):
"""
If a parameter is defined as a list, but the value is of a basic datatype,
value is expanded to a list. In all other cases, the value is returned nuchanged
:param value: value to be expanded
:param definition: definition dict of parameter/attribute
:type value: str
:type definition: dict
:return: expanded value
"""
result = value
if definition is not None:
typ = definition.get('type', 'foo')
if (typ == 'list') and (not isinstance(value, list)):
result = Utils.string_to_list(value)
# if (typ == 'list'):
# logger.warning(self._log_premsg + "_expand_listvalues: value = >{}<, type(value) = >{}<, result = >{}<, type(result) = >{}<".format(value, type(value), result, type(result)))
return result
def _convert_valuetotype(self, typ, value):
"""
Returns the value converted to the parameters type
"""
if typ == 'bool':
result = Utils.to_bool(value)
elif typ in ['int','scene']:
result = int(value)
elif typ in ['float','num']:
result = float(value)
elif typ in ['str','password']:
result = str(value)
elif typ == 'list':
if isinstance(value, list):
result = value
else:
result = [value]
elif typ == 'dict':
try:
result = dict(value)
except:
import ast
try:
result = ast.literal_eval(value)
except:
result = {}
elif typ in ['ip', 'ipv4', 'ipv6', 'mac']:
result = str(value)
elif typ in ['knx_ga']:
result = str(value)
elif typ == FOO:
result = value
else:
logger.error(self._log_premsg + "unhandled type {}".format(typ))
return result
def _convert_value(self, value, definition, is_default=False):
"""
Returns the value converted to the parameters type
"""
result = False
if definition is not None:
typ = definition.get('type', 'foo')
result = self._convert_valuetotype(typ, value)
orig = result
if 'valid_list_ci' in definition.keys():
orig = str(orig).lower()
result = self._test_validity('', result, definition, is_default)
if result != orig:
# Für non-default Prüfung nur Warning
if is_default:
logger.error(self._log_premsg+f"Invalid default '{orig}' in metadata file '{self.relative_filename}' for {definition['_type']} '{definition['_name']}' -> using '{result}' instead")
else:
logger.warning(self._log_premsg+f"Invalid value '{orig}' for {definition['_type']} '{definition['_name']}' -> using '{result}' instead {definition.get('_def_in', '')}")
return result
def _test_against_valid_list(self, definition, value):
"""
Test if value is in the valid list(s) of the metadata definition
:param definition:
:param value:
:return:
"""
# test against list of valid entries
result = value
valid_list_ci = definition.get('valid_list_ci', None)
if (valid_list_ci is None) or (len(valid_list_ci) == 0):
# test case sensitive
valid_list = definition.get('valid_list', None)
if (valid_list is None) or (len(valid_list) == 0):
pass
else:
if result in valid_list:
pass
else:
result = valid_list[0]
else:
if isinstance(result, str):
# test case in-sensitive, return result in lower case
if result.lower() in (entry.lower() for entry in valid_list_ci):
result = result.lower()
else:
result = str(valid_list_ci[0]).lower()
return result
def _test_validity(self, param, value, definition=None, is_default=False):
"""
Checks the value against a list of valid values.
If valid, it returns the value.
Otherwise it returns the first entry of the list of valid values.
"""
result = value
if definition is not None:
if definition.get('type', 'foo') in ['int', 'float', 'num', 'scene']:
valid_min = definition.get('valid_min')
if valid_min != None:
if self._test_value(valid_min, definition):
if result < self._convert_valuetotype(definition.get('type', 'foo'), valid_min):
if is_default == False:
result = valid_min
else:
result = valid_min
valid_max = definition.get('valid_max')
if valid_max != None:
if self._test_value(valid_max, definition):
if result > self._convert_valuetotype(definition.get('type', 'foo'), valid_max):
if is_default == False:
result = valid_max
else:
result = valid_max
elif definition.get('type', 'foo') in ['list']:
if definition['listlen'] > 0:
if definition['listlen'] != len(value):
logger.warning(self._log_premsg + "Invalid value '{}' in plugin configuration file for parameter '{}' -> length of list is not {}".format(value, param, self.parameters[param]['listlen']))
while len(value) < definition['listlen']:
value.append('')
result = value
elif definition.get('type', 'foo') in ['dict']:
# No real testing for dicts
result = value
# test against list of valid entries
result = self._test_against_valid_list(definition, result)
return result
elif self.parameters[param] != None:
logger.warning("_test_validity: old version for param={}, value={}".format(param, value))
if self.parameters[param].get('type') in ['int', 'float', 'num', 'scene']:
valid_min = self.parameters[param].get('valid_min')
if valid_min != None:
if self._test_value(valid_min, self.parameters[param]):
if result < self._convert_valuetotype(self.get_parameter_type(param), valid_min):
if is_default == False:
result = valid_min
else:
result = valid_min
valid_max = self.parameters[param].get('valid_max')
if valid_max != None:
if self._test_value(valid_max, self.parameters[param]):
if result > self._convert_valuetotype(self.get_parameter_type(param), valid_max):
if is_default == False:
result = valid_max
else:
result = valid_max
elif self.parameters[param].get('type') in ['list']:
if self.parameters[param]['listlen'] > 0:
if self.parameters[param]['listlen'] != len(value):
logger.warning(self._log_premsg + "Invalid value '{}' in plugin configuration file for parameter '{}' -> length of list is not {}".format(value, param, self.parameters[param]['listlen']))
while len(value) < self.parameters[param]['listlen']:
value.append('')
result = value
if self.parameters[param] is None:
logger.warning(self._log_premsg + "_test_validity: param {}".format(param))
else:
# test against list of valid entries
result = self._test_against_valid_list(self.parameters[param], result)
return result
def _get_default_if_none(self, typ):
"""
Returns the default value for datatype.
It is used, if no default value is defined for a parameter.
"""
return META_DATA_DEFAULTS.get(typ, None)
# ------------------------------------------------------------------------
# Methods for accessing parameter / item definition definitions
#
[Doku] def get_parameterlist(self):
"""
Returns the list of parameter names
:return: List of strings with parameter names
:rtype: list of str
"""
return self._paramlist
[Doku] def get_itemdefinitionlist(self):
"""
Returns the list of item attribute definitions
:return: List of strings with item attribute names
:rtype: list of str
"""
return self._itemdeflist
def _get_definition_type(self, definition, definitions):
"""
Returns the datatype of a parameter
If the defined datatype is 'foo', None is returned
:param param: Name of the parameter
:type param: str
:return: datatype of the parameter
:rtype: str
"""
if definitions is None:
return FOO
if definitions[definition] is None:
return FOO
return str(definitions[definition].get('type', FOO)).lower()
[Doku] def get_parameter_type(self, param):
"""
Returns the datatype of a parameter
"""
return self._get_definition_type(param, self.parameters)
[Doku] def get_itemdefinition_type(self, definition):
"""
Returns the datatype of an item attribute definition
"""
return self._get_definition_type(definition, self.itemdefinitions)
def _get_definition_subtype(self, definition, definitions):
"""
Returns the subtype of a parameter
If the defined datatype is 'foo', None is returned
If no subtype is defined (or definable), an empty string is returned
:param param: Name of the parameter
:type param: str
:return: subtype of the parameter
:rtype: str
"""
if definitions is None:
return FOO
if definitions[definition] is None:
return FOO
result = str(definitions[definition].get('type', FOO)).lower()
sub = ''
if result == 'list':
sub = definitions[definition].get('listtype', ['?'])
return sub
[Doku] def get_parameter_subtype(self, param):
"""
Returns the subtype of a parameter
"""
return self._get_definition_subtype(param, self.parameters)
[Doku] def get_itemdefinition_subtype(self, definition):
"""
Returns the subtype of an item attribute definition
"""
return self._get_definition_subtype(definition, self.itemdefinitions)
def _get_definition_listlen(self, definition, definitions):
"""
Returns the len of a parameter of type list of a parameter
:param param: Name of the parameter
:type param: str
:return: subtype of the parameter
:rtype: str or None
"""
if definitions is None:
return FOO
if definitions[definition] is None:
return FOO
result = str(definitions.get('type', FOO)).lower()
llen = 0
if result == 'list':
llen = definitions.get('listlen', ['?'])
return llen
[Doku] def get_parameter_listlen(self, param):
"""
Returns the len of a parameter of type list of a parameter
"""
return self.get_definition_listlen(param, self.parameters)
[Doku] def get_itemdefinition_listlen(self, definition):
"""
Returns the len of a parameter of type list of an item attribute definition
"""
return self.get_definition_listlen(definition, self.itemdefinitions)
def _get_definition_type_with_subtype(self, definition, definitions):
"""
Returns the datatype of a parameter with subtype (if subtype exists)
If the defined datatype is 'foo', None is returned
Subtypes are returnd for parameter type 'list'
:param param: Name of the parameter
:type param: str
:return: datatype with subtype of the parameter
:rtype: str
"""
if definitions is None:
return FOO
if definitions[definition] is None:
return FOO
result = self._get_definition_type(definition, definitions)
sub = self._get_definition_subtype(definition, definitions)
if sub != '':
llen = self._get_definition_listlen(definition, definitions)
if llen > 0:
sub = str(llen)+','+ str.join(',', sub)
else:
sub = str.join(',', sub)
result = result+'(' + sub + ')'
return result
[Doku] def get_parameter_type_with_subtype(self, param):
"""
Returns the datatype of a parameter with subtype (if subtype exists)
"""
return self._get_definition_type_with_subtype(param, self.parameters)
[Doku] def get_itemdefinition_type_with_subtype(self, definition):
"""
Returns the datatype of an item attribute definition with subtype (if subtype exists)
"""
return self._get_definition_type_with_subtype(definition, self.itemdefinitions)
def _get_definition_defaultvalue(self, definition, definitions, definitionlist):
"""
Returns the default value for the parameter
If no default value is specified for the parameter, the default value for the datatype
of the parameter is returned.
If the parameter is not defined, None is returned
:param param: Name of the parameter
:type param: str
:return: Default value
:rtype: str or None
"""
value = None
if definition in definitionlist:
if self.parameters[definition] is not None:
if self._get_definition_type(definition, definitions) == 'dict':
if definitions[definition].get('default') is not None:
value = dict(definitions[definition].get('default'))
#import ast
#try:
# value = ast.literal_eval(value)
#except:
# value = {}
else:
value = definitions[definition].get('default')
typ = self._get_definition_type(definition, definitions)
if value == 'None*':
logger.info("_get_definition_defaultvalue: default value is 'None*' -> None")
value = None
else:
if value is None:
value = self._get_default_if_none(typ)
value = self._expand_listvalues(value, self.parameters[definition])
###ms
if not self._test_value(value, self.parameters[definition]):
# Für non-default Prüfung nur Warning
logger.error(self._log_premsg + "Invalid data for type '{}' in metadata file '{}': default '{}' for parameter '{}' -> using '{}' instead".format(definitions[definition].get('type'), self.relative_filename, value, definition, self._get_default_if_none(typ)))
value = None
if value is None:
value = self._get_default_if_none(typ)
value = self._convert_value(value, self.parameters[definition], is_default=True)
orig_value = value
value = self._test_validity('', value, self.parameters[definition], is_default=True)
if value != orig_value:
# Für non-default Prüfung nur Warning
logger.error(self._log_premsg + "Invalid default '{}' in metadata file '{}' for parameter '{}' -> using '{}' instead".format(orig_value, self.relative_filename, definition, value))
return value
[Doku] def get_parameter_defaultvalue(self, param):
"""
Returns the default value for the parameter
"""
return self._get_definition_defaultvalue(param, self.parameters, self._paramlist)
# def get_itemdefinition_defaultvalue(self, definition):
# """
# Returns the default value for an item attribute definition
# """
# return self._get_definition_defaultvalue(definition, self.itemdefinitions, self._itemdeflist)
def _get_definitioninfo(self, definition, key, definitions):
"""
Returns the value for a key of a parameter as a string
:param parameter: parameter to get the definition info from
:param key: key of the definition info
:type parameter: str
:type key: str
:return: List of strings with parameter names (None if parameter is not found)
:rtype: str
"""
try:
result = definitions[definition].get('key')
except:
result = None
return result
[Doku] def get_parameterdefinition(self, param, key):
"""
Returns the value for a key of a parameter as a string
"""
return self.__get_definitioninfo(param, key, self.parameters)
[Doku] def get_itemdefinition(self, definition, key):
"""
Returns the value for a key of a parameter as a string
"""
return self.__get_definitioninfo(definition, key, self.itemdefinitions)
[Doku] def check_parameters(self, args):
"""
Checks the values of a dict of configured parameters.
Returns a dict with all defined parameters with values and a bool indicating if all parameters are ok (True)
or if a mandatory parameter is not configured (False). It returns default values
for parameters that have not been configured. The resulting dict contains the
values in the the datatype of the parameter definition
:param args: Configured parameters with the values
:type args: dict of parameter-values (values as string)
:return: All defined parameters with values, Flag if all parameters are ok (no mandatory is missing)
:rtype: dict, bool
"""
addon_params = collections.OrderedDict()
hide_params = collections.OrderedDict()
if self.meta is None:
logger.info(self._log_premsg + "No metadata found")
return (addon_params, True, hide_params)
if self.parameters is None:
logger.info(self._log_premsg + "No parameter definitions found in metadata")
return (addon_params, True, hide_params)
allparams_ok = True
if self._paramlist != []:
for param in self._paramlist:
value = Utils.strip_quotes(args.get(param))
if value is None:
if self.parameters[param] is not None:
if self.parameters[param].get('mandatory'):
logger.error(self._log_premsg + "'{}' is mandatory, but was not found in /etc/{}".format(param, self._addon_type + YAML_FILE))
allparams_ok = False
else:
addon_params[param] = self.get_parameter_defaultvalue(param)
hide_params[param] = Utils.to_bool(self.parameters[param].get('hide'), default=False)
logger.info(self._log_premsg + "value not found in plugin configuration file for parameter '{}' -> using default value '{}' instead".format(param, addon_params[param]))
# logger.warning(self._log_premsg + "'{}' not found in /etc/{}, using default value '{}'".format(param, self._addon_type + YAML_FILE, addon_params[param]))
else:
value = self._expand_listvalues(value, self.parameters[param])
if self._test_value(value, self.parameters[param]):
addon_params[param] = self._convert_value(value, self.parameters[param])
if self.parameters[param] is None:
hide_params[param] = None
else:
hide_params[param] = Utils.to_bool(self.parameters[param].get('hide'), default=False)
logger.debug(self._log_premsg + "Found '{}' with value '{}' in /etc/{}".format(param, value, self._addon_type + YAML_FILE))
else:
if self.parameters.get(param) is not None:
if bool(self.parameters[param].get('mandatory', False)) is True:
logger.error(self._log_premsg + "'{}' is mandatory, but no valid value was found in /etc/{}".format(param, self._addon_type + YAML_FILE))
allparams_ok = False
else:
addon_params[param] = self.get_parameter_defaultvalue(param)
hide_params[param] = Utils.to_bool(self.parameters[param].get('hide'), default=False)
logger.error(self._log_premsg + "Found invalid value '{}' for parameter '{}' (type {}) in /etc/{}, using default value '{}' instead".format(value, param, self.parameters[param]['type'], self._addon_type + YAML_FILE, str(addon_params[param])))
return (addon_params, allparams_ok, hide_params)
[Doku] def check_itemattribute(self, item, attribute, value, defined_in_file=None):
"""
Checks the value of a plugin-specific item attribute
and returnes the checked value or the default value if needed
attribute name is checked
- against list of valid attributes-names (defined by section 'item_attributes' of configured plugins)
- against list of valid attribute-name prefixes (defined by section 'item_attribute_prefixes' of configured plugins)
:param item: item object
:param attribute:
:param value:
:return:
"""
global all_prefixes_tuple
self._log_premsg = "Item '{}', attribute '{}': ".format(item.property.path, attribute)
if all_prefixes_tuple is None:
# Generate tuple on first call to this method
all_prefixes_tuple = tuple(all_itemprefixdefinitions.keys())
if defined_in_file is None:
def_in = ''
else:
def_in = '(defined in ' + defined_in_file + ')'
attr_definition = all_itemdefinitions.get(attribute, None)
if attr_definition is None:
for prefix in all_itemprefixdefinitions.keys():
if attribute.startswith(prefix):
attr_definition = dict(all_itemprefixdefinitions[prefix])
attr_definition['_prefix'] = True
break
if not(attribute.startswith(all_prefixes_tuple)):
if not (item.property.path.startswith('env.core.') or item.property.path.startswith('env.system.')):
logger.notice(f"Item '{item.property.path}', attribute '{attribute}': Attribute is undefined and has value '{value}' {def_in}")
return value
attr_definition['_def_in'] = def_in
attr_type = attr_definition.get('type', 'foo')
# If a parameter is defined as a list, but the value is of a basic datatype, value is expanded to a list.
value = self._expand_listvalues(value, attr_definition)
# test if value can be converted into defined type
if self._test_value(value, attr_definition):
value = self._convert_value(value, attr_definition)
else:
# handle invalid value that cannot be converted to defined type
additional_text = ''
default_value = attr_definition.get('default', None)
if default_value is not None:
additional_text = ", using default value '" + str(default_value) + "' instead"
logger.warning("Item '{}', attribute '{}': value '{}' can not be converted to type '{}'{} {}".format(item.property.path, attribute, value, attr_type, additional_text, def_in))
if default_value is None:
value = ''
else:
value = default_value
return value
def _compare_versions(self, vers1, vers2, operator, res_old=None):
"""
Compare two version numbers and return if the condition is met
:param vers1:
:param vers2:
:param operator:
:type vers1: str
:type vers2: str
:type operator: str
:return: true if condition is met
:rtype: bool
"""
v1 = self._version_to_list(vers1)
v2 = self._version_to_list(vers2)
result = False
if v1 == v2 and operator in ['>=', '==', '<=']:
result = True
if v1 < v2 and operator in ['<', '<=']:
result = True
if v1 > v2 and operator in ['>', '>=']:
result = True
#logger.warning(f"_compare_versions: {self._addon_name:12} v1={v1}, v2={v2}, operator='{operator}', result={result}, res_old={res_old}")
logger.debug("_compare_versions: - - - vers1 = {}, vers2 = {}, v1 = {}, v2 = {}, operator = '{}', result = {}".format(vers1, vers2, v1, v2, operator, result))
return result
def _version_to_list(self, vers):
"""
Split version number to list and get rid of non-numeric parts
:param vers:
:return: version as list
:rtype: list
"""
# create list with [major,minor,revision,build]
vsplit = vers.split('.')
while len(vsplit) < 4:
vsplit.append('0')
import re
# get rid of non numeric parts
vlist = []
for v in vsplit:
#v = re.findall('\d+', v)[0]
v = re.findall(r'\d+', v)[0]
vi = 0
try:
vi = int(v)
except:
pass
vlist.append(vi)
return vlist