Handsfree_ros_imu:ROS机器人IMU模块的hfi_a9.py文件学习记录

news2024/10/2 6:35:18

之前的博客写了关于Handsfree_ros_imu:ROS机器人IMU模块ARHS姿态传感器(A9)Liunx系统Ubuntu20.04学习启动和运行教程:

https://blog.csdn.net/qq_54900679/article/details/135539176?spm=1001.2014.3001.5502

与Handsfree_ros_imu:ROS机器人IMU模块的get_imu_rpy.py文件学习记录:

https://blog.csdn.net/qq_54900679/article/details/135550752?spm=1001.2014.3001.5502

 

这次带来hfi_a9.py文件的学习与数据记录改进:

hfi_a9.py文件位置如下:

 对应的代码如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import serial
import struct
import platform
import serial.tools.list_ports
import math


# 查找 ttyUSB* 设备
def find_ttyUSB():
    print('imu 默认串口为 /dev/ttyUSB0, 若识别多个串口设备, 请在 launch 文件中修改 imu 对应的串口')
    posts = [port.device for port in serial.tools.list_ports.comports() if 'USB' in port.device]
    print('当前电脑所连接的 {} 串口设备共 {} 个: {}'.format('USB', len(posts), posts))


# crc 校验
def checkSum(list_data, check_data):
    data = bytearray(list_data)
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for i in range(8):
            if (crc & 1) != 0:
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return hex(((crc & 0xff) << 8) + (crc >> 8)) == hex(check_data[0] << 8 | check_data[1])


# 16 进制转 ieee 浮点数
def hex_to_ieee(raw_data):
    ieee_data = []
    raw_data.reverse()
    for i in range(0, len(raw_data), 4):
        data2str =hex(raw_data[i] | 0xff00)[4:6] + hex(raw_data[i + 1] | 0xff00)[4:6] + hex(raw_data[i + 2] | 0xff00)[4:6] + hex(raw_data[i + 3] | 0xff00)[4:6]
        if python_version == '2':
            ieee_data.append(struct.unpack('>f', data2str.decode('hex'))[0])
        if python_version == '3':
            ieee_data.append(struct.unpack('>f', bytes.fromhex(data2str))[0])
    ieee_data.reverse()
    return ieee_data


# 处理串口数据
def handleSerialData(raw_data):
    global buff, key, angle_degree, magnetometer, acceleration, angularVelocity, pub_flag
    if python_version == '2':
        buff[key] = ord(raw_data)
    if python_version == '3':
        buff[key] = raw_data

    key += 1
    if buff[0] != 0xaa:
        key = 0
        return
    if key < 3:
        return
    if buff[1] != 0x55:
        key = 0
        return
    if key < buff[2] + 5:  # 根据数据长度位的判断, 来获取对应长度数据
        return

    else:
        data_buff = list(buff.values())  # 获取字典所以 value

        if buff[2] == 0x2c and pub_flag[0]:
            if checkSum(data_buff[2:47], data_buff[47:49]):
                data = hex_to_ieee(data_buff[7:47])
                angularVelocity = data[1:4]
                acceleration = data[4:7]
                magnetometer = data[7:10]
            else:
                print('校验失败')
            pub_flag[0] = False
        elif buff[2] == 0x14 and pub_flag[1]:
            if checkSum(data_buff[2:23], data_buff[23:25]):
                data = hex_to_ieee(data_buff[7:23])
                angle_degree = data[1:4]
            else:
                print('校验失败')
            pub_flag[1] = False
        else:
            print("该数据处理类没有提供该 " + str(buff[2]) + " 的解析")
            print("或数据错误")
            buff = {}
            key = 0

        buff = {}
        key = 0
        if pub_flag[0] == True or pub_flag[1] == True:
            return
        pub_flag[0] = pub_flag[1] = True
        acc_k = math.sqrt(acceleration[0] ** 2 + acceleration[1] ** 2 + acceleration[2] ** 2)

        print('''
加速度(m/s²):
    x轴:%.2f
    y轴:%.2f
    z轴:%.2f

角速度(rad/s):
    x轴:%.2f
    y轴:%.2f
    z轴:%.2f

欧拉角(°):
    x轴:%.2f
    y轴:%.2f
    z轴:%.2f

磁场:
    x轴:%.2f
    y轴:%.2f
    z轴:%.2f
''' % (acceleration[0] * -9.8 / acc_k, acceleration[1] * -9.8 / acc_k, acceleration[2] * -9.8 / acc_k,
       angularVelocity[0], angularVelocity[1], angularVelocity[2],
       angle_degree[0], angle_degree[1], angle_degree[2],
       magnetometer[0], magnetometer[1], magnetometer[2]
      ))
       


key = 0
flag = 0
buff = {}
angularVelocity = [0, 0, 0]
acceleration = [0, 0, 0]
magnetometer = [0, 0, 0]
angle_degree = [0, 0, 0]
pub_flag = [True, True]


if __name__ == "__main__":
    python_version = platform.python_version()[0]

    
    find_ttyUSB()
    port = "/dev/ttyUSB0"
    baudrate = 921600

    try:
        hf_imu = serial.Serial(port=port, baudrate=baudrate, timeout=0.5)
        if hf_imu.isOpen():
            print("\033[32m串口打开成功...\033[0m")
        else:
            hf_imu.open()
            print("\033[32m打开串口成功...\033[0m")
    except Exception as e:
        print(e)
        print("\033[31m串口打开失败\033[0m")
        exit(0)
    else:
        while True:
            try:
                buff_count = hf_imu.inWaiting()
            except Exception as e:
                print("exception:" + str(e))
                print("imu 失去连接,接触不良,或断线")
                exit(0)
            else:
                if buff_count > 0:
                    buff_data = hf_imu.read(buff_count)
                    for i in range(0, buff_count):
                        handleSerialData(buff_data[i])

这段Python代码主要用于处理与IMU(惯性测量单元)相关的串口通信。以下是对代码的主要功能和组件的详细分析:

  1. 导入库

    • serial:用于串口通信。
    • struct:解析打包的二进制数据。
    • platform:检测操作系统信息,用于判断Python版本。
    • serial.tools.list_ports:列出计算机的串口设备。
    • math:提供数学函数,例如开方。
  2. 查找ttyUSB设备(find_ttyUSB函数)

    • 打印默认IMU串口设备(/dev/ttyUSB0),提示用户在识别多个设备时需要在配置文件中修改。
    • 列出连接到计算机的所有USB串口设备。
  3. CRC校验(checkSum函数)

    • 使用CRC16校验算法来验证数据的完整性。
  4. 16进制转IEEE浮点数(hex_to_ieee函数)

    • 将16进制数据转换为IEEE标准的浮点数。
    • 对Python 2和3使用不同的方法处理数据。
  5. 处理串口数据(handleSerialData函数)

    • 解析串口接收到的原始数据。
    • 检查数据的起始位和数据长度。
    • 根据数据类型(角速度、加速度、磁场、欧拉角)进行处理。
    • 使用CRC校验判断数据是否有效。
    • 计算并打印加速度、角速度、欧拉角和磁场的值。
  6. 全局变量

    • 定义用于存储数据和标记的全局变量。
  7. 主函数(if __name__ == "__main__"

    • 检测Python版本。
    • 查找并设置串口。
    • 循环读取串口数据并调用handleSerialData函数处理。

这段代码主要用于从IMU设备接收数据,并通过串口将其转换为可用的浮点数,用于进一步的数据分析或应用。代码在兼容性方面对Python 2和3都进行了考虑,并且具有较强的错误处理和数据校验功能。

改进部分

        要将角速度、加速度和磁场数据保存到文本文件或CSV文件中,可以在上述代码的基础上进行一些修改。除了保存到对应路径下的文件中,还要考虑实时地保存数据这个要求,从开始保存数据到结束保存数据的时间是可以自定义的,比如要实现保存3秒时间段内实时的数据。

改进后的代码(imu_data_a9_record.py)如下:

import serial
import struct
import time
import platform
import serial.tools.list_ports
import math

# 其他必要的导入模块和全局变量

def find_ttyUSB():
    print('imu 默认串口为 /dev/ttyUSB0, 若识别多个串口设备, 请在 launch 文件中修改 imu 对应的串口')
    posts = [port.device for port in serial.tools.list_ports.comports() if 'USB' in port.device]
    print('当前电脑所连接的 {} 串口设备共 {} 个: {}'.format('USB', len(posts), posts))

# crc 校验
def checkSum(list_data, check_data):
    data = bytearray(list_data)
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for i in range(8):
            if (crc & 1) != 0:
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return hex(((crc & 0xff) << 8) + (crc >> 8)) == hex(check_data[0] << 8 | check_data[1])

# 16 进制转 ieee 浮点数
def hex_to_ieee(raw_data):
    ieee_data = []
    raw_data.reverse()
    for i in range(0, len(raw_data), 4):
        data2str = hex(raw_data[i] | 0xff00)[4:6] + hex(raw_data[i + 1] | 0xff00)[4:6] + hex(
            raw_data[i + 2] | 0xff00)[4:6] + hex(raw_data[i + 3] | 0xff00)[4:6]
        if python_version == '2':
            ieee_data.append(struct.unpack('>f', data2str.decode('hex'))[0])
        if python_version == '3':
            ieee_data.append(struct.unpack('>f', bytes.fromhex(data2str))[0])
    ieee_data.reverse()
    return ieee_data

# 处理串口数据
def handleSerialData(raw_data):
    global buff, key, angle_degree, magnetometer, acceleration, angularVelocity, pub_flag
    if python_version == '2':
        buff[key] = ord(raw_data)
    if python_version == '3':
        buff[key] = raw_data

    key += 1
    if buff[0] != 0xaa:
        key = 0
        return
    if key < 3:
        return
    if buff[1] != 0x55:
        key = 0
        return
    if key < buff[2] + 5:  # 根据数据长度位的判断, 来获取对应长度数据
        return
    else:
        data_buff = list(buff.values())  # 获取字典所有 value

        if buff[2] == 0x2c and pub_flag[0]:
            if checkSum(data_buff[2:47], data_buff[47:49]):
                data = hex_to_ieee(data_buff[7:47])
                angularVelocity = data[1:4]
                acceleration = data[4:7]
                magnetometer = data[7:10]
            else:
                print('校验失败')
            pub_flag[0] = False
        elif buff[2] == 0x14 and pub_flag[1]:
            if checkSum(data_buff[2:23], data_buff[23:25]):
                data = hex_to_ieee(data_buff[7:23])
                angle_degree = data[1:4]
            else:
                print('校验失败')
            pub_flag[1] = False
        else:
            print("该数据处理类没有提供该 " + str(buff[2]) + " 的解析")
            print("或数据错误")
            buff = {}
            key = 0

        buff = {}
        key = 0
        if pub_flag[0] == True or pub_flag[1] == True:
            return
        pub_flag[0] = pub_flag[1] = True
        acc_k = math.sqrt(acceleration[0] ** 2 + acceleration[1] ** 2 + acceleration[2] ** 2)

#         print('''
# 加速度(m/s²):
#     x轴:%.2f
#     y轴:%.2f
#     z轴:%.2f
#
# 角速度(rad/s):
#     x轴:%.2f
#     y轴:%.2f
#     z轴:%.2f
#
# 欧拉角(°):
#     x轴:%.2f
#     y轴:%.2f
#     z轴:%.2f
#
# 磁场:
#     x轴:%.2f
#     y轴:%.2f
#     z轴:%.2f
# ''' % (acceleration[0] * -9.8 / acc_k, acceleration[1] * -9.8 / acc_k, acceleration[2] * -9.8 / acc_k,
#        angularVelocity[0], angularVelocity[1], angularVelocity[2],
#        angle_degree[0], angle_degree[1], angle_degree[2],
#        magnetometer[0], magnetometer[1], magnetometer[2]
#       ))

key = 0
flag = 0
buff = {}
angularVelocity = [0, 0, 0]
acceleration = [0, 0, 0]
magnetometer = [0, 0, 0]
angle_degree = [0, 0, 0]
pub_flag = [True, True]

if __name__ == "__main__":
    python_version = platform.python_version()[0]

    find_ttyUSB()
    port = "/dev/ttyUSB0"
    baudrate = 921600

    try:
        hf_imu = serial.Serial(port=port, baudrate=baudrate, timeout=0.5)
        if hf_imu.isOpen():
            print("\033[32m串口打开成功...\033[0m")
        else:
            hf_imu.open()
            print("\033[32m打开串口成功...\033[0m")
    except Exception as e:
        print(e)
        print("\033[31m串口打开失败\033[0m")
        exit(0)
    else:
        # 定义保存数据的文件名和持续时间
        file_name = '/home/hjx/handsfree/imu_data_record/hfi_a9_timer/imu_data_a9_timer.csv'  # 可以根据需要更改文件名和路径
        duration = 3  # 保存3秒数据,可根据需要调整

        start_time = time.time()
        end_time = start_time + duration

        # 在保存数据之前,添加一个标题行
        with open(file_name, 'w') as file:
            file.write(
                'Timestamp,X_Acceleration,Y_Acceleration,Z_Acceleration,X_AngularVelocity,Y_AngularVelocity,Z_AngularVelocity,Euler_X,Euler_Y,Euler_Z,X_Magnetometer,Y_Magnetometer,Z_Magnetometer\n')

        while True:
            try:
                buff_count = hf_imu.inWaiting()
            except Exception as e:
                print("exception:" + str(e))
                print("imu 失去连接,接触不良,或断线")
                exit(0)
            else:
                if buff_count > 0:
                    buff_data = hf_imu.read(buff_count)
                    for i in range(0, buff_count):
                        handleSerialData(buff_data[i])

                current_time = time.time()
                if current_time >= end_time:
                    # 保存数据并退出循环
                    with open(file_name, 'a') as file:
                        file.write(','.join(map(str, [current_time, acceleration[0], acceleration[1], acceleration[2],
                                                      angularVelocity[0], angularVelocity[1], angularVelocity[2],
                                                      angle_degree[0], angle_degree[1], angle_degree[2],
                                                      magnetometer[0], magnetometer[1], magnetometer[2]])) + '\n')
                    break
                else:
                    # 保存数据并继续接收和处理
                    with open(file_name, 'a') as file:
                        file.write(','.join(map(str, [current_time, acceleration[0], acceleration[1], acceleration[2],
                                                      angularVelocity[0], angularVelocity[1], angularVelocity[2],
                                                      angle_degree[0], angle_degree[1], angle_degree[2],
                                                      magnetometer[0], magnetometer[1], magnetometer[2]])) + '\n')

 下面给出以上修改后的代码解析:

  1. 导入模块

    • serial:用于处理串口通信。
    • struct:用于处理二进制数据。
    • time:用于处理时间相关的功能。
    • platform:用于获取操作系统信息。
    • serial.tools.list_ports:用于列出系统中的串口设备。
    • math:用于数学运算。
  2. 函数定义

    • find_ttyUSB:检测连接到电脑的USB串口设备,并打印设备列表。
    • checkSum:执行CRC校验,验证数据的完整性。
    • hex_to_ieee:将16进制数据转换为IEEE浮点数。
    • handleSerialData:处理从串口接收到的原始数据。
  3. 主要逻辑

    • 初始化一些全局变量。
    • __main__部分,脚本首先检测Python版本,然后搜索串口设备,并尝试打开特定的串口(默认为/dev/ttyUSB0)。
    • 如果串口打开成功,脚本会记录IMU数据到指定文件中(默认路径为/home/hjx/handsfree/imu_data_record/hfi_a9_timer/imu_data_a9_timer.csv)。
    • 数据记录包括加速度、角速度、欧拉角和磁场信息。
    • 脚本会在指定时间(默认3秒)后停止记录数据。
  4. 数据处理

    • handleSerialData函数接收串口数据,根据特定的格式解析数据,包括加速度、角速度、欧拉角和磁场等信息。
    • 进行CRC校验确保数据完整性,然后将数据转换为浮点数进行记录。
  5. 异常处理

    • 在尝试打开串口或处理数据时,脚本包含异常处理逻辑,以确保在出现错误时能够优雅地处理。

        总之,这个脚本主要用于通过串口从IMU设备收集数据,并将其转换为可用的格式进行分析和记录。

配置好conda的环境和ros包的路径后,开始在pycharm中运行:

其中  duration = 3  # 保存3秒数据,可根据需要调整

查看经过3秒后保存好的csv文件:

    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1378178.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python基础知识:整理11 模块的导入、自定义模块和安装第三方包

1 模块的导入 1.1 使用import 导入time模块&#xff0c;使用sleep功能&#xff08;函数&#xff09; import time print("start") time.sleep(3) print("end")1.2 使用from 导入time的sleep功能 from time import sleep print("start") slee…

外汇天眼:塞浦路斯证券交易委员会(CySEC)确认了四家投资公司退出投资者赔偿基金(ICF)会员资格

塞浦路斯证券交易委员会&#xff08;CySEC&#xff09;今天确认了四家投资公司已被取消其在投资者赔偿基金&#xff08;ICF&#xff09;的会员资格。 以下公司不再是ICF的会员&#xff1a; 1.Stone Edge Capital Ltd&#xff08;LEI 213800PZFB9VV8FNWB30&#xff09;&#xf…

Redis的优化

1 Redis的高可用 1.1 高可用的定义 在web服务器中&#xff0c;高可用是指服务器可以正常访问的时间&#xff0c;衡量的标准是在多长时间内可以提供正常服务&#xff08;99.9%、99.99%、99.999%等等&#xff09;。 但是在Redis语境中&#xff0c;高可用的含义似乎要宽泛一些&…

边缘数据采集网关无法上传数据是什么原因?如何解决?

边缘数据采集网关是物联网系统中的常见设备&#xff0c;主要用途包括数据采集、协议转换、边缘数据处理、数据传输分发等&#xff0c;实现多设备和多系统的互联互通和数据协同应用&#xff0c;对于提高物联网感知和响应效率、加强物联网联动协同能力、提升数据安全性等方面都具…

Docker安装Atlassian全家桶

文章目录 省流&#xff1a;1.docker-compose文件2.其他服务都正常启动&#xff0c;唯独Bitbucket不行。日志错误刚启动时候重启后查询分析原因再针对第一点排查看样子是安装的bitbucket和系统环境有冲突问题&#xff1f; 结论&#xff1a; 省流&#xff1a; bitbucket 只能安装…

查看SQL Server的表字段类型、长度、描述以及是否可为null

文章目录 初步理解小步测试组合一下参考文章有更详细评述 继续理解得到大部分信息 本文参考&#xff1a;https://blog.csdn.net/josjiang1/article/details/80558068。 也可以直接点击这里文章链接&#xff1a; sql server查询表结构&#xff08;字段名&#xff0c;数据类型&a…

基于博弈树的开源五子棋AI教程[3 极大极小搜索]

基于博弈树的开源五子棋AI教程[3 极大极小搜索] 引子极大极小搜索原理alpha-beta剪枝负极大搜索尾记 引子 极大极小搜索是博弈树搜索中最常用的算法&#xff0c;广泛应用于各类零和游戏中&#xff0c;例如象棋&#xff0c;围棋等棋类游戏。算法思想也是合乎人类的思考逻辑的&a…

Tomcat性能优化学习

Tomcat 服务器是一个开源的轻量级Web应用服务器&#xff0c;在中小型系统和并发量小的场合下被普遍使用&#xff0c;是开发和调试Servlet、JSP 程序的首选。相信大家对于 Tomcat 已经是非常熟悉了&#xff0c;本篇将介绍tomcat的常见优化。那么为什么要对tomcat进行优化呢。因为…

Linux上如何一键安装软件?yum源是什么?Linux如何配置yum源?

这几个问题是Linux操作的入门问题&#xff0c;但是确实也会让刚上手Linux小伙伴头疼一阵&#xff0c;故特有此文&#xff0c;希望能对刚入门的小伙伴有一些帮助~ 众所周知 在linux上在线安装软件需要用到yum命令&#xff0c;经常下述命令来安装 yum install [-y] 包名 #-y的…

自动化测试数据校验神器!

在做接口自动化测试时&#xff0c;经常需要从接口响应返回体中提取指定数据进行断言校验。 今天给大家推荐一款json数据提取神器: jsonpath jsonpath和常规的json有哪些区别呢&#xff1f;在Python中&#xff0c;json是用于处理JSON数据的内置模块&#xff0c;而jsonpath是用…

Python基础知识:整理12 JSON数据格式的转换

首先导入python中的内置包json import json 1 准备一个列表&#xff0c;列表内每个元素都是字典&#xff0c;将其转换为JSON 使用json.dumps()方法 data [{"name": "John", "age": 30}, {"name": "Jane", "age":…

Wpf 使用 Prism 实战开发Day11

仓储&#xff08;Repository&#xff09;/工作单元&#xff08;Unit Of Work&#xff09;模式 仓储&#xff08;rep&#xff09;:仓储接口定义了对实体类访问数据库及操作的方法。它统一管理数据访问的逻辑&#xff0c;并与业务逻辑层进行解耦。 简单的理解就是对访问数据库的一…

百家大吉·夕阳关爱——昌岗街微型养老博览会

居民热情参与博览会 为让长者了解及选择适合自己的养老服务&#xff0c;昌岗街在2023年12月27日开展以“百家大吉夕阳关爱”为主题的昌岗街微型养老服务公益博览会活动&#xff0c;通过搭建养老服务机构供需服务平台&#xff0c;拓宽社区长者了解正规养老服务机构的渠道&#…

视频转码:掌握mp4视频格式转FLV视频的技巧,视频批量剪辑方法

在多媒体时代&#xff0c;视频格式的转换成为一种常见的需求。把MP4格式转换为FLV格式&#xff0c;FLV格式的视频文件通常具有较小的文件大小&#xff0c;同时保持了较好的视频质量。批量剪辑视频的方法能大大提高工作效率。下面来看云炫AI智剪如何进行MP4到FLV的转码&#xff…

写在学习webkit过程的前面

webkit起源于KHTML&#xff0c;是KDE开源项目的KHTML和KJS引擎的一部分。在它的诞生和发展过程中&#xff0c;由两家著名的公司参与开发过程中&#xff0c;造成两次裂变。诞生两个内核webkit和blink&#xff0c;并发展和产生了两个主流的浏览器&#xff0c;分别为safari和chrom…

Unity网络通讯学习

---部分截图来自 siki学院Unity网络通讯课程 Socket 网络上的两个程序通过一个双向的通信连接实现数据交换&#xff0c;这个连接的一端称为一个 Socket &#xff0c;Socket 包含了网络通信必须的五种信息 Socket 例子{ 协议&#xff1a; TCP 本地&#xff1a; IP &#xff…

【Linux Shell】9. 流程控制

文章目录 【 1. if else 判断 】1.1 if1.2 if else1.3 if elif else1.4 实例 【 2. case 匹配 】【 3. 循环 】3.1 for 循环3.2 while 循环3.3 until 循环3.4 无限循环3.5 跳出循环3.5.1 break 跳出所有循环3.5.2 continue 仅跳出当前循环 【 1. if else 判断 】 1.1 if fi 是…

【双指针】001移动零_C++

题目链接&#xff1a;移动零 目录 题目解析 代码书写 知识补充 题目解析 题目让我们求必须在不复制数组的情况下,编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 这题我们可以用双指针的方法来写&#xff1a; 我们这里将用两个数组下标来…

Linux tail命令详解和高级用法举例

目 录 一、概述 二、tail命令解释 1&#xff0e;命令格式; 2&#xff0e;功能 3&#xff0e;选项 4&#xff0e;选项的基本用法 &#xff08;1&#xff09; 显示行号 &#xff08;2&#xff09;忽略指定字符数 &#xff08;3&#xff09; 不显示文件名 三…

GVM垃圾收集算法

分代收集理论 目前主流JVM虚拟机中的垃圾收集器&#xff0c;都遵循分代收集理论&#xff1a; 弱分代&#xff1a;绝大多数对象都是朝生夕灭强分带&#xff1a;经历越多次垃圾收集过程的对象&#xff0c;越难以回收&#xff0c;难以消亡 按照分代收集理论设计的“分代垃圾收集…