目录
- 一、前言
- 二、总体设想
- 三、系统的组成
- 四、python代码构建
- 构建一个MessageBroker消息代理类
- 以下这个是常规的MessageBroker类:
- 以下这个是引入协程的MessageBroker类:
- 下面是使用MessageBroker消息代理类
- 构建一个DataProcessor消息预处理类
- 构建一个DataProcessors平行协程处理类
- 综合应用示例
- 五、总结
一、前言
我们知道ROS是一个开源的机器人系统,有人叫他是操作系统,里面有丰富的生态,如导航等,很多大学搞研究都利用它。但是,使用ROS很多是基于linux系统,并且需要按照,学习,同时,很多用ROS可能只是为了使用它的消息发布订阅机制,由于其基于的是进程间的通信,可能对通信的实时性也有一些影响。本章,我们来手搓一个纯python构建的类似ROS系统的消息订阅发布系统,非常轻量级,可以在开发机器人时候,想轻量化不想装ROS系统的朋友们。并且我们把这个消息模块用在了无人机仿真控制环境如下:
接下来开搞…
二、总体设想
开发这个分布订阅系统的目的,我们想是作为一个机器人系统的“中枢神经”系统,用于跟硬件打交道,并为上层运动控制软件提供消息输入输出的服务,并且需要保持一定的实时性,满足实时控制,整个框架设想如下:
三、系统的组成
如上图所示,我们的一个消息订阅发布系统可以负责跟硬件打交道,通过如串口、以太网口通信的方式去和采集硬件如网关进行通信,这个我们在这里命名为“第一级原始信号通讯”,然后我们会进入一个预处理模块,对原始信号进行一些预处理,这个预处理模块根据需要我们可以自定义修改,预处理的型号变成了上层模块想要的格式和频率的TOPIC(主题)后,可以将这个TOCPIC发布出来,供多个用户订阅后使用,这个过程我们命名为“第二级信号预处理”。经过这个两级处理,从底层传感或者部件采集的不同频率、不同格式数据就变成了可以供上层应用的有固定周期和格式的TOPIC。接下来就可以做各种应用模块了。
四、python代码构建
构建一个MessageBroker消息代理类
python实现的消息代理类其实也非常常见了,网上到处都是,这里我们结合了一下协程的概念,对普通的消息代理类进行了一些优化,利用协程的高性能特性,势必可以在大量的消息来源服务下保持一个好的性能,这部分有待压力测试,发布本博文前并未经过性能测试。源代码如下:
以下这个是常规的MessageBroker类:
class MessageBroker:
#同步消息订阅分发处理
def __init__(self):
self.message_queue = queue.Queue()
self.subscribers = {}
self.running=True
def publish(self, topic, message):
if self.running:
self.message_queue.put((topic, message))
else:
print("MessageBroker is already stopped")
def subscribe(self, topic, callback):
if self.running:
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(callback)
else:
print("MessageBroker is already stopped")
def unsubscribe(self, topic, callback):
if topic in self.subscribers:
if callback in self.subscribers[topic]:
self.subscribers[topic].remove(callback)
def start(self):
self.running=True
def worker():
while True:
topic, message = self.message_queue.get()
#print("worker..")
if message=="close":
print("MessageBroker thread stoped")
break
if topic in self.subscribers:
for callback in self.subscribers[topic]:
callback(message)
self.message_queue.task_done()
thread = threading.Thread(target=worker)
thread.daemon = True
thread.start()
print("broker started at:",thread)
def close(self):
self.running = False
self.subscribers.clear()
self.message_queue.put(('stop', "close"))
del self.message_queue
self.message_queue=queue.Queue()
以下这个是引入协程的MessageBroker类:
class MessageBrokerAsy:
#高性能异步步消息订阅分发处理
def __init__(self):
self.message_queue = queue.Queue()
self.subscribers = {}
self.thread=None
self.running=True
#self.condition = asyncio.Condition()
def publish(self, topic, message):
if self.running:
self.message_queue.put((topic, message))
return True
else:
print("MessageBroker is already stopped")
return
def subscribe(self, topic, callback):
if self.running:
if topic not in self.subscribers:
self.subscribers[topic] = []
#asy_callback=self.make_async(callback)
self.subscribers[topic].append(callback)
return True
else:
print("MessageBroker is already stopped")
return
def unsubscribe(self, topic, callback):
if topic in self.subscribers:
if callback in self.subscribers[topic]:
self.subscribers[topic].remove(callback)
async def main(self):
while True:
if self.running==False:
print("MessageBroker thread stoped")
break
topic, message = self.message_queue.get()
if topic in self.subscribers:
callbacks = self.subscribers[topic]
for callback in callbacks:
if inspect.iscoroutinefunction(callback):
await callback(message)
else:
callback(message)
#asyncio.run(dotasks())
self.message_queue.task_done()
def start(self):
def worker():
self.running=True
asyncio.run(self.main())
thread = threading.Thread(target=worker)
thread.daemon = True
thread.start()
self.thread=thread
print("broker started at:",thread)
def close(self):
self.running = False
time.sleep(1)
self.subscribers.clear()
del self.message_queue
self.message_queue=queue.Queue()
如上所述,我们可以实例化这个类之后,通过start()启动这个消息服务,并且是在新线程中运行,不会影响主线程,也保持了一定的实时性,然后在main()这个消息分发处理方法中,采用了协程处理,这样理论上会提高处理时间。
此外,可以通过subscribe
和unsubscribe
来订阅相关的主题,用publish
发布主题:
下面是使用MessageBroker消息代理类
if __name__ == '__main__':
MB=MessageBrokerAsy()#创建一个消息系统
MB.start()#启动一个消息系统
# 订阅者1的回调处理函数
async def subscriber1(message):
print("Subscriber 1 received:", message,time.perf_counter())
# 订阅者2的回调函数
def subscriber2(message):
print("Subscriber 2 received:", message)
# 订阅主题为"topic1"的消息,可以支持多个处理函数(用户)
broker.subscribe("topic1", subscriber1)
broker.subscribe("topic1", subscriber2)
for i in range(20):
# 模拟采集数据,发布消息到主题"topic1"
MB.publish("topic1", np.array([11,1.5]))
time.sleep(0.5)
MB.close()
运行如下:
构建一个DataProcessor消息预处理类
根据我们的设想,是要建设一个消息预处理系统,也就是对原始数据进行加工处理,将原始数据按照一定的频率发布出去供上层订阅使用。这个DataProcessor是这么构建的:
class DataProcessor:
def __init__(self, interval,broker=None,topic=None,processer=None):
self.interval = interval
self.data_queue1 = []
self.data_queue2 = []
self.queue_shift=1
if processer==None:#默认为均值过滤器
self.processer=self.Everage_Filter
else:
self.processer=processer
#输出逻辑
if broker==None:
self.broker=self
else:
self.broker=broker
#发布的主题
if topic==None:
self.topic="Notopic"
else:
self.topic=topic
def publish(self, topic, message):
print(topic, message)
def Everage_Filter(self,data_queue):
# 均值过滤,处理队列内的数据的逻辑,数据指定为np格式
if len(data_queue)>0:
average = sum(data_queue) / len(data_queue)
data_queue.clear()
#print("done:",average)
else:
print("empty done:",average,time.perf_counter())
return average
async def process_data(self):
while True:
processed_data=None
if self.queue_shift==1:
data_queue=self.data_queue1
if len(data_queue) > 0:
self.queue_shift=2
# 处理队列内的数据
processed_data = self.processer(data_queue)
print("Processed data1:")
self.broker.publish(self.topic,processed_data)
if self.queue_shift==2:
data_queue=self.data_queue2
if len(data_queue) > 0:
self.queue_shift=1
# 处理队列内的数据
processed_data = self.processer(data_queue)
print("Processed data2:")
self.broker.publish(self.topic,processed_data)
await asyncio.sleep(self.interval)
def add_data(self, data):
# 向队列中添加数据,多个容器,避免冲突
if self.queue_shift==1:
self.data_queue1.append(data)
if self.queue_shift==2:
self.data_queue2.append(data)
async def start(self):
# 启动主循环
#loop = asyncio.get_event_loop()
#loop.create_task(self.process_data())
#loop.run_forever()
return asyncio.create_task(self.process_data())
如上所述,我们在DataProcessor
初始化时定义了interval
消息更新的周期,broker
指定的分发模块,topic
预处理完后的消息名称如“机械臂末端坐标”,processer
预处理的自定义函数,预处理默认我们内置了一个在周期内取平均的过滤器。这样就可以把各路原始数据来自一级系统
进行预处理后在二级系统
的broker
中进行发布了。
此外,我们知道原始数据路数非常多,如果使用串行进行预处理肯定是不合理的,特别是用到某些耗时的预处理函数时,那么我们还是引入了协程处理的方式,通过再开启一个线程,并集中创建预处理任务的方式,让各路预处理函数平行运行,这样就提高了效率。于是我们再要构建一个类DataProcessors
构建一个DataProcessors平行协程处理类
这个类的代码如下:
class DataProcessors:
def __init__(self):
self.DataPros=[]
self.tasks=[]
self.stop=False
self.T=None
self.main_queue= queue.Queue()
def add(self,DataPro):
self.DataPros.append(DataPro)
def close(self):
self.stop=True
self.DataPros.clear()
def start(self):
#创建新线程,启动所有周期处理
async def main():
# 启动主循环
# 在主线程中创建并行的协程定时任务
"""
async def datasource(processor):
while True:
#print("Running task 2...")
# 在这里编写您的定时任务逻辑
processor.add_data(101)
processor.add_data(111)
processor.add_data(130)
await asyncio.sleep(0.5) # 设置定时任务的时间间隔为10秒
"""
#task2=datasource
for DataPro in self.DataPros:
self.tasks.append(await DataPro.start())
#await asyncio.gather(task1(), task2(processor))
#task2_obj = asyncio.create_task(task2(DataPro))
while True:
print("DataProcessors main runing")
await asyncio.sleep(5)
if self.stop:
break
#task1_obj.cancel()
#task2_obj.cancel()
print("DataProcessors stoped")
def run():
print("Starting main program...")
asyncio.run(main())
thread = threading.Thread(target=run)
thread.daemon = True
thread.start()
print("DataProcessors started at:",thread)
self.T=thread
return thread
综合应用示例
结合以上三个模块,我们就完成了本标题的一个类ROS机器人消息订阅发布模块,以下是简易使用:
if __name__ == '__main__':
# 示例用法
broker = MessageBrokerAsy()
# 启动消息分发系统
broker.start()
# 创建一个DataProcessor对象
processor = DataProcessor(interval=1,broker=broker,topic="topic2") # 设置时间周期为5秒
DataProcessors1=DataProcessors()
DataProcessors1.add(processor)
T=DataProcessors1.start()
# 订阅者1的回调函数
async def subscriber1(message):
#print("Subscriber 1 received:", message,time.perf_counter())
processor.add_data(message)
# 订阅者1的回调函数
async def subscriber3(message):
print("Subscriber 3 received:", message,time.perf_counter())
# 订阅者2的回调函数
def subscriber2(message):
print("Subscriber 2 received:", message)
# 订阅者2的回调函数
def subscriber4(message):
print("Subscriber 4 received:", message)
# 订阅主题为"topic1"的消息
broker.subscribe("topic1", subscriber1)
#broker.subscribe("topic1", subscriber3)
broker.subscribe("topic1", subscriber2)
broker.subscribe("topic2", subscriber4)
# 取消订阅主题"topic1"的消息
#broker.unsubscribe("topic1", subscriber2)
for i in range(20):
# 发布消息到主题"topic1"
broker.publish("topic1", np.array([11,1.5]))
broker.publish("topic1", np.array([1.1,1.5]))
broker.publish("topic1", np.array([12,1.5]))
broker.publish("topic1", np.array([1.1,1.5]))
time.sleep(0.5)
DataProcessors1.close()
time.sleep(6)
broker.close()
如上所述,我们对一级原始数据通过broker进行发布"topic1"给预处理器processor,processor在processors的线程里周期处理原始数据,处理完后按照周期为1s定时进行发布"topic2","topic2"即为成品数据,给subscriber4使用。
如上图,红框为原始数据,红线为预处理后的成品数据(这里是平均值)。
五、总结
好了到此,我们构建了一个非常轻量的类ROS机器人消息订阅发布模块,当然跟ROS其实没有半毛钱关系,也没法比较,只是一个说头。源码已经上传至本站资源库点击连接。需要请下载获取或者关注公众号回复获取。
下一篇我们将这个系统用于无人机的控制,尽情期待…