#!/usr/bin/env python3
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
#########################################################################
# Parts Copyright 2016 C. Strassburg (lib.utils) c.strassburg@gmx.de
# Copyright 2017- Serge Wagener serge@wagener.family
# Copyright 2020- Sebastian Helms morg @ knx-user-forum
#########################################################################
# This file is part of SmartHomeNG
#
# SmartHomeNG is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SmartHomeNG is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SmartHomeNG If not, see <http://www.gnu.org/licenses/>.
#########################################################################
"""
This library contains the network classes for SmartHomeNG.
New network functions and utilities are going to be implemented in this library.
These classes, functions and methods are mainly meant to be used by plugin developers
- class Network provides utility methods for network-related tasks
- class Html provides methods for communication with resp. requests to a HTTP server
- class Tcp_client provides a two-way TCP client implementation
- class Tcp_server provides a TCP listener with connection / data callbacks
- class Udp_server provides a UDP listener with data callbacks
"""
from lib.utils import Utils
import sys
import traceback
from inspect import signature
import re
import asyncio
import logging
import requests
from iowait import IOWait
import socket
import struct
import subprocess
import threading
import time
from contextlib import suppress
from . import aioudp
# Turn off ssl warnings from urllib
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
logging.getLogger('urllib3').setLevel(logging.WARNING)
[Doku]class Network(object):
"""
Provide useful static methods that you can use in your projects.
NOTE: Some format check routines were duplicate with lib.utils. As these primarily check string formats and are used for metadata parsing, they were removed here to prevent duplicates.
"""
[Doku] @staticmethod
def ip_port_to_socket(ip, port):
"""
Return an ip address plus port to a socket string.
Format is 'ip:port' for IPv4 or '[ip]:port' for IPv6
:return: Socket address / IP endpoint as string
:rtype: string
"""
if Utils.is_ipv6(ip):
ip = f'[{ip}]'
return f'{ip}:{port}'
[Doku] @staticmethod
def family_to_string(family):
"""
Convert a socket address family to an ip version string 'IPv4' or 'IPv6'.
:param family: Socket family
:type family: socket.AF_INET or socket.AF_INET6
:return: 'IPv4' or 'IPv6'
:rtype: string
"""
return 'IPv6' if family == socket.AF_INET6 else 'IPv4'
[Doku] @staticmethod
def ping(ip):
"""
Try to ICMP ping a host using external OS utilities. IPv4 only.
:param ip: IPv4 address as a string
:type ip: string
:return: True if a reachable, false otherwise.
:rtype: bool
"""
logger = logging.getLogger(__name__)
if subprocess.call(f'ping -c 1 {ip}', shell=True, stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT) == 0:
logger.debug(f'Ping: {ip} is online')
return True
else:
logger.debug(f'Ping: {ip} is offline')
return False
[Doku] @staticmethod
def ping_port(ip, port=80):
"""
Try to reach a given TCP port. IPv4 only.
:param ip: IPv4 address
:param port: Port number
:type ip: string
:type port: int
:return: True if reachable, false otherwise.
:rtype: bool
"""
logger = logging.getLogger(__name__)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
if sock.connect_ex((ip, int(port))) == 0:
logger.debug(f'Ping: port {port} on {ip} is reachable')
sock.close()
return True
else:
logger.debug(f'Ping: port {port} on {ip} is offline or not reachable')
sock.close()
return False
[Doku] @staticmethod
def send_wol(mac, ip='255.255.255.255'):
"""
Send a wake on lan packet to the given mac address using ipv4 broadcast (or directed to specific ip).
:param mac: Mac address to wake up (pure numbers or with any separator)
:type mac: string
"""
logger = logging.getLogger(__name__)
if len(mac) == 12:
pass
elif len(mac) == 12 + 5:
mac = mac.replace(mac[2], '')
else:
logger.error('Incorrect MAC address format')
return
data = ''.join(['FFFFFFFFFFFF', mac * 16])
send_data = b''
for i in range(0, len(data), 2):
send_data = b''.join([send_data, struct.pack('B', int(data[i: i + 2], 16))])
for _ in range(15):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(send_data, (ip, 9))
logger.debug(f'Sent WOL packet to {mac}')
[Doku] @staticmethod
def validate_inet_addr(addr, port):
"""
Validate that addr:port resolve properly and return resolved IP address and port.
:param addr: hostname or ip address under test
:type addr: str
:param port: port number under test
:type port: num
:return: (ip_address, port, family) or (None, 0, None) if error occurs
:rtype: tuple
"""
logger = logging.getLogger(__name__)
# Test if host is empty
if addr == '':
return ('', port, socket.AF_INET)
else:
# try to resolve addr to get more info
logger.debug(f'trying to resolve addr {addr} with port {port}')
try:
family, sockettype, proto, canonname, socketaddr = socket.getaddrinfo(addr, None)[0]
# Check if resolved address is IPv4 or IPv6
if family == socket.AF_INET:
ip, _ = socketaddr
elif family == socket.AF_INET6:
ip, _, flow_info, scope_id = socketaddr
else:
# might be AF_UNIX or something esoteric?
logger.error(f'Unsupported address family {family}')
ip = None
if ip is not None:
logger.info(f'Resolved {addr} to {Network.family_to_string(family)} address {ip}')
except socket.gaierror as e:
# Unable to resolve hostname
logger.error(f'Cannot resolve {addr} to a valid ip address (v4 or v6): {e}')
ip = None
port = 0
family = None
return (ip, port, family)
[Doku] @staticmethod
def clean_uri(uri, mode='show'):
"""
Check URIs for embedded http/https login data (http://user:pass@domain.tld...) and clean it.
Possible modes are:
- 'show': don't change URI (default) -> ``http://user:pass@domain.tld...``
- 'mask': replace login data with ``***`` -> ``http://***:***@domain.tld...``
- 'strip': remove login data part -> ``http://domain.tld...``
:param uri: full URI to check and process
:param mode: handling mode, one of 'show', 'strip', 'mask'
:return: resulting URI string
:type uri: str
:type mode: str
:rtype: str
"""
# find login data
pattern = re.compile('http([s]?)://([^:]+:[^@]+@)')
# possible replacement modes
replacement = {
'strip': 'http\\g<1>://',
'mask': 'http\\g<1>://***:***@'
}
# if no change requested or no login data found, return unchanged
if mode not in replacement or not pattern.match(uri):
return uri
# return appropriately changed URI
return pattern.sub(replacement[mode], uri)
[Doku]class Connections(object):
"""
Within SmartHome.py there is one instance of this class
The monitoring feature enables autoconnecting and auto-
reconnecting by checking <plugin>.connected and calling
<plugin>.connect()
"""
_monitor = []
def __init__(self):
self._name = self.__class__.__name__
[Doku] def monitor(self, obj):
if obj not in self._monitor:
self._monitor.append(obj)
[Doku] def unmonitor(self, obj):
if obj in self._monitor:
self._monitor.remove(obj)
[Doku] def check(self):
for obj in self._monitor:
if obj.alive and not obj.connected:
obj.connect()
[Doku]class Http(object):
"""
Provide methods to simplify HTTP connections, especially to talk to HTTP servers.
:param baseurl: base URL used everywhere in this instance (example: http://www.myserver.tld)
:param timeout: Set a maximum amount of seconds the class should try to establish a connection
:param hide_login: Hide or mask login data in logged http(s) requests (see ``Network.clean_uri()``)
:type baseurl: str
:type timeout: int
:type hide_login: str
"""
def __init__(self, baseurl='', timeout=10, hide_login='show', name=None):
self.logger = logging.getLogger(__name__)
self.baseurl = baseurl
self._response = None
self.timeout = timeout
self._session = requests.Session()
self._hide_login = hide_login
self._id = f'({name if name else "HTTP"}_{self.baseurl})'
[Doku] def HTTPDigestAuth(self, user=None, password=None):
"""
Create a HTTPDigestAuth instance and returns it to the caller.
:param user: Username
:param password: Password
:type user: str
:type password: str
:return: HTTPDigestAuth object
:rtype: HTTPDigestAuth
"""
return requests.auth.HTTPDigestAuth(user, password)
[Doku] def post_json(self, url=None, params=None, verify=True, auth=None, json=None, files={}):
"""
Launch a POST request and return JSON answer as a dict or None on error.
:param url: Optional URL to fetch from. If None (default) use baseurl given on init.
:param params: Optional dict of parameters to add to URL query string.
:param verify: Set to false to ignore SSL certificate verification errors (for self-signed for example)
:param auth: Optional authentication object
:type url: str
:type params: dict
:type verify: bool
:type auth: HTTPBasicAuth | HTTPDigestAuth | ...
:return: JSON answer decoded into a dict or None on whatever error occured
:rtype: dict | None
"""
if self.__post(url=url, params=params, verify=verify, auth=auth, json=json, files=files):
json = None
try:
json = self._response.json()
except Exception:
self.logger.warning(f'{self._id} invalid JSON received from {Network.clean_uri(url, self._hide_login) if url else self.baseurl}')
return json
return None
[Doku] def get_json(self, url=None, params=None, verify=True, auth=None):
"""
Launch a GET request and return JSON answer as a dict or None on error.
:param url: Optional URL to fetch from. If None (default) use baseurl given on init.
:param params: Optional dict of parameters to add to URL query string.
:param verify: Set to false to ignore SSL certificate verification errors (for self-signed for example)
:param auth: Optional authentication object
:type url: str
:type params: dict
:type verify: bool
:type auth: HTTPBasicAuth | HTTPDigestAuth | ...
:return: JSON answer decoded into a dict or None on whatever error occured
:rtype: dict | None
"""
if self.__get(url=url, params=params, verify=verify, auth=auth):
json = None
try:
json = self._response.json()
except Exception:
self.logger.warning(f'{self._id} invalid JSON received from {Network.clean_uri(url if url else self.baseurl, self._hide_login) }')
return json
return None
[Doku] def get_text(self, url=None, params=None, encoding=None, timeout=None):
"""
Launch a GET request and return answer as string or None on error.
:param url: Optional URL to fetch from. Default is to use baseurl given to constructor.
:param params: Optional dict of parameters to add to URL query string.
:param encoding: Optional encoding of the received text. Default is to let the lib try to figure out the right encoding.
:type url: str
:type params: dict
:type encoding: str
:return: Answer decoded into a string or None on whatever error occured
:rtype: str | None
"""
_text = None
if self.__get(url=url, params=params, timeout=timeout):
try:
if encoding:
self._response.encoding = encoding
_text = self._response.text
except Exception as e:
self.logger.error(f'{self._id} successful GET, but decoding response failed. This should never happen...error was: {e}')
return _text
[Doku] def download(self, url=None, local=None, params=None, verify=True, auth=None):
"""
Download a binary file to a local path.
:param url: Remote file to download. Attention: Must be full url. 'baseurl' is NOT prefixed here.
:param local: Local file to save
:param params: Optional dict of parameters to add to URL query string.
:param verify: Set to false to ignore SSL certificate verification errors (for self-signed for example)
:param auth: Optional authentication object
:type url: str
:type local: str
:type params: dict
:type verify: bool
:type auth: HTTPBasicAuth | HTTPDigestAuth | ...
:return: Returns true on success, else false
:rtype: bool
"""
if self.__get(url=url, params=params, verify=verify, auth=auth, stream=True):
self.logger.debug(f'{self._id} download of {Network.clean_uri(url, self._hide_login)} successfully completed, saving to {local}')
with open(str(local), 'wb') as f:
for chunk in self._response:
f.write(chunk)
return True
else:
self.logger.warning(f'{self._id} download error: {Network.clean_uri(url, self._hide_login)}')
return False
[Doku] def get_binary(self, url=None, params=None):
"""
Launch a GET request and return answer as raw binary data or None on error.
This is useful for downloading binary objects / files.
:param url: Optional URL to fetch from. Default is to use baseurl given to constructor.
:param params: Optional dict of parameters to add to URL query string.
:type url: str
:type params: dict
:return: Answer as raw binary objector None on whatever error occured
:rtype: bytes | None
"""
self.__get(url=url, params=params)
return self._response.content
[Doku] def response_status(self):
"""
Return the status code (200, 404, ...) of the last executed request.
If GET request was not possible and thus no HTTP statuscode is available,
the returned status code is 0.
:return: Status code and text of last request
:rtype: tuple(int, str)
"""
try:
(code, reason) = (self._response.status_code, self._response.reason)
except Exception:
code = 0
reason = 'Unable to complete GET request'
return (code, reason)
[Doku] def response_cookies(self):
"""
Return a dictionary with the cookies the server may have sent on the last executed request.
:return: Cookies returned by server
:rtype: dict
"""
return self._response.cookies
[Doku] def response_object(self):
"""
Return the raw response object for advanced ussage.
:return: Reponse object as returned by underlying requests library
:rtype: `requests.Response <http://docs.python-requests.org/en/master/user/quickstart/#response-content>`_
"""
return self._response
def __post(self, url=None, params=None, timeout=None, verify=True, auth=None, json=None, data=None, files={}):
"""
Send POST request. Non-documented arguments are passed on to requests.request().
:param url: URL to which to POST
:type url: str
:param data: data to submit to POST
:type data: dict or bytes or file
:return: True if POST was successful
:rtype: bool
"""
url = self.baseurl + url if url else self.baseurl
timeout = timeout if timeout else self.timeout
data = json if json else data
self.logger.info(f'{self._id} sending POST request {json} to {Network.clean_uri(url, self._hide_login)}')
try:
self._response = self._session.post(url, params=params, timeout=timeout, verify=verify, auth=auth, data=data, files=files)
self.logger.debug(f'{self.response_status()} Posted to URL {Network.clean_uri(self._response.url, self._hide_login)}')
except Exception as e:
self.logger.warning(f'{self._id} error sending POST request to {Network.clean_uri(url, self._hide_login)}: {e}')
return False
return True
def __get(self, url=None, params=None, timeout=None, verify=True, auth=None, stream=False):
"""
Send POST request. Non-documented arguments are passed on to requests.request().
:param url: URL to which to GET
:type url: str
:return: True if GET was successful
:rtype: bool
"""
url = self.baseurl + url if url else self.baseurl
timeout = timeout if timeout else self.timeout
self.logger.info(f'{self._id} sending GET request to {Network.clean_uri(url, self._hide_login)}')
try:
self._response = self._session.get(url, params=params, timeout=timeout, verify=verify, auth=auth, stream=stream)
self.logger.debug(f'{self._id} {self.response_status()} fetched URL {Network.clean_uri(self._response.url, self._hide_login)}')
except Exception as e:
self.logger.warning(f'{self._id} error sending GET request to {Network.clean_uri(url, self._hide_login)}: {e}')
self._response = None
return False
return True
[Doku]class Tcp_client(object):
"""
Structured class to handle locally initiated TCP connections with two-way communication.
The callbacks need to be defined as follows:
def connected_callback(Tcp_client_instance)
def receiving_callback(Tcp_client_instance)
def disconnected_callback(Tcp_client_instance)
def data_received_callback(Tcp_client_instance, message)
(Class members need the additional first `self` parameter)
:param host: Remote host name or ip address (v4 or v6)
:param port: Remote host port to connect to
:param name: Name of this connection (mainly for logging purposes). Try to keep the name short.
:param autoreconnect: Should the socket try to reconnect on lost connection (or finished connect cycle)
:param autoconnect: automatically connect on send. Copies autoreconnect if None
:param connect_retries: Number of connect retries per connect round
:param connect_cycle: Time between retries inside a connect round
:param retry_cycle: Time between connect rounds if autoreconnect is True
:param retry_abort: abort connecting after this many failed connect rounds and call abort_callback, no action if set to 0 or callback not set
:param abort_callback: callback function to be run on connection abort
:param binary: Switch between binary and text mode. Text will be encoded / decoded using encoding parameter.
:param terminator: Terminator to use to split received data into chunks (split lines <cr> for example). If integer then split into n bytes. Default is None means process chunks as received.
:param timeout: Timeout to set for connected socket. Don't change without reason
:type host: str
:type port: int
:type name: str
:type autoreconnect: bool
:type autoconnect: bool
:type connect_retries: int
:type connect_cycle: int
:type retry_cycle: int
:type retry_abort: int
:type abort_callback: function
:type binary: bool
:type terminator: int | bytes | str
:type timeout: int
"""
def __init__(self, host, port, name=None,
autoreconnect=True, autoconnect=None, connect_retries=5,
connect_cycle=5, retry_cycle=30, retry_abort=0,
abort_callback=None, binary=False, terminator=False, timeout=1,
rate_limit=1, max_rate_connects=10):
self.logger = logging.getLogger(__name__)
# public properties
self.name = name
self.terminator = terminator
# protected properties
self._host = host
self._port = port
self._autoreconnect = autoreconnect
self._autoconnect = autoconnect
if self._autoconnect is None:
self._autoconnect = self._autoreconnect
self._is_connected = False
self._is_receiving = False
self._connect_retries = connect_retries
self._connect_cycle = connect_cycle
self._retry_cycle = int(retry_cycle)
self._retry_abort = retry_abort
self._abort_callback = abort_callback
self._timeout = timeout
self._ratelimit = rate_limit
self._max_rate_connects = max_rate_connects
self._last_connect = 0
self._num_connects = 0
self._hostip = None
self._family = socket.AF_INET
self._socket = None
self._connect_counter = 0
self._retry_round_counter = 0
self._binary = binary
self._connected_callback = None
self._receiving_callback = None
self._disconnected_callback = None
self._data_received_callback = None
# private properties
self.__connect_thread = None
self.__connect_threadlock = threading.Lock()
self.__receive_thread = None
self.__running = False
# self.logger.setLevel(logging.DEBUG) # Das sollte hier NICHT gesetzt werden, sondern in etc/logging.yaml im Logger lib.network konfiguriert werden!
self._host = host
self._port = port
self._id = f'({self.name if self.name else "TCP_Client"}_{self._host}:{self._port})'
(self._hostip, self._port, self._family) = Network.validate_inet_addr(host, port)
if self._hostip is not None:
self._id = f'({self.name if self.name else "TCP_Client"}_{self._hostip}:{self._port})'
self.logger.info(f'{self._id} Initializing a connection to {self._host} on TCP port {self._port} {"with" if self._autoreconnect else "without"} autoreconnect')
else:
self.logger.error(f'{self._id} Connection to {self._host} not possible, invalid address')
[Doku] def set_callbacks(self, connected=None, receiving=None, data_received=None, disconnected=None):
"""
Set callbacks to caller for different socket events.
:param connected: Called whenever a connection is established successfully
:param data_received: Called when data is received
:param disconnected: Called when a connection has been dropped for whatever reason
:type connected: function
:type data_received: function
:type disconnected: function
"""
if connected:
params = len(signature(connected).parameters)
self.logger.debug(f"connected_callback for {self._id} is {connected.__qualname__} and it expects {params} arguments")
self._connected_callback = connected
if receiving:
params = len(signature(receiving).parameters)
self.logger.debug(f"connected_callback for {self._id} is {receiving.__qualname__} and it expects {params} arguments")
self._receiving_callback = receiving
if disconnected:
params = len(signature(disconnected).parameters)
self.logger.debug(f"connected_callback for {self._id} is {disconnected.__qualname__} and it expects {params} arguments")
self._disconnected_callback = disconnected
if data_received:
params = len(signature(data_received).parameters)
self.logger.debug(f"connected_callback for {self._id} is {data_received.__qualname__} and it expects {params} arguments")
self._data_received_callback = data_received
[Doku] def open(self):
self.connect()
[Doku] def connect(self):
"""
Connect the socket.
:return: False if an error prevented us from launching a connection thread. True if a connection thread has been started.
:rtype: bool
"""
if self._is_connected: # return false if already connected
self.logger.debug(f'{self._id} already connected, ignoring new request')
return False
if self._hostip is None: # return False if no valid ip to connect to
self.logger.error(f'{self._id} no valid IP address to connect')
self._is_connected = False
return False
# prevent starting connect thread twice
with self.__connect_threadlock:
self.logger.debug(f'Starting connect to {self._host}:{self._port}')
if not self.__connect_thread or not self.__connect_thread.is_alive():
# limit connection rates
if time.time() < self._last_connect + (1.0 / self._ratelimit):
self.logger.debug(f'connect: rate limit active, minimum delay is {1.0 / self._ratelimit}, current delay is {time.time() - self._last_connect}')
self._num_connects += 1
if self._num_connects >= self._max_rate_connects:
# too many rate limits reached
self.logger.debug(f'connect: max number of rate limits hit {self._max_rate_connects}, aborting connect')
if self._abort_callback:
self._abort_callback()
self._num_connects = 0
return False
# wait till we may connect again
while time.time() < self._last_connect + (1.0 / self._ratelimit):
time.sleep(.1)
self.logger.dbglow(f'connect() creating connect thread "TCP_Connect {self._id}')
self.__connect_thread = threading.Thread(target=self._connect_thread_worker, name=f'TCP_Connect {self._id}')
self.__connect_thread.daemon = True
self.logger.dbglow(f'connect() to {self._host}:{self._port}: self.__running={self.__running}, self.__connect_thread.is_alive()={self.__connect_thread.is_alive()}')
if not self.__running or not self.__connect_thread.is_alive():
self.logger.dbglow(f'connect() to {self._host}:{self._port}: calling __connect_thread.start()')
try:
self.__connect_thread.start()
except RuntimeError as e:
self.logger.dbglow(f'connect() starting thread failed, error was {e}, thread is {self.__connect_thread}, running={self.__running}, is_alive()={self.__connect_thread.is_alive()}')
return False
self.logger.dbglow(f'leaving connect() to {self._host}:{self._port}')
return True
[Doku] def connected(self):
"""
Return the current connection state.
:return: True if an active connection exists,else False.
:rtype: bool
"""
return self._is_connected
[Doku] def send(self, message):
"""
Send a message to the server. Can be a string, bytes or a bytes array.
:return: True if message has been successfully sent, else False.
:rtype: bool
"""
if not isinstance(message, (bytes, bytearray)):
try:
message = message.encode('utf-8')
except Exception:
self.logger.warning(f'{self._id} error encoding message for client')
return False
# automatically (re)connect on send attempt
if not self._is_connected:
if self._autoconnect:
self.logger.debug(f'{self._id} autoconnecting on send attempt, message is {message}')
self.connect()
else:
self.logger.warning(f'{self._id} trying to send {message}, but not connected and autoconnect not active. Aborting.')
return False
try:
if self._is_connected:
bytes_sent = self._socket.send(message)
if bytes_sent != len(message):
self.logger.warning(f'{self._id} error sending message {message}: message truncated, sent {bytes_sent} of {len(message)} bytes')
else:
return False
except (BrokenPipeError, TimeoutError, ConnectionResetError) as e:
if e.errno == 60:
# timeout
self.logger.warning(f'{self._id} detected timeout, disconnecting, send failed.')
else:
self.logger.warning(f'{self._id} detected disconnect, send failed.')
self._is_connected = False
try:
self._socket.shutdown()
except Exception:
pass
if self._disconnected_callback:
self._disconnected_callback(self)
if self._autoreconnect:
self.logger.debug(f'{self._id} autoreconnect enabled')
self.connect()
return False
except Exception as e: # log errors we are not prepared to handle and raise exception for further debugging
self.logger.warning(f'{self._id} unhandleded error on sending, cannot send data {message}. Error: {e}')
raise
return True
def _connect_thread_worker(self):
"""
Thread worker to handle connection.
"""
if self._is_connected:
self.logger.info(f'{self._id} already connected, ignoring new request')
return
self.logger.debug(f'{self._id} starting connection cycle')
self._connect_counter = 0
self._retry_round_counter = 0
self.__running = True
while self.__running and not self._is_connected:
# Try a full connect round
while not self._is_connected and self._connect_counter < self._connect_retries and (self._retry_round_counter < self._retry_abort or not self._retry_abort) and self.__running:
self._connect()
if self._is_connected:
try:
self._last_connect = time.time()
if self._connected_callback:
self._connected_callback(self)
name = f'TCP_Client {self._id}'
self.__receive_thread = threading.Thread(target=self.__receive_thread_worker, name=name)
self.__receive_thread.daemon = True
self.__receive_thread.start()
except Exception:
self.logger.error(f"could not start __receive_thread_worker for {self.name}")
raise
return True
else:
self.logger.warning(f"self._connect() for {self.name} did not work")
if self.__running and self._connect_counter < self._connect_retries:
self._sleep(self._connect_cycle)
if self._autoreconnect and self.__running:
self._retry_round_counter += 1
if self._retry_abort and self._retry_round_counter == self._retry_abort and self._abort_callback:
self._abort_callback()
break
self.logger.debug(f'waiting {self._retry_cycle} seconds before next connection attempt')
self._sleep(self._retry_cycle)
self._connect_counter = 0
else:
break
def _connect(self):
"""
Initiate connection.
"""
self.logger.debug(f'{self._id} connecting using TCP/{"IPv6" if self._family == socket.AF_INET6 else "IPv4"} {"with" if self._autoreconnect else "without"} autoreconnect')
# Try to connect to remote host using ip (v4 or v6)
try:
self._socket = socket.socket(self._family, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._socket.settimeout(5)
self._socket.connect((f'{self._hostip}', int(self._port)))
self._socket.settimeout(self._timeout)
self._is_connected = True
self.logger.info(f'{self._id} connected')
# Connection error
except Exception as err:
self._is_connected = False
self._connect_counter += 1
self.logger.warning(f'{self._id} TCP connection failed {self._connect_counter}/{int(self._connect_retries)} times, last error was: {err}')
def __receive_thread_worker(self):
"""
Thread worker to handle receiving.
"""
self.logger.debug(f'{self._id} started receive thread')
waitobj = IOWait()
waitobj.watch(self._socket, read=True)
__buffer = b''
self._is_receiving = True
if self._receiving_callback:
self._receiving_callback(self)
# try to find possible "hidden" errors
try:
while self._is_connected and self.__running:
events = waitobj.wait(1000) # BMX
for fileno, read, write in events: # BMX
if read:
timeout = False
try:
msg = self._socket.recv(4096)
except (TimeoutError, OSError) as e:
if isinstance(e, OSError) and e.errno not in (60, 65):
raise
msg = None
timeout = True
# Check if incoming message is not empty
if msg:
# TODO: doing this breaks line separation if multiple lines
# are read at a time, the next loop can't split it
# because line endings are missing
# find out reason for this operation...
# # If we transfer in text mode decode message to string
# # if not self._binary:
# # msg = str.rstrip(str(msg, 'utf-8')).encode('utf-8')
# If we work in line mode (with a terminator) slice buffer into single chunks based on terminator
if self.terminator:
__buffer += msg
while True:
# terminator = int means fixed size chunks
if isinstance(self.terminator, int):
i = self.terminator
if i > len(__buffer):
break
# terminator is str or bytes means search for it
else:
i = __buffer.find(self.terminator)
if i == -1:
break
i += len(self.terminator)
line = __buffer[:i]
__buffer = __buffer[i:]
if self._data_received_callback is not None:
try:
self._data_received_callback(self, line if self._binary else str(line, 'utf-8').strip())
except Exception as iex:
self._log_exception(iex, f'lib.network {self._id} receive in terminator mode calling data_received_callback {self._data_received_callback} failed: {iex}')
# If not in terminator mode just forward what we received
else:
if self._data_received_callback is not None:
try:
self._data_received_callback(self, msg)
except Exception as iex:
self._log_exception(iex, f'lib.network {self._id} calling data_received_callback {self._data_received_callback} failed: {iex}')
# If empty peer has closed the connection
else:
if self.__running:
self._is_receiving = False
self._is_connected = False
try:
self._socket.shutdown()
except Exception:
pass
if timeout:
# TimeoutError exception caught
self.logger.warning(f'{self._id} connection timed out, disconnecting.')
else:
# default state, peer closed connection
self.logger.warning(f'{self._id} connection closed by peer')
waitobj.unwatch(self._socket)
if self._disconnected_callback is not None:
try:
self._disconnected_callback(self)
except Exception as iex:
self._log_exception(iex, f'lib.network {self._id} calling disconnected_callback {self._disconnected_callback} failed: {iex}')
if self._autoreconnect:
self.logger.debug(f'{self._id} autoreconnect enabled')
self.connect()
if self._is_connected:
self.logger.debug(f'{self._id} set read watch on socket again')
waitobj.watch(self._socket, read=True)
else:
# socket shut down by self.close, no error
self.logger.debug(f'{self._id} connection shut down by call to close method')
self._is_receiving = False
self._is_connected = False
return
except Exception as ex:
if not self.__running:
self.logger.debug(f'{self._id} receive thread shutting down')
else:
self._log_exception(ex, f'lib.network {self._id} receive thread died with unexpected error: {ex}. Go tell...')
self._is_receiving = False
def _log_exception(self, ex, msg):
self.logger.error(msg + ' -- If stack trace is necessary, enable/check debug log')
if self.logger.isEnabledFor(logging.DEBUG):
# Get current system exception
ex_type, ex_value, ex_traceback = sys.exc_info()
# Extract unformatter stack traces as tuples
trace_back = traceback.extract_tb(ex_traceback)
# Format stacktrace
stack_trace = list()
for trace in trace_back:
stack_trace.append("File : %s , Line : %d, Func.Name : %s, Message : %s" % (trace[0], trace[1], trace[2], trace[3]))
self.logger.debug("Exception type : %s " % ex_type.__name__)
self.logger.debug("Exception message : %s" % ex_value)
self.logger.debug("Stack trace : %s" % stack_trace)
def _sleep(self, time_lapse):
"""
Sleep (at least) <time_lapse> seconds, but abort if self.__running changes to False.
:param time_lapse: wait time in seconds
:type time: int
"""
time_start = time.time()
time_end = (time_start + time_lapse)
while self.__running and time_end > time.time():
# modified from 'pass' - this way intervals of 1 second are given up to other threads
# but the abort loop stays intact with a maximum of 1 second delay
time.sleep(1)
[Doku] def close(self):
"""
Close the current client socket.
"""
self.__running = False
self.logger.info(f'{self._id} closing connection')
if self._is_connected:
try:
self._socket.shutdown(socket.SHUT_RD)
except Exception as e:
self.logger.info(f"socket no longer connected on disconnect, exception is {e}")
if self.__connect_thread is not None and self.__connect_thread.is_alive():
try:
self.__connect_thread.join()
except Exception:
pass
if self.__receive_thread is not None and self.__receive_thread.is_alive():
try:
self.__receive_thread.join()
except Exception:
pass
self.__connect_thread = None
self.__receive_thread = None
self.__connect_threadlock = threading.Lock()
self.__receive_threadlock = threading.Lock()
self._is_connected = False
def __str__(self):
if self.name:
return self.name
else:
return super().__str__()
[Doku]class ConnectionClient(object):
"""
Client object that represents a connected client returned by a Tcp_server instance on incoming connection.
This class should normally **not be instantiated manually**, but is provided by the Tcp_server via the callbacks
:param server: The tcp_server passes a reference to itself to access parent methods
:param socket: socket.Socket class used by the Client object
:param fd: File descriptor of socket used by the Client object
:type server: tcp_server
:type socket: function
:type fd: int
"""
def __init__(self, server=None, socket=None, ip=None, port=None, name=None):
self.logger = logging.getLogger(__name__)
self.name = name
self.ip = ip
self.port = port
self.family = None
self.writer = None
self.process_iac = True
self._data_received_callback = None
self._will_close_callback = None
self.__server = server
self.__socket = socket
self._id = f'({self.name if self.name else "Connection"}_{self.ip}:{self.port})'
@property
def socket(self):
"""
Socket getter.
"""
return self.__socket
[Doku] def set_callbacks(self, data_received=None, will_close=None):
"""
Set callbacks for different socket events (client based).
:param data_received: Called when data is received
:type data_received: function
"""
self._data_received_callback = data_received
self._will_close_callback = will_close
async def __drain_writer(self):
"""
Ensure drain() is called.
"""
with suppress(ConnectionResetError):
await self.writer.drain()
[Doku] def send(self, message):
"""
Send a string to connected client.
:param msg: Message to send
:type msg: string | bytes | bytearray
:return: True if message has been queued successfully.
:rtype: bool
"""
if not isinstance(message, (bytes, bytearray)):
try:
message = message.encode('utf-8')
except Exception:
self.logger.warning(f'{self._id} error encoding data')
return False
try:
self.writer.write(message)
asyncio.ensure_future(self.__drain_writer())
except Exception as e:
self.logger.warning(f'{self._id} error sending data: {e}')
return False
return True
[Doku] def send_echo_off(self):
"""
Send an IAC telnet command to ask client to turn its echo off.
"""
command = bytearray([0xFF, 0xFB, 0x01])
string = self._iac_to_string(command)
self.logger.debug(f'{self._id} sending IAC telnet command: {string}')
self.send(command)
[Doku] def send_echo_on(self):
"""
Send an IAC telnet command to ask client to turn its echo on again.
"""
command = bytearray([0xFF, 0xFC, 0x01])
string = self._iac_to_string(command)
self.logger.debug(f'{self._id} sending IAC telnet command: {string}')
self.send(command)
def _process_IAC(self, msg):
"""
Process incomming IAC messages.
NOTE: Does nothing for now except logging them in clear text
"""
if len(msg) >= 3:
string = self._iac_to_string(msg[:3])
self.logger.debug(f'{self._id} received IAC telnet command: {string}')
msg = msg[3:]
return msg
[Doku] def close(self):
"""
Close client socket.
"""
if self._will_close_callback:
self._will_close_callback(self)
self.set_callbacks(data_received=None, will_close=None)
self.writer.close()
return True
def _iac_to_string(self, msg):
iac = {1: 'ECHO', 251: 'WILL', 252: 'WON\'T', 253: 'DO', 254: 'DON\'T', 255: 'IAC'}
string = ''
for char in msg:
if char in iac:
string += iac[char] + ' '
else:
string += chr(char)
return string.rstrip()
def __str__(self):
if self.name:
return self.name
else:
return super().__str__()
[Doku]class Tcp_server(object):
"""
Threaded TCP listener which dispatches connections (and possibly received data) via callbacks.
NOTE: The callbacks need to expect the following arguments:
- ``incoming_connection(server, client)`` where ``server`` ist the ``Tcp_server`` instance and ``client`` is a ``ConnectionClient`` for the current connection
- ``data_received(server, client, data)`` where ``server`` ist the ``Tcp_server`` instance, ``client`` is a ``ConnectionClient`` for the current connection, and ``data`` is a string containing received data
- ``disconnected(server, client)`` where ``server`` ist the ``Tcp_server`` instance and ``client`` is a ``ConnectionClient`` for the closed connection
:param host: Local host name or ip address (v4 or v6). Default is '::' which listens on all IPv4 and all IPv6 addresses available.
:param port: Local port to connect to
:param name: Name of this connection (mainly for logging purposes)
:type host: str
:type port: int
:type name: str
"""
MODE_TEXT = 1
MODE_TEXT_LINE = 2
MODE_BINARY = 3
MODE_FIXED_LENGTH = 4
def __init__(self, port, host='', name=None, mode=MODE_BINARY, terminator=None):
self.logger = logging.getLogger(__name__)
# public properties
self.name = name
self.mode = mode
self.terminator = terminator
# private properties
self._host = host
self._port = port
self._is_listening = False
self._timeout = 1
self._ipaddr = None
self._family = socket.AF_INET
self._socket = None
self._incoming_connection_callback = None
self._data_received_callback = None
# protected properties
self.__loop = None
self.__coroutine = None
self.__server = None
self.__listening_thread = None
self.__running = True
# Test if host is an ip address or a host name
self._id = f'({self.name if self.name else "TCP_Server"}_{self._host}:{self._port})'
(self._ipaddr, self._port, self._family) = Network.validate_inet_addr(host, port)
if self._ipaddr is not None:
self._id = f'({self.name if self.name else "TCP_Server"}_{self._ipaddr}:{self._port})'
self.__our_socket = Network.ip_port_to_socket(self._ipaddr, self._port)
if not self.name:
self.name = self.__our_socket
[Doku] def set_callbacks(self, incoming_connection=None, disconnected=None, data_received=None):
"""
Set callbacks to caller for different socket events.
:param connected: Called whenever a connection is established successfully
:param data_received: Called when data is received
:param disconnected: Called when a connection has been dropped for whatever reason
:type connected: function
:type data_received: function
:type disconnected: function
"""
self._incoming_connection_callback = incoming_connection
self._data_received_callback = data_received
self._disconnected_callback = disconnected
[Doku] def start(self):
"""
Start the server socket.
:return: False if an error prevented us from launching a connection thread. True if a connection thread has been started.
:rtype: bool
"""
if self._is_listening:
return False
try:
self.logger.info(f'{self._id} starting up TCP server socket')
self.__loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.__loop)
self.__coroutine = asyncio.start_server(self.__handle_connection, self._ipaddr, self._port)
self.__server = self.__loop.run_until_complete(self.__coroutine)
self.__listening_thread = threading.Thread(target=self.__listening_thread_worker, name=f'TCPServer {self._id}')
self.__listening_thread.daemon = True
self.__listening_thread.start()
except Exception as e:
self.logger.error(f'{self._id} error starting server: {e}')
return False
return True
def __listening_thread_worker(self):
"""
Run the asyncio loop in a separate thread to not block the Tcp_server.start() method.
"""
asyncio.set_event_loop(self.__loop)
self._is_listening = True
try:
self.__loop.run_forever()
except Exception:
self.logger.debug(f'{self._id} error in loop.run_forever()')
finally:
for task in asyncio.all_tasks(self.__loop):
task.cancel()
self.__server.close()
self.__loop.run_until_complete(self.__server.wait_closed())
try:
self.__loop.close()
except Exception:
pass
self._is_listening = False
async def __handle_connection(self, reader, writer):
"""
Handle incoming connection.
Each client gets its own handler.
"""
peer = writer.get_extra_info('peername')
socket_object = writer.get_extra_info('socket')
peer_socket = Network.ip_port_to_socket(peer[0], peer[1])
client = ConnectionClient(server=self, socket=socket_object, ip=peer[0], port=peer[1])
client.family = socket.AF_INET6 if Utils.is_ipv6(client.ip) else socket.AF_INET
client.name = Network.ip_port_to_socket(client.ip, client.port)
client.writer = writer
self.logger.info(f'{self._id} incoming connection from {peer_socket}')
if self._incoming_connection_callback:
self._incoming_connection_callback(self, client)
while True:
try:
if self.mode == self.MODE_TEXT_LINE:
# self.logger.debug("***")
data = await reader.readline()
else:
data = await reader.read(4096)
except Exception:
data = None
if data and data[0] == 0xFF and client.process_iac:
data = client._process_IAC(data)
if data:
try:
string = str.rstrip(str(data, 'utf-8'))
self.logger.debug(f'{self._id} received "{string}" from {client.name}')
if self._data_received_callback:
self._data_received_callback(self, client, string)
if client._data_received_callback:
client._data_received_callback(self, client, string)
except Exception as e:
self.logger.debug(f'{self._id} received undecodable bytes from {client.name}: {data}, resulting in error: {e}')
else:
try:
self.__close_client(client)
pass
finally:
del client
return
def __close_client(self, client):
"""
Close client connection.
:param client: client object
:type client: lib.network.ConnectionClient
"""
self.logger.info(f'{self._id} connection to client {client.name} closed')
if self._disconnected_callback:
self._disconnected_callback(self, client)
client.writer.close()
[Doku] def listening(self):
"""
Return the current listening state.
:return: True if the server socket is actually listening, else False.
:rtype: bool
"""
return self._is_listening
[Doku] def send(self, client, msg):
"""
Send a string to connected client.
:param client: Client Object to send message to
:param msg: Message to send
:type client: lib.network.ConnectionClient
:type msg: string | bytes | bytearray
:return: True if message has been queued successfully.
:rtype: bool
"""
client.send(msg)
return True
[Doku] def disconnect(self, client):
"""
Disconnect a specific client.
:param client: Client Object to disconnect
:type client: lib.network.ConnectionClient
"""
client.close()
return True
[Doku] def close(self):
"""
Close running listening socket.
"""
self.logger.info(f'{self._id} shutting down listening socket')
asyncio.set_event_loop(self.__loop)
try:
active_connections = len([task for task in asyncio.all_tasks(self.__loop) if not task.done()])
except Exception:
active_connections = 0
if active_connections > 0:
self.logger.info(f'{self._id} still has {active_connections} active connection(s), cleaning up')
self.__running = False
self.__loop.call_soon_threadsafe(self.__loop.stop)
while self.__loop.is_running():
pass
with suppress(AttributeError): # thread can disappear between first and second condition test
if self.__listening_thread and self.__listening_thread.is_alive():
self.__listening_thread.join()
self.__listening_thread = None
self.__loop.close()
def __str__(self):
if self.name:
return self.name
else:
return super().__str__()
[Doku]class Udp_server(object):
"""
Threaded UDP listener which dispatches received data via callbacks.
NOTE: The callbacks need to expect the following arguments:
- ``data_received(addr, data)`` where ``addr`` is a tuple with ``('<remote_ip>', remote_port)`` and ``data`` is the received data as string
:param host: Local hostname or ip address (v4 or v6). Default is '' which listens on all IPv4 addresses available.
:param port: Local port to connect to
:param name: Name of this connection (mainly for logging purposes)
:type host: str
:type port: int
:type name: str
"""
def __init__(self, port, host='', name=None):
self.logger = logging.getLogger(__name__)
# Public properties
self.name = name
# protected properties
self._host = host
self._port = port
self._is_listening = False
self._ipaddr = None
self._family = socket.AF_INET
self._socket = None
self._data_received_callback = None
# provide a shutdown timeout for the server loop. emergency fallback only
self._close_timeout = 2
# private properties
self.__coroutine = None
self.__loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.__loop)
self.__server = aioudp.aioUDPServer()
self.__listening_thread = None
self.__running = True
self._id = f'({self.name if self.name else "UDP_Server"}_{self._host}:{self._port})'
# create sensible ipaddr (resolve host, handle protocol family)
(self._ipaddr, self._port, self._family) = Network.validate_inet_addr(host, port)
if self._ipaddr is not None:
self._id = f'({self.name if self.name else "UDP_Server"}_{self._ipaddr}:{self._port})'
self.__our_socket = Network.ip_port_to_socket(self._ipaddr, self._port)
if not self.name:
self.name = self.__our_socket
else:
self.__running = False
[Doku] def start(self):
"""
Start the server socket.
:return: False if an error prevented us from launching a connection thread. True if a connection thread has been started.
:rtype: bool
"""
if not self.__running:
self.logger.error(f'{self._id} UDP server not initialized, can not start.')
return False
if self._is_listening:
self.logger.warning(f'{self._id} already listening, not starting again')
return False
try:
self.logger.info(f'{self._id} starting up UDP server socket')
self.__coroutine = self.__start_server()
self.__loop.run_until_complete(self.__coroutine)
self.__listening_thread = threading.Thread(target=self.__listening_thread_worker, name=f'UDP_Server {self._id}')
self.__listening_thread.daemon = True
self.__listening_thread.start()
except Exception as e:
self.logger.error(f'{self._id} error {e} setting up udp server')
return False
return True
[Doku] def set_callbacks(self, data_received=None):
"""
Set callbacks to caller for different socket events.
:param data_received: Called when data is received
:type data_received: function
"""
self._data_received_callback = data_received
[Doku] def listening(self):
"""
Return the current listening state.
:return: True if the server socket is actually listening, else False.
:rtype: bool
"""
return self._is_listening
[Doku] def close(self):
"""
Close running listening socket.
"""
self.logger.info(f'{self._id} shutting down listening socket')
asyncio.set_event_loop(self.__loop)
self.__running = False
self.__server.stop()
# cancel pending tasks
tasks = [t for t in asyncio.all_tasks(self.__loop) if t is not asyncio.current_task(self.__loop)]
[task.cancel() for task in tasks]
# close loop gracefully
self.__loop.call_soon_threadsafe(self.__loop.stop)
# this code shouldn't be needed, but include it with timeout just to be sure...
starttime = time.time()
while self.__loop.is_running() and time.time() < starttime + self._close_timeout:
pass
if self.__loop.is_running():
self.__loop.stop()
time.sleep(0.5)
with suppress(AttributeError): # thread can disappear between first and second condition test
if self.__listening_thread and self.__listening_thread.is_alive():
self.__listening_thread.join()
self.__listening_thread = None
self.__loop.close()
async def __start_server(self):
"""
Start the actual server class.
"""
self.__server.run(self._ipaddr, self._port, self.__loop)
self.__server.subscribe(self.__handle_connection)
def __listening_thread_worker(self):
"""
Run the asyncio loop in a separate thread to not block the Udp_server.start() method.
"""
self._is_listening = True
self.logger.debug('{self._id} listening thread set is_listening to True')
asyncio.set_event_loop(self.__loop)
try:
self.__loop.run_forever()
except Exception as e:
self.logger.debug(f'{self._id} error in loop.run_forever(): {e}')
finally:
self.__server.stop()
self.__loop.close()
self._is_listening = False
return True
async def __handle_connection(self, data, addr):
"""
Handle incoming connection.
As UDP is stateless, each datagram creates a new handler.
:param data: data received from socket
:type data: bytes
:param addr: address info ('addr', port)
:type addr: tuple
"""
if addr:
host, port = addr
else:
self.logger.debug(f'{self._id} address info {addr} not in format "(host, port)"')
host = '0.0.0.0'
port = 0
self.logger.info(f'{self._id} incoming datagram from {host}:{port}')
if data:
try:
string = str.rstrip(str(data, 'utf-8'))
self.logger.debug(f'{self._id} received "{string}" from {host}:{port}')
if self._data_received_callback:
self._data_received_callback(addr, string)
except UnicodeError:
self.logger.debug(f'{self._id} received undecodable bytes from {host}:{port}')
else:
self.logger.debug(f'{self._id} received empty datagram from {host}:{port}')
def __str__(self):
if self.name:
return self.name
else:
return super().__str__()