Python实现代码,需要注意解算的yaw与车头的关系
import rospy
from sensor_msgs.msg import NavSatFix, Imu
from geometry_msgs.msg import PointStamped, QuaternionStamped
from nav_msgs.msg import Odometry
import numpy as np
from scipy.spatial.transform import Rotation
from threading import Lock
class GNSSIMUFusion:
def __init__(self):
rospy.init_node('gnss_imu_fusion', anonymous=True)
self.gnss_sub = rospy.Subscriber('/fix', NavSatFix, self.gnss_callback)
self.heading_sub = rospy.Subscriber("/heading", QuaternionStamped, self.gnss_heading_callback)
self.odom_pub = rospy.Publisher('/gnss_odom', Odometry, queue_size=10)
self.init_lla_pub = rospy.Publisher("/gnss_init_fix", PointStamped, queue_size=10)
self.frame_count = 0
self.gnss_init_fix_msg = None
self.initial_llat = np.array([0, 0, 0], dtype=np.float32)
self.initial_ecef_ = np.array([0, 0, 0], dtype=np.float32)
self.gnss_heading_lock = Lock()
self.last_heading_msg = None
self.last_gnss_msg = None
self.gnss_heading_msg_list = []
def gnss_callback(self, data):
assert type(data) is NavSatFix
if data.status.status != 2:
print("GNSS state:{}, Not Good ....".format(data.status))
return
average_num = 1
if self.frame_count < average_num:
self.frame_count += 1
self.initial_llat += np.array([data.latitude, data.longitude, data.altitude], dtype=np.float32)
print("累积平均值中...")
return None
if self.frame_count == average_num :
self.initial_llat /= self.frame_count
self.gnss_init_fix_msg = data
self.gnss_init_fix_msg.header.stamp = rospy.Time.now()
self.gnss_init_fix_msg.latitude, self.gnss_init_fix_msg.longitude, self.gnss_init_fix_msg.altitude = self.initial_llat
self.initial_ecef_ = np.asarray(self.lla_to_ecef(self.initial_llat[0], self.initial_llat[1], self.initial_llat[2]), dtype=np.float32)
self.frame_count += 1
current = [self.lla_to_enu(data.latitude, data.longitude, data.altitude)]
print("得到偏移量...: ",current)
return None
quat = np.array([self.last_heading_msg.quaternion.x, self.last_heading_msg.quaternion.y, self.last_heading_msg.quaternion.z, self.last_heading_msg.quaternion.w], dtype=np.float32)
yaw, roll, pitch = Rotation.from_quat(quat).as_euler("ZXY", degrees=False)
quat = Rotation.from_euler("ZXY", [-yaw + np.pi, 0, 0], degrees=False).as_quat()
quat_t = self.last_heading_msg.header.stamp.to_sec()
current_GNSS_XYZ = np.array(self.lla_to_enu(data.latitude, data.longitude, data.altitude), dtype=np.float32)
gnss_heading_quat = self.get_cur_gnss_observ_quat(data.header.stamp.to_sec())
if len(gnss_heading_quat) == 4:
x, y, z, w = gnss_heading_quat
current_GNSS_ORI = np.array([x, y, z, w], dtype=np.float32)
Vel = np.array([0, 0, 0], dtype=np.float32)
self.pub_imu_odom(current_GNSS_XYZ, Vel, current_GNSS_ORI, cur_IMU__msg=data)
def gnss_heading_callback(self, msg):
assert type(msg) is QuaternionStamped
self.gnss_heading_lock.acquire()
if len(self.gnss_heading_msg_list) > 100:
self.gnss_heading_msg_list.pop(0)
self.gnss_heading_msg_list.append(msg)
self.gnss_heading_lock.release()
self.last_heading_msg = msg
def get_cur_gnss_observ_quat(self, cur_time):
self.gnss_heading_lock.acquire()
if len(self.gnss_heading_msg_list) == 0:
return np.array([], dtype=np.float32)
time_bias = []
for i, msg in enumerate(self.gnss_heading_msg_list) :
assert type(msg) is QuaternionStamped
time_bias.append( abs(msg.header.stamp.to_sec() - cur_time) )
min_time_index = time_bias.index(min(time_bias))
print(min(time_bias))
min_time_msg = self.gnss_heading_msg_list[min_time_index]
for i in range(min_time_index):
self.gnss_heading_msg_list.pop(0)
self.gnss_heading_lock.release()
msg = min_time_msg
quat = np.array([msg.quaternion.x, msg.quaternion.y, msg.quaternion.z, msg.quaternion.w],dtype=np.float32)
yaw, roll, pitch = Rotation.from_quat(quat).as_euler("ZXY", degrees=False)
quat = Rotation.from_euler("ZXY", [-yaw + np.pi, 0, 0], degrees=False).as_quat()
return quat
def pub_imu_odom(self, P, V, Q, cur_IMU__msg):
msg = cur_IMU__msg
odom_msg = Odometry()
odom_msg.header.stamp = rospy.Time.now()
odom_msg.header.frame_id = "map"
odom_msg.child_frame_id = msg.header.frame_id
odom_msg.pose.pose.position.x = P[0]
odom_msg.pose.pose.position.y = P[1]
odom_msg.pose.pose.position.z = P[2]
odom_msg.pose.pose.orientation.x = Q[0]
odom_msg.pose.pose.orientation.y = Q[1]
odom_msg.pose.pose.orientation.z = Q[2]
odom_msg.pose.pose.orientation.w = Q[3]
odom_msg.twist.twist.linear.x = V[0]
odom_msg.twist.twist.linear.y = V[1]
odom_msg.twist.twist.linear.z = V[2]
odom_msg.twist.twist.angular.x = 0
odom_msg.twist.twist.angular.y = 0
odom_msg.twist.twist.angular.z = 0
self.odom_pub.publish(odom_msg)
def lla_to_ecef(self, lat, lon, alt):
equatorial_radius = 6378137.0
polar_radius = 6356752.31424518
square_ratio = np.power(polar_radius, 2) / np.power(equatorial_radius, 2)
lat = lat * np.pi / 180
lon = lon * np.pi / 180
alt = alt
N = equatorial_radius / np.sqrt(1 - (1 - square_ratio) * np.power(np.sin(lat), 2))
z = (square_ratio * N + alt) * np.sin(lat)
q = (N + alt) * np.cos(lat)
x = q * np.cos(lon)
y = q * np.sin(lon)
return x, y, z
def lla_to_enu(self, lat, lon, alt):
cur_ecef = self.lla_to_ecef(lat, lon, alt)
r_ecef = cur_ecef - self.initial_ecef_
phi = self.initial_llat[0] * np.pi / 180
lam = self.initial_llat[1] * np.pi / 180
R = np.asarray([ [-np.sin(lam), np.cos(lam), 0],
[-np.cos(lam) * np.sin(phi), -np.sin(lam) * np.sin(phi), np.cos(phi)],
[ np.cos(lam) * np.cos(phi), np.sin(lam) * np.cos(phi), np.sin(phi)]], dtype=np.float32)
return R @ r_ecef
if __name__ == '__main__':
try:
fusion_node = GNSSIMUFusion()
rospy.spin()
except rospy.ROSInterruptException:
pass