#环境准备#
按官方的步骤准备就可以,我是在Windows下使用VS Code开发的,安装了WSL(使用模板创建组件需要在WSL环境下完成)
官方链接:https://developers.home-assistant.io/docs/development_environment
环境准备好后,在项目根目录下使用命令:
python3 -m script.scaffold inetgration
创建组件后会自动生成几个文件:__init__.py,const.py,config_flow.py和mainfest.json、string.json。
研究中盛官方给的工具软件测试,可以确定是可以自动发现设备的,所以组件整体思路就是,选择组件后,就自动发现网络上的设备,然后自动生成对应IO数量的灯和开关实体,不需要通过界面让用户输入其他数据,很简单的一个逻辑吧,但是实现硬是搞了我两个月(期间有一个月被搞得头大,没心思搞,玩了一个月吃鸡!),主要原因是不知道组件的执行流程,从哪个函数方法调用,配置的数据存储在哪里,怎么保存自己需要的数据,后面才发现,根本不需要考虑这些,直接干就完了。下面进入正题:
mainfest.json、string.json这两个Json文件暂时不需要去碰,因为不需要去做界面(估计做窗帘组件的时候需要去弄)。看__init__.py和config_flow.py两个文件:
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import PLATFORMS
# TODO Create ConfigEntry type alias with API object
# TODO Rename type alias and update all entry annotations
# type NovoConfigEntry = ConfigEntry[0xF843] # noqa: F821
# TODO Update entry annotation
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Novo from a config entry."""
# TODO 1. Create API instance
# TODO 2. Validate the API connection (and authentication)
# TODO 3. Store an API object for your platforms to access
# entry.runtime_data = MyAPI(...)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
# TODO Update entry annotation
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
以上是自动生成的__init__.py的文件内容,查看文档,说的是加载组件的时候,就会调用async_setup_entry方法,如果创建组件的时候选择了自动发现设备,就会调用config_flow.py文件中的:async def _async_has_devices(hass: HomeAssistant) -> bool:,那发现设备的代码放在这两个地方都是可以的。
PS:后续测试发现,代码调用是有顺序的,顺序是:
Config_flow.py->__init__.py->PLATFORMS中顺序调用async_setup_entry
决定把发现代码放在__init__.py中。设备发现代码是通过Wireshark抓网络包分析自动发现设备过程,使用UDP协议,向网络中的5002端口发送约定的字符串:
{"cmd":"search","vendor":"www.coltsmart.com","product":"MJ-ExSx","keyword":"FF010102"}
设备就会回复自己的信息,通过回复的信息中的IP地址和端口(返回的信息是json数据,直接使用json进行分析就可以了),我们就可以建立通讯了。完整代码:
# 扫描发现设备
def discover(ip_address=None) -> dict[int, ZhongshengConfig]:
"""扫描发现设备."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.settimeout(5)
found_devices: dict[int, ZhongshengConfig] = {}
if ip_address is None:
addrs = enum_all_broadcast()
else:
addrs = [ip_address]
for addr in addrs:
try:
sock.sendto(BROADCAST_MSG, (addr, BROADCAST_PORT))
except Exception: # noqa: BLE001
_LOGGER.warning(f"Can't access network {addr}") # noqa: G004
while True:
try:
data, addr = sock.recvfrom(512)
_LOGGER.debug(f"Received response from {addr}: {data.hex(' ')}") # noqa: G004
json_dic = json.loads(data)
if json_dic["id"] != "":
# 查询返回
dev: ZhongshengConfig
d_id: int = int(json_dic["id"])
if d_id in found_devices:
dev = found_devices[d_id]
else:
dev = ZhongshengConfig()
dev.load_data(data)
found_devices[d_id] = dev
except TimeoutError:
break
except OSError as e:
_LOGGER.error(f"Socket error: {e!r}") # noqa: G004
return found_devices
因为设备会返回好几组数据,所以这里需要对数据进行合并,即这段代码:
# 查询返回
dev: ZhongshengConfig
d_id: int = int(json_dic["id"])
# dev = found_devices.get(key= d_id, default= ZhongshengConfig())
if d_id in found_devices:
dev = found_devices[d_id]
else:
dev = ZhongshengConfig()
dev.load_data(data)
found_devices[d_id] = dev
这里代码只是获取到设备的配置信息(ZhongshengConfig),并不是设备。设备是需要完成与硬件通讯,发送命令控制灯开关动作。所以需要构建设备类,实现与硬件通讯,设备类代码如下:
"""Description:中盛多路IO控制系统设备类,负责与设备进行通讯,完成设备I状态的读取和O端口输出
version: 1.0.0.0
Author: Cubar
Date: 2024-09-14 12:50:44
LastEditors: hht
LastEditTime: 2024-09-14 12:51:39.
""" # noqa: D205
from datetime import datetime
import logging
import socket
from homeassistant.helpers.device_registry import DeviceInfo
from ..const import DOMAIN
from .device import ZhongshengConfig
_LOGGER = logging.getLogger(__name__)
LIGHT_COUNT_MAX = 64
DATA_RECIVE_MAX = 1024
QUERY_SWITCH_STATUS = bytearray(
[0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x04, 0x00, 0x00, 0x00, 0x10]
)
QUERY_LIGHT_STATUS = bytearray(
[0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x03, 0x00, 0x00, 0x00, 0x10]
)
CONTROL_LIGHT_CMD = bytearray(
[0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x06, 0x00, 0x09, 0x00, 0x00]
)
CONTROL_LIGHT_CMD_IDX_ID = 9
CONTROL_LIGHT_CMD_IDX_VALUE = 11
DEVICE_MANUFACTRUE = "Zhongsheng Tech"
class ZhongshengDevice:
"""中盛设备类."""
_config: ZhongshengConfig
_lightcount: int
_switchcount: int
_last_switch_query_time: datetime
_last_light_query_time: datetime
"""中盛设备类"""
def __init__(self, config: ZhongshengConfig) -> None:
"""初始化."""
assert config
self._lightcount = LIGHT_COUNT_MAX
self._lightstatus = {}
self._switchstatus = {}
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._ip = config.ip
self._port = config.port
self._config = config
self._mac = config._device_mac # noqa: SLF001
# 确保第一次发送命令不受限制
self._last_switch_query_time = datetime.min
self._last_light_query_time = datetime.min
self._switchcount = 0
self._lightcount = 0
for i in range(LIGHT_COUNT_MAX):
self._lightstatus[i] = False
self._switchstatus[i] = False
try:
self._socket.connect((self._ip, self._port))
self.query_light_status()
self.query_switch_status()
except Exception as ex: # noqa: BLE001
_LOGGER.error(f"Exception: {ex}") # noqa: G004
self._socket.close()
@property
def name(self):
"""获取设备名称."""
return "ZhongshengDevice"
@property
def mac(self) -> str:
"""获取设备MAC地址."""
return self._mac
@property
def getLightCount(self):
"""获取灯光数量."""
if self._lightcount == LIGHT_COUNT_MAX:
self.query_light_status()
return self._lightcount
@property
def getSwitchCount(self):
"""获取开关数量."""
if self._switchcount == LIGHT_COUNT_MAX:
self.query_switch_status()
return self._switchcount
@property
def device_info(self) -> DeviceInfo:
"""Return the device info."""
return DeviceInfo(
identifiers={
# Serial numbers are unique identifiers within a specific domain
(DOMAIN, self._config._device_id) # noqa: SLF001
},
name=self.name,
manufacturer=DEVICE_MANUFACTRUE,
model=DOMAIN,
sw_version=self._config._soft_version, # noqa: SLF001
via_device=(DOMAIN, self._config._device_id), # noqa: SLF001
)
def setLightCount(self, count):
"""设置灯光数量."""
if count > 0 and count < LIGHT_COUNT_MAX:
self._lightcount = count
def create_light_cmd(self, light_id: int = -1, turnon: bool = False) -> bytearray:
"""构建灯光控制命令."""
res: bytearray = CONTROL_LIGHT_CMD.copy()
res[CONTROL_LIGHT_CMD_IDX_ID] = light_id
res[CONTROL_LIGHT_CMD_IDX_VALUE] = 0x01 if turnon else 0x00
return res
def send_ctrl_cmd(self, light_id: int = -1, turnon: bool = False) -> bool:
"""发送控制指令."""
###
# 关、开9号灯,即id=8
# 10:31:48.422→发 00 01 00 00 00 06 01 06 00 08 00 00 (关灯)
# 10:31:48.479←收 00 01 00 00 00 06 01 06 00 08 00 00
# 10:32:08.088→发 00 01 00 00 00 06 01 06 00 08 00 01 (开灯)
# 10:32:08.128←收 00 01 00 00 00 06 01 06 00 08 00 01
###
if light_id is None:
return False
# _LOGGER.error(f'contrl light id={id} turnon:{turnon}')
# 构建控制命令
cmd = self.create_light_cmd(light_id, turnon)
if self._socket.send(cmd) == len(cmd):
# 发送成功
recv = self._socket.recv(DATA_RECIVE_MAX)
# 读取状态信息
if len(recv) > 10:
if turnon:
if recv[CONTROL_LIGHT_CMD_IDX_VALUE] > 0x00:
return True
elif recv[CONTROL_LIGHT_CMD_IDX_VALUE] == 0x00:
return True
return False
def query_light_status(self):
"""查询灯状态."""
# 比较最近一次发送查询指令时间,如果小于500ms,则不发送
timed = datetime.now() - self._last_light_query_time
if timed.seconds < 1 and timed.microseconds < 500000:
return
self._last_light_query_time = datetime.now()
slen = self._socket.send(QUERY_LIGHT_STATUS)
if slen == len(QUERY_LIGHT_STATUS):
# 发送成功
recv = self._socket.recv(DATA_RECIVE_MAX)
# 00 01 00 00 00 23 01 03 20
# 00 00 00 00 00 00 00 00 00 00
# 00 00 00 00 00 00 00 00 00 00
# 00 00 00 00 00 00 00 00 00 00
# 00 00
# 读取状态信息
if len(recv) > 10:
cnt = -1
for i in range(LIGHT_COUNT_MAX):
if (i * 2 + 10) < len(recv):
if recv[i * 2 + 10] > 0:
self._lightstatus[i] = True
else:
self._lightstatus[i] = False
else:
break
cnt = i
self._lightcount = cnt + 1
else:
_LOGGER.error(f'query light status recv :[{len(recv)}] {recv.hex(" ")}') # noqa: G004
else:
_LOGGER.error(
f"light status query, send recv [{slen}/{len(QUERY_LIGHT_STATUS)}]" # noqa: G004
)
def query_switch_status(self):
"""发送查询状态指令."""
# 比较最近一次发送查询指令时间,如果小于500ms,则不发送
timed = datetime.now() - self._last_switch_query_time
if timed.seconds < 1 and timed.microseconds < 500000:
return
self._last_switch_query_time = datetime.now()
slen = self._socket.send(QUERY_SWITCH_STATUS)
if slen == len(QUERY_SWITCH_STATUS):
# 发送成功
recv = self._socket.recv(DATA_RECIVE_MAX)
# [30011]recv [41]:
# 00 01 00 00 00 23 01 04 20
# 00 00 00 00 00 00 00 00 00 00
# 00 00 00 00 00 00 00 00 00 00
# 00 00 00 00 00 00 00 00 00 00
# 00 00
# 读取状态信息
if len(recv) > 10:
cnt = -1
for i in range(LIGHT_COUNT_MAX):
if (i * 2 + 10) < len(recv):
if recv[i * 2 + 10] > 0:
self._switchstatus[i] = True
else:
self._switchstatus[i] = False
else:
break
# _LOGGER.error(f'{i * 2 + 8}, {i} , [{self._switchstatus[i]}]')
cnt = i
self._switchcount = cnt + 1
else:
_LOGGER.error(f'query switch status recv:[{len(recv)}] {recv.hex(" ")}') # noqa: G004
else:
_LOGGER.error(
f"switch status query, recv [{slen}/{len(QUERY_SWITCH_STATUS)}]" # noqa: G004
)
def turn_on(self, light_id: int) -> bool:
"""打开指定ID的灯光."""
if light_id >= 0 and light_id < self._lightcount:
self._lightstatus[light_id] = self.send_ctrl_cmd(light_id, True)
return self._lightstatus[light_id]
return False
def turn_off(self, light_id: int) -> bool:
"""关闭指定ID的灯光."""
if light_id >= 0 and light_id < self._lightcount:
self._lightstatus[light_id] = self.send_ctrl_cmd(light_id, False)
return self._lightstatus[light_id]
return False
def get_light_status(self, light_id: int) -> bool:
"""获取指定ID的灯光状态."""
if light_id >= 0 and light_id < self._lightcount:
return self._lightstatus[light_id]
return False
def get_switch_status(self, switch_id: int) -> bool:
"""获取指定ID的开关状态."""
if switch_id >= 0 and switch_id < self._switchcount:
return self._switchstatus[switch_id]
return False
设备实例化后,需要注册到HA中,注册代码为:
for idx, dev in devices.items():
deviceentry = device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
configuration_url=f"http://{dev.ip}:{dev.port}/",
connections={("mac", mac), ("ip", ip), ("port", port)},
manufacturer="Zhongsheng Tech",
model=DOMAIN,
name=DEVICE_NAME,
sw_version=dev._soft_version, # noqa: SLF001
translation_key="Zhongsheng Switch",
)
这里使用for循环是兼容多设备的情况。
设备搞定了,后面就是把灯和开关的实体类实现。在开始生成代码时,有这么一段代码:
# TODO List the platforms that you want to support.
# For your initial PR, limit it to 1 platform.
PLATFORMS: list[Platform] = [Platform.LIGHT]
这里就是灯、开关之类的,本设备支持的实体。(很奇怪这样的命名,Platform直译过来是平台???不知道是我英语不好还是百度翻译不靠谱,我无法理解这里用平台来把灯、开关之类的归类),因为我要实现灯和开关,所以我的PLATFORMS是这么定义的:
# List the platforms that you want to support.
# For your initial PR, limit it to 1 platform.
PLATFORMS: list[Platform] = [
Platform.LIGHT,
# Platform.SWITCH,
Platform.SENSOR,
]
好吧,我这里其实是使用sensor,并没有使用Switch,因为Switch在HA里是可以操作的,但是实际场景中,Switch是实体开关,HA里面操作的话,并不反映到实体开关上来,即:在我的应用场景中,HA里面的开关只是反映实体开关的状态,只读的。所以最终版本就是使用了Sensor来做。(只是现在想来,这个Sensor也没多大意义)
这里有个约定,Platforms里的实体必须存在对应的python文件,即:在组件根目录里要存在light.py和sensor.py。通过下面的代码把这些实体“注册”到HA中(我觉得使用注册这个词更加好理解,函数翻译应该是发送):
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
这段代码放在注册设备后就可以了,这里就完成了设备的注册和把配置发送至实体,至此,HA就知道你的组件里有这些设备及设备支持的实体了。
后续完成light.py实体代码(觉得Sensor没必要,就不在这放了):
"""中盛IO控制器中的灯光类."""
from typing import Any # noqa: D100
from homeassistant.components.light import LightEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DATA_DISCOVER_COMPONENT, ZHONGSHENG_DISCOVERY_ENTITY_NEW
from .core.ZhongshengDevice import ZhongshengDevice
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up Zhongshen switches based on a config entry."""
@callback
def async_discover(
device: ZhongshengDevice, count: int, config_entry: ConfigEntry
) -> None:
"""Discover and add a Zhongsheng light."""
for i in range(count):
async_add_entities([ZhongshengLightEntity(device, i, config_entry)])
hass.data[DATA_DISCOVER_COMPONENT.format("light")] = async_dispatcher_connect(
hass,
ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("light"),
async_discover,
)
class ZhongshengLightEntity(LightEntity):
"""中盛-灯实体类."""
def __init__(
self,
device: ZhongshengDevice,
id: int = 0,
entry: ConfigEntry | None = None,
name: str = "",
) -> None:
"""初始化灯实体."""
self._attr_unique_id = f"{device.mac}_light_{id}"
self._id = id
self._name = name
self._entry = entry
self._device = device
if self._name is None:
self._name = f"{device.name}_light"
self._attr_name = self._name
self._attr_is_on = self._device.get_light_status(id)
super().__init__()
@property
def light_name(self) -> str:
"""获取灯名称."""
return self._name
def turn_on(self, **kwargs: Any) -> None:
"""打开灯光."""
self._attr_is_on = bool(self._device.turn_on(self._id))
async def async_turn_on(self, **kwargs: Any) -> None:
"""异步打开灯光."""
self._attr_is_on = bool(self._device.turn_on(self._id))
def turn_off(self, **kwargs: Any) -> None:
"""关闭灯光."""
self._attr_is_on = not bool(self._device.turn_off(self._id))
async def async_turn_off(self, **kwargs: Any) -> None:
"""异步关闭灯光."""
self._attr_is_on = not bool(self._device.turn_off(self._id))
def get_id(self) -> int:
"""获取灯ID."""
return self._id
async def async_update(self) -> None:
"""异步更新灯状态数据."""
self._device.query_light_status()
self._attr_is_on = self._device.get_light_status(self._id)
灯的实体类比较好理解,也就是继承LightEntity,实现turn_on和turn_off,也包括异步的方法。具体就是通过设备去发送控制命令。(最开始我想的是通过一个单例全局变量,把设备传到灯实体对象中,然后单例又需要配置信息,这里转来转去,然后我就去吃鸡了),看前面的代码:
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up Zhongshen switches based on a config entry."""
@callback
def async_discover(
device: ZhongshengDevice, count: int, config_entry: ConfigEntry
) -> None:
"""Discover and add a Zhongsheng light."""
for i in range(count):
async_add_entities([ZhongshengLightEntity(device, i, config_entry)])
hass.data[DATA_DISCOVER_COMPONENT.format("light")] = async_dispatcher_connect(
hass,
ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("light"),
async_discover,
)
async_setup_entry方法是HA系统规则调用的方法,不用去多想。应该是前面执行这段代码的时候,自动调用了。
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
async_update这个方法是给HA周期调用的,每个实体都会调用到,所以在设备对应的方法里我做了调用频率限制。测试调用频率大概是30秒调用一次(实测时,调用周期到了1分钟,分析是16次调用时间太快,500ms的限制导致在一次调用周期内满足不了,得在第二个周期内才能触发,可以优化成:把灯id传进去,只处理id=0的请求,这样就能保证每个周期都能执行。但是这个还是不理想,最终是要优化成0.5秒执行一次,查询灯状态,这样在开关动作时,及时把状态反馈到HA中,但是这个就是另外的内容了)。
这个@callback比较好玩,里面的参数全是自己定义的,意思我要实例化实体的时候,需要用到的参数,都可以在这里提出来。我这里实例化的时候,需要用到设备、灯数量和配置信息(这个最终没用上)。使用async_add_entities方法就能把实体添加到HA中了。
async_dispatcher_connect这个方法比较关键了,类似Windows里的钩子、消息,或者ROS里的话题:与hass约定一个字符串消息,触发callback。这里实体就告一段落了。
设备对象哪里有,在__init__.py里,在那里注册了设备对象,所以在那里添加这个消息的代码:
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
for idx, dev in devices.items(): # noqa: B007
zs_dev = ZhongshengDevice(dev)
dcount = zs_dev.getLightCount
scount = zs_dev.getSwitchCount
async_dispatcher_send(
hass,
ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("light"),
zs_dev,
dcount,
entry,
)
# async_dispatcher_send(
# hass,
# ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("switch"),
# zs_dev,
# scount,
# entry,
# )
async_dispatcher_send(
hass,
ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("sensor"),
zs_dev,
scount,
entry,
)
通过async_dispatcher_send方法把设备对象,数量和配置传回去。整个就完成了。最后附上结果:
设备页面
实体页面
可以把这些灯放到面板里: