Cartographer的后端优化是借用SPA(Sparse Pose Adjustment)优化算法的思想. 其主要步骤如下:
- 确定两个节点在global坐标系下的相对位姿变换.
- 通过其它方式再次获取这两个节点的相对位姿变化
- 对这两个相对位姿变换的差的最小二乘问题进行求解
- 进行求解后得到一个增量
Δ
x
\Delta{x}
Δx, 将当前位姿加上这个增量就得到了优化后的位姿
Cartographer使用的是ceres进行位姿图优化. 值得注意的是, global坐标系到现在(提到后端部分)才出现, 前端都是在local坐标系下做的.
我理解在第一次后端前, local坐标系和global坐标系是重合的, 只有进入过后端之后, local和global坐标系才会有偏移. 经过测试, local坐标系在rviz中是map, global中是odom.
1. Cartographer后端约束类型
根据SPA论文, 第一个约束就是node位姿在global坐标系下的相对坐标变换, 也就是
T
10
T_{10}
T10
Cartographer中设置了其他的5种第二个约束和残差, 共同进行优化
- 第一种残差 将节点(tracking的位姿)与节点(子图原点位姿)在global坐标系下的相对位姿 与 约束(包含子图内约束与子图间约束) 的差值作为残差项.
- landmark数据 与 通过2个节点位姿插值出来的相对位姿 的差值作为残差项
- 节点与节点间在global坐标系下的相对坐标变换 与 通过里程计数据插值出的相对坐标变换 的差值作为残差项
- 节点与节点间在global坐标系下的相对坐标变换 与 相邻2个节点在local坐标系下的相对坐标变换的差值作为残差项
- 节点与gps坐标系原点在global坐标系下的相对坐标变换 与 通过gps数据进行插值得到的相对坐标变换 的差值作为残差项
接下来结合代码看看每种残差是怎么构建的
2. 后端数据的传递
在我之前写的博客<MapBuilder的声明与构造>中, 详细说明了前端后端如何被调用起来的. map_builder调用collated_trajectory_builder, collated_trajectory_builder再调用GlobalTrajectory和Sensor这两个类, GlobalTrajectory再调用LocalTrajectory和PoseGraph这两个类. 进入到global_trajectory_builder.cc中看看. 这里面的核心函数就是AddSensorData. AddSensorData重载了很多次, 每个传感器数据类型就是一个重载. 咱们还是以最重要的点云数据为例子:
/**
* @brief 点云数据的处理, 先进行扫描匹配, 然后将扫描匹配的结果当做节点插入到后端的位姿图中
*
* @param[in] sensor_id topic名字
* @param[in] timed_point_cloud_data 点云数据
*/
void AddSensorData(
const std::string& sensor_id,
const sensor::TimedPointCloudData& timed_point_cloud_data) override {
CHECK(local_trajectory_builder_)
<< "Cannot add TimedPointCloudData without a LocalTrajectoryBuilder.";
// 进行前端(扫描匹配), 返回匹配后的结果
std::unique_ptr<typename LocalTrajectoryBuilder::MatchingResult> //local_trajectory_builder_Xd.h中定义MatchingResult
matching_result = local_trajectory_builder_->AddRangeData(
sensor_id, timed_point_cloud_data);
if (matching_result == nullptr) {
// The range data has not been fully accumulated yet.
return;
}
kLocalSlamMatchingResults->Increment();
std::unique_ptr<InsertionResult> insertion_result; //TrajectoryBuilderInterface的InsertionResult,不是local_trajectory_builder的
// matching_result->insertion_result 的类型是 LocalTrajectoryBuilder2D::InsertionResult
// 如果雷达成功插入到地图中
if (matching_result->insertion_result != nullptr) {
kLocalSlamInsertionResults->Increment();
// 将前端扫描匹配后的结果 当做节点 加入到位姿图中
auto node_id = pose_graph_->AddNode(
matching_result->insertion_result->constant_data, trajectory_id_,
matching_result->insertion_result->insertion_submaps);
CHECK_EQ(node_id.trajectory_id, trajectory_id_);
// 这里的InsertionResult的类型是 TrajectoryBuilderInterface::InsertionResult
insertion_result = absl::make_unique<InsertionResult>(InsertionResult{
node_id,
matching_result->insertion_result->constant_data,
std::vector<std::shared_ptr<const Submap>>(
matching_result->insertion_result->insertion_submaps.begin(),
matching_result->insertion_result->insertion_submaps.end())});
}
// 将结果数据传入回调函数中, 进行保存
if (local_slam_result_callback_) {
local_slam_result_callback_(
trajectory_id_, matching_result->time, matching_result->local_pose,
std::move(matching_result->range_data_in_local),
std::move(insertion_result));
}
}
很容易看到, AddSensorData调用了前端的扫描匹配, 然后将前端扫描匹配后的结果当做节点加入到位姿图中(pose_graph的AddNode), 通过pose_graph的AddNode相关的操作去完成后端数据的处理.
对于上面的步骤, 各个传感器都是一样的: 某些传感器需要插入前端的先插入前端(比如IMU, 里程计和点云), 不需要插入前端的, 比如landmark和GPS还有前端定位结果, 就直接调用后端的AddXXXData.
还需要提一下的是, 对于点云数据, 还通过local_slam_result_callback_这个变量把前端匹配的结果作为回调函数与MapBuilderBridge::AddTrajectory进行联系, 调用了MapBuilderBridge的OnlocalSlamResult, 作用是保存前端local slam的结果, 而这个保存的前端结果将会被Node::PublishLocalTrajectoryData把位姿发出去用于可视化等.
上面提到了后端的实现都是在pose_graph中, 通过AddXXXData把数据加入到pose_graph中实现.
/**
* @brief 增加节点, 并计算跟这个节点相关的约束,在global_trajectory_builder中调用
*
* @param[in] constant_data 节点信息
* @param[in] trajectory_id 轨迹id
* @param[in] insertion_submaps 子地图 active_submaps
* @return NodeId 返回节点的ID
*/
NodeId PoseGraph2D::AddNode(
std::shared_ptr<const TrajectoryNode::Data> constant_data, //constant_data: 时间,点云和local下的pose
const int trajectory_id,
const std::vector<std::shared_ptr<const Submap2D>>& insertion_submaps) {
// 将节点在local坐标系下的坐标转成global坐标系下的坐标
const transform::Rigid3d optimized_pose(
GetLocalToGlobalTransform(trajectory_id) * constant_data->local_pose);
// 向节点列表加入节点,并得到节点的id
const NodeId node_id = AppendNode(constant_data, trajectory_id,
insertion_submaps, optimized_pose);
// We have to check this here, because it might have changed by the time we
// execute the lambda.
// 获取第一个submap是否是完成状态
const bool newly_finished_submap =
insertion_submaps.front()->insertion_finished();
// 把计算约束的工作放入workitem中等待执行
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
return ComputeConstraintsForNode(node_id, insertion_submaps,
newly_finished_submap);
});
return node_id;
}
第一步先得到节点在global坐标系下的坐标, 设为变量optimized_pose. 方法就是简单的坐标转换
第二步, 添加节点, 并保存新生成的Submap调用的是AppendNode. 看看AppendNode这个函数
/**
* @brief 向节点列表中添加一个新的节点, 并保存新生成的submap
*
* @param[in] constant_data 节点数据的指针
* @param[in] trajectory_id 轨迹id
* @param[in] insertion_submaps 子地图指针的vector
* @param[in] optimized_pose 当前节点在global坐标系下的坐标
* @return NodeId 返回新生成的节点id
*/
NodeId PoseGraph2D::AppendNode(
std::shared_ptr<const TrajectoryNode::Data> constant_data,
const int trajectory_id,
const std::vector<std::shared_ptr<const Submap2D>>& insertion_submaps,
const transform::Rigid3d& optimized_pose) {
absl::MutexLock locker(&mutex_);
// 如果轨迹不存在, 则将轨迹添加到连接状态里并添加采样器
AddTrajectoryIfNeeded(trajectory_id);
// 根据轨迹状态判断是否可以添加任务
if (!CanAddWorkItemModifying(trajectory_id)) {
LOG(WARNING) << "AddNode was called for finished or deleted trajectory.";
}
// 向节点列表中添加一个新的节点
const NodeId node_id = data_.trajectory_nodes.Append(
trajectory_id, TrajectoryNode{constant_data, optimized_pose});
// 节点总个数加1
++data_.num_trajectory_nodes;
// Test if the 'insertion_submap.back()' is one we never saw before.
// 如果是刚开始的轨迹, 或者insertion_submaps.back()是第一次看到, 就添加新的子图
if (data_.submap_data.SizeOfTrajectoryOrZero(trajectory_id) == 0 ||
std::prev(data_.submap_data.EndOfTrajectory(trajectory_id))
->data.submap != insertion_submaps.back()) {
// We grow 'data_.submap_data' as needed. This code assumes that the first
// time we see a new submap is as 'insertion_submaps.back()'.
// 如果insertion_submaps.back()是第一次看到, 也就是新生成的
// 在data_.submap_data中加入一个空的InternalSubmapData
const SubmapId submap_id =
data_.submap_data.Append(trajectory_id, InternalSubmapData());
// 保存后边的地图, 将后边的地图的指针赋值过去
// 地图是刚生成的, 但是地图会在前端部分通过插入点云数据进行更新, 这里只保存指针
data_.submap_data.at(submap_id).submap = insertion_submaps.back();
LOG(INFO) << "Inserted submap " << submap_id << ".";
kActiveSubmapsMetric->Increment();
}
return node_id;
}
这个函数的重点是添加新的节点, 新子图的生成与添加新的子图数据.
data_这个变量是PoseGraphData的实例化, 里面包含了submap_data(子图数据)与trajectory_nodes(节点数据). 具体的内容在pose_graph_data.h中定义. 通过调用trajectory_nodes.submap_data和submap_data.Append完成节点数据和子图数据的添加.
什么时候添加子图数据呢?
if (data_.submap_data.SizeOfTrajectoryOrZero(trajectory_id) == 0 ||
std::prev(data_.submap_data.EndOfTrajectory(trajectory_id))
->data.submap != insertion_submaps.back())
这块意思是, 如果该轨迹的子图个数为0, 或者需要维护的Submap的id与最新添加的Submap的id不一样, 就说明需要新加一个Submap.
子图到底是怎么添加的? 这个需要结合子图维护部分看, 在之前的博客中提到过: 子图添加node, 达到一定数量的node后, 新建新的子图, 同时往新的子图和旧的子图里添加node, 当新的子图的node数量达到一定数量后, 就删除旧的子图, 并新建新的子图, 一直重复下去…
完成的子图和未完成的子图的约束计算是不一样的, 所以用了newly_finished_submap作为标志位去区分.
再看看任务处理部分:
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
return ComputeConstraintsForNode(node_id, insertion_submaps,
newly_finished_submap);
AddWorkItem传入了一个函数, 用来放在线程池中进行执行, 这一块在后面线程处理部分再说. 咱们先看看ComputeConstraintsForNode这个函数
/**
* @brief 保存节点, 计算子图内约束, 查找回环
*
* @param[in] node_id 刚加入的节点ID
* @param[in] insertion_submaps active_submaps
* @param[in] newly_finished_submap 是否是新finished的submap
* @return WorkItem::Result 是否需要执行全局优化
*/
WorkItem::Result PoseGraph2D::ComputeConstraintsForNode(
const NodeId& node_id,
std::vector<std::shared_ptr<const Submap2D>> insertion_submaps,
const bool newly_finished_submap) {
std::vector<SubmapId> submap_ids; // 活跃状态下的子图的id
std::vector<SubmapId> finished_submap_ids; // 处于完成状态的子图id的集合
std::set<NodeId> newly_finished_submap_node_ids; // 刚刚完成的子图对应的节点id
// 保存节点与计算子图内约束
{
absl::MutexLock locker(&mutex_);
// 获取节点信息数据
const auto& constant_data =
data_.trajectory_nodes.at(node_id).constant_data;
// 获取 trajectory_id 下的正处于活跃状态下的子图的SubmapId
submap_ids = InitializeGlobalSubmapPoses(
node_id.trajectory_id, constant_data->time, insertion_submaps);
CHECK_EQ(submap_ids.size(), insertion_submaps.size());
// 获取这两个submap中前一个的id
const SubmapId matching_id = submap_ids.front();
// 计算该Node投影到平面后的位姿 gravity_alignment是机器人的姿态
const transform::Rigid2d local_pose_2d =
transform::Project2D(constant_data->local_pose * // 三维转平面
transform::Rigid3d::Rotation(
constant_data->gravity_alignment.inverse()));
// 计算该Node在global坐标系下的二维位姿
// global_pose * constraints::ComputeSubmapPose().inverse() = globla指向local的坐标变换
const transform::Rigid2d global_pose_2d =
optimization_problem_->submap_data().at(matching_id).global_pose *
constraints::ComputeSubmapPose(*insertion_submaps.front()).inverse() *
local_pose_2d;
// 把该节点的信息加入到OptimizationProblem中
optimization_problem_->AddTrajectoryNode(
matching_id.trajectory_id,
optimization::NodeSpec2D{constant_data->time, local_pose_2d,
global_pose_2d,
constant_data->gravity_alignment});
// 遍历2个子图, 将节点加入子图的节点列表中, 计算子图原点与及节点间的约束(子图内约束)
for (size_t i = 0; i < insertion_submaps.size(); ++i) {
const SubmapId submap_id = submap_ids[i];
// Even if this was the last node added to 'submap_id', the submap will
// only be marked as finished in 'data_.submap_data' further below.
CHECK(data_.submap_data.at(submap_id).state ==
SubmapState::kNoConstraintSearch);
// 将node_id放到子图保存的node_ids的set中
data_.submap_data.at(submap_id).node_ids.emplace(node_id);
// 计算 子图原点 指向 node坐标 间的坐标变换(子图内约束)
const transform::Rigid2d constraint_transform =
constraints::ComputeSubmapPose(*insertion_submaps[i]).inverse() *
local_pose_2d;
// 新生成的 子图内约束 放入容器中
data_.constraints.push_back(
Constraint{submap_id,
node_id,
{transform::Embed3D(constraint_transform),
options_.matcher_translation_weight(),
options_.matcher_rotation_weight()},
Constraint::INTRA_SUBMAP}); // 子图内约束
} // end for
// TODO(gaschler): Consider not searching for constraints against
// trajectories scheduled for deletion.
// TODO(danielsievers): Add a member variable and avoid having to copy
// them out here.
// 找到所有已经标记为kFinished状态的submap的id
for (const auto& submap_id_data : data_.submap_data) {
if (submap_id_data.data.state == SubmapState::kFinished) {
CHECK_EQ(submap_id_data.data.node_ids.count(node_id), 0);
finished_submap_ids.emplace_back(submap_id_data.id);
}
}
// 如果是刚刚finished的submap
if (newly_finished_submap) {
const SubmapId newly_finished_submap_id = submap_ids.front();
InternalSubmapData& finished_submap_data =
data_.submap_data.at(newly_finished_submap_id);
// 检查它还是不是kNoConstraintSearch
CHECK(finished_submap_data.state == SubmapState::kNoConstraintSearch);
// 把它设置成kFinished
finished_submap_data.state = SubmapState::kFinished;
// 刚结束的这个子图里包含的所有节点
newly_finished_submap_node_ids = finished_submap_data.node_ids;
}
} // end {}
// Step: 当前节点与所有已经完成的子图进行约束的计算---实际上就是回环检测
for (const auto& submap_id : finished_submap_ids) {
// 计算旧的submap和新的节点间的约束
ComputeConstraint(node_id, submap_id);
}
// Step: 计算所有节点与刚完成子图间的约束---实际上就是回环检测
if (newly_finished_submap) {
const SubmapId newly_finished_submap_id = submap_ids.front();
// We have a new completed submap, so we look into adding constraints for
// old nodes.
for (const auto& node_id_data : optimization_problem_->node_data()) {
const NodeId& node_id = node_id_data.id;
// 刚结束的子图内部的节点, 不再与这个子图进行约束的计算
if (newly_finished_submap_node_ids.count(node_id) == 0) {
// 计算新的submap和旧的节点间的约束
ComputeConstraint(node_id, newly_finished_submap_id);
}
}
}
// 结束构建约束
constraint_builder_.NotifyEndOfNode();
absl::MutexLock locker(&mutex_);
++num_nodes_since_last_loop_closure_;
// Step: 插入的节点数大于optimize_every_n_nodes时执行一次优化
// optimize_every_n_nodes = 0 时不进行优化, 这样就可以单独分析前端的效果
if (options_.optimize_every_n_nodes() > 0 && // param: optimize_every_n_nodes
num_nodes_since_last_loop_closure_ > options_.optimize_every_n_nodes()) {
// 正在建图时只有这一块会返回 执行优化
return WorkItem::Result::kRunOptimization;
}
return WorkItem::Result::kDoNotRunOptimization;
}
图优化问题是分为两个部分的, 一个是约束的构建, 第二个就是优化求最优解.
Cartographer是通过
- 添加新节点时, 当前节点和所有已完成子图进行约束构建
- 添加新子图时, 当前子图和所有节点进行约束构建
和图优化一样, 我们把待优化称为节点, 约束称为边. 通过optimization_problem_->AddTrajectoryNode把节点, 也就是每个node的位姿, 添加到优化问题. 然后添加子图内的约束(子图原点与node之间的位姿约束), 然后按照是不是新子图调用ComputeConstraint来计算约束.
再看看ComputeConstraint这个函数
/**
* @brief 进行子图间约束计算, 也可以说成是回环检测
*
* @param[in] node_id 节点的id
* @param[in] submap_id submap的id
*/
void PoseGraph2D::ComputeConstraint(const NodeId &node_id,
const SubmapId &submap_id) {
bool maybe_add_local_constraint = false;
bool maybe_add_global_constraint = false;
const TrajectoryNode::Data *constant_data;
const Submap2D *submap;
{
absl::MutexLock locker(&mutex_);
CHECK(data_.submap_data.at(submap_id).state == SubmapState::kFinished);
// 如果是未完成状态的地图不进行约束计算
if (!data_.submap_data.at(submap_id).submap->insertion_finished()) {
// Uplink server only receives grids when they are finished, so skip
// constraint search before that.
return;
}
// 获取该 node 和该 submap 中的 node 中较新的时间
const common::Time node_time = GetLatestNodeTime(node_id, submap_id);
// 两个轨迹的最后连接时间
const common::Time last_connection_time =
data_.trajectory_connectivity_state.LastConnectionTime(
node_id.trajectory_id, submap_id.trajectory_id);
// 如果节点和子图属于同一轨迹, 或者时间小于阈值
// 则只需进行 局部搜索窗口 的约束计算(对局部子图进行回环检测)
if (node_id.trajectory_id == submap_id.trajectory_id ||
node_time <
last_connection_time +
common::FromSeconds(
options_.global_constraint_search_after_n_seconds())) {
// If the node and the submap belong to the same trajectory or if
// there has been a recent global constraint that ties that node's
// trajectory to the submap's trajectory, it suffices to do a match
// constrained to a local search window.
maybe_add_local_constraint = true;
}
// 如果节点与子图不属于同一条轨迹 并且 间隔了一段时间, 同时采样器为true
// 才进行 全局搜索窗口 的约束计算(对整体子图进行回环检测)
else if (global_localization_samplers_[node_id.trajectory_id]
->Pulse()) {
maybe_add_global_constraint = true;
}
// 获取节点信息数据与地图数据
constant_data = data_.trajectory_nodes.at(node_id).constant_data.get();
submap = static_cast<const Submap2D *>(
data_.submap_data.at(submap_id).submap.get());
} // end {}
// 建图时只会执行这块, 通过局部搜索进行回环检测
if (maybe_add_local_constraint) {
// 计算约束的先验估计值
// submap原点在global坐标系下的坐标的逆 * 节点在global坐标系下的坐标 =
// submap原点指向节点的坐标变换
const transform::Rigid2d initial_relative_pose =
optimization_problem_->submap_data()
.at(submap_id)
.global_pose.inverse() *
optimization_problem_->node_data().at(node_id).global_pose_2d;
// 进行局部搜索窗口 的约束计算 (对局部子图进行回环检测)
constraint_builder_.MaybeAddConstraint(
submap_id, submap, node_id, constant_data, initial_relative_pose);
}
// 定位时才有可能执行这块
else if (maybe_add_global_constraint) {
// 全局搜索窗口 的约束计算 (对整体子图进行回环检测)
constraint_builder_.MaybeAddGlobalConstraint(submap_id, submap, node_id,
constant_data);
}
}
这个函数是在ComputeConstraintsForNode中被for循环调用, 至少执行当前子图个数的次数, 可以想象每次优化的时候被调用很多次的.
对于次要的数据, 比如Odometry的data, 判断轨迹状态可以添加数据后直接调用optimization_problem_->AddXXXData, 把数据添加到optimization_problem中的XXX_data_中, 作为优化约束. 返回值告诉任务队列不要优化
// 将 把里程计数据加入到优化问题中 这个任务放入到任务队列中
void PoseGraph2D::AddOdometryData(const int trajectory_id,
const sensor::OdometryData &odometry_data) {
AddWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddOdometryData(trajectory_id,
odometry_data);
}
return WorkItem::Result::kDoNotRunOptimization;
});
}
比较特殊的是landmark_data, 因为landmark包含的数据类型比较特殊, 不能像IMU和odom以及GPS一样直接得到位姿信息, 所以不是在optimization_problem_2d.cc中添加数据到相应的data_, 而是在pose_graph_2d.cc中直接添加数据, 然后在optimization_problem_2d.cc中执行优化
3. 总结
进入到后端, Cartographer在pose_graph中实现了约束与节点的添加, 在optimization_problem中实现优化. 约束的构建方法由于传感器类型不同, 也多种多样, 不同的构建方法实现了相互的约束, 所以可以实现整体的优化.