1296.51 ROS2 토픽 구독 기반 액션 노드 구현

1296.51 ROS2 토픽 구독 기반 액션 노드 구현

1. 토픽 구독 액션 노드의 개념

ROS2 토픽 구독 기반 액션 노드는 행동 트리의 리프 노드로서, 특정 ROS2 토픽으로부터 메시지를 수신하고 그 내용을 블랙보드에 기록하거나 수신 조건을 평가하여 노드의 성공 또는 실패를 결정하는 역할을 수행한다. 이 유형의 노드는 센서 데이터 획득, 외부 명령 수신, 시스템 상태 갱신 등 외부로부터의 데이터 유입이 필요한 행동 트리 분기에서 활용된다.

토픽 구독은 ROS2의 발행-구독(publish-subscribe) 통신 패턴에서 수신 측에 해당하며, 발행자가 메시지를 전송할 때까지 대기하거나 이미 수신된 최신 메시지를 즉시 참조하는 두 가지 운용 모드가 존재한다. 이러한 운용 모드의 차이에 따라 기반 클래스의 선택과 노드의 실행 모델이 결정된다.

2. 설계 구조와 기반 클래스 선택

토픽 구독 액션 노드의 기반 클래스 선택은 메시지 수신 시점과 Tick 실행 시점 간의 관계에 의해 결정된다.

2.1 SyncActionNode 기반: 최신 메시지 참조

이미 수신되어 캐싱된 최신 메시지를 Tick 시점에 즉시 참조하는 경우에는 BT::SyncActionNode를 기반으로 구현한다. 이 방식에서는 구독 콜백이 메시지를 멤버 변수에 저장하고, tick() 메서드는 저장된 메시지를 읽어 블랙보드에 기록한다. 메시지 수신 여부와 무관하게 tick()은 즉시 반환된다.

2.2 StatefulActionNode 기반: 메시지 대기

특정 조건을 만족하는 메시지가 수신될 때까지 RUNNING 상태를 유지하며 대기하는 경우에는 BT::StatefulActionNode를 기반으로 구현한다. 이 방식은 외부 이벤트의 발생을 행동 트리에서 동기화하는 데 사용된다.

기반 클래스운용 모드반환 상태적용 사례
SyncActionNode최신 메시지 즉시 참조SUCCESS / FAILURE센서 데이터 갱신, 상태 폴링
StatefulActionNode메시지 도착 대기RUNNING / SUCCESS / FAILURE이벤트 대기, 명령 수신 대기

3. 구독자 초기화와 콜백 등록

토픽 구독 액션 노드의 생성자에서 ROS2 구독자(Subscriber)를 초기화하는 과정을 기술한다.

3.1 공유 노드 핸들 획득

행동 트리의 블랙보드에 저장된 ROS2 노드 핸들을 획득한다.

node_ = config().blackboard->get<rclcpp::Node::SharedPtr>("node");

3.2 콜백 그룹 설정

구독 콜백이 행동 트리의 Tick 스레드와 독립적으로 실행될 수 있도록 별도의 콜백 그룹을 설정한다. MutuallyExclusiveCallbackGroup을 사용하면 동일 그룹 내의 콜백이 동시에 실행되지 않도록 보장한다.

callback_group_ = node_->create_callback_group(
  rclcpp::CallbackGroupType::MutuallyExclusive);

rclcpp::SubscriptionOptions sub_options;
sub_options.callback_group = callback_group_;

3.3 구독자 생성과 콜백 등록

토픽 이름, QoS 프로파일, 콜백 함수를 지정하여 구독자를 생성한다. 콜백 함수는 메시지를 수신할 때마다 호출되어 최신 메시지를 멤버 변수에 저장한다.

std::string topic_name;
getInput("topic_name", topic_name);

rclcpp::QoS qos(10);
subscription_ = node_->create_subscription<MessageT>(
  topic_name, qos,
  std::bind(&TopicSubscriberNode::callback, this, std::placeholders::_1),
  sub_options);

4. SyncActionNode 기반 구현: 최신 메시지 참조

최신 메시지를 캐싱하여 Tick 시점에 즉시 참조하는 구독 액션 노드의 구현을 제시한다.

#include "behaviortree_cpp/action_node.h"
#include "rclcpp/rclcpp.hpp"
#include "sensor_msgs/msg/laser_scan.hpp"
#include <mutex>

class GetLatestLaserScan : public BT::SyncActionNode
{
public:
  GetLatestLaserScan(
    const std::string & xml_tag_name,
    const BT::NodeConfiguration & conf)
  : BT::SyncActionNode(xml_tag_name, conf)
  {
    node_ = config().blackboard->get<rclcpp::Node::SharedPtr>("node");
    
    std::string topic_name;
    getInput("topic_name", topic_name);
    
    callback_group_ = node_->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);
    rclcpp::SubscriptionOptions sub_options;
    sub_options.callback_group = callback_group_;
    
    subscription_ = node_->create_subscription<sensor_msgs::msg::LaserScan>(
      topic_name,
      rclcpp::SensorDataQoS(),
      std::bind(&GetLatestLaserScan::callback, this, std::placeholders::_1),
      sub_options);
  }

  static BT::PortsList providedPorts()
  {
    return {
      BT::InputPort<std::string>("topic_name", "/scan", "laser scan topic"),
      BT::OutputPort<sensor_msgs::msg::LaserScan>("scan", "latest laser scan")
    };
  }

  BT::NodeStatus tick() override
  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (!last_msg_) {
      return BT::NodeStatus::FAILURE;
    }
    setOutput("scan", *last_msg_);
    return BT::NodeStatus::SUCCESS;
  }

private:
  void callback(const sensor_msgs::msg::LaserScan::SharedPtr msg)
  {
    std::lock_guard<std::mutex> lock(mutex_);
    last_msg_ = msg;
  }

  rclcpp::Node::SharedPtr node_;
  rclcpp::CallbackGroup::SharedPtr callback_group_;
  rclcpp::Subscription<sensor_msgs::msg::LaserScan>::SharedPtr subscription_;
  sensor_msgs::msg::LaserScan::SharedPtr last_msg_;
  std::mutex mutex_;
};

이 구현에서 mutex_는 구독 콜백 스레드와 Tick 스레드 간의 데이터 경합(data race)을 방지한다. 구독 콜백은 ROS2 실행기(Executor)의 스레드에서 호출되고, tick() 메서드는 행동 트리의 Tick 스레드에서 호출되므로, 공유 자원인 last_msg_에 대한 상호 배제가 필수적이다.

5. StatefulActionNode 기반 구현: 메시지 대기

특정 메시지의 도착을 대기하는 구독 액션 노드의 구현을 제시한다.

#include "behaviortree_cpp/action_node.h"
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include <mutex>
#include <atomic>

class WaitForMessage : public BT::StatefulActionNode
{
public:
  WaitForMessage(
    const std::string & xml_tag_name,
    const BT::NodeConfiguration & conf)
  : BT::StatefulActionNode(xml_tag_name, conf),
    message_received_(false)
  {
    node_ = config().blackboard->get<rclcpp::Node::SharedPtr>("node");
    
    std::string topic_name;
    getInput("topic_name", topic_name);
    
    callback_group_ = node_->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);
    rclcpp::SubscriptionOptions sub_options;
    sub_options.callback_group = callback_group_;
    
    subscription_ = node_->create_subscription<std_msgs::msg::String>(
      topic_name,
      rclcpp::QoS(10),
      std::bind(&WaitForMessage::callback, this, std::placeholders::_1),
      sub_options);
  }

  static BT::PortsList providedPorts()
  {
    return {
      BT::InputPort<std::string>("topic_name", "topic to subscribe"),
      BT::InputPort<double>("timeout", 10.0, "timeout in seconds"),
      BT::OutputPort<std::string>("received_message", "received message data")
    };
  }

  BT::NodeStatus onStart() override
  {
    message_received_ = false;
    getInput("timeout", timeout_);
    start_time_ = node_->now();
    return BT::NodeStatus::RUNNING;
  }

  BT::NodeStatus onRunning() override
  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (message_received_) {
      setOutput("received_message", received_data_);
      return BT::NodeStatus::SUCCESS;
    }
    
    auto elapsed = (node_->now() - start_time_).seconds();
    if (elapsed > timeout_) {
      return BT::NodeStatus::FAILURE;
    }
    
    return BT::NodeStatus::RUNNING;
  }

  void onHalted() override
  {
    message_received_ = false;
  }

private:
  void callback(const std_msgs::msg::String::SharedPtr msg)
  {
    std::lock_guard<std::mutex> lock(mutex_);
    received_data_ = msg->data;
    message_received_ = true;
  }

  rclcpp::Node::SharedPtr node_;
  rclcpp::CallbackGroup::SharedPtr callback_group_;
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
  std::mutex mutex_;
  std::atomic<bool> message_received_;
  std::string received_data_;
  rclcpp::Time start_time_;
  double timeout_;
};

이 구현에서 onStart()는 수신 플래그를 초기화하고 타임아웃 측정을 개시한다. onRunning()은 매 Tick마다 메시지 수신 여부와 타임아웃 초과 여부를 검사한다. 메시지가 수신되면 SUCCESS를 반환하고, 타임아웃이 초과되면 FAILURE를 반환한다.

6. 조건부 메시지 필터링

수신된 메시지 중 특정 조건을 만족하는 메시지만을 유효한 수신으로 처리하는 필터링 메커니즘의 구현을 제시한다.

class WaitForSpecificMessage : public BT::StatefulActionNode
{
public:
  // 생성자 생략 (WaitForMessage와 동일한 구조)

  static BT::PortsList providedPorts()
  {
    return {
      BT::InputPort<std::string>("topic_name", "topic to subscribe"),
      BT::InputPort<std::string>("expected_value", "expected message content"),
      BT::InputPort<double>("timeout", 10.0, "timeout in seconds"),
      BT::OutputPort<std::string>("received_message", "matched message")
    };
  }

  BT::NodeStatus onRunning() override
  {
    std::lock_guard<std::mutex> lock(mutex_);
    
    std::string expected;
    getInput("expected_value", expected);
    
    if (message_received_ && received_data_ == expected) {
      setOutput("received_message", received_data_);
      return BT::NodeStatus::SUCCESS;
    }
    
    auto elapsed = (node_->now() - start_time_).seconds();
    if (elapsed > timeout_) {
      return BT::NodeStatus::FAILURE;
    }
    
    return BT::NodeStatus::RUNNING;
  }

private:
  void callback(const std_msgs::msg::String::SharedPtr msg)
  {
    std::lock_guard<std::mutex> lock(mutex_);
    received_data_ = msg->data;
    message_received_ = true;
  }
  // 멤버 변수 생략
};

이 패턴은 특정 상태 전환 신호, 명령 코드, 이벤트 식별자 등 사전에 정의된 값과의 일치를 검증하여야 하는 경우에 활용된다. 조건 함수를 콜백 내부에 배치하는 대신 onRunning()에서 평가함으로써, 조건 로직의 변경이 구독 메커니즘에 영향을 미치지 않도록 관심사를 분리한다.

7. 구체적 구현 사례: 오도메트리 데이터 획득

nav_msgs::msg::Odometry 메시지를 구독하여 로봇의 현재 위치와 속도 정보를 블랙보드에 기록하는 액션 노드의 구현을 제시한다.

#include "behaviortree_cpp/action_node.h"
#include "rclcpp/rclcpp.hpp"
#include "nav_msgs/msg/odometry.hpp"
#include "geometry_msgs/msg/pose_stamped.hpp"
#include <mutex>

class GetOdometry : public BT::SyncActionNode
{
public:
  GetOdometry(
    const std::string & xml_tag_name,
    const BT::NodeConfiguration & conf)
  : BT::SyncActionNode(xml_tag_name, conf)
  {
    node_ = config().blackboard->get<rclcpp::Node::SharedPtr>("node");
    
    callback_group_ = node_->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);
    rclcpp::SubscriptionOptions sub_options;
    sub_options.callback_group = callback_group_;
    
    subscription_ = node_->create_subscription<nav_msgs::msg::Odometry>(
      "/odom",
      rclcpp::QoS(10),
      std::bind(&GetOdometry::callback, this, std::placeholders::_1),
      sub_options);
  }

  static BT::PortsList providedPorts()
  {
    return {
      BT::OutputPort<geometry_msgs::msg::PoseStamped>("current_pose",
        "current robot pose"),
      BT::OutputPort<geometry_msgs::msg::Twist>("current_velocity",
        "current robot velocity")
    };
  }

  BT::NodeStatus tick() override
  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (!last_odom_) {
      return BT::NodeStatus::FAILURE;
    }
    
    geometry_msgs::msg::PoseStamped pose;
    pose.header = last_odom_->header;
    pose.pose = last_odom_->pose.pose;
    setOutput("current_pose", pose);
    setOutput("current_velocity", last_odom_->twist.twist);
    
    return BT::NodeStatus::SUCCESS;
  }

private:
  void callback(const nav_msgs::msg::Odometry::SharedPtr msg)
  {
    std::lock_guard<std::mutex> lock(mutex_);
    last_odom_ = msg;
  }

  rclcpp::Node::SharedPtr node_;
  rclcpp::CallbackGroup::SharedPtr callback_group_;
  rclcpp::Subscription<nav_msgs::msg::Odometry>::SharedPtr subscription_;
  nav_msgs::msg::Odometry::SharedPtr last_odom_;
  std::mutex mutex_;
};

8. 구체적 구현 사례: 배터리 상태 모니터링

sensor_msgs::msg::BatteryState 메시지를 구독하여 배터리 잔량을 블랙보드에 기록하는 액션 노드의 구현을 제시한다.

#include "behaviortree_cpp/action_node.h"
#include "rclcpp/rclcpp.hpp"
#include "sensor_msgs/msg/battery_state.hpp"
#include <mutex>

class GetBatteryState : public BT::SyncActionNode
{
public:
  GetBatteryState(
    const std::string & xml_tag_name,
    const BT::NodeConfiguration & conf)
  : BT::SyncActionNode(xml_tag_name, conf)
  {
    node_ = config().blackboard->get<rclcpp::Node::SharedPtr>("node");
    
    callback_group_ = node_->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);
    rclcpp::SubscriptionOptions sub_options;
    sub_options.callback_group = callback_group_;
    
    subscription_ = node_->create_subscription<sensor_msgs::msg::BatteryState>(
      "/battery_state",
      rclcpp::SensorDataQoS(),
      std::bind(&GetBatteryState::callback, this, std::placeholders::_1),
      sub_options);
  }

  static BT::PortsList providedPorts()
  {
    return {
      BT::OutputPort<float>("battery_percentage", "battery level [0.0-1.0]"),
      BT::OutputPort<float>("voltage", "battery voltage [V]")
    };
  }

  BT::NodeStatus tick() override
  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (!last_battery_) {
      return BT::NodeStatus::FAILURE;
    }
    
    setOutput("battery_percentage", last_battery_->percentage);
    setOutput("voltage", last_battery_->voltage);
    return BT::NodeStatus::SUCCESS;
  }

private:
  void callback(const sensor_msgs::msg::BatteryState::SharedPtr msg)
  {
    std::lock_guard<std::mutex> lock(mutex_);
    last_battery_ = msg;
  }

  rclcpp::Node::SharedPtr node_;
  rclcpp::CallbackGroup::SharedPtr callback_group_;
  rclcpp::Subscription<sensor_msgs::msg::BatteryState>::SharedPtr subscription_;
  sensor_msgs::msg::BatteryState::SharedPtr last_battery_;
  std::mutex mutex_;
};

9. 스레드 안전성과 동기화

토픽 구독 액션 노드에서 스레드 안전성은 핵심적인 설계 고려 사항이다. 구독 콜백은 ROS2 실행기의 스레드에서 비동기적으로 호출되고, tick() 메서드는 행동 트리의 Tick 스레드에서 호출되므로, 두 스레드가 공유 데이터에 동시에 접근하는 데이터 경합이 발생할 수 있다.

9.1 뮤텍스 기반 동기화

std::mutexstd::lock_guard를 사용하여 공유 데이터에 대한 상호 배제를 구현하는 방식이다. 콜백 함수와 tick() 메서드 모두에서 동일한 뮤텍스를 잠금(lock)한 후 공유 데이터에 접근한다.

// 콜백에서의 뮤텍스 사용
void callback(const MessageT::SharedPtr msg)
{
  std::lock_guard<std::mutex> lock(mutex_);
  last_msg_ = msg;
}

// tick()에서의 뮤텍스 사용
BT::NodeStatus tick() override
{
  std::lock_guard<std::mutex> lock(mutex_);
  if (!last_msg_) {
    return BT::NodeStatus::FAILURE;
  }
  // 메시지 처리
}

9.2 원자적 변수 활용

단순한 플래그 변수의 경우 std::atomic을 사용하여 뮤텍스 없이 스레드 안전한 접근을 보장할 수 있다. 그러나 복합 데이터 구조에 대해서는 std::atomic이 적용되지 않으므로, 메시지 객체의 동기화에는 뮤텍스를 사용하여야 한다.

std::atomic<bool> message_received_{false};

10. 구독자 수명 주기 관리

토픽 구독 액션 노드에서 구독자의 수명 주기는 노드의 전체 생존 기간과 일치하도록 설계한다. 행동 트리 노드는 트리가 존재하는 동안 파괴되지 않으므로, 생성자에서 구독을 시작하면 트리의 전체 실행 기간에 걸쳐 메시지를 수신한다.

이러한 설계는 구독자의 반복적인 생성과 파괴에 따른 오버헤드를 제거하며, ROS2 디스커버리(Discovery) 프로토콜에 의한 발행자-구독자 매칭 지연을 방지한다. 노드가 IDLE 상태에서도 구독 콜백은 계속 호출되어 최신 메시지를 유지하므로, 다음 Tick에서 최신 데이터에 즉시 접근할 수 있다.

11. QoS 프로파일 선택 기준

토픽 구독 액션 노드에서 QoS 프로파일은 데이터의 특성과 활용 목적에 따라 선택한다.

데이터 유형권장 QoS근거
센서 데이터 (LiDAR, 카메라)SensorDataQoS()최신 데이터만 필요하며 지연이 발생하면 무의미
상태 정보 (오도메트리, TF)rclcpp::QoS(10)일정 수의 최근 메시지를 버퍼링
이벤트/명령rclcpp::QoS(10).reliable()메시지 손실 방지가 중요
래치 토픽 (맵, 초기 자세)rclcpp::QoS(1).transient_local()최초 연결 시 최신 메시지를 수신

발행자와 구독자의 QoS 프로파일이 호환되지 않으면 통신이 성립하지 않으므로, 대상 토픽의 발행자 QoS를 사전에 확인하고 호환 가능한 프로파일을 설정하여야 한다.

12. 플러그인 등록과 XML 통합

구현한 토픽 구독 액션 노드를 BehaviorTree.CPP의 플러그인 시스템에 등록하는 방법을 제시한다.

#include "behaviortree_cpp/bt_factory.h"

BT_REGISTER_NODES(factory)
{
  factory.registerNodeType<GetLatestLaserScan>("GetLatestLaserScan");
  factory.registerNodeType<GetOdometry>("GetOdometry");
  factory.registerNodeType<GetBatteryState>("GetBatteryState");
  factory.registerNodeType<WaitForMessage>("WaitForMessage");
}

XML 행동 트리에서의 활용 사례를 제시한다.

<BehaviorTree>
  <Sequence>
    <!-- 현재 위치 획득 -->
    <GetOdometry current_pose="{robot_pose}"
                 current_velocity="{robot_vel}"/>
    
    <!-- 배터리 상태 확인 -->
    <GetBatteryState battery_percentage="{battery}"
                     voltage="{battery_voltage}"/>
    
    <!-- 배터리 잔량에 따른 분기 -->
    <Fallback>
      <Condition ID="BatteryAboveThreshold"
                 battery="{battery}" threshold="0.2"/>
      <Action ID="ReturnToChargingStation"
              pose="{charging_station_pose}"/>
    </Fallback>
    
    <!-- 외부 명령 대기 -->
    <WaitForMessage topic_name="/mission_command"
                    timeout="30.0"
                    received_message="{command}"/>
  </Sequence>
</BehaviorTree>

참고 문헌 및 출처

  • Macenski, S., Martín, F., White, R., & Clavero, J. G. (2020). “The Marathon 2: A Navigation System.” arXiv preprint arXiv:2003.00368.
  • Colledanchise, M., & Ogren, P. (2018). Behavior Trees in Robotics and AI: An Introduction. CRC Press.
  • ROS2 공식 문서: https://docs.ros.org/en/humble/
  • BehaviorTree.CPP 공식 문서: https://www.behaviortree.dev/

버전 정보: ROS2 Humble Hawksbill 기준, BehaviorTree.CPP v3.8 이상.