1293.90 이벤트 기반 Tick 트리거
1. 이벤트 기반 Tick 트리거의 개념
전통적인 행동 트리 실행 모델은 고정 주기의 타이머에 의해 일정 간격으로 Tick을 발생시키는 주기적(periodic) 실행 방식을 사용한다. 이벤트 기반 Tick 트리거(event-driven tick trigger)란, 외부 이벤트의 발생을 감지하여 Tick을 실행하는 방식이다. 이벤트가 발생하지 않는 동안에는 Tick이 실행되지 않으므로, 불필요한 CPU 소비를 줄이고 이벤트에 대한 응답 지연을 최소화할 수 있다(Colledanchise & Ogren, 2018).
2. 주기적 Tick과 이벤트 기반 Tick의 비교
주기적 Tick에서는 환경에 변화가 없더라도 설정된 주기마다 트리 전체를 순회한다. 이는 구현이 단순하고 동작이 예측 가능하지만, 이벤트가 없는 기간에도 동일한 연산을 반복한다는 비효율이 존재한다.
주기적 Tick (10 Hz):
t=0.0 Tick → 변화 없음
t=0.1 Tick → 변화 없음
t=0.2 Tick → 변화 없음
t=0.3 Tick → 이벤트 발생, 처리 ← 최대 100ms 지연
t=0.4 Tick → 변화 없음
...
이벤트 기반 Tick:
t=0.3 이벤트 발생 → 즉시 Tick ← 지연 최소
(이벤트가 없으면 Tick 없음)
| 특성 | 주기적 Tick | 이벤트 기반 Tick |
|---|---|---|
| CPU 사용 | 일정 (상시) | 가변 (이벤트 시만) |
| 응답 지연 | 최대 1 주기 | 이벤트 처리 시간 |
| 구현 복잡도 | 낮음 | 중간~높음 |
| 예측 가능성 | 높음 | 중간 |
| 무변화 시 동작 | 동일 연산 반복 | 유휴 상태 |
3. 이벤트 소스의 유형
3.1 ROS2 토픽 메시지 수신
센서 데이터나 상태 정보의 토픽 메시지 수신을 Tick 트리거로 사용한다.
class EventDrivenExecutor {
public:
EventDrivenExecutor(rclcpp::Node::SharedPtr node, BT::Tree& tree)
: node_(node), tree_(tree) {
// 센서 메시지 수신 시 Tick 트리거
scan_sub_ = node_->create_subscription<sensor_msgs::msg::LaserScan>(
"scan", rclcpp::SensorDataQoS(),
[this](sensor_msgs::msg::LaserScan::SharedPtr%20msg) {
latest_scan_ = msg;
triggerTick();
});
// 목표 지점 수신 시 Tick 트리거
goal_sub_ = node_->create_subscription<geometry_msgs::msg::PoseStamped>(
"goal_pose", 10,
[this](geometry_msgs::msg::PoseStamped::SharedPtr%20msg) {
tree_.rootBlackboard()->set("goal_pose", *msg);
triggerTick();
});
}
void triggerTick() {
tick_requested_ = true;
}
void spin() {
while (rclcpp::ok()) {
rclcpp::spin_some(node_);
if (tick_requested_) {
tick_requested_ = false;
tree_.tickOnce();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
private:
rclcpp::Node::SharedPtr node_;
BT::Tree& tree_;
std::atomic<bool> tick_requested_{false};
rclcpp::Subscription<sensor_msgs::msg::LaserScan>::SharedPtr scan_sub_;
rclcpp::Subscription<geometry_msgs::msg::PoseStamped>::SharedPtr goal_sub_;
sensor_msgs::msg::LaserScan::SharedPtr latest_scan_;
};
3.2 ROS2 서비스 호출 완료
외부 서비스의 응답이 도착했을 때 Tick을 트리거한다.
3.3 블랙보드 값 변경
블랙보드의 특정 키 값이 변경될 때 Tick을 트리거한다. BehaviorTree.CPP v4에서는 블랙보드 항목의 변경 감지를 위한 콜백 메커니즘이 제한적이므로, 별도의 감시(watch) 레이어를 구현해야 한다.
class BlackboardWatcher {
public:
void watch(const std::string& key,
std::function<void()> on_change) {
watchers_[key] = on_change;
}
void checkChanges(const BT::Blackboard::Ptr& bb) {
for (const auto& [key, callback] : watchers_) {
auto entry = bb->getEntry(key);
if (entry && entry->sequence_id != last_seq_[key]) {
last_seq_[key] = entry->sequence_id;
callback();
}
}
}
private:
std::map<std::string, std::function<void()>> watchers_;
std::map<std::string, uint64_t> last_seq_;
};
3.4 타이머 만료
특정 시간 조건(타임아웃, 주기적 점검 등)이 충족되었을 때 Tick을 트리거한다.
3.5 운영 체제 신호
SIGINT, SIGUSR 등의 운영 체제 신호에 의해 Tick을 트리거할 수 있다. 이는 주로 외부 시스템과의 동기화에 사용된다.
4. 이벤트 큐 기반 구현
다수의 이벤트 소스를 관리하기 위해 이벤트 큐를 사용하는 구현 패턴이다.
class EventQueue {
public:
enum class EventType {
SENSOR_UPDATE,
GOAL_RECEIVED,
ACTION_COMPLETE,
TIMER_EXPIRED,
BLACKBOARD_CHANGED
};
void push(EventType event) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(event);
cv_.notify_one();
}
EventType waitForEvent() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return !queue_.empty(); });
auto event = queue_.front();
queue_.pop();
return event;
}
bool hasEvent() const {
std::lock_guard<std::mutex> lock(mutex_);
return !queue_.empty();
}
private:
std::queue<EventType> queue_;
mutable std::mutex mutex_;
std::condition_variable cv_;
};
이벤트 큐 기반 실행 루프:
void eventDrivenLoop(BT::Tree& tree, EventQueue& events) {
while (rclcpp::ok()) {
// 이벤트 대기 (블로킹)
auto event = events.waitForEvent();
// 대기 중 누적된 이벤트를 모두 소비 (Tick은 1회만)
while (events.hasEvent()) {
events.waitForEvent(); // 큐 비우기
}
// 단일 Tick 실행
tree.tickOnce();
}
}
5. 이벤트 기반 Tick의 설계 고려 사항
5.1 이벤트 폭주 방지
짧은 시간 내에 다수의 이벤트가 연속 발생하면, 이벤트마다 Tick을 실행하는 것은 CPU 과부하를 유발한다. 이를 방지하기 위해, 이벤트를 일정 간격으로 통합(coalescing)하여 단일 Tick으로 처리하거나, 최소 Tick 간격을 설정한다.
void throttledTrigger() {
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_tick_time_).count();
if (elapsed >= min_tick_interval_ms_) {
tree_.tickOnce();
last_tick_time_ = now;
} else {
pending_tick_ = true; // 지연 Tick 예약
}
}
5.2 RUNNING 노드의 처리
이벤트 기반 모델에서 RUNNING 상태의 비동기 액션 노드는 주기적인 재방문을 필요로 한다. 이벤트가 발생하지 않으면 RUNNING 노드가 재방문되지 않아 작업 완료를 감지할 수 없다. 이를 해결하기 위해, RUNNING 노드가 존재하는 동안에는 보조 타이머에 의한 주기적 Tick을 병행한다.
5.3 이벤트 우선순위
안전 관련 이벤트(비상 정지, 장애물 감지)는 일반 이벤트보다 높은 우선순위로 처리되어야 한다. 우선순위 큐를 사용하여 긴급 이벤트가 즉시 Tick을 트리거하도록 구현한다.
참고 문헌
- Colledanchise, M., & Ogren, P. (2018). Behavior Trees in Robotics and AI: An Introduction. CRC Press.
- Faconti, D. (2022). BehaviorTree.CPP documentation and API reference. https://www.behaviortree.dev/