#要说的话#
前面把中盛科技的控制器组件写完了。稍稍熟悉了一些HA,现在准备写窗帘控制组件,构想的东西会比较多,估计有些难度,过程会比较长,边写边记录吧!
#设备和场景环境#
使用的是Novo的电机,淘宝链接在【前言】文章中有,轨道应该是佳丽斯的,买电机的时候一起配的。电机提供的是RS485接口,所以需要增加一个RS485的服务器,我选用的是“亿佰特”的网口转RS485的模块,型号是NA111-A(使用220V电源供电,就避免了再加一个模块)统一放置在机柜那里,把原来的网络拿两路出来(蓝色和棕色组)作为485的线路,保留100M的网线功能。远端使用的是多功能的面板,带2+3的电源插口和8+4的网络口,网络口刚好可以分成百兆网和485接口。
#组件思路#
流程:选择Novo组件-->选择485设备类型-->发现485设备-->(可选:配置485设备)
-->通过485连接Novo设备-->配置Novo设备地址(需要按SET按钮)-->设置电机转向
-->设置开合范围-->完成配置
思路:
- 1、选择组件后,提供界面选择使用的485设备类型,使用下拉框给用户选择;
- 2、依据选择的485设备类型触发对应的自发现流程,并列出发现的485设备;
- 3、用户点击选择485设备,组件建立与485设备的链接,并显示通讯正常;
- 4、组件通过485设备向Novo电机发送查询命令,有返回则说明链路建立成功;
- 5、由于是并联了多个Novo电机,所以会返回多个查询回复,所以只能轮循查询,需要提供界面配置的Novo电机地址和通道;
- 6、点击485设备条目进行配置,配置界面自动生成Novo电机地址和通道,点击提交,然后到Novo电机上按下相应配置按钮,完成Novo电机的地址配置(电机正反转两次),该条目移至已经配置好的列表;
- 7、在已经配置好地址的列表里点击Novo电机,进入到电机的配置:可以配置电机转向、窗帘开合范围,检查开、合是否正确,都正确无误后完成配置(后续版本再做)。
-
#代码实现#
- 通过命令:
-
python3 -m script.scaffold integration
初始化组件代码。添加discover方法:
-
def ebyte_discover(ip_address: str | None = None) -> dict[str, EbyteConfig]: """亿佰特-发现设备方法.""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.bind(("", BROADCAST_SRC_PORT)) sock.settimeout(2) found_devices: dict[str, EbyteConfig] = {} if ip_address is None: addrs = enum_all_broadcast() else: addrs = [ip_address] for addr in addrs: try: sock.sendto(BROADCAST_MSG, (addr, BROADCAST_PORT)) except Exception: # noqa: BLE001 _LOGGER.warning(f"Can't access network {addr}") # noqa: G004 while True: try: data, addr = sock.recvfrom(512) # 返回:fd 06 54 14 a7 dc 74 16 03 01 # 返回:fd 06 54 14 a7 dc 72 83 03 01 # 分析:fd 06 为帧头 # 54 14 a7 dc 74 16 为MAC地址 # 03 01 为帧尾 mac = ebyte_get_mac(data) if len(mac[0]) > 6: found_devices[mac[0]] = EbyteConfig( mac[0], "Ebyte_RS485", mac[0], addr[0], 0 ) found_devices[mac[0]].set_mac_bytes(mac[1]) except TimeoutError: break except OSError as e: _LOGGER.error(f"Socket error: {e!r}") # noqa: G004 if len(found_devices) > 0: # 查询相关信息 for conf in found_devices.values(): bts: bytearray = QUERY_CMD_FORMAT.copy() for i in range(len(conf.mac_bytes)): bts[i + QUERY_CMD_FORMAT_MAC_IDX] = conf.mac_bytes[i] # 获取名称、版本号和序列号 bts[QUERY_CMD_FORMAT_CMD_IDX] = 0x05 sock.sendto(bts, (conf.base_ip, BROADCAST_PORT)) res = sock.recv(1024) mac = ebyte_get_mac(res)[0] if mac in found_devices: found_devices[mac].load_data(res) # 获取网络配置 bts[QUERY_CMD_FORMAT_CMD_IDX] = 0x00 sock.sendto(bts, (conf.base_ip, BROADCAST_PORT)) res = sock.recv(1024) mac = ebyte_get_mac(res)[0] if mac in found_devices: found_devices[mac].load_data(res) return found_devices
这里绑定UDP的源端口,是转为亿佰特的设备返回的数据是另外一个固定端口。不绑定源端口的话,返回数据收不到。
-
修改config_flow.py中的函数,显示485设备列表:
-
async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle the initial step.""" errors: dict[str, str] = {} ent_data: dict[str, Any] = {} if user_input is not None: try: # 获取用户选择的设备ID conf: EbyteConfig = self.get_ebyteconfig_by_id( user_input[CONF_USER_INPURT_ID] ) ent_data[CONF_HOST] = conf.base_ip ent_data[CONF_PORT] = conf.base_port ent_data[CONF_MAC] = conf.base_mac except Exception: # noqa: BLE001 _LOGGER.exception("Unexpected exception") errors["base"] = "unknown" dp: dict[str, str] = {} dp["MAC"] = conf.mac_bytes.hex(":") dp["IP"] = conf.base_ip dp["Port"] = conf.base_port ret: ConfigFlowResult = self.async_create_entry( # 生成集成条目 title=f"Ebyte Device[{conf.base_ip}]", description="This is a Ebyte RS485 communication device!", description_placeholders=dp, options=dp, data=ent_data, ) self._ebytelink = static_ebyte_manager.get_link_by_mac( conf.base_mac, conf.base_ip, conf.base_port ) # # 创建对应的RS485通讯链路设备 # 错误:在此处不能创建设备,只能在__init__.py里创建 else: # noqa: RET505 self._confs = ebyte_discover() options: dict[str, str] = {} if len(self._confs) < 1: _LOGGER.error("No ebyte communications!") else: for conf in self._confs.values(): options[conf.base_mac] = ( f"{conf.base_name}[{conf.base_ip}:{conf.base_port}]" ) ops = [] # 从系统中获取已经配置了的条目 clist: list[ConfigEntry] = self.hass.config_entries.async_entries(DOMAIN) cd: dict[str, str] = {} for cf in clist: cd[cf.data[CONF_MAC]] = cf.data[CONF_HOST] for k, v in options.items(): if k not in cd: ops.append(SelectOptionDict(value=k, label=v)) if len(ops) > 0: ebyteschema = vol.Schema( { vol.Required(CONF_USER_INPURT_ID): SelectSelector( SelectSelectorConfig( options=ops, mode=SelectSelectorMode.DROPDOWN, ) ), } ) else: ebyteschema = vol.Schema({"ERROR:": "No Ebyte RS485 device founded!"}) return self.async_show_form( step_id="user", data_schema=ebyteschema, errors=errors )
这里使用SelectSelector提供选择项。
-
生成的效果如下:
要显示集成条目右边的配置按钮,需要在Flow类中添加指定的方法:
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow:
"""增加本函数后,会在集成条目位置增加“配置”按钮."""
return NovoOptionsFlow(config_entry)
完成RS485设备(集成条目)的添加,后续就是在点击“配置”的时候,弹出界面给Novo电机写地址和通道号,代码放在NovoOptionsFlow中,如下:
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Manage the options."""
errors: dict[str, str] = {}
if user_input is not None:
# 有输入信息
if not user_input[CONFIG_FLOW_TYPE]:
# 未知的步骤
_LOGGER.error("Unkown flow type!")
elif user_input[CONFIG_FLOW_TYPE] == OPTIONS_FLOW_TYPE_SET_ADDRESS:
# 当前为输入电机地址步骤,需要配置电机
idb = bytes.fromhex(user_input[CONFIG_ID])
did: int = int.from_bytes(idb, "big")
channel: int = user_input["channel"]
edata: dict[str, Any] = {}
res = await self._device.async_set_motor_addr(did, channel)
if res:
# 写入成功,则保存相关信息
edata[CONF_MAC] = self._entry.data[CONF_MAC]
edata[CONF_HOST] = self._entry.data[CONF_HOST]
edata[CONF_PORT] = self._entry.data[CONF_PORT]
edata[CONFIG_ID] = did
edata[CONFIG_CHANNEL] = channel
self._save_device_config(edata)
return self.async_create_entry(title="", data=edata)
# 自动生成电机ID
ida = np.random.random_integers(161, 254, size=(2))
idab: bytearray = bytearray(2)
idab[0] = ida[0]
idab[1] = ida[1]
ebyteschema = vol.Schema(
{
vol.Required(
CONFIG_FLOW_TYPE, default=OPTIONS_FLOW_TYPE_SET_ADDRESS
): vol.In(ADD_WAY),
vol.Required(
schema=CONFIG_ID,
description="Enetry RS485 address.",
default=idab.hex(" "),
): str,
vol.Required(
schema=CONFIG_CHANNEL, description="Entery Novo channel.", default=4
): int,
}
)
return self.async_show_form(
step_id="init", data_schema=ebyteschema, errors=errors
)
显示效果:
在这里做了选择器,最初是想把设置地址和配置旋转方向一起的,所以留在这里了。后续再完善。
这里配置完成后,我不知道应该怎么保存配置并使用,看了美的的组件,就直接用他的代码了,就是保存json文件到本地,使用的时候读取就是了。对应的代码:
def _save_device_config(self, data: dict):
os.makedirs(
self.hass.config.path(f"{STORAGE_PATH}/{data[CONF_MAC]}"), exist_ok=True
)
record_file = self.hass.config.path(
f"{STORAGE_PATH}/{data[CONF_MAC]}/{data[CONFIG_ID]}.json"
)
save_json(record_file, data)
添加并配置Novo电机就完成了,后而就是怎么加载这些实体,加载的工作必须放在__init__.py文件中,不能放在其他地方,回到__init__.py文件,修改对应的代码:
# Update entry annotation
async def async_setup_entry(hass: HomeAssistant, entry: NovoConfigEntry) -> bool:
"""Set up Novo from a config entry."""
# 说明:此方法,每个集成条目都会调用一次
# 1. Create API instance
# 2. Validate the API connection (and authentication)
# 3. Store an API object for your platforms to access
ip: str = entry.data[CONF_HOST]
port: int = entry.data[CONF_PORT]
mac: str = entry.data[CONF_MAC]
link: EbyteRS485Link = static_ebyte_manager.get_link_by_mac(mac, ip, port)
if link is None:
_LOGGER.error(f"Device[{mac},{ip}:{port}] lost!") # noqa: G004
# 注册通讯设备:当前是亿佰特的RS485设备
device_registry = dr.async_get(hass)
if link.is_connected:
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
configuration_url=f"http://{link.tcp_ip}:{link.tcp_port}/",
identifiers={("mac", mac)},
connections={("mac", mac), ("ip", link.tcp_ip), ("port", link.tcp_port)},
manufacturer="Ebyte Tech",
model=link.DOMAIN,
name=link.base_name,
serial_number=mac,
sw_version=link.version,
translation_key="Ebyte communication",
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
novo: NovoDevice = static_novo_device_manager.get_novo_device(mac, link)
# conf: NovoConfig
d_id = 0
c_id = 0
# 从保存的配置文件中获取实体
jsons = _load_device_config(hass, mac)
if len(jsons) > 0:
for js in jsons:
# _LOGGER.error(f'get json:{js}')
d_id = js[CONFIG_ID]
c_id = js[CONFIG_CHANNEL]
c_mac = js[CONF_MAC]
if c_mac == mac:
# 发送消息给实体类,创建实体
async_dispatcher_send(
hass,
DATA_DISCOVER_COMPONENT.format(NovoCoverEntry.DOMAIN),
novo,
entry,
d_id,
c_id,
)
return True
因为考虑到需要支持多RS485设备,不同的电机是挂在不同的RS485设备上的,加载的时候就需要一一对应上,在这里的思路就是使用RS485设备的MAC地址作为文件夹区分,并通过文件遍历的方式获取到配置的json文件(每个json文件对应一台电机),遍历的代码:
def _load_device_config(hass: HomeAssistant, device_id):
# 列表出文件夹下所有的文件
files = []
jsons = []
pypath = hass.config.path(f"{STORAGE_PATH}/{device_id}/")
if not os.path.isdir(pypath):
_LOGGER.error(f"file path :{pypath} not exists!") # noqa: G004
return jsons
for filename in os.listdir(pypath):
filepath = os.path.join(pypath, filename)
if os.path.isfile(filepath):
files.append(filename)
record_file = hass.config.path(f"{STORAGE_PATH}/{device_id}/{filename}")
jsons.append(load_json(record_file, default={}))
return jsons
在__init__.py中怎么把配置信息传到实体类,在我的上一篇文章中已经说了,不清楚的可以看看。这里使用了if c_mac == mac:进行限制是否生成实体,是因为在多个RS485(集成条目)的情况下,相应的代码就是执行多次,有冲突后,就不能正确给实体配置上对应的链路。
最后就是实体类Cover了,完整代码如下:
"""窗帘实体类."""
import logging
from typing import Any
from homeassistant.components.cover import (
ATTR_POSITION,
CoverEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DATA_DISCOVER_COMPONENT
from .core.NovoDevice import NovoDevice
_LOGGER = logging.getLogger("novocover")
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""配置窗帘实体."""
@callback
def async_discover(
device: NovoDevice, entry: ConfigEntry, id: int, ch: int
) -> None:
"""发现电机回调及添加方法."""
nentry = NovoCoverEntry(device, entry, id, ch)
async_add_entities([nentry])
hass.data[DATA_DISCOVER_COMPONENT.format(NovoCoverEntry.DOMAIN)] = (
async_dispatcher_connect(
hass,
DATA_DISCOVER_COMPONENT.format(NovoCoverEntry.DOMAIN),
async_discover,
)
)
class NovoCoverEntry(CoverEntity):
"""Novo电机窗帘实体类."""
DOMAIN = "cover"
_attr_has_entity_name = True
_id: int
_ch: int
_entry: ConfigEntry
_device: NovoDevice
_pos: int
_closed: bool
def __init__(
self, device: NovoDevice, entry: ConfigEntry, id: int, ch: int
) -> None:
"""初始化Novo电机窗帘实体."""
super().__init__()
self._id = id
self._entry = entry
self._unique_id = f"{self.DOMAIN}.novo_curtain_{id}"
self.entity_id = self._unique_id
self._closed = True
self._ch = ch
self._device = device
self._pos = 0
self._attr_unique_id = f"{self.DOMAIN}.novo_curtain_{id}"
self._attr_name = f"NovoCurtain_{id}"
@property
def unique_id(self) -> str | None:
"""设备标识符."""
return self._attr_unique_id
@property
def is_closed(self) -> bool:
"""窗帘是否已经关闭."""
return self._closed
def open_cover(self, **kwargs: Any) -> None:
"""打开窗帘."""
self._device.async_open_cover_by_id(self._id, self._ch, 0)
async def async_open_cover(self, **kwargs: Any) -> None:
"""异步打开窗帘."""
return await self._device.async_open_cover_by_id(self._id, self._ch, 0)
def close_cover(self, **kwargs: Any) -> None:
"""关闭窗帘."""
self._device.async_close_cover_by_id(self._id, self._ch, 100)
async def async_close_cover(self, **kwargs: Any) -> None:
"""异步关闭窗帘."""
return await self._device.async_close_cover_by_id(self._id, self._ch, 100)
def set_cover_position(self, **kwargs: Any) -> None:
"""设置窗帘位置."""
position = int(kwargs.get(ATTR_POSITION))
self._device.async_set_cover_position_by_id(self._id, self._ch, position)
async def async_update(self) -> None:
"""更新代码."""
await self._device.async_query_position_by_id(self._id, self._ch)
self._pos = self._device.get_position_by_id(self._id)
self._attr_current_cover_position = self._pos
if self._pos >= 99:
self._attr_is_closed = False
self._closed = False
else:
self._attr_is_closed = True
self._closed = True
最后运行结果:
学到的知识点:
1、使用界面获取用户输入:async_show_form方法的使用;
2、集成条目的使用:async_create_entry方法;
3、集成条目配置:async_get_options_flow方法
存在的问题:
1、Novo电机会主动发送信息,需要在链路代码里增加循环读取socket信息的功能,而不是现在一发一收的模式;
2、Novo电机的Update方法是直接调用链路模块发送命令的,有可能会出现冲突的情况,需要封装链路模块,在模块中处理好冲突问题;
3、使用集成条目的配置功能添加Novo电机时,不会自动删除原有的配置文件(json);
4、使用配置功能添加Novo电机后,不会自动刷新实体列表,需要手动“重新加载”集成条目。
增加了自动化:早上7点05分,开灯、开窗帘。早上自动打开了。
Novo窗帘的组件基本功能完成,家里的零冷水泵到了,后续就是把零冷水泵添加到HA中。还有人体传感器、空气质量传感器、电动水阀都有了,都得花时间把这些东西加进去……,又是得花时间折腾……
另外:最近想把项目放到HA里面去,因为需要用到厂家的LOGO,目前在跟厂家沟通,获得授权后,把LOGO加上,就可以放到HA里面了。