#!/usr/bin/env python3
#########################################################################
# Copyright 2013 Marcus Popp marcus@popp.mx
# Copyright 2018 Bernd Meiners Bernd.Meiners@Mail.de
#########################################################################
# This file is part of SmartHomeNG. https://github.com/smarthomeNG//
#
# SmartHomeNG is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SmartHomeNG is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SmartHomeNG. If not, see <http://www.gnu.org/licenses/>.
#########################################################################
"""
This library is on its way out. Network classes for SmartHomeNG are provided by
lib.network. Creating lib.connection Server and Client class object will
create an appropriate WARNING log entry.
The following modules use an import lib.connection as of December 2021:
smarthome.py for an object of Connections()
Plugins:
visu_websocket
"""
import logging
import socket
import collections
import threading
import select
import time
import sys
from iowait import IOWait
logger = logging.getLogger(__name__)
# =====================================================================================
import lib.utils
import sys
dep_id_list = []
#####################################################################
# Diplay DEPRECATED warning
#####################################################################
def _deprecated_warning(n_func='', o_func=''):
"""
Display function deprecated warning
"""
# if hasattr(self, '_deprecated_warnings'):
# if lib.utils.Utils.to_bool(self._deprecated_warnings) == False:
# return
# else:
# return # if parameter is not defined
if o_func != '':
d_func = 'lib.connection'+' '+o_func
else:
d_func = 'lib.connection'+'.'+str(sys._getframe(1).f_code.co_name)+'()'
if n_func != '':
n_func = '- use the '+n_func+' instead'
try:
d_test = ' (' + str(sys._getframe(2).f_locals['self'].__module__) + ')'
except:
d_test = ''
called_by = str(sys._getframe(2).f_code.co_name)
in_class = ''
try:
in_class = 'class ' + str(sys._getframe(2).f_locals['self'].__class__.__name__) + d_test
except:
in_class = 'a logic?' + d_test
if called_by == '<module>':
called_by = str(sys._getframe(3).f_code.co_name)
level = 3
while True:
level += 1
try:
c_b = str(sys._getframe(level).f_code.co_name)
except ValueError:
c_b = ''
if c_b == '':
break
called_by += ' -> ' + c_b
# called_by = str(sys._getframe(3).f_code.co_name)
# if not hasattr(self, 'dep_id_list'):
# self.dep_id_list = []
id_str = d_func + '|' + in_class + '|' + called_by
if not id_str in dep_id_list:
if in_class.find('lib.smarthome') == -1 :
logger.warning(f"DEPRECATED: lib.connection is deprecated and will be removed in SmartHommeNG v1.10")
logger.warning(f"DEPRECATED: Used '{d_func}', called in '{in_class}' by '{called_by}' {n_func}")
dep_id_list.append(id_str)
return
# =====================================================================================
[Doku]class Base():
"""
provides same base class for class Connections(), class Server(),
class Stream() and thus also to class Client() which inherits from Stream()
some lookup dicts for protocol family like TCP or UDP flavours and the like for protocol type
"""
_poller = None
_family = {'UDP': socket.AF_INET, 'UDP6': socket.AF_INET6, 'TCP': socket.AF_INET, 'TCP6': socket.AF_INET6}
_type = {'UDP': socket.SOCK_DGRAM, 'UDP6': socket.SOCK_DGRAM, 'TCP': socket.SOCK_STREAM, 'TCP6': socket.SOCK_STREAM}
_monitor = []
_deprecated_wanings = True
def __init__(self, monitor=False):
self._name = self.__class__.__name__
if monitor:
self._monitor.append(self)
def _create_socket(self, flags=None):
family, type, proto, canonname, sockaddr = socket.getaddrinfo(self._host, self._port, family=self._family[self._proto], type=self._type[self._proto])[0]
self.socket = socket.socket(family, type, proto)
return sockaddr
def _deprecated_warning(self, n_func=''):
"""
Display function deprecated warning
"""
if hasattr(self, '_deprecated_warnings'):
if lib.utils.Utils.to_bool(self._deprecated_warnings) == False:
return
else:
return # if parameter is not defined
d_func = 'sh.'+str(sys._getframe(1).f_code.co_name)+'()'
if n_func != '':
n_func = '- use the '+n_func+' instead'
try:
d_test = ' (' + str(sys._getframe(2).f_locals['self'].__module__) + ')'
except:
d_test = ''
called_by = str(sys._getframe(2).f_code.co_name)
in_class = ''
try:
in_class = 'class ' + str(sys._getframe(2).f_locals['self'].__class__.__name__) + d_test
except:
in_class = 'a logic?' + d_test
if called_by == '<module>':
called_by = str(sys._getframe(3).f_code.co_name)
level = 3
while True:
level += 1
try:
c_b = str(sys._getframe(level).f_code.co_name)
except ValueError:
c_b = ''
if c_b == '':
break
called_by += ' -> ' + c_b
# called_by = str(sys._getframe(3).f_code.co_name)
if not hasattr(self, 'dep_id_list'):
self.dep_id_list = []
id_str = d_func + '|' + in_class + '|' + called_by
if not id_str in self.dep_id_list:
self.logger.warning("DEPRECATED: Used function '{}', called in '{}' by '{}' {}".format(d_func, in_class, called_by, n_func))
self.dep_id_list.append(id_str)
return
[Doku]class Connections(Base):
"""
Within SmartHome.py there is one instance of this class
The filenumber of a connection is the key to the contained dicts of
_connections and _servers
Additionally the filenumber is used for either epoll or kqueue depending
on the environment found for select.
A filenumber of value -1 is an error value.
"""
_connections = {}
_servers = {}
if hasattr(select, 'epoll'):
_ro = select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR
_rw = _ro | select.EPOLLOUT
def __init__(self):
Base.__init__(self)
Base._poller = self
_deprecated_warning('', 'class Connections')
if hasattr(select, 'epoll'):
self._epoll = select.epoll()
elif hasattr(select, 'kqueue'):
self._kqueue = select.kqueue()
else:
logger.debug("Init connections using IOWait")
self._connections_found = 0
self._waitobj = IOWait()
[Doku] def register_server(self, fileno, obj):
if fileno == -1:
logger.error("{} tried to register a server with filenumber == -1".format(obj))
return
self._servers[fileno] = obj
self._connections[fileno] = obj
if hasattr(select, 'epoll'):
self._epoll.register(fileno, self._ro)
elif hasattr(select, 'kqueue'):
event = [
select.kevent(fileno,
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD)
]
self._kqueue.control(event, 0, 0)
else:
logger.debug("register_server: put watch on fileno {}".format(fileno))
self._waitobj.watch(fileno, read=True)
[Doku] def register_connection(self, fileno, obj):
if fileno == -1:
logger.error("tried to register a connection with filenumber == -1")
return
self._connections[fileno] = obj
if hasattr(select, 'epoll'):
self._epoll.register(fileno, self._ro)
elif hasattr(select, 'kqueue'):
event = [
select.kevent(fileno,
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD)
]
self._kqueue.control(event, 0, 0)
else:
logger.debug("register_connection: put watch on fileno {}".format(fileno))
self._waitobj.watch(fileno, read=True)
[Doku] def unregister_connection(self, fileno):
if fileno == -1:
logger.error("tried to unregister a connection with filenumber == -1")
return
try:
if hasattr(select, 'epoll'):
self._epoll.unregister(fileno)
elif hasattr(select, 'kqueue'):
pass
else:
logger.debug("unregister_connection: unwatch fileno {}".format(fileno))
self._waitobj.unwatch(fileno)
except Exception as e:
logger.error("unregister a connection with filenumber == {} for epoll failed".format(fileno))
try:
del(self._connections[fileno])
except:
pass
try:
del(self._servers[fileno])
except:
pass
[Doku] def monitor(self, obj):
self._monitor.append(obj)
[Doku] def check(self):
for obj in self._monitor:
if not obj.connected:
obj.connect()
[Doku] def trigger(self, fileno):
if fileno == -1:
logger.error("tried to trigger a connection with filenumber == -1")
return
if self._connections[fileno].outbuffer:
if hasattr(select, 'epoll'):
self._epoll.modify(fileno, self._rw)
elif hasattr(select, 'kqueue'):
event = [
select.kevent(fileno,
filter=select.KQ_FILTER_WRITE,
flags=select.KQ_EV_ADD | KQ_EV_ONESHOT)
]
self._kqueue.control(event, 0, 0)
else:
logger.error("trigger: Operating System without epoll or kqueue is currently not supported, please report this error")
[Doku] def poll(self):
time.sleep(0.0000000001) # give epoll.modify a chance
if not self._connections:
time.sleep(1)
return
if -1 in self._connections:
logger.error("fileno -1 was found, please report to SmartHomeNG team")
del( self._connections[-1])
if hasattr(select, 'epoll') or hasattr(select, 'kqueue'):
for fileno in self._connections:
# Fix for: "RuntimeError: dictionary changed size during iteration"
#connections_keys = self._connections.keys()
#for fileno in connections_keys:
if fileno not in self._servers:
if hasattr(select, 'epoll'):
if self._connections[fileno].outbuffer:
try:
self._epoll.modify(fileno, self._rw)
except OSError as e:
# as with python 3.6 an OSError will be raised when a socket is already closed like with a settimeout
# the socket will need to be recreated then
logger.error("OSError {} for epoll.modify(RW) with fileno {} for object {}, please report to SmartHomeNG team".format(e, fileno, self._connections[fileno]))
# here we could try to get rid of the connection that causes the headache
except PermissionError as e:
logger.error("PermissionError {} for epoll.modify(RW) with fileno {} for object {}, please report to SmartHomeNG team".format(e, fileno, self._connections[fileno]))
# here we could try to get rid of the connection that causes the headache
except FileNotFoundError as e:
logger.error("FileNotFoundError {} for epoll.modify(RW) with fileno {} for object {}, please report to SmartHomeNG team".format(e, fileno, self._connections[fileno]))
# here we could try to get rid of the connection that causes the headache
else:
try:
self._epoll.modify(fileno, self._ro)
except OSError as e:
# as with python 3.6 an OSError will be raised when a socket is already closed like with a settimeout
# the socket will need to be recreated then
logger.error("OSError {} for epoll.modify(RO) with fileno {} for object {}, please report to SmartHomeNG team".format(e, fileno, self._connections[fileno]))
# here we could try to get rid of the connection that causes the headache
except PermissionError as e:
logger.error("PermissionError {} for epoll.modify(RO) with fileno {} for object {}, please report to SmartHomeNG team".format(e, fileno, self._connections[fileno]))
# here we could try to get rid of the connection that causes the headache
except FileNotFoundError as e:
logger.error("FileNotFoundError {} for epoll.modify(RO) with fileno {} for object {}, please report to SmartHomeNG team".format(e, fileno, self._connections[fileno]))
# here we could try to get rid of the connection that causes the headache
elif hasattr(select, 'kqueue'):
event = []
if self._connections[fileno].outbuffer:
event.append(select.kevent(fileno,
filter=select.KQ_FILTER_WRITE,
flags=select.KQ_EV_ADD | KQ_EV_ONESHOT))
else:
event.append(select.kevent(fileno,
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD))
self._kqueue.control(event, 0, 0)
if hasattr(select, 'epoll'):
for fileno, event in self._epoll.poll(timeout=1):
if fileno in self._servers:
server = self._servers[fileno]
server.handle_connection()
else:
if event & select.EPOLLIN:
try:
con = self._connections[fileno]
con._in()
except Exception as e:
con.close()
continue
if event & select.EPOLLOUT:
try:
con = self._connections[fileno]
con._out()
except Exception as e:
con.close()
continue
if event & (select.EPOLLHUP | select.EPOLLERR):
try:
con = self._connections[fileno]
con.close()
continue
except:
pass
elif hasattr(select, 'kqueue'):
for event in self._kqueue.control(None, 1):
fileno = event.ident
if fileno in self._servers:
server = self._servers[fileno]
server.handle_connection()
else:
if event.filter == select.KQ_FILTER_READ:
try:
con = self._connections[fileno]
con._in()
except Exception as e: # noqa
con.close()
continue
if event.filter == select.KQ_FILTER_WRITE:
try:
con = self._connections[fileno]
con._out()
except Exception as e: # noqa
con.close()
continue
if event.flags & select.KQ_EV_EOF:
try:
con = self._connections[fileno]
con.close()
continue
except:
pass
else:
# not using epoll or kqueue
n_connections = len(self._connections)
if self._connections_found != n_connections:
logger.debug("lib/connection.py poll() for len(self._connections)={}".format(n_connections))
self._connections_found = n_connections
watched = self._waitobj.get_watched()
nwatched = len(watched)
logger.debug("iowait in alpha status with {} watched connections".format(nwatched))
events = self._waitobj.wait()
nevents = len(events)
logger.debug("iowait reports {} events".format(nevents))
for fileno, read, write in events:
logger.debug("event for fileno={}, read={}, write={}".format(fileno, read, write))
if fileno in self._servers:
server = self._servers[fileno]
server.handle_connection()
else:
logger.debug("fileno {} not in self._servers".format(fileno))
if read:
try:
con = self._connections[fileno]
con._in()
except Exception as e: # noqa
con.close()
continue
if write:
try:
con = self._connections[fileno]
con._out()
except Exception as e: # noqa
con.close()
continue
[Doku] def close(self):
if -1 in self._connections:
logger.error("Connections.close() tried to close a filenumber == -1")
try:
for fileno in self._connections:
try:
self._connections[fileno].close()
except:
pass
except:
pass
[Doku]class Server(Base):
def __init__(self, host, port, proto='TCP'):
Base.__init__(self, monitor=True)
self._host = host
self._port = port
self._proto = proto
self.address = "{}:{}".format(host, port)
self.connected = False
#self._deprecated_warning('lib.network.Tcp_server() class')
_deprecated_warning('lib.network.Tcp_server() class', 'class Server')
[Doku] def connect(self):
try:
sockaddr = self._create_socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(sockaddr)
if self._proto.startswith('TCP'):
self.socket.listen(5)
self.socket.setblocking(0)
except Exception as e:
logger.error("{}: problem binding {} ({}): {}".format(self._name, self.address, self._proto, e))
self.close()
else:
self.connected = True
logger.debug("{}: binding to {} ({})".format(self._name, self.address, self._proto))
self._poller.register_server(self.socket.fileno(), self)
[Doku] def close(self):
self.connected = False
try:
self._poller.unregister_connection(self.socket.fileno())
except:
pass
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
try:
self.socket.close()
except:
pass
try:
del(self.socket)
except:
pass
[Doku] def accept(self):
try:
sock, addr = self.socket.accept()
sock.setblocking(0)
addr = "{}:{}".format(addr[0], addr[1])
logger.debug("{}: incoming connection from {} to {}".format(self._name, addr, self.address))
return sock, addr
except:
return None, None
[Doku] def handle_connection(self):
pass
[Doku]class Stream(Base):
def __init__(self, sock=None, address=None, monitor=False):
Base.__init__(self, monitor=monitor)
self.connected = False
self.address = address
self.inbuffer = bytearray()
self.outbuffer = collections.deque()
self.__olock = threading.Lock()
self._frame_size_in = 4096
self._frame_size_out = 4096
self.terminator = b'\r\n'
self._balance_open = False
self._balance_close = False
self._close_after_send = False
if sock is not None:
self.socket = sock
self._connected()
def _connected(self):
self._poller.register_connection(self.socket.fileno(), self)
self.connected = True
self.handle_connect()
def _in(self):
max_size = self._frame_size_in
try:
data = self.socket.recv(max_size)
except Exception as e: # noqa
self.close()
return
if data == b'':
self.close()
return
self.inbuffer.extend(data)
while True:
terminator = self.terminator
buffer_len = len(self.inbuffer)
if not terminator:
if not self._balance_open:
break
index = self._is_balanced()
if index:
data = self.inbuffer[:index]
self.inbuffer = self.inbuffer[index:]
self.found_balance(data)
else:
break
elif isinstance(terminator, int):
if buffer_len < terminator:
break
else:
data = self.inbuffer[:terminator]
self.inbuffer = self.inbuffer[terminator:]
self.terminator = 0
self.found_terminator(data)
else:
if terminator not in self.inbuffer:
break
index = self.inbuffer.find(terminator)
data = self.inbuffer[:index]
cut = index + len(terminator)
self.inbuffer = self.inbuffer[cut:]
self.found_terminator(data)
def _is_balanced(self):
stack = []
for index, char in enumerate(self.inbuffer):
if char == self._balance_open:
stack.append(char)
elif char == self._balance_close:
stack.append(char)
if stack.count(self._balance_open) < stack.count(self._balance_close):
logger.warning("{}: unbalanced input!".format(self._name))
logger.close()
return False
if stack.count(self._balance_open) == stack.count(self._balance_close):
return index + 1
return False
def _out(self):
if not self.__olock.acquire(timeout=1):
return
try:
while self.connected:
frame = self.outbuffer.pop()
if not frame:
if frame is None:
self.close()
return
continue # ignore empty frames
sent = self.socket.send(frame)
if sent < len(frame):
self.outbuffer.append(frame[sent:])
except IndexError: # buffer empty
return
except socket.error:
self.outbuffer.append(frame)
except Exception as e: # noqa
logger.exception("{}: {}".format(self._name, e))
self.close()
finally:
if self._close_after_send:
logger.debug("close after send")
self.close()
self.__olock.release()
[Doku] def balance(self, bopen, bclose):
self._balance_open = ord(bopen)
self._balance_close = ord(bclose)
[Doku] def close(self):
if self.connected:
logger.debug("{}: closing socket {}".format(self._name, self.address))
self.connected = False
try:
self._poller.unregister_connection(self.socket.fileno())
except:
pass
try:
self.handle_close()
except:
pass
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
try:
self.socket.close()
except:
pass
try:
del(self.socket)
except:
pass
[Doku] def discard_buffers(self):
self.inbuffer = bytearray()
self.outbuffer.clear()
[Doku] def found_terminator(self, data):
pass
[Doku] def found_balance(self, data):
pass
[Doku] def handle_close(self):
pass
[Doku] def handle_connect(self):
pass
[Doku] def send(self, data, close=False):
self._close_after_send = close
if not self.connected:
return False
frame_size = self._frame_size_out
if len(data) > frame_size:
for i in range(0, len(data), frame_size):
self.outbuffer.appendleft(data[i:i + frame_size])
else:
self.outbuffer.appendleft(data)
self._out()
return True
[Doku]class Client(Stream):
def __init__(self, host, port, proto='TCP', monitor=False):
Stream.__init__(self, monitor=monitor)
self._host = host
self._port = port
self._proto = proto
self.address = "{}:{}".format(host, port)
self._connection_attempts = 0
self._connection_errorlog = 60
self._connection_lock = threading.Lock()
#self._deprecated_warning('lib.network.Tcp_client() class')
_deprecated_warning('lib.network.Tcp_client() class', 'class Client')
[Doku] def connect(self):
self._connection_lock.acquire()
if self.connected:
self._connection_lock.release()
return
try:
sockaddr = self._create_socket()
self.socket.settimeout(2)
self.socket.connect(sockaddr)
self.socket.setblocking(0)
# self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except Exception as e:
self._connection_attempts -= 1
if self._connection_attempts <= 0:
logger.error("{}: could not connect to {} ({}): {}".format(self._name, self.address, self._proto, e))
self._connection_attempts = self._connection_errorlog
self.close()
else:
logger.debug("{}: connected to {}".format(self._name, self.address))
self._connected()
finally:
self._connection_lock.release()