토픽 메시지 타임스탬프 기반 조건 노드 (Topic Message Timestamp-Based Condition Node)

토픽 메시지 타임스탬프 기반 조건 노드 (Topic Message Timestamp-Based Condition Node)

1. 개요

토픽 메시지 타임스탬프 기반 조건 노드는 ROS2 토픽 메시지의 헤더에 포함된 타임스탬프(header.stamp)를 기준으로 메시지의 시간적 유효성을 평가하는 조건 노드이다. 로봇 시스템에서 센서 데이터나 상태 정보는 시간 경과에 따라 신뢰도가 감소하므로, 오래된 데이터를 기반으로 의사 결정을 수행하는 것은 안전상 중대한 위험을 초래할 수 있다. 타임스탬프 기반 조건 노드는 메시지의 경과 시간(age), 메시지 간 시간 간격, 시간 일관성 등을 평가하여 데이터의 시간적 신뢰성을 보장한다.

2. ROS2 타임스탬프의 구조

2.1 std_msgs::msg::Header의 stamp 필드

ROS2의 std_msgs::msg::Header 메시지는 stamp 필드를 포함하며, 이는 builtin_interfaces::msg::Time 타입으로 정의된다.

builtin_interfaces/Time stamp
    int32 sec      # 초 단위 (에포크 이후 경과 시간)
    uint32 nanosec # 나노초 단위

타임스탬프는 일반적으로 데이터가 생성된 시점을 나타내며, 센서 드라이버가 측정 시점의 시각을 기록한다. 이 값은 rclcpp::Time 객체로 변환하여 시간 연산에 활용할 수 있다.

rclcpp::Time msg_time(msg->header.stamp);

2.2 시계 유형

ROS2는 세 가지 시계(clock) 유형을 지원한다.

시계 유형설명용도
RCL_SYSTEM_TIME시스템 시계 (UTC)실시간 운용 환경
RCL_STEADY_TIME단조 증가 시계경과 시간 측정
RCL_ROS_TIMEROS 시간 (시뮬레이션 시간 지원)시뮬레이션 호환

조건 노드에서 시간을 비교할 때에는 메시지의 타임스탬프와 현재 시각이 동일한 시계 소스를 사용하도록 하여야 한다. rclcpp::Node::get_clock() 메서드가 반환하는 시계는 use_sim_time 파라미터 설정에 따라 자동으로 시뮬레이션 시간과 실제 시간 사이를 전환한다.

3. 메시지 경과 시간 확인 조건 노드

3.1 기본 구현

메시지의 경과 시간(age)을 계산하여 허용 한계와 비교하는 가장 기본적인 타임스탬프 기반 조건 노드이다.

template <typename MessageT>
class IsMessageFresh : public BT::ConditionNode
{
public:
    IsMessageFresh(const std::string& name,
                   const BT::NodeConfiguration& config,
                   rclcpp::Node::SharedPtr node)
        : BT::ConditionNode(name, config),
          node_(node),
          message_received_(false)
    {
        std::string topic_name;
        getInput("topic_name", topic_name);

        subscription_ = node_->create_subscription<MessageT>(
            topic_name,
            rclcpp::SensorDataQoS(),
            [this](const%20typename%20MessageT::SharedPtr%20msg) {
                std::lock_guard<std::mutex> lock(mutex_);
                last_message_ = msg;
                message_received_ = true;
            });
    }

    static BT::PortsList providedPorts()
    {
        return {
            BT::InputPort<std::string>("topic_name", "구독할 토픽 이름"),
            BT::InputPort<double>("max_age_sec", 1.0,
                                  "최대 허용 경과 시간 (초)")
        };
    }

    BT::NodeStatus tick() override
    {
        std::lock_guard<std::mutex> lock(mutex_);
        if (!message_received_ || !last_message_)
        {
            return BT::NodeStatus::FAILURE;
        }

        double max_age_sec;
        getInput("max_age_sec", max_age_sec);

        rclcpp::Time now = node_->get_clock()->now();
        rclcpp::Time msg_time(last_message_->header.stamp);
        rclcpp::Duration age = now - msg_time;

        if (age.seconds() >= 0.0 && age.seconds() <= max_age_sec)
        {
            return BT::NodeStatus::SUCCESS;
        }
        return BT::NodeStatus::FAILURE;
    }

private:
    rclcpp::Node::SharedPtr node_;
    typename rclcpp::Subscription<MessageT>::SharedPtr subscription_;
    typename MessageT::SharedPtr last_message_;
    std::mutex mutex_;
    bool message_received_;
};

경과 시간 \Delta t는 다음과 같이 정의된다.

\Delta t = t_{\text{now}} - t_{\text{msg}}

여기서 t_{\text{now}}는 현재 시각, t_{\text{msg}}는 메시지의 타임스탬프이다. \Delta t > t_{\max}이면 메시지가 시간적으로 유효하지 않은 것으로 판정하여 FAILURE를 반환한다.

음의 경과 시간 처리

\Delta t < 0인 경우, 즉 메시지의 타임스탬프가 현재 시각보다 미래인 경우가 발생할 수 있다. 이는 다음과 같은 원인에 기인한다.

  1. 시계 동기화 오류: 센서 노드와 행동 트리 노드 간의 시계가 동기화되지 않은 경우
  2. 시뮬레이션 시간 전환: use_sim_time 설정이 노드 간에 불일치하는 경우
  3. 시뮬레이션 리셋: 시뮬레이션이 재시작되어 시간이 초기화된 경우

음의 경과 시간을 가진 메시지는 시간 일관성이 보장되지 않으므로 FAILURE로 처리하는 것이 안전하다. 위 구현에서 age.seconds() >= 0.0 조건은 이 상황을 처리한다.

RosTopicSubNode 기반 구현

BehaviorTree.ROS2의 RosTopicSubNode를 활용한 간결한 구현이다.

class IsScanFresh
    : public BT::RosTopicSubNode<sensor_msgs::msg::LaserScan>
{
public:
    IsScanFresh(const std::string& name,
                const BT::NodeConfiguration& config,
                const BT::RosNodeParams& params)
        : RosTopicSubNode(name, config, params),
          node_(params.nh)
    {}

    static BT::PortsList providedPorts()
    {
        return providedBasicPorts({
            BT::InputPort<double>("max_age_sec", 0.5,
                                  "최대 허용 경과 시간 (초)")
        });
    }

    BT::NodeStatus onTick(
        const sensor_msgs::msg::LaserScan::SharedPtr& msg) override
    {
        if (!msg)
        {
            return BT::NodeStatus::FAILURE;
        }

        double max_age_sec;
        getInput("max_age_sec", max_age_sec);

        rclcpp::Time now = node_->get_clock()->now();
        rclcpp::Time msg_time(msg->header.stamp);
        double age = (now - msg_time).seconds();

        if (age >= 0.0 && age <= max_age_sec)
        {
            return BT::NodeStatus::SUCCESS;
        }
        return BT::NodeStatus::FAILURE;
    }

private:
    rclcpp::Node::SharedPtr node_;
};

메시지 간 시간 간격 확인 조건 노드

연속적으로 수신되는 메시지 간의 시간 간격이 허용 범위 이내인지를 평가하는 조건 노드이다. 이는 센서 발행 주기의 안정성을 모니터링하는 데 활용된다.

template <typename MessageT>
class IsPublishRateStable : public BT::ConditionNode
{
public:
    IsPublishRateStable(const std::string& name,
                        const BT::NodeConfiguration& config,
                        rclcpp::Node::SharedPtr node)
        : BT::ConditionNode(name, config),
          node_(node),
          has_previous_(false)
    {
        std::string topic_name;
        getInput("topic_name", topic_name);

        subscription_ = node_->create_subscription<MessageT>(
            topic_name,
            rclcpp::SensorDataQoS(),
            [this](const%20typename%20MessageT::SharedPtr%20msg) {
                std::lock_guard<std::mutex> lock(mutex_);
                rclcpp::Time current_stamp(msg->header.stamp);
                if (has_previous_)
                {
                    last_interval_ = current_stamp - previous_stamp_;
                }
                previous_stamp_ = current_stamp;
                has_previous_ = true;
            });
    }

    static BT::PortsList providedPorts()
    {
        return {
            BT::InputPort<std::string>("topic_name", "구독할 토픽 이름"),
            BT::InputPort<double>("max_interval_sec", 0.2,
                                  "최대 허용 메시지 간격 (초)")
        };
    }

    BT::NodeStatus tick() override
    {
        std::lock_guard<std::mutex> lock(mutex_);
        if (!has_previous_ ||
            last_interval_.seconds() == 0.0)
        {
            return BT::NodeStatus::FAILURE;
        }

        double max_interval;
        getInput("max_interval_sec", max_interval);

        if (last_interval_.seconds() > 0.0 &&
            last_interval_.seconds() <= max_interval)
        {
            return BT::NodeStatus::SUCCESS;
        }
        return BT::NodeStatus::FAILURE;
    }

private:
    rclcpp::Node::SharedPtr node_;
    typename rclcpp::Subscription<MessageT>::SharedPtr subscription_;
    std::mutex mutex_;
    bool has_previous_;
    rclcpp::Time previous_stamp_;
    rclcpp::Duration last_interval_{0, 0};
};

메시지 간 시간 간격 \Delta t_{\text{interval}}은 다음과 같이 정의된다.

\Delta t_{\text{interval}} = t_{\text{msg}}^{(k)} - t_{\text{msg}}^{(k-1)}

여기서 t_{\text{msg}}^{(k)}k번째 메시지의 타임스탬프이다. 이 값이 허용 상한 t_{\text{max\_interval}}을 초과하면 발행 주기가 불안정한 것으로 판정한다.

4. 다중 토픽 시간 동기성 확인

두 개 이상의 토픽 메시지가 시간적으로 동기화되어 있는지를 확인하는 조건 노드이다. 센서 융합(sensor fusion)에서 서로 다른 센서의 데이터가 동일한 시점을 기준으로 하는지를 검증하는 데 활용된다.

class AreTopicsSynchronized : public BT::ConditionNode
{
public:
    AreTopicsSynchronized(const std::string& name,
                          const BT::NodeConfiguration& config,
                          rclcpp::Node::SharedPtr node)
        : BT::ConditionNode(name, config), node_(node)
    {
        std::string topic_a, topic_b;
        getInput("topic_a", topic_a);
        getInput("topic_b", topic_b);

        sub_a_ = node_->create_subscription<sensor_msgs::msg::PointCloud2>(
            topic_a, rclcpp::SensorDataQoS(),
            [this](const%20sensor_msgs::msg::PointCloud2::SharedPtr%20msg) {
                std::lock_guard<std::mutex> lock(mutex_);
                stamp_a_ = rclcpp::Time(msg->header.stamp);
                received_a_ = true;
            });

        sub_b_ = node_->create_subscription<sensor_msgs::msg::Image>(
            topic_b, rclcpp::SensorDataQoS(),
            [this](const%20sensor_msgs::msg::Image::SharedPtr%20msg) {
                std::lock_guard<std::mutex> lock(mutex_);
                stamp_b_ = rclcpp::Time(msg->header.stamp);
                received_b_ = true;
            });
    }

    static BT::PortsList providedPorts()
    {
        return {
            BT::InputPort<std::string>("topic_a", "첫 번째 토픽 이름"),
            BT::InputPort<std::string>("topic_b", "두 번째 토픽 이름"),
            BT::InputPort<double>("max_diff_sec", 0.05,
                                  "최대 허용 시간 차이 (초)")
        };
    }

    BT::NodeStatus tick() override
    {
        std::lock_guard<std::mutex> lock(mutex_);
        if (!received_a_ || !received_b_)
        {
            return BT::NodeStatus::FAILURE;
        }

        double max_diff;
        getInput("max_diff_sec", max_diff);

        double diff = std::abs((stamp_a_ - stamp_b_).seconds());
        if (diff <= max_diff)
        {
            return BT::NodeStatus::SUCCESS;
        }
        return BT::NodeStatus::FAILURE;
    }

private:
    rclcpp::Node::SharedPtr node_;
    rclcpp::Subscription<sensor_msgs::msg::PointCloud2>::SharedPtr sub_a_;
    rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr sub_b_;
    std::mutex mutex_;
    rclcpp::Time stamp_a_, stamp_b_;
    bool received_a_{false}, received_b_{false};
};

두 메시지 간의 시간 차이 \lvert \Delta t \rvert는 다음과 같다.

\lvert \Delta t \rvert = \lvert t_A - t_B \rvert

이 값이 허용 한계 t_{\text{max\_diff}}를 초과하면, 두 센서의 데이터가 시간적으로 일치하지 않는 것으로 판정한다.

XML 행동 트리에서의 활용

센서 데이터 신선도 보장

<BehaviorTree ID="FreshDataNavigation">
    <ReactiveSequence>
        <Condition ID="IsScanFresh"
                   topic_name="/scan"
                   max_age_sec="0.5"/>
        <Condition ID="IsOdomFresh"
                   topic_name="/odom"
                   max_age_sec="0.2"/>
        <Action ID="NavigateToGoal"/>
    </ReactiveSequence>
</BehaviorTree>

위 구조는 라이다 데이터가 0.5초 이내, 오도메트리 데이터가 0.2초 이내에 생성된 경우에만 내비게이션을 수행한다.

센서 융합 전제 조건

<BehaviorTree ID="SensorFusion">
    <Sequence>
        <Condition ID="AreTopicsSynchronized"
                   topic_a="/lidar/points"
                   topic_b="/camera/image"
                   max_diff_sec="0.05"/>
        <Action ID="FuseSensorData"/>
    </Sequence>
</BehaviorTree>

설계 시 고려 사항

시뮬레이션 시간 호환성

시뮬레이션 환경에서는 use_sim_time 파라미터를 true로 설정하여 시뮬레이션 시간을 사용한다. 조건 노드가 rclcpp::Node::get_clock()->now()를 호출하면, 이 설정에 따라 시뮬레이션 시간 또는 실제 시간이 반환된다. 메시지의 타임스탬프와 현재 시각이 동일한 시간 소스를 사용하도록 하여야 정확한 경과 시간 계산이 보장된다.

헤더가 없는 메시지 처리

일부 ROS2 메시지 타입(예: std_msgs::msg::Float64, std_msgs::msg::Bool)은 header 필드를 포함하지 않는다. 이러한 메시지에 대해서는 타임스탬프 기반 조건 평가를 직접 적용할 수 없으므로, 수신 시점의 시각을 콜백 내부에서 기록하여 대리 타임스탬프(surrogate timestamp)로 사용하여야 한다.

[this](const%20std_msgs::msg::Float64::SharedPtr%20msg) {
    std::lock_guard<std::mutex> lock(mutex_);
    last_message_ = msg;
    receive_time_ = node_->get_clock()->now();  // 수신 시점 기록
};

이 경우 기록된 시각은 메시지 생성 시점이 아닌 수신 시점이므로, 통신 지연(latency)이 포함되어 있음을 유의하여야 한다.

시계 점프 대응

NTP(Network Time Protocol) 동기화나 시뮬레이션 리셋으로 인해 시계가 비연속적으로 변화(clock jump)하는 경우, 경과 시간 계산이 비정상적으로 큰 값이나 음수를 반환할 수 있다. 이러한 시계 점프를 감지하기 위해 rclcpp::Clock의 점프 콜백(jump callback)을 활용하거나, 경과 시간의 절대값에 상한을 설정하는 방어적 전략을 적용할 수 있다.

참고 문헌

  • Colledanchise, M., & Ogren, P. (2018). Behavior Trees in Robotics and AI: An Introduction. CRC Press.
  • Macenski, S., et al. (2020). “The Marathon 2: A Navigation System.” arXiv preprint arXiv:2003.00368.
  • ROS2 공식 문서. https://docs.ros.org/en/humble/
  • BehaviorTree.CPP 공식 문서. https://www.behaviortree.dev/

버전날짜변경 사항
v0.12026-04-04초안 작성