1295.88 ReactiveFallback 우선순위 전환 추적
1. 우선순위 전환 추적의 필요성
ReactiveFallback은 매 Tick마다 자식을 처음부터 재평가하여, 상위 우선순위 조건이 활성화되면 하위 분기를 Halt하고 상위 분기를 실행하는 선점(preemption) 메커니즘을 구현한다. 이 우선순위 전환은 로봇의 행동 모드를 결정하는 핵심 메커니즘이므로, 전환이 정확한 시점에 올바른 방향으로 발생하는지를 추적하고 검증하는 것이 필수적이다. 특히 비상 대응 시나리오에서 선점 지연은 안전 사고로 직결될 수 있다.
2. 우선순위 전환 이벤트 로거
ReactiveFallback의 활성 분기(active branch)가 변경되는 시점을 감지하고 기록하는 전용 로거를 구현한다.
class PriorityTransitionLogger : public BT::StatusChangeLogger
{
public:
struct TransitionEvent
{
int tick_number;
std::chrono::steady_clock::time_point timestamp;
std::string from_branch;
std::string to_branch;
std::string trigger_condition;
};
PriorityTransitionLogger(const BT::Tree& tree,
const std::string& fallback_name)
: StatusChangeLogger(tree.rootNode()),
fallback_name_(fallback_name) {}
void callback(BT::Duration timestamp,
const BT::TreeNode& node,
BT::NodeStatus prev_status,
BT::NodeStatus status) override
{
auto* parent = node.getParent();
if (!parent || parent->name() != fallback_name_)
return;
// 자식이 RUNNING으로 전이 → 이 분기가 활성화됨
if (status == BT::NodeStatus::RUNNING &&
prev_status != BT::NodeStatus::RUNNING)
{
std::string new_branch = node.name();
if (new_branch != active_branch_)
{
TransitionEvent event;
event.tick_number = current_tick_;
event.timestamp = std::chrono::steady_clock::now();
event.from_branch = active_branch_;
event.to_branch = new_branch;
events_.push_back(event);
active_branch_ = new_branch;
}
}
// 자식이 IDLE로 전이 → 이 분기가 Halt됨
if (status == BT::NodeStatus::IDLE &&
prev_status == BT::NodeStatus::RUNNING)
{
if (node.name() == active_branch_)
{
// Halt 이벤트 기록
}
}
}
void onNewTick() { current_tick_++; }
void printTransitions() const
{
std::cout << "=== Priority Transitions for "
<< fallback_name_ << " ===" << std::endl;
for (const auto& e : events_)
{
std::cout << "Tick " << e.tick_number << ": "
<< e.from_branch << " -> " << e.to_branch
<< std::endl;
}
}
const std::vector<TransitionEvent>& events() const
{ return events_; }
void flush() override {}
private:
std::string fallback_name_;
std::string active_branch_;
std::vector<TransitionEvent> events_;
int current_tick_ = 0;
};
3. 활성 분기 이력 기록
매 Tick에서 ReactiveFallback의 활성 분기를 기록하여 시계열 이력을 구성한다.
class ActiveBranchHistory
{
public:
struct TickEntry
{
int tick;
int active_child_index;
std::string active_child_name;
BT::NodeStatus result;
};
void record(int tick, const BT::ControlNode* fallback)
{
for (size_t i = 0; i < fallback->childrenCount(); i++)
{
auto status = fallback->child(i)->status();
if (status == BT::NodeStatus::RUNNING ||
status == BT::NodeStatus::SUCCESS)
{
TickEntry entry;
entry.tick = tick;
entry.active_child_index = static_cast<int>(i);
entry.active_child_name = fallback->child(i)->name();
entry.result = status;
history_.push_back(entry);
return;
}
}
// 모든 자식이 FAILURE인 경우
history_.push_back({tick, -1, "<none>",
BT::NodeStatus::FAILURE});
}
void printTimeline() const
{
std::cout << "Tick Priority Active Branch" << std::endl;
for (const auto& e : history_)
{
std::cout << std::setw(4) << e.tick
<< std::setw(10) << e.active_child_index
<< " " << e.active_child_name
<< " [" << BT::toStr(e.result) << "]"
<< std::endl;
}
}
// 전환 빈도 분석
int countTransitions() const
{
int transitions = 0;
for (size_t i = 1; i < history_.size(); i++)
{
if (history_[i].active_child_index !=
history_[i - 1].active_child_index)
{
transitions++;
}
}
return transitions;
}
// 특정 분기의 활성 비율 계산
double branchActiveRatio(int child_index) const
{
if (history_.empty()) return 0.0;
int active_count = 0;
for (const auto& e : history_)
{
if (e.active_child_index == child_index)
active_count++;
}
return static_cast<double>(active_count) / history_.size();
}
private:
std::vector<TickEntry> history_;
};
활성 분기 이력의 출력 예시이다.
Tick Priority Active Branch
0 2 ExecuteMission [RUNNING]
1 2 ExecuteMission [RUNNING]
2 1 ReturnToBase [RUNNING]
3 1 ReturnToBase [RUNNING]
4 0 EmergencyStop [RUNNING]
5 0 EmergencyStop [SUCCESS]
6 1 ReturnToBase [RUNNING]
7 2 ExecuteMission [RUNNING]
이 이력에서 Tick 2에서 배터리 부족으로 ReturnToBase가 활성화되고, Tick 4에서 비상 상황으로 EmergencyStop이 선점하며, Tick 6에서 비상 해제 후 다시 ReturnToBase로 복귀하는 과정을 추적할 수 있다.
4. 전환 지연 측정
상위 조건이 활성화된 시점부터 실제 전환이 완료된 시점까지의 지연(latency)을 측정한다. 안전 관련 시나리오에서는 이 지연이 Tick 주기 이내여야 한다.
class TransitionLatencyMeasurer
{
public:
void conditionActivated(const std::string& condition_name)
{
activation_time_ = std::chrono::steady_clock::now();
pending_condition_ = condition_name;
}
void transitionCompleted(const std::string& new_branch)
{
if (!pending_condition_.empty())
{
auto latency = std::chrono::duration_cast<
std::chrono::microseconds>(
std::chrono::steady_clock::now() - activation_time_);
latencies_.push_back({pending_condition_,
new_branch, latency});
pending_condition_.clear();
}
}
void printReport() const
{
std::cout << "=== Transition Latency Report ===" << std::endl;
for (const auto& entry : latencies_)
{
std::cout << entry.condition << " -> " << entry.branch
<< ": " << entry.latency.count() << " us"
<< std::endl;
}
if (!latencies_.empty())
{
auto max_it = std::max_element(
latencies_.begin(), latencies_.end(),
[](const%20auto&%20a,%20const%20auto&%20b) {
return a.latency < b.latency;
});
std::cout << "Max latency: "
<< max_it->latency.count() << " us"
<< std::endl;
}
}
private:
struct LatencyEntry
{
std::string condition;
std::string branch;
std::chrono::microseconds latency;
};
std::chrono::steady_clock::time_point activation_time_;
std::string pending_condition_;
std::vector<LatencyEntry> latencies_;
};
5. 전환 원인 추적
우선순위 전환이 발생한 원인, 즉 어떤 조건 노드의 상태 변화가 전환을 유발하였는지를 추적한다.
class TransitionCauseTracker : public BT::StatusChangeLogger
{
public:
struct CauseRecord
{
int tick;
std::string condition_name;
BT::NodeStatus old_status;
BT::NodeStatus new_status;
std::string resulting_transition;
};
TransitionCauseTracker(const BT::Tree& tree,
const std::set<std::string>& condition_names)
: StatusChangeLogger(tree.rootNode()),
tracked_conditions_(condition_names) {}
void callback(BT::Duration timestamp,
const BT::TreeNode& node,
BT::NodeStatus prev_status,
BT::NodeStatus status) override
{
if (tracked_conditions_.count(node.name()) > 0)
{
// 조건 노드의 상태 변화 기록
if (prev_status != status)
{
CauseRecord record;
record.tick = current_tick_;
record.condition_name = node.name();
record.old_status = prev_status;
record.new_status = status;
causes_.push_back(record);
}
}
}
void onNewTick() { current_tick_++; }
void printCauses() const
{
std::cout << "=== Transition Causes ===" << std::endl;
for (const auto& c : causes_)
{
std::cout << "Tick " << c.tick << ": "
<< c.condition_name << " "
<< BT::toStr(c.old_status) << " -> "
<< BT::toStr(c.new_status) << std::endl;
}
}
void flush() override {}
private:
std::set<std::string> tracked_conditions_;
std::vector<CauseRecord> causes_;
int current_tick_ = 0;
};
사용 예시이다.
BT::Tree tree = factory.createTreeFromText(xml_text);
std::set<std::string> conditions = {
"IsEmergency", "IsBatteryLow", "IsObstacleNear"
};
TransitionCauseTracker tracker(tree, conditions);
출력 예시이다.
=== Transition Causes ===
Tick 45: IsBatteryLow FAILURE -> SUCCESS
Tick 102: IsEmergency FAILURE -> SUCCESS
Tick 115: IsEmergency SUCCESS -> FAILURE
Tick 116: IsBatteryLow SUCCESS -> FAILURE
이 기록에서 Tick 45에서 배터리 부족이 감지되고, Tick 102에서 비상 상황이 발생하며, Tick 115에서 비상이 해제되었음을 확인할 수 있다.
6. 진동(Thrashing) 감지
우선순위 전환이 과도하게 빈번하게 발생하는 진동 현상을 감지한다. 이는 조건 노드의 히스테리시스(hysteresis) 부재 또는 센서 잡음에 의해 발생할 수 있다.
class ThrashingDetector
{
public:
ThrashingDetector(int window_size, int threshold)
: window_size_(window_size), threshold_(threshold) {}
bool check(const ActiveBranchHistory& history) const
{
const auto& entries = history.entries();
if (static_cast<int>(entries.size()) < window_size_)
return false;
// 최근 window_size Tick에서의 전환 횟수 계산
int transitions = 0;
size_t start = entries.size() - window_size_;
for (size_t i = start + 1; i < entries.size(); i++)
{
if (entries[i].active_child_index !=
entries[i - 1].active_child_index)
{
transitions++;
}
}
if (transitions >= threshold_)
{
std::cerr << "[WARNING] Thrashing detected: "
<< transitions << " transitions in last "
<< window_size_ << " ticks" << std::endl;
return true;
}
return false;
}
private:
int window_size_;
int threshold_;
};
예를 들어, 10 Tick 이내에 5회 이상의 전환이 발생하면 진동으로 간주한다.
ThrashingDetector detector(/*window=*/10, /*threshold=*/5);
if (detector.check(branch_history))
{
// 진동 경고 → 조건 노드에 히스테리시스 추가 필요
}
7. 통합 추적 시스템
위의 추적 도구들을 통합하여 ReactiveFallback의 동작을 종합적으로 분석하는 시스템을 구성한다.
class ReactiveFallbackAnalyzer
{
public:
ReactiveFallbackAnalyzer(BT::Tree& tree,
const std::string& fallback_name,
const std::set<std::string>& conditions)
: branch_history_(),
transition_logger_(tree, fallback_name),
cause_tracker_(tree, conditions),
thrashing_detector_(10, 5) {}
void onTick(int tick, const BT::ControlNode* fallback)
{
branch_history_.record(tick, fallback);
transition_logger_.onNewTick();
cause_tracker_.onNewTick();
if (thrashing_detector_.check(branch_history_))
{
std::cerr << "[Tick " << tick
<< "] Thrashing warning!" << std::endl;
}
}
void printFullReport() const
{
branch_history_.printTimeline();
std::cout << std::endl;
transition_logger_.printTransitions();
std::cout << std::endl;
cause_tracker_.printCauses();
std::cout << std::endl;
std::cout << "Total transitions: "
<< branch_history_.countTransitions() << std::endl;
}
private:
ActiveBranchHistory branch_history_;
PriorityTransitionLogger transition_logger_;
TransitionCauseTracker cause_tracker_;
ThrashingDetector thrashing_detector_;
};
이 통합 분석기는 활성 분기 이력, 전환 이벤트, 전환 원인, 진동 감지를 단일 인터페이스로 제공한다. 개발 및 테스트 단계에서 ReactiveFallback의 우선순위 전환 동작을 종합적으로 검증하고, 설계 의도대로 동작하는지를 확인하는 데 활용한다.