-
在workspace/modules下新建包
buildtool create --template component modules/test_one
-
编译包
buildtool build -p modules/test_two/
-
增加自己的proto消息
在刚才自动生成的proto文件里面添加自己定义的消息,记得重新编译.
syntax = "proto2";
package apollo;
// message type of channel, just a placeholder for demo,
// you should use `--channel_message_type` option to specify the real message type
message TestTwoMsg {
}
message TestTwoConfig {
optional string name = 1;
};
// new added
message TestTwoMymessage{
optional string twoInfo = 1;
}
也可以新建proto文件, 修改BUILD文件;记得重新编译.
proto文件内容如下
syntax = "proto2";
package apollo;
message TestTwoLwlgzy{
optional string twoInfo = 1;
optional string twoFault = 2;
}
修改对应BUILF的文件
load("//tools:apollo_package.bzl", "apollo_package")
load("//tools/proto:proto.bzl", "proto_library")
load("//tools:cpplint.bzl", "cpplint")
package(default_visibility = ["//visibility:public"])
proto_library(
name = "test_two_proto",
srcs = ["test_two.proto"],
)
proto_library(
name = "test_two_lwlgzy_proto",
srcs = ["test_two_lwlgzy.proto"],
)
apollo_package()
cpplint()
- 生成的pb2文件路径在
/opt/apollo/neo/python/modules/test_two/proto
a). 新建test.py文件加入如下内容from modules.test_two.proto.test_two_pb2 import TestTwoMymessage
b). 给运行权限sudo chmod 755 *.py
运行没有报错即为正常.
本机没有编译or缺少需要引用的proto文件时候, 也可以直接将其他机器生成的pb2.py文件拷贝到类似
/opt/apollo/neo/python/xxx/xxx/proto
下import才不会报错.
- python 文件中import 路径为
from 包名路径.proto.xx_pb2 import xxx
. 例如:from modules.test_two.proto.test_two_lwlgzy_pb2 import TestTwoLwlgzy
modules/test_two/
|-- BUILD
|-- conf
| |-- test_two.conf
| `-- test_two.pb.txt
|-- cyberfile.xml
|-- dag
| `-- test_two.dag
|-- launch
| `-- test_two.launch
|-- proto
| |-- BUILD
| |-- test_two.proto
| `-- test_two_lwlgzy.proto
|-- test.py
|-- test_two_component.cc
`-- test_two_component.h
4 directories, 12 files
- 播包
cyber_recorder play -f modules/chassis.00000.20210622135417
订阅底盘话题处理故障实例
import time
import asyncio
from datetime import datetime
from cyber.python.cyber_py3 import cyber
from modules.canbus_vehicle.yt.proto.yt_pb2 import Yt
from od_health_test.proto.od_health_test_pb2 import OdErrorInfo, OdErrorMsgs
import logging
from modules.tools.common.logger import Logger
import os
APOLLO_ROOT = "/apollo"
class xxHealthMonitor:
def __init__(self) -> None:
self.health_node_ = cyber.Node("xx_health_monitor_node")
Logger.config(
log_file = os.path.join(APOLLO_ROOT, 'data/log/xx_health.log'),
use_stdout=True,
log_level=logging.DEBUG)
self.logger = Logger.get_logger("HealthMonitor")
self.logger.info("health_monitor_node is started. ")
self.health_node_.create_reader("/apollo/canbus/chassis_detail",Yt,self._chassis_cb_)
self.health_writer = self.health_node_.create_writer("/xx/health", OdErrorMsgs)
self.has_data = False
self.bat_total_voltage = 0
self.bat_total_current = 0
self.od_ErrMessages = OdErrorMsgs()
e_info = OdErrorInfo()
e_info.err_level = ''
e_info.err_message = ''
self.od_ErrMessages.error_msgs.append(e_info)
self.last_data_time = None
# /apollo/canbus/chassis_detail 100hz
def _chassis_cb_(self,msg):
self.last_data_time = time.time()
#...解析数据,添加逻辑判断
def _padding_fault_msgs(self, level, msg):
err_info = OdErrorInfo()
err_info.err_level = level
err_info.err_message = msg
self.od_ErrMessages.error_msgs.append(err_info)
def _error_checking(self):
pass
def run(self):
while not cyber.is_shutdown():
try:
current_time = time.time()
if self.last_data_time is None or (current_time - self.last_data_time)>1:
self.od_ErrMessages = OdErrorMsgs()
e_info = OdErrorInfo()
e_info.err_level = '1'
e_info.err_message = 'No data received from chassis_detail'
self.od_ErrMessages.error_msgs.append(e_info)
self.health_writer.write(self.od_ErrMessages)
else:
self.od_ErrMessages = OdErrorMsgs()
self._error_checking()
self.health_writer.write(self.od_ErrMessages)
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
self.dump2log()
except Exception as e:
print("Error:", e)
time.sleep(1) # 1hz
self.has_data = False
def dump2log(self):
dumpMsg = ""
for oo in self.od_ErrMessages.error_msgs:
dumpMsg += f"[{oo.err_level}:{oo.err_message}],"
# print(writemsg.strip(','))
self.logger.info(dumpMsg.strip(','))
if __name__=="__main__":
cyber.init()
print("health monitor is starting!")
health_monitor = OdHealthMonitor()
health_monitor.run()
print("health monitor is closing!")
cyber.shutdown()