184 lines
9.6 KiB
Python
184 lines
9.6 KiB
Python
#
|
|
# Copyright (C) 2025 Extrafu <extrafu@gmail.com>
|
|
#
|
|
# This file is part of BerryBMS.
|
|
#
|
|
# BerryBMS is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the
|
|
# Free Software Foundation; either version 3, or (at your option) any
|
|
# later version.
|
|
#
|
|
import pymodbus.client as ModbusClient
|
|
from pymodbus.client.mixin import ModbusClientMixin
|
|
import pymodbus.exceptions
|
|
import json
|
|
|
|
from ModbusDevice import ModbusDevice
|
|
from Register import Register
|
|
|
|
class JKBMS(ModbusDevice):
|
|
|
|
def __init__(self,
|
|
name,
|
|
id,
|
|
port):
|
|
super().__init__(id)
|
|
self.name = name
|
|
self.port = port
|
|
self.registers = [
|
|
Register(self.id,'CellUnderVoltProtectionValue', 0x1004, ModbusClientMixin.DATATYPE.UINT32, .001),
|
|
Register(self.id,'CellOverVoltProtectionValue', 0x100C, ModbusClientMixin.DATATYPE.UINT32, .001),
|
|
Register(self.id,'100PercentSoCVoltageValue', 0x1018, ModbusClientMixin.DATATYPE.UINT32, .001),
|
|
Register(self.id,'ZeroPercentSoCVoltageValue', 0x101C, ModbusClientMixin.DATATYPE.UINT32, .001),
|
|
Register(self.id,'CellChargeOverCurrentProtection', 0x102C, ModbusClientMixin.DATATYPE.UINT32, .001),
|
|
Register(self.id,'CellDischargeOverCurrentProtection', 0x1038, ModbusClientMixin.DATATYPE.UINT32, .001),
|
|
Register(self.id, "CellCount", 0x106C, ModbusClientMixin.DATATYPE.UINT32),
|
|
Register(self.id, "BatChargeEN", 0x1070, ModbusClientMixin.DATATYPE.UINT32),
|
|
Register(self.id, "BatDisChargeEN", 0x1074, ModbusClientMixin.DATATYPE.UINT32),
|
|
Register(self.id, "BatBalanceEN", 0x1078, ModbusClientMixin.DATATYPE.UINT32),
|
|
Register(self.id, "StartBalanceVol", 0x1078, ModbusClientMixin.DATATYPE.UINT32, .001),
|
|
Register(self.id, "DeviceOptions", 0x1114, ModbusClientMixin.DATATYPE.UINT16, length=1),
|
|
Register(self.id, "CellVolAve", 0x1244, ModbusClientMixin.DATATYPE.UINT16, .001),
|
|
Register(self.id, "BatCurrent", 0x1298, ModbusClientMixin.DATATYPE.INT32, .001),
|
|
Register(self.id, "Alarms", 0x12A0, ModbusClientMixin.DATATYPE.UINT32, length=4),
|
|
Register(self.id, "SOCStateOfcharge", 0x12A6, ModbusClientMixin.DATATYPE.UINT16),
|
|
Register(self.id, "SOCCapRemain", 0x12A8, ModbusClientMixin.DATATYPE.INT32, 0.001),
|
|
Register(self.id, "SOCFullChargeCap", 0x12AC, ModbusClientMixin.DATATYPE.UINT32, 0.001),
|
|
Register(self.id, "SOCCycleCount", 0x12B0, ModbusClientMixin.DATATYPE.UINT32),
|
|
Register(self.id, "SOCCycleCap", 0x12B4, ModbusClientMixin.DATATYPE.UINT32, .001),
|
|
Register(self.id, "BatVol", 0x12E4, ModbusClientMixin.DATATYPE.UINT16, .01),
|
|
Register(self.id, "ChargingStatus", 0x12C0, ModbusClientMixin.DATATYPE.UINT16),
|
|
Register(self.id, "ManufacturerDeviceID", 0x1400, ModbusClientMixin.DATATYPE.STRING, None, 8),
|
|
Register(self.id, "HardwareVersion", 0x1410, ModbusClientMixin.DATATYPE.STRING, None, 4),
|
|
Register(self.id, "SoftwareVersion", 0x1418, ModbusClientMixin.DATATYPE.STRING, None, 4),
|
|
Register(self.id, "BatTempMos", 0x128A, ModbusClientMixin.DATATYPE.UINT16, .1),
|
|
Register(self.id, "BatTemp1", 0x129C, ModbusClientMixin.DATATYPE.UINT16, .1),
|
|
Register(self.id, "BatTemp2", 0x129E, ModbusClientMixin.DATATYPE.UINT16, .1),
|
|
Register(self.id, "BatTemp3", 0x12F8, ModbusClientMixin.DATATYPE.UINT16, .1),
|
|
Register(self.id, "BatTemp4", 0x12FA, ModbusClientMixin.DATATYPE.UINT16, .1),
|
|
Register(self.id, "BatTemp5", 0x12FC, ModbusClientMixin.DATATYPE.UINT16, .1)
|
|
]
|
|
self.cellVoltages = None
|
|
self.binary_sensors = ['BatChargeEN', 'BatDisChargeEN', 'BatBalanceEN']
|
|
self.device_options = ['HeaterEN', 'DisableTempSensor', 'GpsHeartBeat', 'PortSwitch', 'LcdAlwaysOn', 'SpecialCharger', 'SmartSleep', 'DisablePclModule', 'TimedStoredData', 'ChargingFloatEN',"UnknownOption",'DryArmIntermittent','DischargeOCP2','DischargeOCP3']
|
|
self.device_alarms = ['AlarmWireRes','MosOtp', 'CellQuantity', 'CurSensorErr', 'CellOVP', 'BatOVP', 'ChargeOVP', 'ChargeSCP', 'ChargeOTP', 'ChargeUTP',
|
|
'CpuAuxCommErr', 'CellUVP', 'BatUVP', 'DischargeOCP', 'DischargeSCP', 'DischargeOTP', 'ChargeMos', 'DischargeMos', 'GpsDisconnected', 'ModifyPasswordTime',
|
|
'DischargeOnFail', 'BatteryOverTemp','TemperatureSensorAbnormality',"PclModuleAbnormality"]
|
|
|
|
def connect(self):
|
|
if self.connection == None:
|
|
self.connection = ModbusClient.ModbusSerialClient(port=self.port, stopbits=1, bytesize=8, parity='N', baudrate=115200, timeout=1)
|
|
if self.connection.connect():
|
|
print("Successfully connected to BMS id %d" % self.id)
|
|
else:
|
|
print("Cannot connect to BMS id %d" % self.id)
|
|
self.connection = None
|
|
|
|
return self.connection
|
|
|
|
def setChargeMode(self, mode):
|
|
register = self.getRegister("BatChargeEN")
|
|
register.setValue(self.connection, mode)
|
|
|
|
def setDischargeMode(self, mode):
|
|
register = self.getRegister("BatDisChargeEN")
|
|
register.setValue(self.connection, mode)
|
|
|
|
def getCellVoltages(self, reload=False):
|
|
if self.cellVoltages == None or reload == True:
|
|
count = self.getRegister('CellCount').value
|
|
values = []
|
|
|
|
recv = self.connection.read_holding_registers(address=0x1200, count=count, slave=self.id)
|
|
if not isinstance(recv, pymodbus.pdu.register_message.ReadHoldingRegistersResponse):
|
|
return None
|
|
|
|
values = self.connection.convert_from_registers(recv.registers, data_type=self.connection.DATATYPE.UINT16)
|
|
|
|
for i in range(0, count):
|
|
r = Register(self.id, f'CellVol{i}', 0x1200+i*2, ModbusClient.ModbusSerialClient.DATATYPE.UINT16, 0.001)
|
|
r.value = round(values[i]*0.001,3)
|
|
values[i] = r.value
|
|
self.registers.append(r)
|
|
|
|
self.cellVoltages = values
|
|
|
|
return self.cellVoltages
|
|
|
|
def dump(self, dic = {}):
|
|
#self.getCellVoltages()
|
|
d = super().dump()
|
|
topic_soc = "bms-%d" % self.id
|
|
d["name"] = self.name
|
|
dic[topic_soc] = d
|
|
self.normalize_values(dic[topic_soc])
|
|
return dic
|
|
|
|
def __str__(self):
|
|
self.getCellVoltages()
|
|
return super().__str__()
|
|
|
|
def formattedOutput(self):
|
|
bms_model = self.getRegister('ManufacturerDeviceID').value
|
|
bms_hw_version = self.getRegister('HardwareVersion').value
|
|
bms_sw_version = self.getRegister('SoftwareVersion').value
|
|
alarms = self.getRegister('Alarms').value
|
|
|
|
# We skip the first byte, as it can be 0, 1 and 2
|
|
# A 100% SOC will give us a value of 100 with first byte set to 0, 356 when set to 1 and 612 when set to 2
|
|
soc = self.getRegister('SOCStateOfcharge').value & 0x0FF
|
|
cycle_count = self.getRegister('SOCCycleCount').value
|
|
pack_voltage = self.getRegister('BatVol').value
|
|
cell_count = self.getRegister('CellCount').value
|
|
cell_avg_voltage = self.getRegister('CellVolAve').value
|
|
battery_current = self.getRegister('BatCurrent').value
|
|
discharge_enabled = self.getRegister('BatDisChargeEN').value
|
|
pack_capacity = self.getRegister('SOCFullChargeCap').value
|
|
pack_remaining = self.getRegister('SOCCapRemain').value
|
|
status = "charging" if battery_current > 0 else "discharging"
|
|
options = self.getRegister("DeviceOptions").value
|
|
|
|
s = f"== {self.name} (id {self.id}) - {bms_model} (v{bms_hw_version}) - sw v{bms_sw_version}) ==\n"
|
|
s += f"Alarms?\t\t\t{alarms}\n"
|
|
s += f"SOC:\t\t\t{soc} ({cycle_count} cycle(s))\n"
|
|
s += f"Voltage:\t\t{pack_voltage:.2f}v\n"
|
|
s += f"Cell Voltages:\t\t{str(self.getCellVoltages())}\n"
|
|
s += f"Cell Average Voltage:\t{cell_avg_voltage:.3f} ({cell_count} cells)\n"
|
|
s += f"Battery Current:\t{battery_current:.3f}A ({status} {pack_voltage*battery_current:.2f}W)\n"
|
|
s += f"Discharge Enabled?\t{discharge_enabled}\n"
|
|
s += f"Remaining Capacity:\t{pack_remaining:.2f}Ah ({pack_capacity}Ah capacity)"
|
|
|
|
return s
|
|
|
|
# def update_alarms(self, alarms_value):
|
|
# alarms_bits = list(bin(options_value)[2:])
|
|
# res = {}
|
|
# for i in range(len(self.device_options)):
|
|
# res[self.device_alarms[i]] = "ON" if alarms_value[i] else "OFF"
|
|
# return res
|
|
|
|
# def update_options(self, options_value):
|
|
# options_bits = list(bin(options_value)[2:])
|
|
# res = {}
|
|
# for i in range(len(self.device_options)):
|
|
# res[self.device_options[i]] = "ON" if options_value[i] == 1 else "OFF"
|
|
# return res
|
|
def parse_bits(self, key_names, bytes_object):
|
|
bits = bin(bytes_object)[2:].zfill(32)
|
|
bit_flags = list(reversed(bits))
|
|
res = {}
|
|
|
|
for pos in range(len(key_names)):
|
|
res.update({key_names[pos]: "ON" if bit_flags[pos] == "1" else "OFF"})
|
|
return res
|
|
|
|
def normalize_values(self, dic):
|
|
for topic in self.binary_sensors:
|
|
dic[topic] = True if dic[topic] == 1 else False
|
|
dic['DeviceOptions'] = self.parse_bits(self.device_options,dic['DeviceOptions'])
|
|
dic["Alarms"] = self.parse_bits(self.device_alarms, dic["Alarms"])
|
|
# dic.update(self.update_options(dic['DeviceOptions']))
|
|
# dic.pop("DeviceOptions")
|
|
|
|
# dic.update(self.update_alarms(dic['Alarms']))
|
|
# dic.pop('Alarms') |