Quellcode für lib.scheduler

#!/usr/bin/env python3
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
#########################################################################
# Copyright 2011-2014 Marcus Popp                          marcus@popp.mx
# Copyright 2016-2017 Christian Straßburg
# Copyright 2017-2023 Martin Sinn                           m.sinn@gmx.de
# Copyright 2017-2022 Bernd Meiners                 Bernd.Meiners@mail.de
#########################################################################
#  This file is part of SmartHomeNG
#
#  SmartHomeNG is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  SmartHomeNG is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with SmartHomeNG.  If not, see <http://www.gnu.org/licenses/>.
##########################################################################

import logging
import time
import datetime
import sys
import traceback
import threading
import random
import inspect
import copy

from lib.shtime import Shtime
import lib.env
from lib.item import Items
from lib.model.smartplugin import SmartPlugin

import dateutil.relativedelta
from dateutil.relativedelta import MO, TU, WE, TH, FR, SA, SU
from dateutil.tz import tzutc


# following modules) are imported to have those functions available during logic execution
import gc  # noqa
import os
import math
import lib.userfunctions as uf

import types
import subprocess

try:
    from lib.module import Modules
    _lib_modules_found = True
except:
    _lib_modules_found = False


logger = logging.getLogger(__name__)
tasks_logger = logging.getLogger(__name__ + '.tasks')

_scheduler_instance = None    # Pointer to the initialized instance of the scheduler class  (for use by static methods)

from lib.triggertimes import TriggerTimes

[Doku]class LeaveLogic(Exception): pass # declare a label for 'raise LeaveLogic'
class _PriorityQueue: """ Implements a queue which contain tuples of priority and data sorted by priority. Lowest priority given will be the first candidate for a get from the queue, data can be anything """ def __init__(self): self.queue = [] self.lock = threading.Lock() def insert(self, priority, data): """ Add a tuple with priority and data into the queue :param priority: a positive integer or a tuple where lowest indicates the highest priority :param data: anything to be associated with the given priority """ self.lock.acquire() lo = 0 hi = len(self.queue) while lo < hi: mid = (lo + hi) // 2 if priority < self.queue[mid][0]: hi = mid else: lo = mid + 1 self.queue.insert(lo, (priority, data)) self.lock.release() def get(self): """ Returns the first tuple of the queue :return: tuple with priority and data or None if no entry is available in the queue """ self.lock.acquire() try: return self.queue.pop(0) except IndexError: raise finally: self.lock.release() def qsize(self): """ Returns the actual size of the queue :return: Size of the queue """ return len(self.queue) def dump(self): """ Returns all entries of the queue as a list :return: list of all queue entries """ queue_list = [] self.lock.acquire() for entry in self.queue: queue_list.append(entry) self.lock.release() return queue_list
[Doku]class Scheduler(threading.Thread): _workers = [] _worker_num = 5 _worker_max = 20 _worker_delta = 60 # wait 60 seconds before adding another worker thread _scheduler = {} # holder schedulers, key is the scheduler name. Each scheduler is stored in a dict # (keys are 'obj', 'active', 'prio', 'next', 'value', 'cycle', 'cron') _runq = _PriorityQueue() # holds priority and a tuple of (name, obj, by, source, dest, value) for immediate execution _triggerq = _PriorityQueue() # holds tuples of (datetime, priority) and (name, obj, by, source, dest, value) # to be put in the run queue when time is due _pluginname_prefix = 'plugins.' # prefix for scheduler names def __init__(self, smarthome): threading.Thread.__init__(self, name='Scheduler') logger.info('Init Scheduler') self._sh = smarthome self._lock = threading.Lock() self._runc = threading.Condition() global _scheduler_instance if _scheduler_instance is not None: import inspect curframe = inspect.currentframe() calframe = inspect.getouterframes(curframe, 4) logger.critical("A second 'scheduler' object has been created. There should only be ONE instance of class 'Scheduler'!!! Called from: {} ({})".format(calframe[1][1], calframe[1][3])) _scheduler_instance = self self.shtime = Shtime.get_instance() self.items = Items.get_instance() self.crontabs = TriggerTimes.get_instance() self.mqtt = None # -------------------------------------------------------------------------------------------------- # Following (static) method of the class Scheduler implement the API for schedulers in SmartHomeNG # --------------------------------------------------------------------------------------------------
[Doku] @staticmethod def get_instance(): """ Returns the instance of the scheduler class, to be used to access the scheduler-api Use it the following way to access the api: .. code-block:: python from lib.scheduler import Scheduler scheduler = Scheduler.get_instance() # to access a method (eg. to trigger a logic): scheduler.trigger(...) :return: scheduler instance :rtype: object or None """ if _scheduler_instance == None: return None else: return _scheduler_instance
[Doku] def set_worker_warn_count(self, count): self._worker_max = count logger.info(f"Warn Level for maximum number of workers set to {self._worker_max}")
[Doku] def get_worker_count(self): """ Get number of worker threads initialized by scheduler :return: number of worker threads """ return len(self._workers)
[Doku] def get_idle_worker_count(self): """ Get number of idle worker threads :return: number of worker threads """ idle_count = 0 for w in self._workers: if w.name == 'idle': idle_count +=1 return idle_count
[Doku] def get_worker_names(self): """ Get names on non-idle worker threads :return: list with names of worker threads """ worker_names = [] for w in self._workers: if w.name != 'idle': worker_names.append(w.name) return worker_names
[Doku] def run(self): self.alive = True logger.debug(f"creating {self._worker_num} workers") for i in range(self._worker_num): self._add_worker() while self.alive: now = self.shtime.now() if self._runq.qsize() > len(self._workers): delta = now - self._last_worker if delta.seconds > self._worker_delta: if len(self._workers) < self._worker_max: self._add_worker() else: logger.error(f"Needing more worker threads than the specified maximum of {self._worker_max}! ({len(self._workers)} worker threads active)") tn = {} # for t in threading.enumerate(): for t in self._workers: tn[t.name] = tn.get(t.name, 0) + 1 logger.info('Worker-Threads: ' + ', '.join("{0}: {1}".format(k, v) for (k, v) in list(tn.items()))) if int(self._sh._restart_on_num_workers) < self._worker_max: # do no restart self._add_worker() else: if len(self._workers) < int(self._sh._restart_on_num_workers): self._add_worker() else: logger.warning('Worker-Threads: ' + ', '.join("{0}: {1}".format(k, v) for (k, v) in list(tn.items()))) self._sh.restart('SmartHomeNG (scheduler started too many worker threads ({}))'.format(len(self._workers))) while self._triggerq.qsize() > 0: try: (dt, prio), (name, obj, by, source, dest, value) = self._triggerq.get() except Exception as e: logger.warning(f"Trigger queue exception: {e}") break if dt < now: # run it self._runc.acquire() self._runq.insert(prio, (name, obj, by, source, dest, value)) self._runc.notify() self._runc.release() else: # put last entry back and break while loop self._triggerq.insert((dt, prio), (name, obj, by, source, dest, value)) break # For debugging # task_count = 0 # for name in self._scheduler: # task = self._scheduler[name] # if task['next'] is not None: # task_count += 1 # End for debugging if not self._lock.acquire(timeout=1): # logger.critical("Scheduler: Deadlock! - Task Count to enter run queue: {}".format(task_count)) logger.critical("Scheduler: Deadlock!") continue try: for name in self._scheduler: task = self._scheduler[name] if task['next'] is not None: if task['next'] <= now: self._runc.acquire() # insert priority and a tuple of (name, obj, by, source, dest, value) # ms self._runq.insert(task['prio'], (name, task['obj'], 'Scheduler', task.get('source', None), None, task['value'])) self._runc.notify() self._runc.release() task['next'] = None else: continue elif not task['active']: continue else: if task['cron'] is None and task['cycle'] is None: continue else: self._next_time(name) except Exception as e: tb_str = ''.join(traceback.format_exception(None, e, e.__traceback__)) logger.warning(f"Exception: {e} while searching scheduler for due tasks. Traceback: {tb_str}") finally: self._lock.release() time.sleep(0.5) if self._sh.shng_status['code'] > 20: logger.info("scheduler leaves run method") else: logger.warning("scheduler leaves run method") return
[Doku] def stop(self): self.alive = False logger.debug("scheduler leaves stop method")
[Doku] def trigger(self, name, obj=None, by='Logic', source=None, value=None, dest=None, prio=3, dt=None, from_smartplugin=False): """ triggers the execution of a logic optional at a certain datetime given with dt :param name: :param obj: :param by: :param source: :param value: :param dest: :param prio: :param dt: a certain datetime :return: always None """ name = self.check_caller(name, from_smartplugin) if obj is None: if name in self._scheduler: obj = self._scheduler[name]['obj'] else: logger.warning(f"Logic name not found: {name}") return if name in self._scheduler: if not self._scheduler[name]['active']: logger.debug(f"Logic '{name}' deactivated. Ignoring trigger from {by} {source}") return if dt is None: logger.debug(f"Triggering {name} - by: {by} source: {source} dest: {dest} value: {value}") self._runc.acquire() self._runq.insert(prio, (name, obj, by, source, dest, value)) self._runc.notify() self._runc.release() else: if not isinstance(dt, datetime.datetime): logger.warning(f"Trigger: Not a valid timezone aware datetime for {name}. Ignoring.") return if dt.tzinfo is None: logger.warning(f"Trigger: Not a valid timezone aware datetime for {name}. Ignoring.") return logger.debug(f"Triggering {name} - by: {by} source: {source} dest: {dest} value: {value} at: {dt}") self._triggerq.insert((dt, prio), (name, obj, by, source, dest, value))
[Doku] def remove(self, name, from_smartplugin=False): """ Remove a scheduler entry with given name. If a call is made from a SmartPlugin with an instance configuration the instance name is added to the name to be able to distinguish scheduler entries from different instances :param name: scheduler entry name to remove :param from_smartplugin: """ self._lock.acquire() try: name = self.check_caller(name, from_smartplugin) logger.debug(f"remove scheduler entry with name: {name}") if name in self._scheduler: del(self._scheduler[name]) except Exception as e: logger.error(f"Exception {e}: Could not remove scheduler entry for {name}") finally: self._lock.release()
[Doku] def check_caller(self, name, from_smartplugin=False): """ Checks the calling stack if the calling function (one of get, change, remove, trigger) itself was called by a smartplugin instance. If there is an instance name of the calling smartplugin then the instance name of that calling smartplugin is appended to the name :param name: the name of a scheduler entry :param from_smartplugin: :return: returns either the name or name combined with instance name """ try: stack = inspect.stack() except IndexError: return name except Exception as e: logger.exception(f"check_caller('{name}') *1: Exception in inspect.stack(): {e}") try: obj = stack[2][0].f_locals["self"] except KeyError: return name except Exception as e: pass logger.exception(f"check_caller('{name}') *2: Exception in inspect.stack(): {e}") try: if isinstance(obj, SmartPlugin): iname = obj.get_instance_name() if iname != '': # if not (iname).startswith(self._pluginname_prefix): if not from_smartplugin: if not str(name).endswith('_' + iname): name = name + '_' + obj.get_instance_name() except Exception as e: pass logger.exception(f"check_caller('{name}') *3: Exception in inspect.stack(): {e}") return name
[Doku] def return_next(self, name, from_smartplugin=False): # name = self.check_caller(name, from_smartplugin) # ms if name in self._scheduler: return self._scheduler[name]['next']
[Doku] def add(self, name, obj, prio=3, cron=None, cycle=None, value=None, offset=None, next=None, from_smartplugin=False): """ Adds an entry to the scheduler. :param name: Name of the scheduler :param obj: Method to call by the scheduler :param prio: a priority with default of 3 having 1 as most important and higher numbers less important :param cron: a crontab entry of type string or a list of entries :param cycle: a time given as integer in seconds or a string with a time given in seconds and a value after an equal sign :param value: Value that an item should be set to or to be handed to a logic, otherwise: None :param offset: an optional offset for cycle. If not given, cycle start point will be varied between 10..15 seconds to prevent too many scheduler entries with the same starting times :param next: :param from_smartplugin: Only to set to True, if called from the internal method in SmartPlugin class """ # set shtime and items if they were initialized to None in __init__ (potenital timing problem in init of shng) if self.shtime == None: self.shtime = Shtime.get_instance() if self.items == None: self.items = Items.get_instance() if self._lock.acquire(): try: source = '??' if isinstance(cron, str): cron = [cron, ] if isinstance(cron, list): details = None _cron = {} for entry in cron: desc, __, _value = entry.partition('=') desc = desc.strip() if _value == '': _value = None else: _value = _value.strip() if desc.lower().startswith('init'): details = desc offset = 5 # default init offset desc, op, seconds = desc.partition('+') if op: offset += int(seconds) else: desc, op, seconds = desc.partition('-') if op: offset -= int(seconds) value = _value next = self.shtime.now() + datetime.timedelta(seconds=offset) else: _cron[desc] = _value source = {'source': 'cron', 'details': details} if _cron == {}: cron = None else: cron = _cron if isinstance(cycle, int): source = {'source': 'cycle1', 'details': cycle} cycle = {cycle: cycle} elif isinstance(cycle, str): cycle, __, _value = cycle.partition('=') try: cycle = int(cycle.strip()) except Exception as e: logger.warning(f"Scheduler: Exception {e}: Invalid cycle entry for {name} {cycle}") return if _value != '': _value = _value.strip() else: _value = cycle cycle = {cycle: _value} source = {'source': 'cycle', 'details': _value} if cycle is not None and offset is None: # spread cycle jobs offset = random.randint(10, 15) # change name for multi instance plugins if obj.__class__.__name__ == 'method': if isinstance(obj.__self__, SmartPlugin): if obj.__self__.get_instance_name() != '': #if not (name).startswith(self._pluginname_prefix): if not from_smartplugin: name = name +'_'+ obj.__self__.get_instance_name() logger.debug("Scheduler: Name changed by adding plugin instance name to: " + name) self._scheduler[name] = {'prio': prio, 'obj': obj, 'source': source, 'cron': cron, 'cycle': cycle, 'value': value, 'next': next, 'active': True} if next is None: self._next_time(name, offset) except Exception as e: logger.error(f"Exception: {e} while trying to add a new entry to scheduler") finally: self._lock.release() else: logger.error(f"Could not aquire lock to add a new entry to scheduler")
[Doku] def get(self, name, from_smartplugin=False): """ takes a given name for a scheduler and returns either the matching scheduler or None """ name = self.check_caller(name, from_smartplugin) if name in self._scheduler: return self._scheduler[name] else: return None
[Doku] def change(self, name, from_smartplugin=False, **kwargs): """changes a scheduler entry for a given name to settings given in kwargs""" name = self.check_caller(name, from_smartplugin) if self._lock.acquire(): try: if name in self._scheduler: for key in kwargs: if key in self._scheduler[name]: if key == 'cron': if isinstance(kwargs[key], str): _cron = {} for entry in kwargs[key].split('|'): desc, __, _value = entry.partition('=') desc = desc.strip() if _value == '': _value = None else: _value = _value.strip() _cron[desc] = _value if _cron == {}: kwargs[key] = None else: kwargs[key] = _cron elif key == 'cycle': _cycle = kwargs[key] if isinstance(kwargs[key], dict): _cycle = kwargs[key] elif isinstance(kwargs[key], int): _cycle = {kwargs[key]: None} elif isinstance(kwargs[key], str): _param = kwargs[key].strip() if _param[0] == '{' and _param[-1] == '}': _param = _param[1:-1] _cycle, __, _value = _param.partition(':') try: _cycle = int(_cycle.strip()) except Exception: logger.warning("scheduler.change: Invalid cycle entry for {} {}".format(name, _cycle)) return if _value != '': _value = _value.strip() else: _value = None _cycle = {_cycle: _value} #logger.warning("scheduler.change: {}: {}, type = type(kwargs[key])={}".format(name, kwargs[key], type(kwargs[key]))) kwargs[key] = _cycle #logger.warning("scheduler.change: {}: cycle entry {}".format(name, _cycle)) elif key == 'active': if kwargs['active'] and not self._scheduler[name]['active']: logger.info("Activating logic: {0}".format(name)) elif not kwargs['active'] and self._scheduler[name]['active']: logger.info("Deactivating logic: {0}".format(name)) self._scheduler[name][key] = kwargs[key] else: logger.warning(f"Attribute {key} for {name} not specified. Could not change it.") if self._scheduler[name]['active'] is True: if 'cycle' in kwargs or 'cron' in kwargs: self._next_time(name) else: self._scheduler[name]['next'] = None else: logger.warning(f"Could not change {name}. No logic/method with this name found.") except Exception as e: logger.error(f"Exception: {e} while trying to change entry for {name}") finally: self._lock.release() else: logger.error(f"Could not aquire lock to change entry for {name}")
def _next_time(self, name, offset=None): """ Looks at the cycle and crontab attributes of job with name to find the next time for them and puts this and the value to the job. :param name: the name of the job :param offset: if a cycle attribute is present, then this value offsets the next execution time of a cycle """ job = self._scheduler[name] if None == job['cron'] == job['cycle']: self._scheduler[name]['next'] = None return next_time = None value = None now = self.shtime.now() #now = now.replace(microsecond=0) if job['cycle'] is not None: cycle = list(job['cycle'].keys())[0] #value = job['cycle'][cycle] # set value only, if it is an item scheduler if job['obj'].__class__.__name__ == 'Item': value = job['cycle'][cycle] if offset is None: offset = cycle next_time = now + datetime.timedelta(seconds=offset) #job['source'] = 'cycle' job['source'] = {'source': 'cycle', 'details': str(cycle)} if job['cron'] is not None: for entry in job['cron']: if entry == 'None': continue ct = self.crontabs.get_next(entry, now) if next_time is not None: if ct < next_time: next_time = ct #job['source'] = 'cron' # ms job['source'] = {'source': 'cron', 'details': str(entry)} value = job['cron'][entry] else: next_time = ct job['source'] = {'source': 'cron', 'details': str(entry)} value = job['cron'][entry] self._scheduler[name]['next'] = next_time if value is not None: self._scheduler[name]['value'] = value logger.debug(f"{name} next time: {next_time}") def __iter__(self): for job in self._scheduler: yield job def _add_worker(self): self._last_worker = self.shtime.now() t = threading.Thread(target=self._worker) t.start() self._workers.append(t) if len(self._workers) > self._worker_num: logger.info("Adding worker thread. Total: {0}".format(len(self._workers))) tn = {} for t in threading.enumerate(): tn[t.name] = tn.get(t.name, 0) + 1 logger.info('Threads: ' + ', '.join("{0}: {1}".format(k, v) for (k, v) in list(tn.items()))) def _worker(self): while self.alive: self._runc.acquire() self._runc.wait(timeout=1) try: prio, (name, obj, by, source, dest, value) = self._runq.get() except IndexError: continue finally: self._runc.release() self._task(name, obj, by, source, dest, value) def _task(self, name, obj, by, source, dest, value): threading.current_thread().name = name #logger = logging.getLogger('_task.' + name) if obj.__class__.__name__ == 'Logic': self._execute_logic_task(obj, by, source, dest, value) elif obj.__class__.__name__ == 'Item': try: if isinstance(source, str): scheduler_source = source else: scheduler_source = source.get('source', '') if scheduler_source != '': scheduler_source = ':'+scheduler_source+':'+source.get('details','') if value is not None: obj(value, caller=("Scheduler"+scheduler_source)) except Exception as e: tasks_logger.exception(f"Item {name} exception: {e}") else: # method try: if value is None: obj() else: if ('caller' in inspect.getfullargspec(obj).args) and isinstance(value, dict): caller = value.get('caller', None) if caller is None: value['caller'] = by obj(**value) except Exception as e: tasks_logger.exception(f"Method {name} exception: {e}") threading.current_thread().name = 'idle' def _execute_logic_task(self, logic, by, source, dest, value): """ Execute a logic from _task method :param logic: :return: """ # get logger for the logic name = 'logics.' + logic.name logger = logging.getLogger(name) source_details = None if isinstance(source, dict): source_details = source.get('details', '') src = source.get('item', '') if src == '': # get source ('cron' or 'cycle') src = source.get('source', '') source = src trigger = {'by': by, 'source': source, 'source_details': source_details, 'dest': dest, 'value': value} # noqa # following variables are assigned to be available during logic execution #sh = self._sh # noqa #shtime = self.shtime #items = self.items # set the logic environment here (for use within functions in logics): #logic = obj # noqa #logic.sh = self._sh # not needed (because of being set in logic_globals dict # logic.logger = logger # not needed (because of being set in logic_globals dict #logic.shtime = self.shtime # not needed (because of being set in logic_globals dict #logic.items = self.items # not needed (because of being set in logic_globals dict #logic.env = lib.env # not needed (because of being set in logic_globals dict #logic.trigger_dict = trigger # logic.trigger has naming conflict with method logic.trigger of lib.item # not needed (because of being set in logic_globals dict #logics = logic._logics if not self.mqtt: if _lib_modules_found: self.mqtt = Modules.get_instance().get_module('mqtt') #mqtt = self.mqtt logic.mqtt = self.mqtt try: if logic._enabled: if self._sh.shng_status['code'] < 20: logger.warning(f"Logik ignoriert, SmartHomeNG ist noch nicht vollständig initialisiert - Logik wurde getriggert durch {trigger}") else: # set up "globals" environment for the logic logic_globals = dict(globals()) logic_globals['sh'] = self._sh logic_globals['logger'] = logger logic_globals['mqtt'] = self.mqtt logic_globals['shtime'] = self.shtime logic_globals['env'] = lib.env logic_globals['items'] = self.items logic_globals['trigger'] = trigger # logic.trigger_dict logic_globals['logic'] = logic logic_globals['logics'] = logic._logics # execute logic logger.debug(f"Getriggert durch: {trigger}") exec(logic._bytecode, logic_globals) # store timestamp of last run logic.set_last_run() for method in logic.get_method_triggers(): try: method(logic, by, source, dest) except Exception as e: logger.exception(f"Logic: Trigger {method} for {logic.name} failed: {e}") except LeaveLogic as e: # 'LeaveLogic' is no error if str(e) != '': logger.info(f"Die Logik '{logic.name}' wurde verlassen. Grund: {e}") except SystemExit: # ignore exit() call from logic. pass except Exception as e: tb = sys.exc_info()[2] tb = traceback.extract_tb(tb)[-1] if tb[2] == '<module>': logic_method = 'Hauptroutine der Logik' else: logic_method = 'function ' + tb[2] + '()' logger.error(f"In der Logik ist ein Fehler aufgetreten:\n Logik '{logic.name}', Datei '{tb[0]}', Zeile {tb[1]}\n {logic_method}, Exception: {e}") #logger.exception(f"In der Logik ist ein Fehler aufgetreten:\n Logik '{logic.name}', Datei '{tb[0]}', Zeile {tb[1]}\n {logic_method}, Exception: '{e}'\n ") return