W X

在上一篇文章中,对 src/cartographer/cartographer/sensor/internal/ordered_multi_queue.cc 中的 class OrderedMultiQueue 还有好些函数没有进行讲解。回顾一下 ordered_multi_queue.h 这个头文件,可以看到之前提及到的结构体

  struct Queue {
    common::BlockingQueue<std::unique_ptr<Data>> queue;   // 存储数据的队列
    Callback callback;                                    // 本数据队列对应的回调函数
    bool finished = false;                                // 这个queue是否finished

那么先来看看 BlockingQueue 的实现,位于 src/cartographer/cartographer/common/internal/blocking_queue.h 文件中。在 src 文件中存在 src/cartographer 与 src/cartographer_ros 两个文件夹,其实 src/cartographer 个独立的工程, src/cartographer_ros 是数据格式进行转换,然后加载到 src/cartographer 的数据队列中即可。也就是说 src/cartographer_ros 类似一个中转站,把传感器数据整理好之后交给 src/cartographer,src/cartographer 就可以正常运转了。

BlockingQueue 其就是一个阻塞队列,也就是生产者与消费者的红线,通过他把 生产者(src/cartographer_ros) 与 消费者( src/cartographer) 联系到了一起,下面简单描述一下:

// A thread-safe blocking queue that is useful for producer/consumer patterns.
// 'T' must be movable.
// 一个线程安全的阻塞队列, 对 生产者/消费者模式 很有用.
  为什么要使用生产者消费者模式, 顺序执行不就可以了吗?生产者消费者到底有什么意义?
  生产者和消费者之间不直接依赖, 通过缓冲区通讯, 将两个类之间的耦合度降到最低。
  并发 (异步)
  生产者直接调用消费者, 两者是同步(阻塞)的, 如果消费者吞吐数据很慢, 这时候生产者白白浪费大好时光。
  而使用这种模式之后, 生产者将数据丢到缓冲区, 继续生产, 完全不依赖消费者, 程序执行效率会大大提高。
  复用:通过将生产者类和消费者类独立开来, 可以对生产者类和消费者类进行独立的复用与扩展

二、class BlockingQueue 函数分析

(5) BlockingQueue::PopWithTimeout 其功能与 BlockingQueue::Pop() 比较类似,只是他的阻塞是有时间限定的,如果超过了限定的时间,队列中依旧没有数据可以取出,则放回一个 nullptr 指针。
 

三、class BlockingQueue 代码注释

代码于 src/cartographer/cartographer/common/internal/blocking_queue.h 中实现

template <typename T>
class BlockingQueue {
 public:
  static constexpr size_t kInfiniteQueueSize = 0;
  // Constructs a blocking queue with infinite queue size.
  // 构造一个具有无限队列大小的阻塞队列
  //注意这里的默认构造函数调用了带一个参数的构造函数
  BlockingQueue() : BlockingQueue(kInfiniteQueueSize) {}
  BlockingQueue(const BlockingQueue&) = delete; //禁用拷贝构造函数
  BlockingQueue& operator=(const BlockingQueue&) = delete; //禁用赋值符重载函数
  // Constructs a blocking queue with a size of 'queue_size'.
  // 构造一个大小为 queue_size 的阻塞队列,
  explicit BlockingQueue(const size_t queue_size) : queue_size_(queue_size) {}
  // Pushes a value onto the queue. Blocks if the queue is full.
  // 将值压入队列. 如果队列已满, 则阻塞
  void Push(T t) {
    // 首先定义判断函数
    // 因为QueueNotFullCondition()函数会判断队列大小
    //所以执行该函数时需要上锁,使用 EXCLUSIVE_LOCKS_REQUIRED 声明
    //如果调用该函数时没有上锁,则会报错
    const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
      return QueueNotFullCondition();
    // absl::Mutex的更多信息可看: https://www.jianshu.com/p/d2834abd6796
    // absl官网: https://abseil.io/about/
    // 如果数据满了, 就进行等待
    absl::MutexLock lock(&mutex_); //上锁
    mutex_.Await(absl::Condition(&predicate)); //阻塞等待
    // 将数据加入队列, 移动而非拷贝
    deque_.push_back(std::move(t));
  // Like push, but returns false if 'timeout' is reached.
  // 与Push()类似, 但是超时后返回false
  bool PushWithTimeout(T t, const common::Duration timeout) {
    const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
      return QueueNotFullCondition();
    absl::MutexLock lock(&mutex_);
    if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
                                 absl::FromChrono(timeout))) {
      return false;
    deque_.push_back(std::move(t));
    return true;
  // Pops the next value from the queue. Blocks until a value is available.
  // 取出数据, 如果数据队列为空则进行等待
  T Pop() {
    const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
      return !QueueEmptyCondition();
    // 等待直到数据队列中至少有一个数据
    absl::MutexLock lock(&mutex_);
    mutex_.Await(absl::Condition(&predicate));
    T t = std::move(deque_.front());
    deque_.pop_front();
    return t;
  // Like Pop, but can timeout. Returns nullptr in this case.
  // 与Pop()类似, 但是超时后返回nullptr
  T PopWithTimeout(const common::Duration timeout) {
    const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
      return !QueueEmptyCondition();
    absl::MutexLock lock(&mutex_);
    if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
                                 absl::FromChrono(timeout))) {
      return nullptr;
    T t = std::move(deque_.front());
    deque_.pop_front();
    return t;
  // Like Peek, but can timeout. Returns nullptr in this case.
  // 与Peek()类似, 但是超时后返回nullptr
  template <typename R>
  R* PeekWithTimeout(const common::Duration timeout) {
    const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
      return !QueueEmptyCondition();
    absl::MutexLock lock(&mutex_);
    if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
                                 absl::FromChrono(timeout))) {
      return nullptr;
    return deque_.front().get();
  // Returns the next value in the queue or nullptr if the queue is empty.
  // Maintains ownership. This assumes a member function get() that returns
  // a pointer to the given type R.
  // 返回第一个数据的指针, 如果队列为空则返回nullptr
  template <typename R>
  const R* Peek() {
    absl::MutexLock lock(&mutex_);
    if (deque_.empty()) {
      return nullptr;
    return deque_.front().get();
  // Returns the number of items currently in the queue.
  // 返回当前队列中的项目数
  size_t Size() {
    absl::MutexLock lock(&mutex_);
    return deque_.size();
  // Blocks until the queue is empty.
  // 等待直到队列为空
  void WaitUntilEmpty() {
    const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
      return QueueEmptyCondition();
    absl::MutexLock lock(&mutex_);
    mutex_.Await(absl::Condition(&predicate));
 private:
  // Returns true iff the queue is empty.
  // 如果队列为空, 则返回true
  bool QueueEmptyCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
    return deque_.empty();
  // Returns true iff the queue is not full.
  // 如果队列未满, 则返回true
  bool QueueNotFullCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
    return queue_size_ == kInfiniteQueueSize || deque_.size() < queue_size_;
  absl::Mutex mutex_;
  const size_t queue_size_ GUARDED_BY(mutex_);
  std::deque<T> deque_ GUARDED_BY(mutex_);

四、OrderedMultiQueue 函数分析

了解了 BlockingQueue() 之后,在继续上一篇的博客,对 OrderedMultiQueue 进行分析。在 OrderedMultiQueue::Add() 函数中,其会调用 it->second.queue.Push(std::move(data)) ,把数据 data 添加到与之对应的 trajectory_id topic 队列之中,然后调用 OrderedMultiQueue::Dispatch() 进行数据的分发。在讲解该函数之前,先来了解如下几个函数:

1、CannotMakeProgress()
// 标记queue_key为阻塞者,并按条件发布log,等等这个数据
void OrderedMultiQueue::CannotMakeProgress(const QueueKey& queue_key) {
  // 标记queue_key为阻塞者
  blocker_ = queue_key;
  for (auto& entry : queues_) {
    // queue_key对应的数据队列为空,而某一个传感器数据队列的数据已经大于kMaxQueueSize了
    // 有问题, 进行报错
    if (entry.second.queue.Size() > kMaxQueueSize) {
      // 在该语句第1、61、121……次被执行的时候, 记录日志信息
      LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key;
      // [ WARN] [1628516438.493835120, 1606808659.273453929]: W0809 21:40:38.000000 10662 ordered_multi_queue.cc:230] Queue waiting for data: (0, points2)
      // [ WARN] [1628516439.089736487, 1606808659.869309184]: W0809 21:40:39.000000 10662 ordered_multi_queue.cc:230] Queue waiting for data: (0, points2)
      return;

该函数一般是在队列为空的时候对用,假若队列 queue_key 为空,则会对其他所有队列进行遍历,查看一下有没有是否存在数据超过 kMaxQueueSize=500,如果有,则会信息打印进行警告,类似如下:

[ WARN] [1628516438.493835120, 1606808659.273453929]: W0809 21:40:38.000000 10662 ordered_multi_queue.cc:230] Queue waiting for data: (0, points2)

一般情况下,如果订阅的话题错了,或者话题发布者没有正常发送消息,则会出现该类打印。

2、CannotMakeProgress()
// 返回阻塞的队列的QueueKey
QueueKey OrderedMultiQueue::GetBlocker() const {
  CHECK(!queues_.empty());
  return blocker_;

如果目前有数据队列阻塞了,则该队列的 QueueKey 就会被保存在 OrderedMultiQueue::blocker_ 之中,调用该函数即可查看。

通过该篇博客,了解到了 BlockingQueue 与 OrderedMultiQueue 的成员函数,但是他们是怎么串联起来的,如何被调用的,或许不是很清楚。不过,没有关系,下一篇博客就会进行详细讲解,其主要设计的函数就是 OrderedMultiQueue::Dispatch()。

cartographer_learn_5_test续接前文OrderedMultiQueue相关的定义OrderedMultiQueue的类成员OrderedMultiQueue的类方法 在具体的讨论前后端前,觉得有必要讨论一下数据的处理和传输,即 unique_ptr< sensor::CollatorInterface > MapBuilder::sensor_collator_。看起来它的类型是sensor::CollatorInterface,但是F12进去后发现它仅仅是接口,
    参考定义见:backpack_3d.lua     Local map frame是一次slam过程中的原点。但是现在cartographer支持Incremental mapping。global map是整个地图的原点,local map是每一次建图的原点。     map_frame = “map”:cartograp... const Data* next_data = nullptr; Queue* next_queue = nullptr; QueueKey next_queue_key; // check data is coming or not //LOG(INFO) << "okagv enter forloop"; // must s
本次阅读的源码为 release-1.0 版本的代码 https://github.com/googlecartographer/cartographer_ros/tree/release-1.0 https://github.com/googlecartographer/cartographer/tree/release-1.0 也可以下载我上传的 全套工作空间的代码,包括 protobu...
我们知道了,GlobalTrajectoryBuilder 是 slam系统的顶层设计,该类中包含了两个成员变量:localTrajectoryBuilder 负责前端localslam ; PoseGraph 负责后端 (是从MapBuilder类中的posegraph传递过来的)。那么,传感器数据是如何流到GlobalTrajectoryBuilder从而被处理的呢? 一、基础类 OrderedMultiQueue 下面介绍 sensor::TrajectoryCollator 类 absl::f
0. 简介 在设计复杂的运行程序时,我们经常需要创建一定数量的线程,然而很多时候线程不都是一直执行的,会存在一些线程处于空闲状态。所以通过线程池的方式,可以有效的对线程进行分配。若线程池中有空闲线程,则从线程池中取出一个空闲的线程处理该任务,任务处理完后,该线程被放到线程池中;若线程池中无空闲线程,则将任务放入任务队列等待线程池中有线程空闲,这样的处理方式可以避免线程在建立与销毁时存在的开销。 1. 基础知识 多线程 https://www.cnblogs.com/heimazaifei/p/12176
Queue waiting for data:(0, points2) Queue waiting for data:(0, odom) 说明在提供给cartographter的传感器数据出现了问题,没有被成功解析。 小编是使用自己编写的odom话题发布时,cartographer爆出了该错误。 于是设想传感器数据相关的tf或lua配置或数据本身存在错误。 于是逐个排查,最终发现了错误原因。 排查可能原因:tf错误 小编之前已处理过tf与lua配置的问题
(02)Cartographer源码死角解析中提到了Cartographer的核心算法和数据结构,以及代码架构和工程实现细节。在掌握了这些内容之后,我们就可以使用Cartographer来实现SLAM(同时定位与地图构建)功能了。 (03)新数据运行与地图保存、加载地图启动仅定位,是指在使用Cartographer进行实时定位和地图构建时,如果我们获取到了新的传感器数据,就可以通过传入这些新数据来进行实时定位和地图更新。同时,当我们需要保存地图或者加载已有地图来进行定位时,Cartographer也提供了相应的API和工具。 具体来说,对于新数据的运行和实时定位,我们可以通过cartographer_ros包中的rosnode来实现。这个rosnode订阅传感器数据,然后使用Cartographer的算法和数据结构来进行实时定位和地图构建。当定位结束后,我们可以通过发布ROS消息的方式将建好的地图发送给其他ROS节点使用。 而对于地图的保存和加载,Cartographer提供了两个工具:cartographer_rosbag和cartographer_pbstream。前者可以将Cartographer处理过的数据保存成rosbag文件,以便于后续回放和分析;后者则可以将Cartographer的状态和数据保存成pbstream文件,以便于后续的加载和定位。当我们需要加载已有地图时,只需要使用cartographer_ros包中的另一个rosnode来加载pbstream文件,并订阅传感器数据即可。 需要注意的是,在地图保存和加载时,我们只能进行启动仅定位,也就是只进行实时定位和不进行地图更新。这是因为保存和加载的地图可能已经过时,而且相对于我们实时获取的传感器数据,地图的信息是静态的,无法提供新的信息。因此,Cartographer只提供了仅定位的功能。当我们需要重新建立地图时,需要重新运行实时定位的节点并进行地图更新。