통신 연결 상태 확인 조건 노드 (Communication Connection State Check Condition Node)
1. 개요
통신 연결 상태 확인 조건 노드는 로봇과 외부 시스템(관제 센터, 원격 운용자, 클라우드 서버 등) 간의 통신 연결이 정상적으로 유지되고 있는지를 판정하는 조건 노드이다. 통신 두절은 원격 감시 및 제어의 불가, 비상 정지 명령 수신 불가, 데이터 동기화 실패 등 심각한 운용 문제를 초래하므로, 통신 상태를 실시간으로 감시하여 적시에 대응하는 것이 필수적이다.
2. 통신 상태 감지 방법
2.1 하트비트(Heartbeat) 기반 감지
가장 일반적인 통신 상태 감지 방법은 하트비트 메커니즘이다. 원격 시스템이 주기적으로 하트비트 메시지를 발행하고, 로봇 측에서 이를 수신하여 통신 상태를 판정한다. 일정 시간 이상 하트비트가 수신되지 않으면 통신 두절로 판정한다.
2.2 토픽 수신 빈도 기반 감지
특정 토픽의 메시지 수신 빈도를 감시하여, 수신이 중단되면 통신 장애로 판정한다.
2.3 ROS2 노드 가용성 기반 감지
ROS2 DDS의 노드 발견(discovery) 메커니즘을 활용하여, 원격 노드의 존재를 확인한다.
3. 구현
3.1 하트비트 기반 통신 확인
class IsCommsConnected
: public BT::RosTopicSubNode<std_msgs::msg::Header>
{
public:
IsCommsConnected(const std::string& name,
const BT::NodeConfiguration& config,
const BT::RosNodeParams& params)
: RosTopicSubNode(name, config, params),
node_(params.nh),
ever_received_(false)
{}
static BT::PortsList providedPorts()
{
return providedBasicPorts({
BT::InputPort<double>("timeout_sec", 3.0,
"하트비트 타임아웃 (초)")
});
}
BT::NodeStatus onTick(
const std_msgs::msg::Header::SharedPtr& msg) override
{
double timeout;
getInput("timeout_sec", timeout);
if (msg)
{
last_received_ = node_->get_clock()->now();
ever_received_ = true;
return BT::NodeStatus::SUCCESS;
}
if (!ever_received_)
{
return BT::NodeStatus::FAILURE;
}
auto elapsed =
(node_->get_clock()->now() - last_received_).seconds();
if (elapsed > timeout)
{
return BT::NodeStatus::FAILURE;
}
return BT::NodeStatus::SUCCESS;
}
private:
rclcpp::Node::SharedPtr node_;
rclcpp::Time last_received_;
bool ever_received_;
};
3.2 다중 통신 채널 확인
class AreAllCommsConnected : public BT::ConditionNode
{
public:
AreAllCommsConnected(const std::string& name,
const BT::NodeConfiguration& config,
rclcpp::Node::SharedPtr node)
: BT::ConditionNode(name, config), node_(node)
{
std::string topics_str;
getInput("heartbeat_topics", topics_str);
double timeout;
getInput("timeout_sec", timeout);
std::istringstream stream(topics_str);
std::string topic;
while (std::getline(stream, topic, ';'))
{
auto sub = node_->create_subscription<std_msgs::msg::Header>(
topic, rclcpp::QoS(1),
[this, topic](const%20std_msgs::msg::Header::SharedPtr) {
std::lock_guard<std::mutex> lock(mutex_);
last_times_[topic] = node_->get_clock()->now();
});
subscriptions_.push_back(sub);
}
timeout_ = timeout;
}
static BT::PortsList providedPorts()
{
return {
BT::InputPort<std::string>("heartbeat_topics",
"하트비트 토픽 목록 (세미콜론 구분)"),
BT::InputPort<double>("timeout_sec", 3.0,
"타임아웃 (초)")
};
}
BT::NodeStatus tick() override
{
std::lock_guard<std::mutex> lock(mutex_);
auto now = node_->get_clock()->now();
for (const auto& [topic, last_time] : last_times_)
{
if ((now - last_time).seconds() > timeout_)
{
return BT::NodeStatus::FAILURE;
}
}
if (last_times_.empty())
{
return BT::NodeStatus::FAILURE;
}
return BT::NodeStatus::SUCCESS;
}
private:
rclcpp::Node::SharedPtr node_;
std::vector<rclcpp::Subscription<std_msgs::msg::Header>::SharedPtr>
subscriptions_;
std::map<std::string, rclcpp::Time> last_times_;
std::mutex mutex_;
double timeout_;
};
4. 통신 품질 평가
단순 연결 여부뿐만 아니라 통신 품질(지연, 패킷 손실률)을 평가하는 조건 노드이다.
class IsCommsQualityAcceptable
: public BT::RosTopicSubNode<std_msgs::msg::Header>
{
public:
IsCommsQualityAcceptable(const std::string& name,
const BT::NodeConfiguration& config,
const BT::RosNodeParams& params)
: RosTopicSubNode(name, config, params),
node_(params.nh)
{}
static BT::PortsList providedPorts()
{
return providedBasicPorts({
BT::InputPort<double>("max_latency_sec", 0.5,
"최대 허용 지연 (초)")
});
}
BT::NodeStatus onTick(
const std_msgs::msg::Header::SharedPtr& msg) override
{
if (!msg)
{
return BT::NodeStatus::FAILURE;
}
double max_latency;
getInput("max_latency_sec", max_latency);
auto now = node_->get_clock()->now();
auto msg_time = rclcpp::Time(msg->stamp);
double latency = (now - msg_time).seconds();
if (latency >= 0.0 && latency <= max_latency)
{
return BT::NodeStatus::SUCCESS;
}
return BT::NodeStatus::FAILURE;
}
private:
rclcpp::Node::SharedPtr node_;
};
5. XML 행동 트리에서의 활용
5.1 통신 상태 기반 운용 모드 전환
<BehaviorTree ID="CommsAwareOperation">
<Fallback>
<Sequence>
<Condition ID="IsCommsConnected"
topic_name="/operator/heartbeat"
timeout_sec="3.0"/>
<Action ID="TeleoperatedMission"/>
</Sequence>
<Sequence>
<Action ID="AutonomousSafeReturn"/>
</Sequence>
</Fallback>
</BehaviorTree>
통신이 유지되면 원격 조종 임무를 수행하고, 통신이 두절되면 자율 귀환을 실행한다.
6. 설계 시 고려 사항
6.1 시계 동기화
통신 지연 측정은 송신 측과 수신 측의 시계가 동기화되어 있어야 정확하다. NTP 또는 PTP를 통해 시계를 동기화하여야 하며, 시계 동기화가 보장되지 않는 환경에서는 왕복 시간(round-trip time) 기반의 지연 측정을 사용하여야 한다.
6.2 네트워크 환경에 따른 타임아웃 설정
유선 네트워크(이더넷)에서는 1~2초의 타임아웃이 적합하며, 무선 네트워크(WiFi, LTE)에서는 네트워크 불안정성을 고려하여 3~5초의 타임아웃이 적절하다. 위성 통신 환경에서는 수 초 이상의 지연이 정상이므로, 더 긴 타임아웃을 설정하여야 한다.
6.3 통신 두절 시 자율 행동 정책
통신 두절 시 로봇이 취하여야 할 행동은 운용 시나리오에 따라 다르다. 호버링 유지(드론), 안전한 장소로 귀환(지상 로봇), 현재 위치에서 정지(산업 로봇) 등의 정책을 사전에 정의하고, 행동 트리에 반영하여야 한다.
7. 참고 문헌
- Colledanchise, M., & Ogren, P. (2018). Behavior Trees in Robotics and AI: An Introduction. CRC Press.
- BehaviorTree.CPP 공식 문서. https://www.behaviortree.dev/
- ROS2 공식 문서. https://docs.ros.org/en/humble/
| 버전 | 날짜 | 변경 사항 |
|---|---|---|
| v0.1 | 2026-04-04 | 초안 작성 |