From ab775c4ca8262e071a31374ea03a202dcd146b0e Mon Sep 17 00:00:00 2001 From: utdrmac <4413670+utdrmac@users.noreply.github.com> Date: Wed, 19 Mar 2025 01:43:46 +0000 Subject: [PATCH 1/3] add more rules --- classes/protocol_settings.py | 7 ++++++- classes/transports/canbus.py | 4 +++- classes/transports/modbus_base.py | 6 ++++-- classes/transports/mqtt.py | 2 +- ruff.toml | 17 +++++++++++------ test.py | 2 +- 6 files changed, 26 insertions(+), 12 deletions(-) diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index 2644a92..434896e 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -237,13 +237,16 @@ def __hash__(self): class protocol_settings: + protocol : str transport : str settings_dir : str variable_mask : list[str] ''' list of variables to allow and exclude all others ''' + variable_screen : list[str] ''' list of variables to exclude ''' + registry_map : dict[Registry_Type, list[registry_map_entry]] = {} registry_map_size : dict[Registry_Type, int] = {} registry_map_ranges : dict[Registry_Type, list[tuple]] = {} @@ -801,6 +804,8 @@ def calculate_registry_ranges(self, map : list[registry_map_entry], max_register ranges.append((min(registers), max(registers)-min(registers)+1)) ## APPENDING A TUPLE! return ranges + + def find_protocol_file(self, file : str, base_dir : str = '' ) -> str: path = base_dir + '/' + file @@ -1230,7 +1235,7 @@ def replace_vars(match): tree = ast.parse(maths, mode='eval') # Evaluate the expression - end_value = eval(compile(tree, filename='', mode='eval')) + end_value = ast.literal_eval(compile(tree, filename='', mode='eval')) return str(end_value) except Exception: diff --git a/classes/transports/canbus.py b/classes/transports/canbus.py index f715817..5257dc0 100644 --- a/classes/transports/canbus.py +++ b/classes/transports/canbus.py @@ -107,6 +107,7 @@ def setup_socketcan(self): print("socketcan setup not implemented for windows") return + # ruff: noqa: S605, S607 self._log.info("restart and configure socketcan") os.system("ip link set can0 down") os.system("ip link set can0 type can restart-ms 100") @@ -120,9 +121,10 @@ def is_socketcan_up(self) -> bool: try: with open(f'/sys/class/net/{self.port}/operstate', 'r') as f: state = f.read().strip() - return state == 'up' except FileNotFoundError: return False + else: + return state == 'up' def start_loop(self): self.read_bus(self.bus) diff --git a/classes/transports/modbus_base.py b/classes/transports/modbus_base.py index 9b0839c..db0f6af 100644 --- a/classes/transports/modbus_base.py +++ b/classes/transports/modbus_base.py @@ -357,10 +357,12 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type current_value = current_registers[entry.register] if not self.protocolSettings.validate_registry_entry(entry, current_value): - raise ValueError(f"Invalid value in register '{current_value}'. Unsafe to write") + err = f"Invalid value in register '{current_value}'. Unsafe to write" + raise ValueError(err) if not self.protocolSettings.validate_registry_entry(entry, value): - raise ValueError(f"Invalid new value, '{value}'. Unsafe to write") + err = f"Invalid new value, '{value}'. Unsafe to write" + raise ValueError(err) #handle codes if entry.variable_name+"_codes" in self.protocolSettings.codes: diff --git a/classes/transports/mqtt.py b/classes/transports/mqtt.py index ae5bd17..f4187c5 100644 --- a/classes/transports/mqtt.py +++ b/classes/transports/mqtt.py @@ -119,7 +119,7 @@ def mqtt_reconnect(self): self.__reconnecting = time.time() try: self._log.warning("Attempting to reconnect("+str(attempt)+")...") - if random.randint(0,1): #alternate between methods because built in reconnect might be unreliable. + if random.randint(0,1): # noqa: S311 alternate between methods because built in reconnect might be unreliable. self.client.reconnect() else: self.client.loop_stop() diff --git a/ruff.toml b/ruff.toml index ddf2766..a73964f 100644 --- a/ruff.toml +++ b/ruff.toml @@ -7,14 +7,19 @@ target-version = "py311" # Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or # McCabe complexity (`C901`) by default. -select = ["E", "F", "W"] -ignore = ["E501"] +select = ["BLE", "C", "DTZ", "E", "EM", "F", "FA", "FBT", "G", "I", "S", "TRY", "W"] #select = ["ALL"] -#ignore = ["C901", "EM101", "TRY003", "E501", "D", "G004", "N", "S105", -# "DTZ007", "S301", "RUF013", "PTH123", "PLW0603", "TRY400", "BLE001", -# "PLR0912", "PLR0915", "FBT001", "FBT002", "FA100", "W191" -#] +ignore = [ +"BLE001", +"C901", +# "D", +"EM101", +"E501", +"FBT001", "FBT002", +"N", +"TRY003", +] [format] quote-style = "double" diff --git a/test.py b/test.py index c383d4c..235035a 100644 --- a/test.py +++ b/test.py @@ -127,7 +127,7 @@ def replace_vars(match): tree = ast.parse(maths, mode='eval') # Evaluate the expression - end_value = eval(compile(tree, filename='', mode='eval')) + end_value = ast.literal_eval(compile(tree, filename='', mode='eval')) return str(end_value) except Exception: From e4dd4fef17e5829248c1d2f3d7cd694f9f330823 Mon Sep 17 00:00:00 2001 From: utdrmac <4413670+utdrmac@users.noreply.github.com> Date: Wed, 19 Mar 2025 01:49:24 +0000 Subject: [PATCH 2/3] forgot to run against 'I' rules which formats imports sections --- classes/protocol_settings.py | 17 ++++++++--------- classes/transports/canbus.py | 18 +++++++++--------- classes/transports/modbus_base.py | 13 ++++++++++--- classes/transports/modbus_rtu.py | 8 +++++--- classes/transports/modbus_tcp.py | 3 ++- classes/transports/modbus_tls.py | 7 +++++-- classes/transports/modbus_udp.py | 7 +++++-- classes/transports/mqtt.py | 13 +++++++------ classes/transports/pace.py | 18 +++++++++--------- classes/transports/serial_frame_client.py | 5 +++-- classes/transports/serial_pylon.py | 11 +++++------ classes/transports/transport_base.py | 14 +++++++++----- defs/common.py | 4 +++- protocol_gateway.py | 6 ++---- pytests/test_example_config.py | 3 ++- pytests/test_protocol_settings.py | 6 +++--- test.py | 7 +++---- 17 files changed, 90 insertions(+), 70 deletions(-) diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index 434896e..8d79338 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -1,18 +1,17 @@ +import ast import csv -from dataclasses import dataclass -from enum import Enum import glob -import logging -import time -from typing import Union -from defs.common import strtoint import itertools import json -import re +import logging import os -import ast +import re +import time +from dataclasses import dataclass +from enum import Enum +from typing import TYPE_CHECKING, Union -from typing import TYPE_CHECKING +from defs.common import strtoint if TYPE_CHECKING: from configparser import SectionProxy diff --git a/classes/transports/canbus.py b/classes/transports/canbus.py index 5257dc0..9ea21a2 100644 --- a/classes/transports/canbus.py +++ b/classes/transports/canbus.py @@ -1,19 +1,19 @@ -import re -import time -import can import asyncio -import threading -import platform import os +import platform +import re +import threading +import time +from collections import OrderedDict +from typing import TYPE_CHECKING +import can +from defs.common import strtoint +from ..protocol_settings import Registry_Type, protocol_settings, registry_map_entry from .transport_base import transport_base -from ..protocol_settings import Registry_Type, registry_map_entry, protocol_settings -from defs.common import strtoint -from collections import OrderedDict -from typing import TYPE_CHECKING if TYPE_CHECKING: from configparser import SectionProxy diff --git a/classes/transports/modbus_base.py b/classes/transports/modbus_base.py index db0f6af..f4a3639 100644 --- a/classes/transports/modbus_base.py +++ b/classes/transports/modbus_base.py @@ -3,13 +3,20 @@ import os import re import time +from typing import TYPE_CHECKING + from pymodbus.exceptions import ModbusIOException -from .transport_base import transport_base -from ..protocol_settings import Data_Type, Registry_Type, registry_map_entry, protocol_settings from defs.common import strtobool -from typing import TYPE_CHECKING +from ..protocol_settings import ( + Data_Type, + Registry_Type, + protocol_settings, + registry_map_entry, +) +from .transport_base import transport_base + if TYPE_CHECKING: from configparser import SectionProxy try: diff --git a/classes/transports/modbus_rtu.py b/classes/transports/modbus_rtu.py index 561658c..bf8fd1a 100644 --- a/classes/transports/modbus_rtu.py +++ b/classes/transports/modbus_rtu.py @@ -1,7 +1,6 @@ -from classes.protocol_settings import Registry_Type, protocol_settings - import inspect +from classes.protocol_settings import Registry_Type, protocol_settings try: from pymodbus.client.sync import ModbusSerialClient @@ -9,10 +8,13 @@ from pymodbus.client import ModbusSerialClient -from .modbus_base import modbus_base from configparser import SectionProxy + from defs.common import find_usb_serial_port, get_usb_serial_port_info, strtoint +from .modbus_base import modbus_base + + class modbus_rtu(modbus_base): port : str = "/dev/ttyUSB0" addresses : list[int] = [] diff --git a/classes/transports/modbus_tcp.py b/classes/transports/modbus_tcp.py index 22ce4af..984f146 100644 --- a/classes/transports/modbus_tcp.py +++ b/classes/transports/modbus_tcp.py @@ -8,9 +8,10 @@ except ImportError: from pymodbus.client import ModbusTcpClient -from .modbus_base import modbus_base from configparser import SectionProxy +from .modbus_base import modbus_base + class modbus_tcp(modbus_base): port : str = 502 diff --git a/classes/transports/modbus_tls.py b/classes/transports/modbus_tls.py index c6b464a..c7e1f3d 100644 --- a/classes/transports/modbus_tls.py +++ b/classes/transports/modbus_tls.py @@ -1,7 +1,10 @@ -from classes.protocol_settings import Registry_Type, protocol_settings +from configparser import SectionProxy + from pymodbus.client.sync import ModbusTlsClient + +from classes.protocol_settings import Registry_Type, protocol_settings + from .transport_base import transport_base -from configparser import SectionProxy class modbus_udp(transport_base): diff --git a/classes/transports/modbus_udp.py b/classes/transports/modbus_udp.py index a9f77b4..a3bfef6 100644 --- a/classes/transports/modbus_udp.py +++ b/classes/transports/modbus_udp.py @@ -1,7 +1,10 @@ -from classes.protocol_settings import Registry_Type, protocol_settings +from configparser import SectionProxy + from pymodbus.client.sync import ModbusUdpClient + +from classes.protocol_settings import Registry_Type, protocol_settings + from .transport_base import transport_base -from configparser import SectionProxy class modbus_udp(transport_base): diff --git a/classes/transports/mqtt.py b/classes/transports/mqtt.py index f4187c5..d6c33aa 100644 --- a/classes/transports/mqtt.py +++ b/classes/transports/mqtt.py @@ -1,19 +1,20 @@ import atexit +import json import random import time -import json import warnings +from configparser import SectionProxy import paho.mqtt.client -import paho.mqtt.properties import paho.mqtt.packettypes - -from paho.mqtt.client import Client as MQTTClient, MQTT_ERR_NO_CONN +import paho.mqtt.properties +from paho.mqtt.client import MQTT_ERR_NO_CONN +from paho.mqtt.client import Client as MQTTClient from defs.common import strtobool + +from ..protocol_settings import Registry_Type, WriteMode, registry_map_entry from .transport_base import transport_base -from configparser import SectionProxy -from ..protocol_settings import registry_map_entry, WriteMode, Registry_Type class mqtt(transport_base): diff --git a/classes/transports/pace.py b/classes/transports/pace.py index 6e28885..7dae56b 100644 --- a/classes/transports/pace.py +++ b/classes/transports/pace.py @@ -1,16 +1,16 @@ -import time -import struct import logging -from classes.protocol_settings import Registry_Type -from pymodbus.client.sync import ModbusSerialClient, BaseModbusClient -from pymodbus.transaction import ModbusRtuFramer +import struct +import time -from pymodbus.factory import ClientDecoder +from pymodbus.client.sync import BaseModbusClient, ModbusSerialClient +from pymodbus.compat import byte2int from pymodbus.constants import Defaults - +from pymodbus.factory import ClientDecoder +from pymodbus.framer import BYTE_ORDER, FRAME_HEADER +from pymodbus.transaction import ModbusRtuFramer from pymodbus.utilities import checkCRC, computeCRC -from pymodbus.compat import byte2int -from pymodbus.framer import FRAME_HEADER, BYTE_ORDER + +from classes.protocol_settings import Registry_Type _logger = logging.getLogger(__name__) diff --git a/classes/transports/serial_frame_client.py b/classes/transports/serial_frame_client.py index 6778075..eef81a9 100644 --- a/classes/transports/serial_frame_client.py +++ b/classes/transports/serial_frame_client.py @@ -1,7 +1,8 @@ -from typing import Callable -import serial import threading import time +from typing import Callable + +import serial class serial_frame_client(): diff --git a/classes/transports/serial_pylon.py b/classes/transports/serial_pylon.py index fcc309f..69e2f51 100644 --- a/classes/transports/serial_pylon.py +++ b/classes/transports/serial_pylon.py @@ -1,19 +1,18 @@ -from enum import Enum import struct +from enum import Enum +from typing import TYPE_CHECKING import serial -from ..Object import Object +from defs.common import find_usb_serial_port, get_usb_serial_port_info +from ..Object import Object from .serial_frame_client import serial_frame_client from .transport_base import transport_base -from defs.common import find_usb_serial_port, get_usb_serial_port_info - - -from typing import TYPE_CHECKING if TYPE_CHECKING: from configparser import SectionProxy + from classes.protocol_settings import protocol_settings, registry_map_entry diff --git a/classes/transports/transport_base.py b/classes/transports/transport_base.py index adeb282..cdff05e 100644 --- a/classes/transports/transport_base.py +++ b/classes/transports/transport_base.py @@ -1,13 +1,17 @@ - import logging -from classes.protocol_settings import Registry_Type,protocol_settings,registry_map_entry +from typing import TYPE_CHECKING, Callable + +from classes.protocol_settings import ( + Registry_Type, + protocol_settings, + registry_map_entry, +) -from typing import Callable -from typing import TYPE_CHECKING if TYPE_CHECKING: - from .transport_base import transport_base from configparser import SectionProxy + from .transport_base import transport_base + class transport_base: type : str = '' protocolSettings : 'protocol_settings' diff --git a/defs/common.py b/defs/common.py index a28c371..2be9517 100644 --- a/defs/common.py +++ b/defs/common.py @@ -1,6 +1,8 @@ -import serial.tools.list_ports import re +import serial.tools.list_ports + + def strtobool (val): """Convert a string representation of truth to true (1) or false (0). True values are 'y', 'yes', 't', 'true', 'on', and '1' diff --git a/protocol_gateway.py b/protocol_gateway.py index 1a4590a..485beb7 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -19,17 +19,15 @@ import argparse - -import os import logging +import os import sys import traceback from configparser import ConfigParser, NoOptionError -from classes.protocol_settings import protocol_settings,registry_map_entry +from classes.protocol_settings import protocol_settings, registry_map_entry from classes.transports.transport_base import transport_base - __logo = """ ██████╗ ██╗ ██╗████████╗██╗ ██╗ ██████╗ ███╗ ██╗ diff --git a/pytests/test_example_config.py b/pytests/test_example_config.py index 3f36f27..d3909c2 100644 --- a/pytests/test_example_config.py +++ b/pytests/test_example_config.py @@ -1,11 +1,12 @@ -import sys import os +import sys #move up a folder for tests sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from protocol_gateway import CustomConfigParser + def test_example_cfg(): parser = CustomConfigParser() parser.read("config.cfg.example") diff --git a/pytests/test_protocol_settings.py b/pytests/test_protocol_settings.py index c682432..bada009 100644 --- a/pytests/test_protocol_settings.py +++ b/pytests/test_protocol_settings.py @@ -1,8 +1,8 @@ -import sys -import os -import pytest import glob +import os +import sys +import pytest #move up a folder for tests sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) diff --git a/test.py b/test.py index 235035a..3272c8a 100644 --- a/test.py +++ b/test.py @@ -1,12 +1,11 @@ -import re import ast +import re #pip install "python-can[gs_usb]" - -import can #v4.2.0+ +import can #v4.2.0+ if False: - import usb #pyusb - requires https://github.com/mcuee/libusb-win32 + import usb #pyusb - requires https://github.com/mcuee/libusb-win32 From b017bbde3d217ca1e86582267bbb0f863862f3f9 Mon Sep 17 00:00:00 2001 From: utdrmac <4413670+utdrmac@users.noreply.github.com> Date: Wed, 19 Mar 2025 01:57:46 +0000 Subject: [PATCH 3/3] Rules for single/double quote (Q) --- classes/protocol_settings.py | 244 ++++++++++----------- classes/transports/canbus.py | 26 +-- classes/transports/modbus_base.py | 48 ++-- classes/transports/modbus_rtu.py | 30 +-- classes/transports/modbus_tcp.py | 14 +- classes/transports/mqtt.py | 68 +++--- classes/transports/pace.py | 20 +- classes/transports/serial_frame_client.py | 2 +- classes/transports/serial_pylon.py | 38 ++-- classes/transports/transport_base.py | 38 ++-- defs/common.py | 28 +-- documentation/.scripts/generate_indexes.py | 8 +- protocol_gateway.py | 36 +-- pytests/test_example_config.py | 2 +- pytests/test_protocol_settings.py | 4 +- ruff.toml | 5 +- test.py | 22 +- tools/apply_common_names_to_csv.py | 26 +-- tools/get_common_names_from_csv.py | 26 +-- tools/list_to_json.py | 6 +- 20 files changed, 346 insertions(+), 345 deletions(-) diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index 8d79338..543c809 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -108,7 +108,7 @@ def fromString(cls, name : str): return getattr(cls, name) @classmethod - def getSize(cls, data_type : 'Data_Type'): + def getSize(cls, data_type : "Data_Type"): sizes = { Data_Type.BYTE : 8, Data_Type.USHORT : 16, @@ -254,14 +254,14 @@ class protocol_settings: settings : dict[str, str] ''' default settings provided by protocol json ''' - transport_settings : 'SectionProxy' = None + transport_settings : "SectionProxy" = None byteorder : str = "big" _log : logging.Logger = None - def __init__(self, protocol : str, transport_settings : 'SectionProxy' = None, settings_dir : str = 'protocols'): + def __init__(self, protocol : str, transport_settings : "SectionProxy" = None, settings_dir : str = "protocols"): #apply log level to logger self._log_level = getattr(logging, logging.getLevelName(logging.getLogger().getEffectiveLevel()), logging.INFO) @@ -274,20 +274,20 @@ def __init__(self, protocol : str, transport_settings : 'SectionProxy' = None, s #load variable mask self.variable_mask = [] - if os.path.isfile('variable_mask.txt'): - with open('variable_mask.txt') as f: + if os.path.isfile("variable_mask.txt"): + with open("variable_mask.txt") as f: for line in f: - if line[0] == '#': #skip comment + if line[0] == "#": #skip comment continue self.variable_mask.append(line.strip().lower()) #load variable screen self.variable_screen = [] - if os.path.isfile('variable_screen.txt'): - with open('variable_screen.txt') as f: + if os.path.isfile("variable_screen.txt"): + with open("variable_screen.txt") as f: for line in f: - if line[0] == '#': #skip comment + if line[0] == "#": #skip comment continue self.variable_screen.append(line.strip().lower()) @@ -324,19 +324,19 @@ def get_input_registry_entry(self, name : str): def get_registry_entry(self, name : str, registry_type : Registry_Type) -> registry_map_entry: - name = name.strip().lower().replace(' ', '_') #clean name + name = name.strip().lower().replace(" ", "_") #clean name for item in self.registry_map[registry_type]: if item.documented_name == name: return item return None - def load__json(self, file : str = '', settings_dir : str = ''): + def load__json(self, file : str = "", settings_dir : str = ""): if not settings_dir: settings_dir = self.settings_dir if not file: - file = self.protocol + '.json' + file = self.protocol + ".json" path = self.find_protocol_file(file, settings_dir) @@ -359,12 +359,12 @@ def load_registry_overrides(self, override_path, keys : list[str]): """Load overrides into a multidimensional dictionary keyed by each specified key.""" overrides = {key: {} for key in keys} - with open(override_path, newline='', encoding='latin-1') as csvfile: + with open(override_path, newline="", encoding="latin-1") as csvfile: reader = csv.DictReader(csvfile) for row in reader: for key in keys: if key in row: - row[key] = row[key].strip().lower().replace(' ', '_') + row[key] = row[key].strip().lower().replace(" ", "_") key_value = row[key] if key_value: overrides[key][key_value] = row @@ -373,16 +373,16 @@ def load_registry_overrides(self, override_path, keys : list[str]): def load__registry(self, path, registry_type : Registry_Type = Registry_Type.INPUT) -> list[registry_map_entry]: registry_map : list[registry_map_entry] = [] - register_regex = re.compile(r'(?P(?:0?x[\da-z]+|[\d]+))\.(b(?Px?\d{1,2})|(?Px?\d{1,2}))') + register_regex = re.compile(r"(?P(?:0?x[\da-z]+|[\d]+))\.(b(?Px?\d{1,2})|(?Px?\d{1,2}))") - read_interval_regex = re.compile(r'(?P[\.\d]+)(?P[xs]|ms)') + read_interval_regex = re.compile(r"(?P[\.\d]+)(?P[xs]|ms)") - data_type_regex = re.compile(r'(?P\w+)\.(?P\d+)') + data_type_regex = re.compile(r"(?P\w+)\.(?P\d+)") - range_regex = re.compile(r'(?Pr|)(?P(?:0?x[\da-z]+|[\d]+))[\-~](?P(?:0?x[\da-z]+|[\d]+))') - ascii_value_regex = re.compile(r'(?P^\[.+\]$)') - list_regex = re.compile(r'\s*(?:(?P(?:0?x[\da-z]+|[\d]+))-(?P(?:0?x[\da-z]+|[\d]+))|(?P[^,\s][^,]*?))\s*(?:,|$)') + range_regex = re.compile(r"(?Pr|)(?P(?:0?x[\da-z]+|[\d]+))[\-~](?P(?:0?x[\da-z]+|[\d]+))") + ascii_value_regex = re.compile(r"(?P^\[.+\]$)") + list_regex = re.compile(r"\s*(?:(?P(?:0?x[\da-z]+|[\d]+))-(?P(?:0?x[\da-z]+|[\d]+))|(?P[^,\s][^,]*?))\s*(?:,|$)") #load read_interval from transport settings, for #x per register read intervals @@ -396,12 +396,12 @@ def load__registry(self, path, registry_type : Registry_Type = Registry_Type.INP overrides : dict[str, dict] = None - override_keys = ['documented name', 'register'] + override_keys = ["documented name", "register"] overrided_keys = set() ''' list / set of keys that were used for overriding. to track unique entries''' #assuming path ends with .csv - override_path = path[:-4] + '.override.csv' + override_path = path[:-4] + ".override.csv" if os.path.exists(override_path): self._log.info("loading override file: " + override_path) @@ -409,44 +409,44 @@ def load__registry(self, path, registry_type : Registry_Type = Registry_Type.INP overrides = self.load_registry_overrides(override_path, override_keys) def determine_delimiter(first_row) -> str: - if first_row.count(';') > first_row.count(','): - return ';' + if first_row.count(";") > first_row.count(","): + return ";" else: - return ',' + return "," def process_row(row): # Initialize variables to hold numeric and character parts unit_multiplier : float = 1 - unit_symbol : str = '' + unit_symbol : str = "" read_interval : int = 0 ''' read interval in ms ''' #clean up doc name, for extra parsing - row['documented name'] = row['documented name'].strip().lower().replace(' ', '_') + row["documented name"] = row["documented name"].strip().lower().replace(" ", "_") #region read_interval - if 'read interval' in row: - row['read interval'] = row['read interval'].lower() #ensure is all lower case - match = read_interval_regex.search(row['read interval']) + if "read interval" in row: + row["read interval"] = row["read interval"].lower() #ensure is all lower case + match = read_interval_regex.search(row["read interval"]) if match: - unit = match.group('unit') - value = match.group('value') + unit = match.group("unit") + value = match.group("value") if value: value = float(value) - if unit == 'x': + if unit == "x": read_interval = int((transport_read_interval * 1000) * value) else: # seconds or ms read_interval = value - if unit != 'ms': + if unit != "ms": read_interval *= 1000 if read_interval == 0: read_interval = transport_read_interval * 1000 if "read_interval" in self.settings: try: - read_interval = int(self.settings['read_interval']) + read_interval = int(self.settings["read_interval"]) except ValueError: read_interval = transport_read_interval * 1000 @@ -474,12 +474,12 @@ def process_row(row): #region unit #if or is in the unit; ignore unit - if "or" in row['unit'].lower() or ":" in row['unit'].lower(): + if "or" in row["unit"].lower() or ":" in row["unit"].lower(): unit_multiplier = 1 - unit_symbol = row['unit'] + unit_symbol = row["unit"] else: # Use regular expressions to extract numeric and character parts - matches = re.findall(r'(\-?[0-9.]+)|(.*?)$', row['unit']) + matches = re.findall(r"(\-?[0-9.]+)|(.*?)$", row["unit"]) # Iterate over the matches and assign them to appropriate variables for match in matches: @@ -500,14 +500,14 @@ def process_row(row): #endregion unit - variable_name = row['variable name'] if row['variable name'] else row['documented name'] - variable_name = variable_name.strip().lower().replace(' ', '_').replace('__', '_') #clean name + variable_name = row["variable name"] if row["variable name"] else row["documented name"] + variable_name = variable_name.strip().lower().replace(" ", "_").replace("__", "_") #clean name if re.search(r"[^a-zA-Z0-9\_]", variable_name) : - self._log.warning("Invalid Name : " + str(variable_name) + " reg: " + str(row['register']) + " doc name: " + str(row['documented name']) + " path: " + str(path)) + self._log.warning("Invalid Name : " + str(variable_name) + " reg: " + str(row["register"]) + " doc name: " + str(row["documented name"]) + " path: " + str(path)) - if not variable_name and not row['documented name']: #skip empty entry / no name. todo add more invalidator checks. + if not variable_name and not row["documented name"]: #skip empty entry / no name. todo add more invalidator checks. return #region data type @@ -515,17 +515,17 @@ def process_row(row): data_type_len : int = -1 #optional row, only needed for non-default data types - if 'data type' in row and row['data type']: - matches = data_type_regex.search(row['data type']) + if "data type" in row and row["data type"]: + matches = data_type_regex.search(row["data type"]) if matches: - data_type_len = int(matches.group('length')) - data_type = Data_Type.fromString(matches.group('datatype')) + data_type_len = int(matches.group("length")) + data_type = Data_Type.fromString(matches.group("datatype")) else: - data_type = Data_Type.fromString(row['data type']) + data_type = Data_Type.fromString(row["data type"]) - if 'values' not in row: - row['values'] = "" + if "values" not in row: + row["values"] = "" self._log.warning("No Value Column : path: " + str(path)) #endregion data type @@ -539,12 +539,12 @@ def process_row(row): value_is_json : bool = False #test if value is json. - if "{" in row['values']: #to try and stop non-json values from parsing. the json parser is buggy for validation + if "{" in row["values"]: #to try and stop non-json values from parsing. the json parser is buggy for validation try: - codes_json = json.loads(row['values']) + codes_json = json.loads(row["values"]) value_is_json = True - name = row['documented name']+'_codes' + name = row["documented name"]+"_codes" if name not in self.codes: self.codes[name] = codes_json @@ -552,34 +552,34 @@ def process_row(row): value_is_json = False if not value_is_json: - if ',' in row['values']: - matches = list_regex.finditer(row['values']) + if "," in row["values"]: + matches = list_regex.finditer(row["values"]) for match in matches: groups = match.groupdict() - if groups['range_start'] and groups['range_end']: - start = strtoint(groups['range_start']) - end = strtoint(groups['range_end']) + if groups["range_start"] and groups["range_end"]: + start = strtoint(groups["range_start"]) + end = strtoint(groups["range_end"]) values.extend(range(start, end + 1)) else: - values.append(groups['element']) + values.append(groups["element"]) else: matched : bool = False - val_match = range_regex.search(row['values']) + val_match = range_regex.search(row["values"]) if val_match: - value_min = strtoint(val_match.group('start')) - value_max = strtoint(val_match.group('end')) + value_min = strtoint(val_match.group("start")) + value_max = strtoint(val_match.group("end")) matched = True if data_type == Data_Type.ASCII: #might need to apply too hex values as well? or min-max works for hex? #value_regex - val_match = ascii_value_regex.search(row['values']) + val_match = ascii_value_regex.search(row["values"]) if val_match: - value_regex = val_match.group('regex') + value_regex = val_match.group("regex") matched = True if not matched: #single value - values.append(row['values']) + values.append(row["values"]) #endregion values #region register @@ -589,18 +589,18 @@ def process_row(row): register : int = -1 register_bit : int = 0 register_byte : int = -1 - row['register'] = row['register'].lower() #ensure is all lower case - match = register_regex.search(row['register']) + row["register"] = row["register"].lower() #ensure is all lower case + match = register_regex.search(row["register"]) if match: - register = strtoint(match.group('register')) + register = strtoint(match.group("register")) - register_bit = match.group('bit') + register_bit = match.group("bit") if register_bit: register_bit = strtoint(register_bit) else: register_bit = 0 - register_byte = match.group('byte') + register_byte = match.group("byte") if register_byte: register_byte = strtoint(register_byte) else: @@ -608,13 +608,13 @@ def process_row(row): #print("register: " + str(register) + " bit : " + str(register_bit)) else: - range_match = range_regex.search(row['register']) + range_match = range_regex.search(row["register"]) if not range_match: - register = strtoint(row['register']) + register = strtoint(row["register"]) else: - reverse = range_match.group('reverse') - start = strtoint(range_match.group('start')) - end = strtoint(range_match.group('end')) + reverse = range_match.group("reverse") + start = strtoint(range_match.group("start")) + end = strtoint(range_match.group("end")) register = start if end > start: concatenate = True @@ -633,18 +633,18 @@ def process_row(row): #endregion register read_command = None - if "read command" in row and row['read command']: - if row['read command'][0] == 'x': - read_command = bytes.fromhex(row['read command'][1:]) + if "read command" in row and row["read command"]: + if row["read command"][0] == "x": + read_command = bytes.fromhex(row["read command"][1:]) else: - read_command = row['read command'].encode('utf-8') + read_command = row["read command"].encode("utf-8") writeMode : WriteMode = WriteMode.READ if "writable" in row: - writeMode = WriteMode.fromString(row['writable']) + writeMode = WriteMode.fromString(row["writable"]) if "write" in row: - writeMode = WriteMode.fromString(row['write']) + writeMode = WriteMode.fromString(row["write"]) for i in r: item = registry_map_entry( @@ -653,7 +653,7 @@ def process_row(row): register_bit=register_bit, register_byte= register_byte, variable_name= variable_name, - documented_name = row['documented name'], + documented_name = row["documented name"], unit= str(unit_symbol), unit_mod= unit_multiplier, data_type= data_type, @@ -673,14 +673,14 @@ def process_row(row): register = register + 1 - with open(path, newline='', encoding='latin-1') as csvfile: + with open(path, newline="", encoding="latin-1") as csvfile: #clean column names before passing to csv dict reader - delimeter = ';' - first_row = next(csvfile).lower().replace('_', ' ') - if first_row.count(';') < first_row.count(','): - delimeter = ',' + delimeter = ";" + first_row = next(csvfile).lower().replace("_", " ") + if first_row.count(";") < first_row.count(","): + delimeter = "," first_row = re.sub(r"\s+" + re.escape(delimeter) +"|" + re.escape(delimeter) +r"\s+", delimeter, first_row) #trim values @@ -719,8 +719,8 @@ def process_row(row): if index > 0: #if high/low, its a double if ( - item.documented_name.endswith('_l') - and registry_map[index-1].documented_name.replace('_h', '_l') == item.documented_name + item.documented_name.endswith("_l") + and registry_map[index-1].documented_name.replace("_h", "_l") == item.documented_name ): combined_item = registry_map[index-1] @@ -771,7 +771,7 @@ def calculate_registry_ranges(self, map : list[registry_map_entry], max_register if "batch_size" in self.settings: try: - max_batch_size = int(self.settings['batch_size']) + max_batch_size = int(self.settings["batch_size"]) except ValueError: pass @@ -805,33 +805,33 @@ def calculate_registry_ranges(self, map : list[registry_map_entry], max_register return ranges - def find_protocol_file(self, file : str, base_dir : str = '' ) -> str: + def find_protocol_file(self, file : str, base_dir : str = "" ) -> str: - path = base_dir + '/' + file + path = base_dir + "/" + file if os.path.exists(path): return path - suffix = file.split('_', 1)[0] + suffix = file.split("_", 1)[0] - path = base_dir + '/' + suffix +'/' + file + path = base_dir + "/" + suffix +"/" + file if os.path.exists(path): return path #find file by name, recurisvely. last resort - search_pattern = os.path.join(base_dir, '**', file) + search_pattern = os.path.join(base_dir, "**", file) matches = glob.glob(search_pattern, recursive=True) return matches[0] if matches else None - def load_registry_map(self, registry_type : Registry_Type, file : str = '', settings_dir : str = ''): + def load_registry_map(self, registry_type : Registry_Type, file : str = "", settings_dir : str = ""): if not settings_dir: settings_dir = self.settings_dir if not file: if registry_type == Registry_Type.ZERO: - file = self.protocol + '.registry_map.csv' + file = self.protocol + ".registry_map.csv" else: - file = self.protocol + '.'+registry_type.name.lower()+'_registry_map.csv' + file = self.protocol + "."+registry_type.name.lower()+"_registry_map.csv" path = self.find_protocol_file(file, settings_dir) @@ -885,8 +885,8 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma #handle custom sizes, less than 1 register end_bit = flag_size + start_bit - if entry.documented_name+'_codes' in self.codes: - code_key : str = entry.documented_name+'_codes' + if entry.documented_name+"_codes" in self.codes: + code_key : str = entry.documented_name+"_codes" flags : list[str] = [] flag_indexes : list[str] = [] for i in range(start_bit, end_bit): # Iterate over each bit position (0 to 15) @@ -901,12 +901,12 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma flags.append(self.codes[code_key][flag_index]) #check multibit flags - multibit_flags = [key for key in self.codes if '&' in key] + multibit_flags = [key for key in self.codes if "&" in key] if multibit_flags: #if multibit flags are found flag_indexes_set : set[str] = set(flag_indexes) for multibit_flag in multibit_flags: - bits = multibit_flag.split('&') # Split key into 'bits' + bits = multibit_flag.split("&") # Split key into 'bits' if all(bit in flag_indexes_set for bit in bits): # Check if all bits are present in the flag_indexes_set flags.append(self.codes[code_key][multibit_flag]) @@ -919,7 +919,7 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma flags.append("1") else: flags.append("0") - value = ''.join(flags) + value = "".join(flags) elif entry.data_type.value > 400: #signed-magnitude bit types ( sign bit is the last bit instead of front ) @@ -968,12 +968,12 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma #apply codes if (entry.data_type != Data_Type._16BIT_FLAGS and - entry.documented_name+'_codes' in self.codes): + entry.documented_name+"_codes" in self.codes): try: cleanval = str(int(value)) - if cleanval in self.codes[entry.documented_name+'_codes']: - value = self.codes[entry.documented_name+'_codes'][cleanval] + if cleanval in self.codes[entry.documented_name+"_codes"]: + value = self.codes[entry.documented_name+"_codes"][cleanval] except Exception: #do nothing; try is for intval value = value @@ -1037,7 +1037,7 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma val = registry[entry.register] - if entry.documented_name+'_codes' in self.codes: + if entry.documented_name+"_codes" in self.codes: flags : list[str] = [] offset : int = 0 @@ -1047,8 +1047,8 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma # Check if the i-th bit is set if (val >> i) & 1: flag_index = "b"+str(i+offset) - if flag_index in self.codes[entry.documented_name+'_codes']: - flags.append(self.codes[entry.documented_name+'_codes'][flag_index]) + if flag_index in self.codes[entry.documented_name+"_codes"]: + flags.append(self.codes[entry.documented_name+"_codes"][flag_index]) value = ",".join(flags) @@ -1063,7 +1063,7 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma else: flags.append("0") - value = ''.join(flags) + value = "".join(flags) elif entry.data_type.value > 200 or entry.data_type == Data_Type.BYTE: #bit types bit_size = Data_Type.getSize(entry.data_type) @@ -1091,12 +1091,12 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma # value = round(value, self.max_precision) if (entry.data_type != Data_Type._16BIT_FLAGS and - entry.documented_name+'_codes' in self.codes): + entry.documented_name+"_codes" in self.codes): try: cleanval = str(int(value)) - if cleanval in self.codes[entry.documented_name+'_codes']: - value = self.codes[entry.documented_name+'_codes'][cleanval] + if cleanval in self.codes[entry.documented_name+"_codes"]: + value = self.codes[entry.documented_name+"_codes"][cleanval] except Exception: #do nothing; try is for intval value = value @@ -1112,7 +1112,7 @@ def process_registery(self, registry : Union[dict[int, int], dict[int, bytes]] , if entry.register not in registry: continue - value = '' + value = "" if isinstance(registry[entry.register], bytes): value = self.process_register_bytes(registry, entry) @@ -1148,14 +1148,14 @@ def process_registery(self, registry : Union[dict[int, int], dict[int, bytes]] , def validate_registry_entry(self, entry : registry_map_entry, val) -> int: #if code, validate first. - if entry.documented_name+'_codes' in self.codes: - if val in self.codes[entry.documented_name+'_codes']: + if entry.documented_name+"_codes" in self.codes: + if val in self.codes[entry.documented_name+"_codes"]: return 1 else: return 0 if entry.data_type == Data_Type.ASCII: - if val and not re.match(r'[^a-zA-Z0-9\_\-]', val): #validate ascii + if val and not re.match(r"[^a-zA-Z0-9\_\-]", val): #validate ascii if entry.value_regex: #regex validation if re.match(entry.value_regex, val): if entry.concatenate: @@ -1179,7 +1179,7 @@ def evaluate_expressions(self, expression, variables : dict[str,str]): # Function to evaluate mathematical expressions def evaluate_variables(expression): # Define a regular expression pattern to match variables - var_pattern = re.compile(r'\[([^\[\]]+)\]') + var_pattern = re.compile(r"\[([^\[\]]+)\]") # Replace variables in the expression with their values def replace_vars(match): @@ -1194,7 +1194,7 @@ def replace_vars(match): def evaluate_ranges(expression): # Define a regular expression pattern to match ranges - range_pattern = re.compile(r'\[.*?((?P\d+)\s?\~\s?(?P\d+)).*?\]') + range_pattern = re.compile(r"\[.*?((?P\d+)\s?\~\s?(?P\d+)).*?\]") # Find all ranges in the expression ranges = range_pattern.findall(expression) @@ -1222,19 +1222,19 @@ def evaluate_ranges(expression): def evaluate_expression(expression): # Define a regular expression pattern to match "maths" - var_pattern = re.compile(r'\[(?P.*?)\]') + var_pattern = re.compile(r"\[(?P.*?)\]") # Replace variables in the expression with their values def replace_vars(match): try: maths = match.group("maths") - maths = re.sub(r'\s', '', maths) #remove spaces, because ast.parse doesnt like them + maths = re.sub(r"\s", "", maths) #remove spaces, because ast.parse doesnt like them # Parse the expression safely - tree = ast.parse(maths, mode='eval') + tree = ast.parse(maths, mode="eval") # Evaluate the expression - end_value = ast.literal_eval(compile(tree, filename='', mode='eval')) + end_value = ast.literal_eval(compile(tree, filename="", mode="eval")) return str(end_value) except Exception: diff --git a/classes/transports/canbus.py b/classes/transports/canbus.py index 9ea21a2..ca724d6 100644 --- a/classes/transports/canbus.py +++ b/classes/transports/canbus.py @@ -20,10 +20,10 @@ class canbus(transport_base): ''' canbus is a more passive protocol; todo to include active commands to trigger canbus responses ''' - interface : str = 'socketcan' + interface : str = "socketcan" ''' bustype / interface for canbus device ''' - port : str = '' + port : str = "" ''' 'can0' ''' baudrate : int = 500000 @@ -55,11 +55,11 @@ class canbus(transport_base): linux : bool = True - def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_settings' = None): + def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_settings" = None): super().__init__(settings, protocolSettings=protocolSettings) #check if running on windows or linux - self.linux = platform.system() != 'Windows' + self.linux = platform.system() != "Windows" self.port = settings.get(["port", "channel"], "") @@ -119,12 +119,12 @@ def is_socketcan_up(self) -> bool: return True try: - with open(f'/sys/class/net/{self.port}/operstate', 'r') as f: + with open(f"/sys/class/net/{self.port}/operstate", "r") as f: state = f.read().strip() except FileNotFoundError: return False else: - return state == 'up' + return state == "up" def start_loop(self): self.read_bus(self.bus) @@ -184,7 +184,7 @@ def init_after_connect(self): def read_serial_number(self) -> str: ''' not so simple in canbus''' - return '' + return "" serial_number = str(self.read_variable("Serial Number", Registry_Type.HOLDING)) print("read SN: " +serial_number) if serial_number: @@ -192,22 +192,22 @@ def read_serial_number(self) -> str: sn2 = "" sn3 = "" - fields = ['Serial No 1', 'Serial No 2', 'Serial No 3', 'Serial No 4', 'Serial No 5'] + fields = ["Serial No 1", "Serial No 2", "Serial No 3", "Serial No 4", "Serial No 5"] for field in fields: self._log.info("Reading " + field) registry_entry = self.protocolSettings.get_holding_registry_entry(field) if registry_entry is not None: self._log.info("Reading " + field + "("+str(registry_entry.register)+")") data = self.read_modbus_registers(registry_entry.register, registry_type=Registry_Type.HOLDING) - if not hasattr(data, 'registers') or data.registers is None: + if not hasattr(data, "registers") or data.registers is None: self._log.critical("Failed to get serial number register ("+field+") ; exiting") exit() serial_number = serial_number + str(data.registers[0]) - data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder='big') - sn2 = sn2 + str(data_bytes.decode('utf-8')) - sn3 = str(data_bytes.decode('utf-8')) + sn3 + data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder="big") + sn2 = sn2 + str(data_bytes.decode("utf-8")) + sn3 = str(data_bytes.decode("utf-8")) + sn3 time.sleep(self.modbus_delay*2) #sleep inbetween requests so modbus can rest @@ -259,7 +259,7 @@ def read_variable(self, variable_name : str, registry_type : Registry_Type, entr ''' read's variable from cache''' ##clean for convinecne if variable_name: - variable_name = variable_name.strip().lower().replace(' ', '_') + variable_name = variable_name.strip().lower().replace(" ", "_") registry_map = self.protocolSettings.get_registry_map(registry_type) diff --git a/classes/transports/modbus_base.py b/classes/transports/modbus_base.py index f4a3639..bc4941d 100644 --- a/classes/transports/modbus_base.py +++ b/classes/transports/modbus_base.py @@ -29,7 +29,7 @@ class modbus_base(transport_base): #this is specifically static - clients : dict[str, 'BaseModbusClient'] = {} + clients : dict[str, "BaseModbusClient"] = {} ''' str is identifier, dict of clients when multiple transports use the same ports ''' #non-static here for reference, type hinting, python bs ect... @@ -49,27 +49,27 @@ class modbus_base(transport_base): send_holding_register : bool = True send_input_register : bool = True - def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_settings' = None): + def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_settings" = None): super().__init__(settings) - self.analyze_protocol_enabled = settings.getboolean('analyze_protocol', fallback=self.analyze_protocol_enabled) - self.analyze_protocol_save_load = settings.getboolean('analyze_protocol_save_load', fallback=self.analyze_protocol_save_load) + self.analyze_protocol_enabled = settings.getboolean("analyze_protocol", fallback=self.analyze_protocol_enabled) + self.analyze_protocol_save_load = settings.getboolean("analyze_protocol_save_load", fallback=self.analyze_protocol_save_load) #get defaults from protocol settings - if 'send_input_register' in self.protocolSettings.settings: - self.send_input_register = strtobool(self.protocolSettings.settings['send_input_register']) + if "send_input_register" in self.protocolSettings.settings: + self.send_input_register = strtobool(self.protocolSettings.settings["send_input_register"]) - if 'send_holding_register' in self.protocolSettings.settings: - self.send_holding_register = strtobool(self.protocolSettings.settings['send_holding_register']) + if "send_holding_register" in self.protocolSettings.settings: + self.send_holding_register = strtobool(self.protocolSettings.settings["send_holding_register"]) - if 'batch_delay' in self.protocolSettings.settings: - self.modbus_delay = float(self.protocolSettings.settings['batch_delay']) + if "batch_delay" in self.protocolSettings.settings: + self.modbus_delay = float(self.protocolSettings.settings["batch_delay"]) #allow enable/disable of which registers to send - self.send_holding_register = settings.getboolean('send_holding_register', fallback=self.send_holding_register) - self.send_input_register = settings.getboolean('send_input_register', fallback=self.send_input_register) - self.modbus_delay = settings.getfloat(['batch_delay', 'modbus_delay'], fallback=self.modbus_delay) + self.send_holding_register = settings.getboolean("send_holding_register", fallback=self.send_holding_register) + self.send_input_register = settings.getboolean("send_input_register", fallback=self.send_input_register) + self.modbus_delay = settings.getfloat(["batch_delay", "modbus_delay"], fallback=self.modbus_delay) self.modbus_delay_setting = self.modbus_delay @@ -101,22 +101,22 @@ def read_serial_number(self) -> str: sn2 = "" sn3 = "" - fields = ['Serial No 1', 'Serial No 2', 'Serial No 3', 'Serial No 4', 'Serial No 5'] + fields = ["Serial No 1", "Serial No 2", "Serial No 3", "Serial No 4", "Serial No 5"] for field in fields: self._log.info("Reading " + field) registry_entry = self.protocolSettings.get_holding_registry_entry(field) if registry_entry is not None: self._log.info("Reading " + field + "("+str(registry_entry.register)+")") data = self.read_modbus_registers(registry_entry.register, registry_type=Registry_Type.HOLDING) - if not hasattr(data, 'registers') or data.registers is None: + if not hasattr(data, "registers") or data.registers is None: self._log.critical("Failed to get serial number register ("+field+") ; exiting") exit() serial_number = serial_number + str(data.registers[0]) - data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder='big') - sn2 = sn2 + str(data_bytes.decode('utf-8')) - sn3 = str(data_bytes.decode('utf-8')) + sn3 + data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder="big") + sn2 = sn2 + str(data_bytes.decode("utf-8")) + sn3 = str(data_bytes.decode("utf-8")) + sn3 time.sleep(self.modbus_delay*2) #sleep inbetween requests so modbus can rest @@ -178,7 +178,7 @@ def read_data(self) -> dict[str, str]: return info - def validate_protocol(self, protocolSettings : 'protocol_settings') -> float: + def validate_protocol(self, protocolSettings : "protocol_settings") -> float: score_percent = self.validate_registry(Registry_Type.HOLDING) return score_percent @@ -204,13 +204,13 @@ def validate_registry(self, registry_type : Registry_Type = Registry_Type.INPUT) self._log.info("validation score: " + str(score) + " of " + str(maxScore) + " : " + str(round(percent)) + "%") return percent - def analyze_protocol(self, settings_dir : str = 'protocols'): + def analyze_protocol(self, settings_dir : str = "protocols"): print("=== PROTOCOL ANALYZER ===") protocol_names : list[str] = [] protocols : dict[str,protocol_settings] = {} for file in glob.glob(settings_dir + "/*.json"): - file = file.lower().replace(settings_dir, '').replace('/', '').replace('\\', '').replace('\\', '').replace('.json', '') + file = file.lower().replace(settings_dir, "").replace("/", "").replace("\\", "").replace("\\", "").replace(".json", "") print(file) protocol_names.append(file) @@ -283,7 +283,7 @@ def analyze_protocol(self, settings_dir : str = 'protocols'): def evaluate_score(entry : registry_map_entry, val): score = 0 if entry.data_type == Data_Type.ASCII: - if val and not re.match('[^a-zA-Z0-9_-]', val): #validate ascii + if val and not re.match("[^a-zA-Z0-9_-]", val): #validate ascii mod = 1 if entry.concatenate: mod = len(entry.concatenate_registers) @@ -419,7 +419,7 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type def read_variable(self, variable_name : str, registry_type : Registry_Type, entry : registry_map_entry = None): ##clean for convinecne if variable_name: - variable_name = variable_name.strip().lower().replace(' ', '_') + variable_name = variable_name.strip().lower().replace(" ", "_") registry_map = self.protocolSettings.get_registry_map(registry_type) @@ -485,7 +485,7 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en if isinstance(register, bytes) or register.isError() or isError: #sometimes weird errors are handled incorrectly and response is a ascii error string if isinstance(register, bytes): - self._log.error(register.decode('utf-8')) + self._log.error(register.decode("utf-8")) else: self._log.error(register.__str__) self.modbus_delay += self.modbus_delay_increament #increase delay, error is likely due to modbus being busy diff --git a/classes/transports/modbus_rtu.py b/classes/transports/modbus_rtu.py index bf8fd1a..d4f9147 100644 --- a/classes/transports/modbus_rtu.py +++ b/classes/transports/modbus_rtu.py @@ -21,7 +21,7 @@ class modbus_rtu(modbus_base): baudrate : int = 9600 client : ModbusSerialClient - pymodbus_slave_arg = 'unit' + pymodbus_slave_arg = "unit" def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings = None): super().__init__(settings, protocolSettings=protocolSettings) @@ -45,8 +45,8 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings self.addresses = [address] # pymodbus compatability; unit was renamed to address - if 'slave' in inspect.signature(ModbusSerialClient.read_holding_registers).parameters: - self.pymodbus_slave_arg = 'slave' + if "slave" in inspect.signature(ModbusSerialClient.read_holding_registers).parameters: + self.pymodbus_slave_arg = "slave" # Get the signature of the __init__ method @@ -58,16 +58,16 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings self.client = modbus_base.clients[client_str] return - if 'method' in init_signature.parameters: - self.client = ModbusSerialClient(method='rtu', port=self.port, + if "method" in init_signature.parameters: + self.client = ModbusSerialClient(method="rtu", port=self.port, baudrate=int(self.baudrate), - stopbits=1, parity='N', bytesize=8, timeout=2 + stopbits=1, parity="N", bytesize=8, timeout=2 ) else: self.client = ModbusSerialClient( port=self.port, baudrate=int(self.baudrate), - stopbits=1, parity='N', bytesize=8, timeout=2 + stopbits=1, parity="N", bytesize=8, timeout=2 ) #add to clients @@ -75,12 +75,12 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): - if 'unit' not in kwargs: - kwargs = {'unit': int(self.addresses[0]), **kwargs} + if "unit" not in kwargs: + kwargs = {"unit": int(self.addresses[0]), **kwargs} #compatability - if self.pymodbus_slave_arg != 'unit': - kwargs['slave'] = kwargs.pop('unit') + if self.pymodbus_slave_arg != "unit": + kwargs["slave"] = kwargs.pop("unit") if registry_type == Registry_Type.INPUT: return self.client.read_input_registers(address=start, count=count, **kwargs) @@ -91,12 +91,12 @@ def write_register(self, register : int, value : int, **kwargs): if not self.write_enabled: return - if 'unit' not in kwargs: - kwargs = {'unit': self.addresses[0], **kwargs} + if "unit" not in kwargs: + kwargs = {"unit": self.addresses[0], **kwargs} #compatability - if self.pymodbus_slave_arg != 'unit': - kwargs['slave'] = kwargs.pop('unit') + if self.pymodbus_slave_arg != "unit": + kwargs["slave"] = kwargs.pop("unit") self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register diff --git a/classes/transports/modbus_tcp.py b/classes/transports/modbus_tcp.py index 984f146..3768f44 100644 --- a/classes/transports/modbus_tcp.py +++ b/classes/transports/modbus_tcp.py @@ -17,7 +17,7 @@ class modbus_tcp(modbus_base): port : str = 502 host : str = "" client : ModbusTcpClient - pymodbus_slave_arg = 'unit' + pymodbus_slave_arg = "unit" def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings = None): self.host = settings.get("host", "") @@ -27,8 +27,8 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings self.port = settings.getint("port", self.port) # pymodbus compatability; unit was renamed to address - if 'slave' in inspect.signature(ModbusTcpClient.read_holding_registers).parameters: - self.pymodbus_slave_arg = 'slave' + if "slave" in inspect.signature(ModbusTcpClient.read_holding_registers).parameters: + self.pymodbus_slave_arg = "slave" client_str = self.host+"("+str(self.port)+")" #check if client is already initialied @@ -45,12 +45,12 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): - if 'unit' not in kwargs: - kwargs = {'unit': 1, **kwargs} + if "unit" not in kwargs: + kwargs = {"unit": 1, **kwargs} #compatability - if self.pymodbus_slave_arg != 'unit': - kwargs['slave'] = kwargs.pop('unit') + if self.pymodbus_slave_arg != "unit": + kwargs["slave"] = kwargs.pop("unit") if registry_type == Registry_Type.INPUT: return self.client.read_input_registers(start, count, **kwargs ) diff --git a/classes/transports/mqtt.py b/classes/transports/mqtt.py index d6c33aa..4fecef8 100644 --- a/classes/transports/mqtt.py +++ b/classes/transports/mqtt.py @@ -43,31 +43,31 @@ class mqtt(transport_base): connected : bool = False def __init__(self, settings : SectionProxy): - self.host = settings.get('host', fallback="") + self.host = settings.get("host", fallback="") if not self.host: raise ValueError("Host is not set") - self.port = settings.getint('port', fallback=self.port) - self.base_topic = settings.get('base_topic', fallback=self.base_topic).rstrip('/') - self.error_topic = settings.get('error_topic', fallback=self.error_topic).rstrip('/') - self.discovery_topic = settings.get('discovery_topic', fallback=self.discovery_topic) - self.discovery_enabled = strtobool(settings.get('discovery_enabled', self.discovery_enabled)) - self.json = strtobool(settings.get('json', self.json)) - self.reconnect_delay = settings.getint('reconnect_delay', fallback=7) + self.port = settings.getint("port", fallback=self.port) + self.base_topic = settings.get("base_topic", fallback=self.base_topic).rstrip("/") + self.error_topic = settings.get("error_topic", fallback=self.error_topic).rstrip("/") + self.discovery_topic = settings.get("discovery_topic", fallback=self.discovery_topic) + self.discovery_enabled = strtobool(settings.get("discovery_enabled", self.discovery_enabled)) + self.json = strtobool(settings.get("json", self.json)) + self.reconnect_delay = settings.getint("reconnect_delay", fallback=7) #self.max_precision = settings.getint('max_precision', fallback=self.max_precision) if not isinstance( self.reconnect_delay , int) or self.reconnect_delay < 1: #minumum 1 second self.reconnect_delay = 1 - self.reconnect_attempts = settings.getint('reconnect_attempts', fallback=21) + self.reconnect_attempts = settings.getint("reconnect_attempts", fallback=21) if not isinstance( self.reconnect_attempts , int) or self.reconnect_attempts < 0: #minimum 0 self.reconnect_attempts = 0 self.holding_register_prefix = settings.get("holding_register_prefix", fallback="") self.input_register_prefix = settings.get("input_register_prefix", fallback="") - username = settings.get('user', fallback="") - password = settings.get('pass', fallback="") + username = settings.get("user", fallback="") + password = settings.get("pass", fallback="") if not username: raise ValueError("User is not set") @@ -108,7 +108,7 @@ def connect(self): def exit_handler(self): '''on exit handler''' self._log.warning("MQTT Exiting...") - self.client.publish( self.base_topic + '/' + self.device_identifier + "/availability","offline") + self.client.publish( self.base_topic + "/" + self.device_identifier + "/availability","offline") return def mqtt_reconnect(self): @@ -161,29 +161,29 @@ def write_data(self, data : dict[str, str], from_transport : transport_base): self._log.info(f"write data from [{from_transport.transport_name}] to mqtt transport") self._log.info(data) #have to send this every loop, because mqtt doesnt disconnect when HA restarts. HA bug. - info = self.client.publish(self.base_topic + '/' + from_transport.device_identifier + "/availability","online", qos=0,retain=True) + info = self.client.publish(self.base_topic + "/" + from_transport.device_identifier + "/availability","online", qos=0,retain=True) if info.rc == MQTT_ERR_NO_CONN: self.connected = False if(self.json): # Serializing json json_object = json.dumps(data, indent=4) - self.client.publish(self.base_topic+'/'+from_transport.device_identifier, json_object, 0, properties=self.mqtt_properties) + self.client.publish(self.base_topic+"/"+from_transport.device_identifier, json_object, 0, properties=self.mqtt_properties) else: for entry, val in data.items(): if isinstance(val, float) and self.max_precision >= 0: #apply max_precision on mqtt transport val = round(val, self.max_precision) - self.client.publish(str(self.base_topic+'/'+from_transport.device_identifier+'/'+entry).lower(), str(val)) + self.client.publish(str(self.base_topic+"/"+from_transport.device_identifier+"/"+entry).lower(), str(val)) def client_on_message(self, client, userdata, msg): """ The callback for when a PUBLISH message is received from the server. """ - self._log.info(msg.topic+" "+str(msg.payload.decode('utf-8'))) + self._log.info(msg.topic+" "+str(msg.payload.decode("utf-8"))) #self.protocolSettings.validate_registry_entry if msg.topic in self.__write_topics: entry = self.__write_topics[msg.topic] - self.on_message(self, entry, msg.payload.decode('utf-8')) + self.on_message(self, entry, msg.payload.decode("utf-8")) #self.write_variable(entry, value=str(msg.payload.decode('utf-8'))) def init_bridge(self, from_transport : transport_base): @@ -194,7 +194,7 @@ def init_bridge(self, from_transport : transport_base): for entry in from_transport.protocolSettings.get_registry_map(Registry_Type.HOLDING): if entry.write_mode == WriteMode.WRITE or entry.write_mode == WriteMode.WRITEONLY: #__write_topics - topic : str = self.base_topic + '/'+ from_transport.device_identifier + "/write/" + entry.variable_name.lower().replace(' ', '_') + topic : str = self.base_topic + "/"+ from_transport.device_identifier + "/write/" + entry.variable_name.lower().replace(" ", "_") self.__write_topics[topic] = entry self.client.subscribe(topic) @@ -205,13 +205,13 @@ def mqtt_discovery(self, from_transport : transport_base): self._log.info("Publishing HA Discovery Topics...") disc_payload = {} - disc_payload['availability_topic'] = self.base_topic + '/' + from_transport.device_identifier + "/availability" + disc_payload["availability_topic"] = self.base_topic + "/" + from_transport.device_identifier + "/availability" device = {} - device['manufacturer'] = from_transport.device_manufacturer - device['model'] = from_transport.device_model - device['identifiers'] = "hotnoob_" + from_transport.device_model + "_" + from_transport.device_serial_number - device['name'] = from_transport.device_name + device["manufacturer"] = from_transport.device_manufacturer + device["model"] = from_transport.device_model + device["identifiers"] = "hotnoob_" + from_transport.device_model + "_" + from_transport.device_serial_number + device["name"] = from_transport.device_name registry_map : list[registry_map_entry] = [] for entries in from_transport.protocolSettings.registry_map.values(): @@ -229,7 +229,7 @@ def mqtt_discovery(self, from_transport : transport_base): continue - clean_name = item.variable_name.lower().replace(' ', '_').strip() + clean_name = item.variable_name.lower().replace(" ", "_").strip() if not clean_name: #if name is empty, skip continue @@ -241,36 +241,36 @@ def mqtt_discovery(self, from_transport : transport_base): clean_name = self.__holding_register_prefix + clean_name - print(('#Publishing Topic '+str(count)+' of ' + str(length) + ' "'+str(clean_name)+'"').ljust(100)+"#", end='\r', flush=True) + print(("#Publishing Topic "+str(count)+" of " + str(length) + ' "'+str(clean_name)+'"').ljust(100)+"#", end="\r", flush=True) #device['sw_version'] = bms_version disc_payload = {} - disc_payload['availability_topic'] = self.base_topic + '/' + from_transport.device_identifier + "/availability" - disc_payload['device'] = device - disc_payload['name'] = clean_name - disc_payload['unique_id'] = "hotnoob_" + from_transport.device_serial_number + "_"+clean_name + disc_payload["availability_topic"] = self.base_topic + "/" + from_transport.device_identifier + "/availability" + disc_payload["device"] = device + disc_payload["name"] = clean_name + disc_payload["unique_id"] = "hotnoob_" + from_transport.device_serial_number + "_"+clean_name writePrefix = "" if from_transport.write_enabled and ( item.write_mode == WriteMode.WRITE or item.write_mode == WriteMode.WRITEONLY ): writePrefix = "" #home assistant doesnt like write prefix - disc_payload['state_topic'] = self.base_topic + '/' +from_transport.device_identifier + writePrefix+ "/"+clean_name + disc_payload["state_topic"] = self.base_topic + "/" +from_transport.device_identifier + writePrefix+ "/"+clean_name if item.unit: - disc_payload['unit_of_measurement'] = item.unit + disc_payload["unit_of_measurement"] = item.unit - discovery_topic = self.discovery_topic+"/sensor/HN-" + from_transport.device_serial_number + writePrefix + "/" + disc_payload['name'].replace(' ', '_') + "/config" + discovery_topic = self.discovery_topic+"/sensor/HN-" + from_transport.device_serial_number + writePrefix + "/" + disc_payload["name"].replace(" ", "_") + "/config" self.client.publish(discovery_topic, json.dumps(disc_payload),qos=1, retain=True) #send WO message to indicate topic is write only if item.write_mode == WriteMode.WRITEONLY: - self.client.publish(disc_payload['state_topic'], "WRITEONLY") + self.client.publish(disc_payload["state_topic"], "WRITEONLY") time.sleep(0.07) #slow down for better reliability - self.client.publish(disc_payload['availability_topic'],"online",qos=0, retain=True) + self.client.publish(disc_payload["availability_topic"],"online",qos=0, retain=True) print() self._log.info("Published HA "+str(count)+"x Discovery Topics") diff --git a/classes/transports/pace.py b/classes/transports/pace.py index 7dae56b..927a92c 100644 --- a/classes/transports/pace.py +++ b/classes/transports/pace.py @@ -222,7 +222,7 @@ def checkFrame(self): """ try: self.populateHeader() - frame_size = self._header['len'] + frame_size = self._header["len"] data = self._buffer[:frame_size - 2] crc = self._buffer[frame_size - 2:frame_size] crc_val = (byte2int(crc[0]) << 8) + byte2int(crc[1]) @@ -238,7 +238,7 @@ def checkFrame(self): class CustomModbusSerialClient(ModbusSerialClient): - def __init__(self, method='ascii', **kwargs): + def __init__(self, method="ascii", **kwargs): """ Initialize a serial client instance The methods to connect are:: @@ -262,12 +262,12 @@ def __init__(self, method='ascii', **kwargs): BaseModbusClient.__init__(self, CustomFramer(ClientDecoder(), self), **kwargs) - self.port = kwargs.get('port', 0) - self.stopbits = kwargs.get('stopbits', Defaults.Stopbits) - self.bytesize = kwargs.get('bytesize', Defaults.Bytesize) - self.parity = kwargs.get('parity', Defaults.Parity) - self.baudrate = kwargs.get('baudrate', Defaults.Baudrate) - self.timeout = kwargs.get('timeout', Defaults.Timeout) + self.port = kwargs.get("port", 0) + self.stopbits = kwargs.get("stopbits", Defaults.Stopbits) + self.bytesize = kwargs.get("bytesize", Defaults.Bytesize) + self.parity = kwargs.get("parity", Defaults.Parity) + self.baudrate = kwargs.get("baudrate", Defaults.Baudrate) + self.timeout = kwargs.get("timeout", Defaults.Timeout) self._strict = kwargs.get("strict", True) self.last_frame_end = None if self.method == "rtu": @@ -302,9 +302,9 @@ def __init__(self, settings : dict[str,str]): if "baudrate" in settings: self.baudrate = settings["baudrate"] - self.client = CustomModbusSerialClient(method='binary', port=self.port, + self.client = CustomModbusSerialClient(method="binary", port=self.port, baudrate=int(self.baudrate), - stopbits=1, parity='N', bytesize=8, timeout=2 + stopbits=1, parity="N", bytesize=8, timeout=2 ) def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): diff --git a/classes/transports/serial_frame_client.py b/classes/transports/serial_frame_client.py index eef81a9..5c65ec9 100644 --- a/classes/transports/serial_frame_client.py +++ b/classes/transports/serial_frame_client.py @@ -17,7 +17,7 @@ class serial_frame_client(): max_frame_size : int = 256 - port : str = '/dev/ttyUSB0' + port : str = "/dev/ttyUSB0" baud : int = 9600 timeout : float = 5 diff --git a/classes/transports/serial_pylon.py b/classes/transports/serial_pylon.py index 69e2f51..33cd011 100644 --- a/classes/transports/serial_pylon.py +++ b/classes/transports/serial_pylon.py @@ -45,8 +45,8 @@ class serial_pylon(transport_base): client : serial_frame_client #this format is pretty common; i need a name for it. - SOI : bytes = b'\x7e' # aka b"~" - VER : bytes = b'\x00' + SOI : bytes = b"\x7e" # aka b"~" + VER : bytes = b"\x00" ''' version has to be fetched first ''' ADR : bytes CID1 : bytes @@ -55,9 +55,9 @@ class serial_pylon(transport_base): ''' 2 bytes - include LENID & LCHKSUM''' INFO : bytes CHKSUM : bytes - EOI : bytes = b'\x0d' # aka b"\r" + EOI : bytes = b"\x0d" # aka b"\r" - def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_settings' = None): + def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_settings" = None): super().__init__(settings, protocolSettings=protocolSettings) '''address is required to be specified ''' self.port = settings.get("port", "") @@ -72,7 +72,7 @@ def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_setti address : int = settings.getint("address", 0) self.addresses = [address] - self.ADR = struct.pack('B', address) + self.ADR = struct.pack("B", address) #todo, multi address support later self.client = serial_frame_client(self.port, @@ -87,16 +87,16 @@ def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_setti def connect(self): self.client.connect() #3.1 Get protocol version - if self.VER == b'\x00': + if self.VER == b"\x00": #get VER for communicating #SOI VER ADR 46H 4FH LENGT INFO CHKSUM EOI - version = self.read_variable('version', attribute='ver') + version = self.read_variable("version", attribute="ver") if version: self.connected = True self._log.info("pylon protocol version is "+str(version)) self.VER = version - name = self.read_variable('battery_name') + name = self.read_variable("battery_name") self._log.info(name) pass @@ -113,7 +113,7 @@ def read_data(self): self.send_command(command) frame = self.client.read() if frame: #decode info to ascii: bytes.fromhex(name.decode("utf-8")).decode("ascii") - raw = getattr(self.decode_frame(frame), 'info') + raw = getattr(self.decode_frame(frame), "info") if raw: raw = bytes.fromhex(raw.decode("utf8")) #because protocol is in "ascii" data[entry.register] = raw @@ -125,10 +125,10 @@ def read_data(self): return info - def read_variable(self, variable_name : str, entry : 'registry_map_entry' = None, attribute : str = 'info'): + def read_variable(self, variable_name : str, entry : "registry_map_entry" = None, attribute : str = "info"): ##clean for convinecne if variable_name: - variable_name = variable_name.strip().lower().replace(' ', '_') + variable_name = variable_name.strip().lower().replace(" ", "_") registry_map = self.protocolSettings.get_registry_map() @@ -146,7 +146,7 @@ def read_variable(self, variable_name : str, entry : 'registry_map_entry' = None frame = self.client.read() if frame: #decode info to ascii: bytes.fromhex(name.decode("utf-8")).decode("ascii") raw = getattr(self.decode_frame(frame), attribute) - if raw and attribute == 'info': + if raw and attribute == "info": raw = bytes.fromhex(raw.decode("utf8")) #because protocol is in "ascii" raw = self.protocolSettings.process_registery({entry.register : raw}, map=registry_map) return raw @@ -173,7 +173,7 @@ def decode_frame(self, raw_frame: bytes) -> bytes: frame_data = raw_frame[0:-4] frame_checksum = raw_frame[-4:] - calc_checksum = struct.pack('>H', self.calculate_checksum(raw_frame[0:-4])).hex().upper().encode() + calc_checksum = struct.pack(">H", self.calculate_checksum(raw_frame[0:-4])).hex().upper().encode() if calc_checksum != frame_checksum: self._log.warning(f"Serial Pylon checksum error, got {calc_checksum}, expected {frame_checksum}") @@ -195,7 +195,7 @@ def decode_frame(self, raw_frame: bytes) -> bytes: return data - def build_frame(self, command : int, info: bytes = b''): + def build_frame(self, command : int, info: bytes = b""): ''' builds frame without soi and eoi; that is left for frame client''' info_length = 0 @@ -209,19 +209,19 @@ def build_frame(self, command : int, info: bytes = b''): info_length = (lenid_invert_plus_one << 12) + lenid - self.VER = b'\x20' + self.VER = b"\x20" #protocol is in ASCII hex. :facepalm: frame : str = self.VER.hex().upper() frame = frame + self.ADR.hex().upper() - frame = frame + struct.pack('>H', command).hex().upper() - frame = frame + struct.pack('>H', info_length).hex().upper() + frame = frame + struct.pack(">H", command).hex().upper() + frame = frame + struct.pack(">H", info_length).hex().upper() frame = frame + info.hex().upper() frame = frame.encode() frame_chksum = self.calculate_checksum(frame) - frame = frame + struct.pack('>H', frame_chksum).hex().upper().encode() + frame = frame + struct.pack(">H", frame_chksum).hex().upper().encode() #test frame #self.decode_frame(frame) @@ -229,7 +229,7 @@ def build_frame(self, command : int, info: bytes = b''): return frame - def send_command(self, cmd, info: bytes = b''): + def send_command(self, cmd, info: bytes = b""): data = self.build_frame(cmd, info) self.client.write(data) diff --git a/classes/transports/transport_base.py b/classes/transports/transport_base.py index cdff05e..fde3e47 100644 --- a/classes/transports/transport_base.py +++ b/classes/transports/transport_base.py @@ -13,16 +13,16 @@ from .transport_base import transport_base class transport_base: - type : str = '' - protocolSettings : 'protocol_settings' - protocol_version : str = '' - transport_name : str = '' - device_name : str = '' - device_serial_number : str = '' - device_manufacturer : str = 'hotnoob' - device_model : str = 'hotnoob' - device_identifier : str = 'hotnoob' - bridge : str = '' + type : str = "" + protocolSettings : "protocol_settings" + protocol_version : str = "" + transport_name : str = "" + device_name : str = "" + device_serial_number : str = "" + device_manufacturer : str = "hotnoob" + device_model : str = "hotnoob" + device_identifier : str = "hotnoob" + bridge : str = "" write_enabled : bool = False max_precision : int = 2 @@ -31,18 +31,18 @@ class transport_base: connected : bool = False - on_message : Callable[['transport_base', registry_map_entry, str], None] = None + on_message : Callable[["transport_base", registry_map_entry, str], None] = None ''' callback, on message recieved ''' _log : logging.Logger = None - def __init__(self, settings : 'SectionProxy') -> None: + def __init__(self, settings : "SectionProxy") -> None: self.transport_name = settings.name #section name #apply log level to logger - self._log_level = getattr(logging, settings.get('log_level', fallback='INFO'), logging.INFO) - short_name : str = __name__[__name__.rfind('.'): ] if '.' in __name__ else None + self._log_level = getattr(logging, settings.get("log_level", fallback="INFO"), logging.INFO) + short_name : str = __name__[__name__.rfind("."): ] if "." in __name__ else None self._log : logging.Logger = logging.getLogger(short_name + f"[{self.transport_name}]") self._log.setLevel(self._log_level) @@ -52,7 +52,7 @@ def __init__(self, settings : 'SectionProxy') -> None: if settings: self.device_serial_number = settings.get(["device_serial_number", "serial_number"], self.device_serial_number) self.device_manufacturer = settings.get(["device_manufacturer", "manufacturer"], self.device_manufacturer) - self.device_name = settings.get(['device_name', 'name'], fallback=self.device_manufacturer+"_"+self.device_serial_number) + self.device_name = settings.get(["device_name", "name"], fallback=self.device_manufacturer+"_"+self.device_serial_number) self.bridge = settings.get("bridge", self.bridge) self.read_interval = settings.getfloat("read_interval", self.read_interval) self.max_precision = settings.getint(["max_precision", "precision"], self.max_precision) @@ -63,7 +63,7 @@ def __init__(self, settings : 'SectionProxy') -> None: #load a protocol_settings class for every transport; required for adv features. ie, variable timing. #must load after settings - self.protocol_version = settings.get('protocol_version') + self.protocol_version = settings.get("protocol_version") if self.protocol_version: self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings) @@ -78,7 +78,7 @@ def __init__(self, settings : 'SectionProxy') -> None: def update_identifier(self): self.device_identifier = self.device_serial_number.strip().lower() - def init_bridge(self, from_transport : 'transport_base'): + def init_bridge(self, from_transport : "transport_base"): pass @classmethod @@ -91,7 +91,7 @@ def _get_top_class_name(cls, cls_obj): def connect(self): pass - def write_data(self, data : dict[str, registry_map_entry], from_transport : 'transport_base'): + def write_data(self, data : dict[str, registry_map_entry], from_transport : "transport_base"): ''' general purpose write function for between transports''' pass @@ -120,7 +120,7 @@ def write_register(self, register : int, value : int, **kwargs): def analyse_protocol(self): pass - def validate_protocol(self, protocolSettings : 'protocol_settings') -> float: + def validate_protocol(self, protocolSettings : "protocol_settings") -> float: ''' validates protocol''' pass #endregion diff --git a/defs/common.py b/defs/common.py index 2be9517..7c5a82f 100644 --- a/defs/common.py +++ b/defs/common.py @@ -11,7 +11,7 @@ def strtobool (val): return val val = val.lower() - if val in ('y', 'yes', 't', 'true', 'on', '1'): + if val in ("y", "yes", "t", "true", "on", "1"): return 1 return 0 @@ -24,46 +24,46 @@ def strtoint(val : str) -> int: val = val.lower().strip() - if val and val[0] == 'x': + if val and val[0] == "x": val = val[1:] # Pad the string with a leading zero if len(val) % 2 != 0: - val = '0' + val + val = "0" + val - return int.from_bytes(bytes.fromhex(val), byteorder='big') + return int.from_bytes(bytes.fromhex(val), byteorder="big") if val and val.startswith("0x"): val = val[2:] # Pad the string with a leading zero if len(val) % 2 != 0: - val = '0' + val + val = "0" + val - return int.from_bytes(bytes.fromhex(val), byteorder='big') + return int.from_bytes(bytes.fromhex(val), byteorder="big") if not val: #empty return 0 return int(val) -def get_usb_serial_port_info(port : str = '') -> str: +def get_usb_serial_port_info(port : str = "") -> str: for p in serial.tools.list_ports.comports(): if str(p.device).upper() == port.upper(): return "["+hex(p.vid)+":"+hex(p.pid)+":"+str(p.serial_number)+":"+str(p.location)+"]" return "" -def find_usb_serial_port(port : str = '', vendor_id : str = '', product_id : str = '', serial_number : str = '', location : str = '') -> str: - if not port.startswith('['): +def find_usb_serial_port(port : str = "", vendor_id : str = "", product_id : str = "", serial_number : str = "", location : str = "") -> str: + if not port.startswith("["): return port - port = port.replace('None', '') + port = port.replace("None", "") match = re.match(r"\[(?P[\da-zA-Z]+|):?(?P[\da-zA-Z]+|):?(?P[\da-zA-Z]+|):?(?P[\d\-]+|)\]", port) if match: - vendor_id = int(match.group("vendor"), 16) if match.group("vendor") else '' - product_id = int(match.group("product"), 16) if match.group("product") else '' - serial_number = match.group("serial") if match.group("serial") else '' - location = match.group("location") if match.group("location") else '' + vendor_id = int(match.group("vendor"), 16) if match.group("vendor") else "" + product_id = int(match.group("product"), 16) if match.group("product") else "" + serial_number = match.group("serial") if match.group("serial") else "" + location = match.group("location") if match.group("location") else "" for port in serial.tools.list_ports.comports(): if ((not vendor_id or port.vid == vendor_id) and diff --git a/documentation/.scripts/generate_indexes.py b/documentation/.scripts/generate_indexes.py index a9214a0..b6f4368 100644 --- a/documentation/.scripts/generate_indexes.py +++ b/documentation/.scripts/generate_indexes.py @@ -8,17 +8,17 @@ def extract_first_header(file_path): for line in file: line = line.strip() if line.startswith("#"): - return line.replace('#', '') + return line.replace("#", "") return None def generate_readme(directory : str, folder_order : str = [], output_file : str ="README.md"): - with open(directory+'/'+output_file, "w", encoding="utf-8") as readme: + with open(directory+"/"+output_file, "w", encoding="utf-8") as readme: readme.write("# README Index\n\n") readme.write("This README file contains an index of all files in the documentation directory.\n\n") readme.write("## File List\n\n") - note_file : str = directory+'/note.md' + note_file : str = directory+"/note.md" if os.path.exists(note_file): readme.write("\n## Additional Notes\n\n") with open(note_file, "r", encoding="utf-8") as note: @@ -33,7 +33,7 @@ def generate_readme(directory : str, folder_order : str = [], output_file : str relative_folder = os.path.relpath(root, directory).replace("\\", "/") #use linux path structure #exclude . folders - if relative_folder[0] == '.': + if relative_folder[0] == ".": continue if relative_folder != previous_folder: diff --git a/protocol_gateway.py b/protocol_gateway.py index 485beb7..2f8ea9b 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -52,9 +52,9 @@ def get(self, section, option, *args, **kwargs): if isinstance(option, list): fallback = None - if 'fallback' in kwargs: #override kwargs fallback, for manually handling here - fallback = kwargs['fallback'] - kwargs['fallback'] = None + if "fallback" in kwargs: #override kwargs fallback, for manually handling here + fallback = kwargs["fallback"] + kwargs["fallback"] = None for name in option: value = super().get(section, name, *args, **kwargs) @@ -80,7 +80,7 @@ class Protocol_Gateway: """ __log = None # log level, available log levels are CRITICAL, FATAL, ERROR, WARNING, INFO, DEBUG - __log_level = 'DEBUG' + __log_level = "DEBUG" __running : bool = False ''' controls main loop''' @@ -91,15 +91,15 @@ class Protocol_Gateway: config_file : str def __init__(self, config_file : str): - self.__log = logging.getLogger('invertermodbustomqqt_log') + self.__log = logging.getLogger("invertermodbustomqqt_log") handler = logging.StreamHandler(sys.stdout) #self.__log.setLevel(logging.DEBUG) - formatter = logging.Formatter('[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s') + formatter = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s") handler.setFormatter(formatter) self.__log.addHandler(handler) - self.config_file = os.path.dirname(os.path.realpath(__file__)) + '/growatt2mqtt.cfg' - newcfg = os.path.dirname(os.path.realpath(__file__)) + '/'+ config_file + self.config_file = os.path.dirname(os.path.realpath(__file__)) + "/growatt2mqtt.cfg" + newcfg = os.path.dirname(os.path.realpath(__file__)) + "/"+ config_file if os.path.isfile(newcfg): self.config_file = newcfg @@ -114,34 +114,34 @@ def __init__(self, config_file : str): self.__settings.read(self.config_file) ##[general] - self.__log_level = self.__settings.get('general','log_level', fallback='INFO') + self.__log_level = self.__settings.get("general","log_level", fallback="INFO") log_level = getattr(logging, self.__log_level, logging.INFO) self.__log.setLevel(log_level) logging.basicConfig(level=log_level) for section in self.__settings.sections(): - if section.startswith('transport'): + if section.startswith("transport"): transport_cfg = self.__settings[section] - transport_type = transport_cfg.get('transport', fallback="") - protocol_version = transport_cfg.get('protocol_version', fallback="") + transport_type = transport_cfg.get("transport", fallback="") + protocol_version = transport_cfg.get("protocol_version", fallback="") if not transport_type and not protocol_version: - raise ValueError('Missing Transport / Protocol Version') + raise ValueError("Missing Transport / Protocol Version") if not transport_type and protocol_version: #get transport from protocol settings... todo need to make a quick function instead of this protocolSettings : protocol_settings = protocol_settings(protocol_version) if not transport_type and not protocolSettings.transport: - raise ValueError('Missing Transport') + raise ValueError("Missing Transport") if not transport_type: transport_type = protocolSettings.transport # Import the module - module = importlib.import_module('classes.transports.'+transport_type) + module = importlib.import_module("classes.transports."+transport_type) # Get the class from the module cls = getattr(module, transport_type) transport : transport_base = cls(transport_cfg) @@ -227,13 +227,13 @@ def main(): if __name__ == "__main__": # Create ArgumentParser object - parser = argparse.ArgumentParser(description='Python Protocol Gateway') + parser = argparse.ArgumentParser(description="Python Protocol Gateway") # Add arguments - parser.add_argument('--config', '-c', type=str, help='Specify Config File') + parser.add_argument("--config", "-c", type=str, help="Specify Config File") # Add a positional argument with default - parser.add_argument('positional_config', type=str, help='Specify Config File', nargs='?', default='config.cfg') + parser.add_argument("positional_config", type=str, help="Specify Config File", nargs="?", default="config.cfg") # Parse arguments args = parser.parse_args() diff --git a/pytests/test_example_config.py b/pytests/test_example_config.py index d3909c2..f58de27 100644 --- a/pytests/test_example_config.py +++ b/pytests/test_example_config.py @@ -2,7 +2,7 @@ import sys #move up a folder for tests -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from protocol_gateway import CustomConfigParser diff --git a/pytests/test_protocol_settings.py b/pytests/test_protocol_settings.py index bada009..34394c8 100644 --- a/pytests/test_protocol_settings.py +++ b/pytests/test_protocol_settings.py @@ -5,13 +5,13 @@ import pytest #move up a folder for tests -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from classes.protocol_settings import protocol_settings # List of protocols to test # Create the search pattern to find .json files recursively -search_pattern = os.path.join("protocols", '**', '*.json') +search_pattern = os.path.join("protocols", "**", "*.json") # Use glob to find all files matching the pattern files = glob.glob(search_pattern, recursive=True) # Extract file names without extension diff --git a/ruff.toml b/ruff.toml index a73964f..e757077 100644 --- a/ruff.toml +++ b/ruff.toml @@ -7,17 +7,18 @@ target-version = "py311" # Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or # McCabe complexity (`C901`) by default. -select = ["BLE", "C", "DTZ", "E", "EM", "F", "FA", "FBT", "G", "I", "S", "TRY", "W"] +select = ["BLE", "C", "DTZ", "E", "EM", "F", "FA", "FBT", "G", "I", "LOG", "S", "TRY", "W"] #select = ["ALL"] ignore = [ "BLE001", "C901", -# "D", +"D", "EM101", "E501", "FBT001", "FBT002", "N", +"RUF", "TRY003", ] diff --git a/test.py b/test.py index 3272c8a..dd24198 100644 --- a/test.py +++ b/test.py @@ -10,7 +10,7 @@ # Candlelight firmware on Linux -bus = can.interface.Bus(interface='socketcan', channel='can0', bitrate=500000) +bus = can.interface.Bus(interface="socketcan", channel="can0", bitrate=500000) # Stock slcan firmware on Linux #bus = can.interface.Bus(bustype='slcan', channel='/dev/ttyACM0', bitrate=500000) @@ -40,21 +40,21 @@ if msg.arbitration_id == 0x0FFF: # The data is a 2-byte value (un16) soc_bytes = msg.data[:2] - soc = int.from_bytes(soc_bytes, byteorder='big', signed=False) / 100.0 + soc = int.from_bytes(soc_bytes, byteorder="big", signed=False) / 100.0 print(f"State of Charge: {soc:.2f}%") if msg.arbitration_id == 0x0355: # Extract and print SOC value (U16, 0.01%) - soc_value = int.from_bytes(msg.data[0:0 + 2], byteorder='little') + soc_value = int.from_bytes(msg.data[0:0 + 2], byteorder="little") print(f"State of Charge (SOC) Value: {soc_value / 100:.2f}%") # Extract and print SOH value (U16, 1%) - soh_value = int.from_bytes(msg.data[2:2 + 2], byteorder='little') + soh_value = int.from_bytes(msg.data[2:2 + 2], byteorder="little") print(f"State of Health (SOH) Value: {soh_value:.2f}%") # Extract and print HiRes SOC value (U16, 0.01%) - hires_soc_value = int.from_bytes(msg.data[4:4 + 2], byteorder='little') + hires_soc_value = int.from_bytes(msg.data[4:4 + 2], byteorder="little") print(f"High Resolution SOC Value: {hires_soc_value / 100:.2f}%") except KeyboardInterrupt: @@ -71,7 +71,7 @@ # Function to evaluate mathematical expressions def evaluate_variables(expression): # Define a regular expression pattern to match variables - var_pattern = re.compile(r'\[([^\[\]]+)\]') + var_pattern = re.compile(r"\[([^\[\]]+)\]") # Replace variables in the expression with their values def replace_vars(match): @@ -86,7 +86,7 @@ def replace_vars(match): def evaluate_ranges(expression): # Define a regular expression pattern to match ranges - range_pattern = re.compile(r'\[.*?((?P\d+)\s?\~\s?(?P\d+)).*?\]') + range_pattern = re.compile(r"\[.*?((?P\d+)\s?\~\s?(?P\d+)).*?\]") # Find all ranges in the expression ranges = range_pattern.findall(expression) @@ -114,19 +114,19 @@ def evaluate_ranges(expression): def evaluate_expression(expression): # Define a regular expression pattern to match "maths" - var_pattern = re.compile(r'\[(?P.*?)\]') + var_pattern = re.compile(r"\[(?P.*?)\]") # Replace variables in the expression with their values def replace_vars(match): try: maths = match.group("maths") - maths = re.sub(r'\s', '', maths) #remove spaces, because ast.parse doesnt like them + maths = re.sub(r"\s", "", maths) #remove spaces, because ast.parse doesnt like them # Parse the expression safely - tree = ast.parse(maths, mode='eval') + tree = ast.parse(maths, mode="eval") # Evaluate the expression - end_value = ast.literal_eval(compile(tree, filename='', mode='eval')) + end_value = ast.literal_eval(compile(tree, filename="", mode="eval")) return str(end_value) except Exception: diff --git a/tools/apply_common_names_to_csv.py b/tools/apply_common_names_to_csv.py index 17535a9..6c71d1c 100644 --- a/tools/apply_common_names_to_csv.py +++ b/tools/apply_common_names_to_csv.py @@ -1,41 +1,41 @@ import csv import json -path = 'protocols/eg4_v58.holding_registry_map.csv' -save_path = 'protocols/eg4_v58.holding_registry_map.csv' +path = "protocols/eg4_v58.holding_registry_map.csv" +save_path = "protocols/eg4_v58.holding_registry_map.csv" -common_path = 'tools/common_names.json' +common_path = "tools/common_names.json" common_names : dict[str,str] = {} # Open the file and load the JSON data -with open(common_path, 'r') as file: +with open(common_path, "r") as file: common_names = json.load(file) new_csv : list[dict[str,str]] = [] -with open(path, newline='') as csvfile: +with open(path, newline="") as csvfile: # Create a CSV reader object - reader = csv.DictReader(csvfile, delimiter=';') #compensate for openoffice + reader = csv.DictReader(csvfile, delimiter=";") #compensate for openoffice fieldnames = reader.fieldnames # Iterate over each row in the CSV file for row in reader: - if not row['variable name'] and row['documented name']: - if row['documented name'] not in common_names: - print('no friendly name : ' + row['documented name']) + if not row["variable name"] and row["documented name"]: + if row["documented name"] not in common_names: + print("no friendly name : " + row["documented name"]) new_csv.append(row) continue - if not row['variable name'].strip(): #if empty, apply name - row['variable name'] = common_names[row['documented name']] - print(row['documented name'] + ' -> ' + common_names[row['documented name']]) + if not row["variable name"].strip(): #if empty, apply name + row["variable name"] = common_names[row["documented name"]] + print(row["documented name"] + " -> " + common_names[row["documented name"]]) new_csv.append(row) continue new_csv.append(row) # Write data to the output CSV -with open(save_path, 'w', newline='') as outfile: +with open(save_path, "w", newline="") as outfile: writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(new_csv) diff --git a/tools/get_common_names_from_csv.py b/tools/get_common_names_from_csv.py index f2ba72e..74439c8 100644 --- a/tools/get_common_names_from_csv.py +++ b/tools/get_common_names_from_csv.py @@ -1,32 +1,32 @@ import csv import json -path = 'protocols/growatt_2020_v1.24.input_registry_map.csv' +path = "protocols/growatt_2020_v1.24.input_registry_map.csv" -save_path = 'tools/common_names.json' +save_path = "tools/common_names.json" common_names = {} -common_path = 'tools/common_names.json' #load existing names +common_path = "tools/common_names.json" #load existing names common_names : dict[str,str] = {} # Open the file and load the JSON data -with open(common_path, 'r') as file: +with open(common_path, "r") as file: common_names = json.load(file) -with open(path, newline='') as csvfile: +with open(path, newline="") as csvfile: # Create a CSV reader object - reader = csv.DictReader(csvfile, delimiter=';') #compensate for openoffice + reader = csv.DictReader(csvfile, delimiter=";") #compensate for openoffice # Iterate over each row in the CSV file for row in reader: - if row['variable name'] and row['documented name']: - if row['documented name'] in common_names and common_names[row['documented name']] != row['variable name']: - print('Warning, Naming Conflict') - print(row['documented name'] + ' -> ' + row['variable name']) - print(row['documented name'] + ' -> ' + common_names[row['documented name']]) + if row["variable name"] and row["documented name"]: + if row["documented name"] in common_names and common_names[row["documented name"]] != row["variable name"]: + print("Warning, Naming Conflict") + print(row["documented name"] + " -> " + row["variable name"]) + print(row["documented name"] + " -> " + common_names[row["documented name"]]) else: - common_names[row['documented name']] = row['variable name'] + common_names[row["documented name"]] = row["variable name"] json_str = json.dumps(common_names, indent=1) @@ -34,7 +34,7 @@ print(json_str) quit() -with open(save_path, 'w') as file: +with open(save_path, "w") as file: file.write(json_str) print("saved to "+save_path) diff --git a/tools/list_to_json.py b/tools/list_to_json.py index edd10e2..167b717 100644 --- a/tools/list_to_json.py +++ b/tools/list_to_json.py @@ -5,9 +5,9 @@ input_string = "1=Charger Only;2=Inverter Only;3=On;4=Off" while True: user_input = input("Enter Data: ") - user_input = re.sub(r'\s+', " ", user_input) - user_input = re.sub(r'\:\s+', ":", user_input) - user_input = re.sub(r'\s+\:', ":", user_input) + user_input = re.sub(r"\s+", " ", user_input) + user_input = re.sub(r"\:\s+", ":", user_input) + user_input = re.sub(r"\s+\:", ":", user_input) # Split the string into key-value pairs