首先说明一下,rosbag的解压方式取决于之前的压缩方式,数据的格式和工具使用的方法有很大关系。当前数据类型是 sensor_msgs/CompressedImage 。 处理数据的topic为 /cam_front_center/csi_cam/image_raw/compressed 当前提取是前置摄像头图像数据。
可以通过安装,查看rosbag信息。
sudo apt install python3-rosbag
使用命令:
rosbag info 2022-08-10-17-00-54.bag
提取脚本:
# -*- encoding: utf-8 -*-
"""
@File : ros2Image.py
@Contact : kequan
@License : (C)Copyright 2023
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
10/23/23 5:05 PM kequan 1.0 None
"""
# -*- encoding: utf-8 -*-
import roslib;
import rosbag
import rospy
import cv2
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from cv_bridge import CvBridgeError
import numpy as np
MJPEG_VIDEO = 1
RAWIMAGE_VIDEO = 2
VIDEO_CONVERTER_TO_USE = "ffmpeg" # or you may want to use "avconv"
path = '' # 存放图片的位置
class ImageCreator():
def __init__(self):
self.bridge = CvBridge()
self.currentTopic = "/cam_front_center/csi_cam/image_raw/compressed"
self.opt_display_images = True
self.opt_verbose = False
self.p_avconv = {}
self.filename = '/2022-08-10-17-00-54.bag'
self.cv_image = []
with rosbag.Bag(self.filename, 'r') as bag: # 要读取的bag文件;
for topic, msg, t in bag.read_messages(connection_filter=self.filter_image_msgs, topics=self.currentTopic):
try:
cv_image = self.bridge.compressed_imgmsg_to_cv2(msg, "bgr8")
except CvBridgeError as e:
print(e)
# timestr = "%.6f" % msg.header.stamp.to_sec()
timestr = str(msg.header.stamp)
# %.6f表示小数点后带有6位,可根据精确度需要修改;
image_name = timestr + ".jpg" # 图像命名:时间戳.jpg
cv2.imwrite(path + image_name, cv_image) # 保存;
bag.close()
# filter messages using type or only the opic we whant from the 'topic' argument
def filter_image_msgs(self, topic, datatype, md5sum, msg_def, header):
if (datatype == "sensor_msgs/CompressedImage"):
if (self.currentTopic != "" and self.currentTopic == topic) or self.currentTopic == "":
print("############# COMPRESSED IMAGE ######################")
print(topic, ' with datatype:', str(datatype))
print()
return True
if (datatype == "theora_image_transport/Packet"):
if (self.currentTopic != "" and self.currentTopic == topic) or self.currentTopic == "":
print(topic, ' with datatype:', str(datatype))
print('!!! theora is not supported, sorry !!!')
return False
if (datatype == "sensor_msgs/Image"):
if (self.currentTopic != "" and self.currentTopic == topic) or self.currentTopic == "":
print("############# UNCOMPRESSED IMAGE ######################")
print(topic, ' with datatype:', str(datatype))
print()
return True
return False
if __name__ == '__main__':
# rospy.init_node(PKG)
try:
image_creator = ImageCreator()
except rospy.ROSInterruptException:
pass
提取数据如图:
通常做视觉slam需要对提取图像数据 和 其它数据左对齐。
当前是进行车道线场景重建,和车道线时间戳进行对齐。对齐脚本如下,
# -*- encoding: utf-8 -*-
"""
@File : alignmentPictureAndTime.py
@Contact : kequan
@License : (C)Copyright 2023
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
10/23/23 5:20 PM kequan 1.0 None
"""
import cv2
import os
import json
import shutil
"""
获取到某个文件中所有bag文件,并返回其路径列表
"""
def find_bag_files(folder):
bag_files = []
for root, dirs, files in os.walk(folder):
for file in files:
if file.endswith(".jpg"):
# 获取.bag文件路径
bag_file_path = os.path.join(root, file)
# 将.bag文件路径添加到列表
bag_files.append(bag_file_path)
return bag_files
def renamePictureNameAndAlign(timeJson,pictureFile,targer_path):
#做一个文件路经和时间戳映射
mapping = []
pic_origin_path = os.path.dirname(os.path.abspath(pictureFile[0]))
for pic in pictureFile:
key = pic.split('/')[-1].split('.jpg')[0]
mapping.append(int(key))
# picIndex = 1
#开始查找和复制
for frame in timeJson:
timeStamp = int(frame['timestamp'])
picIndex = 1
while picIndex < len(mapping):
#在图像中找到对应帧
if timeStamp >= mapping[picIndex-1] and timeStamp <= mapping[picIndex]:
#当时间戳在 前一帧 和 后一帧 之间时,使用前一帧图像数据作为目标图像,命名,并保存
#原图像Path
sourcePath = pic_origin_path +f'/{mapping[picIndex-1]}.jpg'
#目标Path
targetPath = targer_path + f'{timeStamp}.jpg'
shutil.copy(sourcePath,targetPath)
break
picIndex = picIndex + 1
# if timeStamp < mapping[picIndex-1]:
# picIndex = picIndex + 1
#在pictureFile 找最近的一张图像,重命名,并将数据复制到新文件夹
if __name__ == "__main__":
origin_path = ""
targer_path = ''
#获取到车道线时间戳
gnssJson = '/gnssJson.json'
with open(gnssJson,'r+') as fp :
gnssTime = json.load(fp)
#获取到所有图片
picture = find_bag_files(origin_path)
renamePictureNameAndAlign(gnssTime['gnssFrames'],picture,targer_path)