# # Copyright (C) 2025 Extrafu # # This file is part of BerryBMS. # # BerryBMS is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the # Free Software Foundation; either version 3, or (at your option) any # later version. # import pymodbus.client as ModbusClient from pymodbus.client.mixin import ModbusClientMixin import pymodbus.exceptions import json from ModbusDevice import ModbusDevice from Register import Register class JKBMS(ModbusDevice): def __init__(self, name, id, port): super().__init__(id) self.name = name self.port = port self.registers = [ Register(self.id,'CellUnderVoltProtectionValue', 0x1004, ModbusClientMixin.DATATYPE.UINT32, .001), Register(self.id,'CellOverVoltProtectionValue', 0x100C, ModbusClientMixin.DATATYPE.UINT32, .001), Register(self.id,'100PercentSoCVoltageValue', 0x1018, ModbusClientMixin.DATATYPE.UINT32, .001), Register(self.id,'ZeroPercentSoCVoltageValue', 0x101C, ModbusClientMixin.DATATYPE.UINT32, .001), Register(self.id,'CellChargeOverCurrentProtection', 0x102C, ModbusClientMixin.DATATYPE.UINT32, .001), Register(self.id,'CellDischargeOverCurrentProtection', 0x1038, ModbusClientMixin.DATATYPE.UINT32, .001), Register(self.id, "CellCount", 0x106C, ModbusClientMixin.DATATYPE.UINT32), Register(self.id, "BatChargeEN", 0x1070, ModbusClientMixin.DATATYPE.UINT32), Register(self.id, "BatDisChargeEN", 0x1074, ModbusClientMixin.DATATYPE.UINT32), Register(self.id, "BatBalanceEN", 0x1078, ModbusClientMixin.DATATYPE.UINT32), Register(self.id, "StartBalanceVol", 0x1078, ModbusClientMixin.DATATYPE.UINT32, .001), Register(self.id, "DeviceOptions", 0x1114, ModbusClientMixin.DATATYPE.UINT16, length=1), Register(self.id, "CellVolAve", 0x1244, ModbusClientMixin.DATATYPE.UINT16, .001), Register(self.id, "BatCurrent", 0x1298, ModbusClientMixin.DATATYPE.INT32, .001), Register(self.id, "Alarms", 0x12A0, ModbusClientMixin.DATATYPE.UINT32, length=4), Register(self.id, "SOCStateOfcharge", 0x12A6, ModbusClientMixin.DATATYPE.UINT16), Register(self.id, "SOCCapRemain", 0x12A8, ModbusClientMixin.DATATYPE.INT32, 0.001), Register(self.id, "SOCFullChargeCap", 0x12AC, ModbusClientMixin.DATATYPE.UINT32, 0.001), Register(self.id, "SOCCycleCount", 0x12B0, ModbusClientMixin.DATATYPE.UINT32), Register(self.id, "SOCCycleCap", 0x12B4, ModbusClientMixin.DATATYPE.UINT32, .001), Register(self.id, "BatVol", 0x12E4, ModbusClientMixin.DATATYPE.UINT16, .01), Register(self.id, "ChargingStatus", 0x12C0, ModbusClientMixin.DATATYPE.UINT16), Register(self.id, "ManufacturerDeviceID", 0x1400, ModbusClientMixin.DATATYPE.STRING, None, 8), Register(self.id, "HardwareVersion", 0x1410, ModbusClientMixin.DATATYPE.STRING, None, 4), Register(self.id, "SoftwareVersion", 0x1418, ModbusClientMixin.DATATYPE.STRING, None, 4), Register(self.id, "BatTempMos", 0x128A, ModbusClientMixin.DATATYPE.UINT16, .1), Register(self.id, "BatTemp1", 0x129C, ModbusClientMixin.DATATYPE.UINT16, .1), Register(self.id, "BatTemp2", 0x129E, ModbusClientMixin.DATATYPE.UINT16, .1), Register(self.id, "BatTemp3", 0x12F8, ModbusClientMixin.DATATYPE.UINT16, .1), Register(self.id, "BatTemp4", 0x12FA, ModbusClientMixin.DATATYPE.UINT16, .1), Register(self.id, "BatTemp5", 0x12FC, ModbusClientMixin.DATATYPE.UINT16, .1) ] self.cellVoltages = None self.binary_sensors = ['BatChargeEN', 'BatDisChargeEN', 'BatBalanceEN'] self.device_options = ['HeaterEN', 'DisableTempSensor', 'GpsHeartBeat', 'PortSwitch', 'LcdAlwaysOn', 'SpecialCharger', 'SmartSleep', 'DisablePclModule', 'TimedStoredData', 'ChargingFloatEN',"UnknownOption",'DryArmIntermittent','DischargeOCP2','DischargeOCP3'] self.device_alarms = ['AlarmWireRes','MosOtp', 'CellQuantity', 'CurSensorErr', 'CellOVP', 'BatOVP', 'ChargeOVP', 'ChargeSCP', 'ChargeOTP', 'ChargeUTP', 'CpuAuxCommErr', 'CellUVP', 'BatUVP', 'DischargeOCP', 'DischargeSCP', 'DischargeOTP', 'ChargeMos', 'DischargeMos', 'GpsDisconnected', 'ModifyPasswordTime', 'DischargeOnFail', 'BatteryOverTemp','TemperatureSensorAbnormality',"PclModuleAbnormality"] def connect(self): if self.connection == None: self.connection = ModbusClient.ModbusSerialClient(port=self.port, stopbits=1, bytesize=8, parity='N', baudrate=115200, timeout=1) if self.connection.connect(): print("Successfully connected to BMS id %d" % self.id) else: print("Cannot connect to BMS id %d" % self.id) self.connection = None return self.connection def setChargeMode(self, mode): register = self.getRegister("BatChargeEN") register.setValue(self.connection, mode) def setDischargeMode(self, mode): register = self.getRegister("BatDisChargeEN") register.setValue(self.connection, mode) def getCellVoltages(self, reload=False): if self.cellVoltages == None or reload == True: count = self.getRegister('CellCount').value values = [] recv = self.connection.read_holding_registers(address=0x1200, count=count, slave=self.id) if not isinstance(recv, pymodbus.pdu.register_message.ReadHoldingRegistersResponse): return None values = self.connection.convert_from_registers(recv.registers, data_type=self.connection.DATATYPE.UINT16) for i in range(0, count): r = Register(self.id, f'CellVol{i}', 0x1200+i*2, ModbusClient.ModbusSerialClient.DATATYPE.UINT16, 0.001) r.value = round(values[i]*0.001,3) values[i] = r.value self.registers.append(r) self.cellVoltages = values return self.cellVoltages def dump(self, dic = {}): #self.getCellVoltages() d = super().dump() topic_soc = "bms-%d" % self.id d["name"] = self.name dic[topic_soc] = d self.normalize_values(dic[topic_soc]) return dic def __str__(self): self.getCellVoltages() return super().__str__() def formattedOutput(self): bms_model = self.getRegister('ManufacturerDeviceID').value bms_hw_version = self.getRegister('HardwareVersion').value bms_sw_version = self.getRegister('SoftwareVersion').value alarms = self.getRegister('Alarms').value # We skip the first byte, as it can be 0, 1 and 2 # A 100% SOC will give us a value of 100 with first byte set to 0, 356 when set to 1 and 612 when set to 2 soc = self.getRegister('SOCStateOfcharge').value & 0x0FF cycle_count = self.getRegister('SOCCycleCount').value pack_voltage = self.getRegister('BatVol').value cell_count = self.getRegister('CellCount').value cell_avg_voltage = self.getRegister('CellVolAve').value battery_current = self.getRegister('BatCurrent').value discharge_enabled = self.getRegister('BatDisChargeEN').value pack_capacity = self.getRegister('SOCFullChargeCap').value pack_remaining = self.getRegister('SOCCapRemain').value status = "charging" if battery_current > 0 else "discharging" options = self.getRegister("DeviceOptions").value s = f"== {self.name} (id {self.id}) - {bms_model} (v{bms_hw_version}) - sw v{bms_sw_version}) ==\n" s += f"Alarms?\t\t\t{alarms}\n" s += f"SOC:\t\t\t{soc} ({cycle_count} cycle(s))\n" s += f"Voltage:\t\t{pack_voltage:.2f}v\n" s += f"Cell Voltages:\t\t{str(self.getCellVoltages())}\n" s += f"Cell Average Voltage:\t{cell_avg_voltage:.3f} ({cell_count} cells)\n" s += f"Battery Current:\t{battery_current:.3f}A ({status} {pack_voltage*battery_current:.2f}W)\n" s += f"Discharge Enabled?\t{discharge_enabled}\n" s += f"Remaining Capacity:\t{pack_remaining:.2f}Ah ({pack_capacity}Ah capacity)" return s # def update_alarms(self, alarms_value): # alarms_bits = list(bin(options_value)[2:]) # res = {} # for i in range(len(self.device_options)): # res[self.device_alarms[i]] = "ON" if alarms_value[i] else "OFF" # return res # def update_options(self, options_value): # options_bits = list(bin(options_value)[2:]) # res = {} # for i in range(len(self.device_options)): # res[self.device_options[i]] = "ON" if options_value[i] == 1 else "OFF" # return res def parse_bits(self, key_names, bytes_object): bits = bin(bytes_object)[2:].zfill(32) bit_flags = list(reversed(bits)) res = {} for pos in range(len(key_names)): res.update({key_names[pos]: "ON" if bit_flags[pos] == "1" else "OFF"}) return res def normalize_values(self, dic): for topic in self.binary_sensors: dic[topic] = True if dic[topic] == 1 else False dic['DeviceOptions'] = self.parse_bits(self.device_options,dic['DeviceOptions']) dic["Alarms"] = self.parse_bits(self.device_alarms, dic["Alarms"]) # dic.update(self.update_options(dic['DeviceOptions'])) # dic.pop("DeviceOptions") # dic.update(self.update_alarms(dic['Alarms'])) # dic.pop('Alarms')