python机器人编程——无人机python联动控制实现(VREP仿真)1——手搓一个类ROS机器人消息订阅发布模块

news2025/1/11 11:58:53

目录

  • 一、前言
  • 二、总体设想
  • 三、系统的组成
  • 四、python代码构建
    • 构建一个MessageBroker消息代理类
      • 以下这个是常规的MessageBroker类:
      • 以下这个是引入协程的MessageBroker类:
      • 下面是使用MessageBroker消息代理类
    • 构建一个DataProcessor消息预处理类
    • 构建一个DataProcessors平行协程处理类
    • 综合应用示例
  • 五、总结

一、前言

我们知道ROS是一个开源的机器人系统,有人叫他是操作系统,里面有丰富的生态,如导航等,很多大学搞研究都利用它。但是,使用ROS很多是基于linux系统,并且需要按照,学习,同时,很多用ROS可能只是为了使用它的消息发布订阅机制,由于其基于的是进程间的通信,可能对通信的实时性也有一些影响。本章,我们来手搓一个纯python构建的类似ROS系统的消息订阅发布系统,非常轻量级,可以在开发机器人时候,想轻量化不想装ROS系统的朋友们。并且我们把这个消息模块用在了无人机仿真控制环境如下:
在这里插入图片描述

接下来开搞…

二、总体设想

开发这个分布订阅系统的目的,我们想是作为一个机器人系统的“中枢神经”系统,用于跟硬件打交道,并为上层运动控制软件提供消息输入输出的服务,并且需要保持一定的实时性,满足实时控制,整个框架设想如下:
在这里插入图片描述

三、系统的组成

如上图所示,我们的一个消息订阅发布系统可以负责跟硬件打交道,通过如串口、以太网口通信的方式去和采集硬件如网关进行通信,这个我们在这里命名为“第一级原始信号通讯”,然后我们会进入一个预处理模块,对原始信号进行一些预处理,这个预处理模块根据需要我们可以自定义修改,预处理的型号变成了上层模块想要的格式和频率的TOPIC(主题)后,可以将这个TOCPIC发布出来,供多个用户订阅后使用,这个过程我们命名为“第二级信号预处理”。经过这个两级处理,从底层传感或者部件采集的不同频率、不同格式数据就变成了可以供上层应用的有固定周期和格式的TOPIC。接下来就可以做各种应用模块了。

四、python代码构建

构建一个MessageBroker消息代理类

python实现的消息代理类其实也非常常见了,网上到处都是,这里我们结合了一下协程的概念,对普通的消息代理类进行了一些优化,利用协程的高性能特性,势必可以在大量的消息来源服务下保持一个好的性能,这部分有待压力测试,发布本博文前并未经过性能测试。源代码如下:

以下这个是常规的MessageBroker类:

class MessageBroker:
    #同步消息订阅分发处理
    def __init__(self):
        self.message_queue = queue.Queue()
        self.subscribers = {}
        self.running=True

    def publish(self, topic, message):
        if self.running:
            self.message_queue.put((topic, message))
        else:
            print("MessageBroker is already stopped")
            

    def subscribe(self, topic, callback):
        if self.running:
            if topic not in self.subscribers:
                self.subscribers[topic] = []
            self.subscribers[topic].append(callback)
        else:
            print("MessageBroker is already stopped")
  
    def unsubscribe(self, topic, callback):
        if topic in self.subscribers:
            if callback in self.subscribers[topic]:
                self.subscribers[topic].remove(callback)

    def start(self):
        self.running=True
        def worker():
            while True:                
                topic, message = self.message_queue.get()
                #print("worker..")
                if message=="close":
                    print("MessageBroker thread stoped")
                    break
                if topic in self.subscribers:
                    for callback in self.subscribers[topic]:
                        callback(message)
                self.message_queue.task_done()
        thread = threading.Thread(target=worker)
        thread.daemon = True
        thread.start()
        print("broker started at:",thread)
    def close(self):
        self.running = False
        self.subscribers.clear()
        self.message_queue.put(('stop', "close"))
        del self.message_queue
        self.message_queue=queue.Queue()

以下这个是引入协程的MessageBroker类:

class MessageBrokerAsy:
    #高性能异步步消息订阅分发处理
    def __init__(self):
        self.message_queue = queue.Queue()
        self.subscribers = {}        
        self.thread=None
        self.running=True
        #self.condition = asyncio.Condition()

    def publish(self, topic, message):
        if self.running:
            self.message_queue.put((topic, message))
            return True
        else:
            print("MessageBroker is already stopped")
            return 

    def subscribe(self, topic, callback):
        if self.running:
            if topic not in self.subscribers:
                self.subscribers[topic] = []
            #asy_callback=self.make_async(callback)
            self.subscribers[topic].append(callback)
            return True
        else:
            print("MessageBroker is already stopped")
            return 
         
    def unsubscribe(self, topic, callback):
        if topic in self.subscribers:
            if callback in self.subscribers[topic]:
                self.subscribers[topic].remove(callback)
   
    async def main(self):        
        while True:  
            if self.running==False:
                print("MessageBroker thread stoped")
                break
            topic, message = self.message_queue.get()
            if topic in self.subscribers:
                callbacks = self.subscribers[topic]
                for callback in callbacks:
                    if inspect.iscoroutinefunction(callback):
                        await callback(message) 
                    else:
                        callback(message)                         
                    #asyncio.run(dotasks())                    
            self.message_queue.task_done() 
    def start(self):            
        def worker():
            self.running=True
            asyncio.run(self.main())             
 
        thread = threading.Thread(target=worker)
        thread.daemon = True
        thread.start()
        self.thread=thread
        print("broker started at:",thread)
    def close(self):
        self.running = False
        time.sleep(1)
        self.subscribers.clear()       
        del self.message_queue
        self.message_queue=queue.Queue()

如上所述,我们可以实例化这个类之后,通过start()启动这个消息服务,并且是在新线程中运行,不会影响主线程,也保持了一定的实时性,然后在main()这个消息分发处理方法中,采用了协程处理,这样理论上会提高处理时间。
此外,可以通过subscribeunsubscribe 来订阅相关的主题,用publish 发布主题:

下面是使用MessageBroker消息代理类

if __name__ == '__main__':	
	MB=MessageBrokerAsy()#创建一个消息系统
	MB.start()#启动一个消息系统
	 # 订阅者1的回调处理函数
    async def subscriber1(message):
        print("Subscriber 1 received:", message,time.perf_counter())        
   # 订阅者2的回调函数
    def subscriber2(message):
        print("Subscriber 2 received:", message)	
	# 订阅主题为"topic1"的消息,可以支持多个处理函数(用户)
    broker.subscribe("topic1", subscriber1)
    broker.subscribe("topic1", subscriber2)
	for i in range(20):
        # 模拟采集数据,发布消息到主题"topic1"
        MB.publish("topic1", np.array([11,1.5]))        
        time.sleep(0.5)
    MB.close()   

运行如下:
在这里插入图片描述

构建一个DataProcessor消息预处理类

根据我们的设想,是要建设一个消息预处理系统,也就是对原始数据进行加工处理,将原始数据按照一定的频率发布出去供上层订阅使用。这个DataProcessor是这么构建的:

class DataProcessor:
    def __init__(self, interval,broker=None,topic=None,processer=None):
        self.interval = interval        
        self.data_queue1 = []
        self.data_queue2 = []
        self.queue_shift=1
        if processer==None:#默认为均值过滤器
            self.processer=self.Everage_Filter
        else:
            self.processer=processer
        #输出逻辑
        if broker==None:
            self.broker=self
        else:
            self.broker=broker
        #发布的主题
        if topic==None:
            self.topic="Notopic"
        else:
            self.topic=topic
        
    def publish(self, topic, message):
        print(topic, message)        

    def Everage_Filter(self,data_queue):
        # 均值过滤,处理队列内的数据的逻辑,数据指定为np格式        
        if len(data_queue)>0:
            average = sum(data_queue) / len(data_queue)
            data_queue.clear()      
            #print("done:",average)
        else:
            print("empty done:",average,time.perf_counter())
        return average

    async def process_data(self):
        while True:
            processed_data=None
            if self.queue_shift==1:
                data_queue=self.data_queue1
                if len(data_queue) > 0:
                    self.queue_shift=2
                    # 处理队列内的数据
                    processed_data = self.processer(data_queue)
                    print("Processed data1:")
                    self.broker.publish(self.topic,processed_data)
            if self.queue_shift==2:
                data_queue=self.data_queue2                
                if len(data_queue) > 0:
                    self.queue_shift=1
                    # 处理队列内的数据
                    processed_data = self.processer(data_queue)
                    print("Processed data2:")            
                    self.broker.publish(self.topic,processed_data)
            await asyncio.sleep(self.interval)   

    def add_data(self, data):
        # 向队列中添加数据,多个容器,避免冲突
        if self.queue_shift==1:
            self.data_queue1.append(data)
        if self.queue_shift==2:
            self.data_queue2.append(data)
    async def start(self):
        # 启动主循环
        #loop = asyncio.get_event_loop()
        #loop.create_task(self.process_data())
        #loop.run_forever()       
        return asyncio.create_task(self.process_data())

如上所述,我们在DataProcessor初始化时定义了interval 消息更新的周期,broker 指定的分发模块,topic 预处理完后的消息名称如“机械臂末端坐标”,processer 预处理的自定义函数,预处理默认我们内置了一个在周期内取平均的过滤器。这样就可以把各路原始数据来自一级系统进行预处理后在二级系统broker中进行发布了。
此外,我们知道原始数据路数非常多,如果使用串行进行预处理肯定是不合理的,特别是用到某些耗时的预处理函数时,那么我们还是引入了协程处理的方式,通过再开启一个线程,并集中创建预处理任务的方式,让各路预处理函数平行运行,这样就提高了效率。于是我们再要构建一个类DataProcessors

构建一个DataProcessors平行协程处理类

这个类的代码如下:

class DataProcessors:
    def __init__(self):
        self.DataPros=[]
        self.tasks=[]
        self.stop=False
        self.T=None
        self.main_queue= queue.Queue()
    def add(self,DataPro):
        self.DataPros.append(DataPro)       
    
    def close(self):
        self.stop=True
        self.DataPros.clear()
        
    def start(self):
        #创建新线程,启动所有周期处理
        async def main():
            # 启动主循环
            # 在主线程中创建并行的协程定时任务
            """
            async def datasource(processor):
                while True:
                    #print("Running task 2...")
                    # 在这里编写您的定时任务逻辑
                    processor.add_data(101)
                    processor.add_data(111)
                    processor.add_data(130)
                    await asyncio.sleep(0.5)  # 设置定时任务的时间间隔为10秒
            """    
            #task2=datasource            
            for DataPro in self.DataPros:
                self.tasks.append(await DataPro.start()) 
                #await asyncio.gather(task1(), task2(processor))        
                #task2_obj = asyncio.create_task(task2(DataPro))
            
            while True:
                print("DataProcessors main runing")
                await asyncio.sleep(5) 
                if self.stop:
                    break
            #task1_obj.cancel()            
            #task2_obj.cancel()
            print("DataProcessors  stoped")
        
        def run():
            print("Starting main program...")
            asyncio.run(main())
        thread = threading.Thread(target=run)
        thread.daemon = True
        thread.start()
        print("DataProcessors started at:",thread)
        self.T=thread
        return thread

综合应用示例

结合以上三个模块,我们就完成了本标题的一个类ROS机器人消息订阅发布模块,以下是简易使用:

if __name__ == '__main__':
    # 示例用法
    broker = MessageBrokerAsy()
    # 启动消息分发系统
    broker.start()
    
    # 创建一个DataProcessor对象
    processor = DataProcessor(interval=1,broker=broker,topic="topic2")  # 设置时间周期为5秒
    DataProcessors1=DataProcessors()
    DataProcessors1.add(processor)
    T=DataProcessors1.start() 
    
    # 订阅者1的回调函数
    async def subscriber1(message):
        #print("Subscriber 1 received:", message,time.perf_counter())
        processor.add_data(message)
        
    # 订阅者1的回调函数
    async def subscriber3(message):
        print("Subscriber 3 received:", message,time.perf_counter())
        
        
    # 订阅者2的回调函数
    def subscriber2(message):
        print("Subscriber 2 received:", message)
    # 订阅者2的回调函数
    def subscriber4(message):
        print("Subscriber 4 received:", message)
 
    # 订阅主题为"topic1"的消息
    broker.subscribe("topic1", subscriber1)
    #broker.subscribe("topic1", subscriber3)
    
    broker.subscribe("topic1", subscriber2)
    broker.subscribe("topic2", subscriber4)

    # 取消订阅主题"topic1"的消息
    #broker.unsubscribe("topic1", subscriber2)
    
    for i in range(20):
        # 发布消息到主题"topic1"
        broker.publish("topic1", np.array([11,1.5]))
        broker.publish("topic1", np.array([1.1,1.5]))
        broker.publish("topic1", np.array([12,1.5]))
        broker.publish("topic1", np.array([1.1,1.5]))
        time.sleep(0.5)
    
    DataProcessors1.close()
    time.sleep(6)
    broker.close()

如上所述,我们对一级原始数据通过broker进行发布"topic1"给预处理器processor,processor在processors的线程里周期处理原始数据,处理完后按照周期为1s定时进行发布"topic2","topic2"即为成品数据,给subscriber4使用。
在这里插入图片描述
如上图,红框为原始数据,红线为预处理后的成品数据(这里是平均值)。

五、总结

好了到此,我们构建了一个非常轻量的类ROS机器人消息订阅发布模块,当然跟ROS其实没有半毛钱关系,也没法比较,只是一个说头。源码已经上传至本站资源库点击连接。需要请下载获取或者关注公众号回复获取。
下一篇我们将这个系统用于无人机的控制,尽情期待…
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/953335.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

加入‘First Sowing’的公开部署马拉松(Deploy-a-Thon),共享巨额奖励

自从“First Sowing”启动以来已经有一段时间了,但等待终于结束!“First Sowing”传奇的精彩结局以“部署马拉松(Deploy-a-Thon)”的形式到来,我们非常高兴能与大家分享这一消息! 准备好与“First Sowing”…

【斗罗大陆】幽冥白虎再现,朱竹清后人继承衣钵,剧情逐渐高燃

Hello,小伙伴们,我是小郑继续为大家深度解析斗罗大陆! 绝世唐门最新的剧情已经更新,新生大赛再次出现了新的角色,也就是霍雨浩的哥哥戴华斌,他们也是霍雨浩冠军之路上最后的拦路虎,同样也拥有着当年叱咤风云…

MMCV安装指南

MMCV是一个开源的计算机视觉库,广泛用于基于Pytorch的深度学习项目中。本教程主要介绍MMCV的安装方法。 首先需要确认已安装Pytorch,CUDA及驱动版本。然后pip安装mmcv或从源码编译安装。需要注意 PyTorch、CUDA和MMCV版本匹配问题。 安装验证可以import mmcv测试是否成功。常…

优思学院|六西格玛中的概率分布有哪些?

为什么概率分布重要? 概率分布是统计学中一个重要的概念,它帮助我们理解随机变量的分布情况以及与之相关的概率。在面对具体问题时,了解概率分布可以帮助我们选择适当的检验或分析策略,以解决问题并做出合理的决策。 常见的概率…

Kafka系列四生产者

文章首发于个人博客,欢迎访问关注:https://www.lin2j.tech 一条消息从生产到发送成功,需要经过多个步骤,多个组件。首先要经过拦截器、序列化器、分区器对消息进行预处理,然后将消息按批次放入缓冲区,之后…

uniapp移动端h5设计稿还原

思路 动态设置html的font-size大小 实现步骤 先创建一个public.css文件,设置初始的font-size大小 /* 注意这样写 只能使用css文件, scss 是不支持的, setProperty 只适用于原生css上 */ html {--gobal-font-size: 0.45px; } .gobal-font-size {font-size: var(--g…

CRM通过哪四个特点赢得不同类型的客户

1.设置正确的目标 首先,在CRM系统中设置正确的目标是非常重要的。不同类型的客户有不同的需求和预期,需要使用不同的方法去处理。如果企业想吸引新客户,那么企业需要更加侧重于建立品牌形象和提供相关的信息。如果企业想留住老客户&#xff…

matplotlib绘图常见设置总结

绘图 官方API 头文件、画布初始化 首先要导入头文件,初始化画布 from matplotlib import pyplot as plt from matplotlib.pyplot import MultipleLocator # 从pyplot导入MultipleLocator类,这个类用于设置刻度间隔 import numpy as np # 常用的数据…

SQL求解用户连续登录天数

数据分析面试过程中,一般都逃不掉对SQL的考察,可能是笔试的形式,也可能是面试过程中面试官当场提问,当场在纸上写出,或者简单说一下逻辑。 今天,就来分享一道面试中常常被问到的一类SQL问题:连…

Vue3响应式原理 私

响应式的本质:当数据变化后会自动执行某个函数映射到组件,自动触发组件的重新渲染。 响应式的实现方式就是劫持数据,Vue3的reactive就是通过Proxy劫持数据,由于劫持的是整个对象,所以可以检测到任何对象的修改&#xf…

【传输层】网络基础 -- UDP协议 | TCP协议

再谈端口号端口号范围划分netstatpidof UDPUDP的特点面向数据报UDP的缓冲区 基于UDP的应用层协议 TCP认识TCP协议的报头理解封装解包理解可靠性TCP工作模式16位窗口大小6位标志位URGACKPSHRSTSYNFIN 再谈端口号 端口号(Port)标识了一个主机上进行通信的不同的应用程序 在TCP/I…

积分游戏小程序模板源码

积分游戏小程序模板源码是一款可以帮助用户快速开发小程序的工具,此模板源码包含五个静态页面,分别是首页、任务列表、大转盘、猜拳等五个页面,非常适合进行积分游戏等相关开发。 此模板源码的前端部分非常简单易用,用户可以根据…

KiCad 封装原件类型与封装焊盘不匹配 预期SMD 实际通孔

KiCad 7.0.6 PCB ERC 检查时弹出不匹配错误,提示: 封装原件类型与封装焊盘不匹配 预期SMD 实际通孔: 但实际的封装已经是 SMD 了呀。为啥,因为自己绘制的封装中属性不对。将自绘封装中的 原件类型由 通孔 改为 贴片即可&#xff1…

Python钢筋混凝土结构计算.pdf-T001-混凝土强度设计值

以下是使用Python求解上述问题的完整代码: # 输入参数 f_ck 35 # 混凝土的特征抗压强度(单位:MPa) f_cd 25 # 混凝土的强度设计值(单位:MPa) # 求解安全系数 gamma_c f_ck / f_cd # …

提高工作效率,轻松实现IP地址批量ping

在实际操作中,我们经常需要对一系列已分配的IP进行ping检测,以确认其是否正在运行。然而,我们的表格仅有一个标签页,且仅包含一个ip地址列。 iP192.168.196.106192.168.196.107192.168.196.108192.168.196.109 实现思路 我们的…

面对银行分支机构,UPS监控该如何应对?

UPS系统确保在电力中断或故障时,银行的关键系统和设备能够继续正常运行,从而防止因电力波动而可能导致的数据丢失和业务中断。 为了实现有效的UPS监控,银行需要应用监控系统。银行可以实时监测UPS系统的状态,及时发现潜在问题并采…

智能感测型静电中和设备由哪些部分构成

智能感测型静电中和设备是一种利用先进的传感技术和自动控制系统,以及适应性算法来实现静电电荷的中和和消除的设备。它主要用于消除静电带来的问题,比如电子元件的损坏、电磁干扰、火灾等。 智能感测型静电中和设备通常包括以下几个主要部分&#xff1…

年轻人的新社交密码:高质量小众社交app皮雀,到底怎么玩?

新一代年轻人被各种生活、工作和强社交关系充斥,面临着巨大的社交压力,因此他们在社交的选择方向上,逐渐远离线下社交,去选择线上社交,不同于有心理负担的线下社交,线上社交具有更多的选择性。基于能为年轻…

地下管线三维自动建模软件MagicPipe3D V3.0发布

2023年9月1日经纬管网建模系统MagicPipe3D V3.0正式发布,该版本经过众多用户应用和反馈,在三维地下管网建模效果、效率、适配性方面均有显著提升!MagicPipe3D本地离线参数化构建地下管网模型(包括管道、接头、附属设施等&#xff…