397.83 FlexBE 기반 행동 실행 엔진과의 통합

397.83 FlexBE 기반 행동 실행 엔진과의 통합

1. 개요

FlexBE(Flexible Behavior Engine)는 계층적 유한 상태 머신(Hierarchical Finite State Machine, HFSM) 패러다임을 기반으로 한 로봇 행동 실행 엔진으로서, 자율 로봇 시스템에서 고수준 임무 계획과 저수준 행동 실행 사이의 체계적 통합을 지원한다(Schillinger et al., 2016). FlexBE는 기존의 상태 머신 기반 실행 프레임워크의 한계를 극복하기 위해, 운영자의 실시간 감독(Operator Supervision), 런타임 행동 수정(Runtime Behavior Modification), 그리고 자율성 수준의 동적 조절(Dynamic Autonomy Level Adjustment)을 핵심 설계 원리로 채택하였다.

FlexBE는 ROS(Robot Operating System) 생태계에서 운용되며, 최근 ROS2로의 완전한 포팅이 이루어져 현대적 로봇 소프트웨어 아키텍처와의 호환성을 확보하고 있다. 특히, 인간-로봇 협업(Human-Robot Collaboration) 시나리오에서 운영자의 의사결정을 임무 실행 과정에 자연스럽게 통합할 수 있는 혼합 이니셔티브(Mixed-Initiative) 프레임워크로서의 역할이 두드러진다.

2. FlexBE의 아키텍처

2.1 시스템 구성 요소

FlexBE 시스템은 크게 세 가지 핵심 구성 요소로 이루어진다:

2.1.1 행동 엔진(Behavior Engine, Onboard)

행동 엔진은 로봇에 탑재되어 실행되는 핵심 실행 컴포넌트이다. 이 엔진은 정의된 행동(Behavior)을 계층적 상태 머신으로 해석하고, 상태 전이를 관리하며, 개별 상태에서 정의된 로봇 행위를 실행한다. 행동 엔진의 주요 책임은 다음과 같다:

  • 상태 머신 실행: 행동 정의에 따른 상태 전이 및 상태별 행위 실행을 관리한다.
  • 자율성 수준 관리: 현재 설정된 자율성 수준에 따라 운영자의 승인 없이 전이를 수행하거나, 운영자의 명시적 승인을 대기한다.
  • 상태 보고: 현재 실행 상태, 활성 상태 경로, 사용자 데이터 등의 정보를 운영자 인터페이스에 실시간으로 전송한다.
  • 런타임 수정 적용: 운영자로부터 수신된 전이 잠금(Transition Lock), 전이 강제(Transition Force), 상태 선점(State Preemption) 등의 명령을 처리한다.

2.1.2 운영자 제어 스테이션(Operator Control Station, OCS)

운영자 제어 스테이션은 FlexBE App이라 불리는 웹 기반 그래픽 사용자 인터페이스(GUI)로 구현된다. OCS는 다음의 기능을 제공한다:

  • 행동 설계: 드래그 앤 드롭(Drag and Drop) 방식의 시각적 편집기를 통해 계층적 상태 머신을 구성한다.
  • 실시간 모니터링: 실행 중인 행동의 현재 상태, 활성 상태 경로, 데이터 흐름을 시각적으로 표시한다.
  • 런타임 개입: 운영자가 실행 중인 행동에 대해 전이 승인, 전이 강제, 자율성 수준 변경 등의 개입을 수행할 수 있다.
  • 행동 미러링(Mirroring): 로봇 탑재 행동 엔진의 상태를 OCS에서 미러링하여, 네트워크 지연이나 통신 두절 상황에서도 행동 상태를 추적할 수 있다.

2.1.3 상태 라이브러리(State Library)

상태 라이브러리는 재사용 가능한 원자적 상태(Atomic State)와 컨테이너 상태(Container State)의 모음이다. FlexBE는 표준 상태 라이브러리와 함께, 사용자가 프로젝트별 상태를 정의하여 라이브러리에 추가할 수 있는 확장 메커니즘을 제공한다.

2.2 상태 정의 인터페이스

FlexBE에서의 상태 정의는 EventState 클래스를 상속하여 이루어진다:

from flexbe_core import EventState, Logger

class NavigateToGoal(EventState):
    """
    지정된 목표 위치로 로봇을 내비게이션하는 상태.

    -- target_frame    string    목표 좌표 프레임
    -- timeout         float     내비게이션 제한 시간 (초)

    ># goal_pose       PoseStamped    목표 위치 및 자세
    #> nav_result      string         내비게이션 결과 정보

    <= succeeded       목표 위치에 성공적으로 도달
    <= failed          내비게이션 실패
    <= timeout         제한 시간 초과
    <= preempted       외부 선점에 의한 중단
    """

    def __init__(self, target_frame='map', timeout=60.0):
        super().__init__(
            outcomes=['succeeded', 'failed', 'timeout', 'preempted'],
            input_keys=['goal_pose'],
            output_keys=['nav_result']
        )
        self._target_frame = target_frame
        self._timeout = timeout
        self._start_time = None
        self._nav_client = None

    def on_enter(self, userdata):
        """상태 진입 시 1회 호출."""
        self._start_time = self._node.get_clock().now()
        Logger.loginfo(f'Navigating to goal in frame {self._target_frame}')

        # 내비게이션 액션 목표 전송
        goal = NavigateToPose.Goal()
        goal.pose = userdata.goal_pose
        self._nav_client.send_goal_async(goal)

    def execute(self, userdata):
        """주기적으로 호출되는 실행 콜백."""
        # 제한 시간 확인
        elapsed = (self._node.get_clock().now() - self._start_time).nanoseconds / 1e9
        if elapsed > self._timeout:
            self._nav_client.cancel_goal_async()
            return 'timeout'

        # 내비게이션 결과 확인
        if self._nav_client.has_result():
            result = self._nav_client.get_result()
            if result.success:
                userdata.nav_result = 'navigation_successful'
                return 'succeeded'
            else:
                userdata.nav_result = f'navigation_failed: {result.error_code}'
                return 'failed'

        # 아직 진행 중
        return None

    def on_exit(self, userdata):
        """상태 종료 시 1회 호출."""
        if not self._nav_client.has_result():
            self._nav_client.cancel_goal_async()
            Logger.loginfo('Navigation cancelled on exit')

    def on_start(self):
        """행동 시작 시 1회 호출 (자원 초기화)."""
        self._nav_client = ActionClient(
            self._node, NavigateToPose, 'navigate_to_pose'
        )

    def on_stop(self):
        """행동 종료 시 1회 호출 (자원 해제)."""
        self._nav_client.destroy()

FlexBE 상태의 생명주기 콜백은 다음과 같이 구분된다:

콜백 메서드호출 시점용도
on_start()행동 시작 시 1회자원 초기화, 통신 채널 설정
on_enter(userdata)상태 진입 시 1회액션 목표 전송, 초기 설정
execute(userdata)상태 활성 중 주기적진행 상태 확인, 결과 판정
on_exit(userdata)상태 종료 시 1회진행 중인 작업 취소, 정리
on_stop()행동 종료 시 1회자원 해제, 통신 채널 종료
on_pause()상태 일시 정지 시일시 정지 처리
on_resume(userdata)상태 재개 시일시 정지 후 복구

3. 자율성 수준 관리

3.1 자율성 수준의 개념

FlexBE의 가장 차별화된 기능 중 하나는 자율성 수준(Autonomy Level)의 동적 관리이다. 자율성 수준은 로봇이 운영자의 명시적 승인 없이 수행할 수 있는 전이의 범위를 결정한다. FlexBE는 네 단계의 자율성 수준을 정의한다:

수준명칭설명
0Off모든 전이에 대해 운영자 승인 필요
1Low저수준 자율성 전이만 자동 수행
2High고수준 자율성 전이까지 자동 수행
3Full모든 전이를 자동 수행, 운영자 승인 불필요

각 상태 전이에는 해당 전이가 자동으로 수행되기 위해 필요한 최소 자율성 수준이 할당된다. 현재 자율성 수준이 전이에 할당된 수준 이상이면 전이가 자동으로 수행되고, 그렇지 않으면 운영자의 승인을 대기한다.

3.2 자율성 수준의 실시간 조절

운영자는 임무 실행 도중 자율성 수준을 실시간으로 변경할 수 있다. 이를 통해, 예측 가능한 환경에서는 높은 자율성을 허용하고, 불확실한 상황이나 위험한 작업 구간에서는 운영자의 직접적 감독을 강화할 수 있다. 자율성 수준의 동적 조절은 다음과 같은 시나리오에서 활용된다:

  • 안전 위험 구간 진입: 운영자가 자율성 수준을 낮추어 모든 전이에 대한 승인을 요구한다.
  • 반복적 정상 작업: 운영자가 자율성 수준을 높여 로봇의 자율 실행을 허용한다.
  • 통신 지연 환경: 높은 자율성 수준을 설정하여 통신 지연으로 인한 작업 중단을 방지한다.
  • 비상 상황: 운영자가 즉시 자율성을 Off로 전환하여 모든 행동을 수동으로 제어한다.

4. 런타임 행동 수정

4.1 전이 잠금과 강제

FlexBE는 운영자가 실행 중인 행동의 전이를 런타임에 제어할 수 있는 두 가지 핵심 메커니즘을 제공한다:

전이 잠금(Transition Lock): 특정 전이를 잠금하여, 해당 전이 조건이 충족되더라도 운영자의 명시적 승인 없이는 전이가 발생하지 않도록 한다. 이는 위험한 전이가 자동으로 수행되는 것을 방지하기 위해 사용된다.

전이 강제(Transition Force): 현재 상태의 자연적 종료 조건과 무관하게, 운영자가 특정 전이를 강제로 수행시킨다. 이는 현재 상태가 교착(Deadlock) 상태에 빠져 있거나, 운영자의 판단에 따라 현재 상태를 즉시 종료해야 할 때 활용된다.

4.2 행동 구조의 동적 수정

FlexBE의 고급 기능으로, 운영자는 실행 중인 행동의 구조 자체를 동적으로 수정할 수 있다. 이 기능은 다음의 작업을 포함한다:

  • 상태 추가/제거: 실행 중인 상태 머신에 새로운 상태를 추가하거나 기존 상태를 제거한다.
  • 전이 경로 변경: 상태 간 전이 매핑을 수정하여 임무 흐름을 재구성한다.
  • 서브 행동 교체: 계층적 상태 머신의 특정 서브 행동을 다른 서브 행동으로 교체한다.

이러한 동적 수정은 미리 예상하지 못한 상황에 대한 적응적 대응을 가능하게 하며, 완전한 임무 중단과 재시작 없이 임무 흐름을 조정할 수 있는 유연성을 제공한다.

5. 임무 계획과의 통합 아키텍처

5.1 계획기-실행기 분리 아키텍처

FlexBE를 임무 계획 시스템과 통합할 때, 가장 일반적인 아키텍처 패턴은 계획기-실행기 분리(Planner-Executor Separation) 아키텍처이다. 이 아키텍처에서 임무 계획기는 고수준 임무 계획을 생성하고, FlexBE는 생성된 계획을 행동으로 변환하여 실행한다.

┌─────────────────────────────────────────────┐
│              임무 관리 계층                    │
│  ┌──────────┐    ┌──────────────────────┐   │
│  │ 임무     │    │ 임무 모니터링 및      │   │
│  │ 계획기   │───▶│ 상태 평가             │   │
│  └────┬─────┘    └──────────┬───────────┘   │
│       │                     │               │
│       │ Plan                │ State Update   │
│       ▼                     │               │
│  ┌──────────────────────────┴───────────┐   │
│  │        계획-행동 변환 모듈             │   │
│  │   (Plan-to-Behavior Translator)       │   │
│  └────────────────┬─────────────────────┘   │
└───────────────────┼─────────────────────────┘
                    │ Behavior
                    ▼
┌───────────────────────────────────────────────┐
│            FlexBE 행동 실행 계층               │
│  ┌──────────────┐    ┌────────────────────┐  │
│  │ 행동 엔진    │◀──▶│ 운영자 제어 스테이션 │  │
│  │ (Onboard)    │    │ (OCS)              │  │
│  └──────┬───────┘    └────────────────────┘  │
│         │                                     │
│         ▼                                     │
│  ┌──────────────────────────────────────┐    │
│  │          로봇 기능 모듈               │    │
│  │  Navigation │ Manipulation │ Sensing  │    │
│  └──────────────────────────────────────┘    │
└───────────────────────────────────────────────┘

5.2 계획-행동 변환(Plan-to-Behavior Translation)

임무 계획기가 생성한 계획을 FlexBE 행동으로 변환하는 과정은 다음의 단계를 따른다:

  1. 계획 파싱(Plan Parsing): PDDL 플래너 또는 HTN 플래너가 생성한 계획을 파싱하여, 액션 시퀀스와 시간적 제약을 추출한다.
  2. 액션-상태 매핑(Action-State Mapping): 각 계획 액션을 FlexBE 상태 라이브러리의 대응 상태에 매핑한다. 매핑 테이블은 사전에 정의되며, 도메인별로 구성된다.
  3. 행동 구성(Behavior Composition): 매핑된 상태들을 FlexBE의 상태 머신 구조로 조립한다. 순차 액션은 시퀀스 구조로, 병렬 액션은 동시성 구조로 변환된다.
  4. 자율성 수준 할당: 각 전이에 적절한 자율성 수준을 할당한다. 위험도가 높은 전이에는 낮은 자율성 수준을 할당하여 운영자 감독을 강화한다.
  5. 행동 배포(Behavior Deployment): 구성된 행동을 행동 엔진에 배포하여 실행을 시작한다.
class PlanToBehaviorTranslator:
    def __init__(self, action_state_mapping):
        self._mapping = action_state_mapping

    def translate(self, plan):
        """PDDL 계획을 FlexBE 행동으로 변환."""
        behavior_sm = OperatableStateMachine(
            outcomes=['mission_complete', 'mission_failed']
        )

        with behavior_sm:
            prev_state_name = None
            for i, action in enumerate(plan.actions):
                state_class = self._mapping[action.name]
                state_name = f'{action.name}_{i}'

                # 상태 인스턴스 생성 및 파라미터 설정
                state = state_class(**self._extract_params(action))

                # 전이 구성
                transitions = {
                    'succeeded': self._next_state(plan, i),
                    'failed': 'mission_failed',
                    'preempted': 'mission_failed'
                }

                OperatableStateMachine.add(
                    state_name, state,
                    transitions=transitions,
                    autonomy={
                        'succeeded': Autonomy.Low,
                        'failed': Autonomy.Full,
                        'preempted': Autonomy.Full
                    }
                )

        return behavior_sm

5.3 재계획 트리거 메커니즘

FlexBE와 임무 계획기의 통합에서 핵심적인 요소는 재계획(Replanning) 트리거 메커니즘이다. 행동 실행 중 예상치 못한 상황이 발생하면, FlexBE 행동 엔진이 상위 임무 계획기에 재계획을 요청한다. 재계획 트리거의 유형은 다음과 같다:

  • 액션 실패: 특정 상태의 실행이 실패하고, 지역적 오류 복구가 불가능한 경우
  • 전제 조건 위반: 환경 변화로 인해 현재 계획의 전제 조건이 더 이상 충족되지 않는 경우
  • 새로운 목표 추가: 운영자가 새로운 임무 목표를 동적으로 추가한 경우
  • 자원 부족: 배터리, 연료, 통신 대역폭 등의 자원이 현재 계획의 완수에 불충분한 경우
  • 운영자 요청: 운영자가 명시적으로 재계획을 요청한 경우

6. FlexBE의 행동 설계 패턴

6.1 재사용 가능한 행동 패턴

FlexBE 기반의 임무 시스템에서 빈번하게 활용되는 행동 설계 패턴은 다음과 같다:

6.1.1 순차 실행 패턴(Sequential Execution Pattern)

순차적으로 수행되어야 하는 작업들을 연쇄적으로 구성하는 기본 패턴이다. 각 상태의 성공 결과가 다음 상태로의 전이를 트리거하며, 실패 시 오류 처리 상태로 전이한다.

6.1.2 감시-행동 패턴(Monitor-Action Pattern)

조건 모니터링과 행동 실행을 병렬로 수행하는 패턴이다. 동시성 컨테이너 내에서 모니터링 상태와 행동 상태를 병렬로 실행하며, 모니터링 상태가 비정상 조건을 감지하면 행동 상태를 선점하여 안전 조치를 수행한다.

6.1.3 시도-재시도 패턴(Try-Retry Pattern)

실패 가능성이 있는 작업에 대해 제한된 횟수의 재시도를 구현하는 패턴이다. 재시도 카운터를 사용자 데이터로 관리하며, 최대 재시도 횟수 초과 시 상위 오류 처리로 전이한다.

6.1.4 조건부 분기 패턴(Conditional Branching Pattern)

환경 조건이나 센서 데이터에 따라 서로 다른 행동 경로를 선택하는 패턴이다. 조건 평가 상태에서 현재 상황을 평가하고, 평가 결과에 따라 적절한 행동 분기로 전이한다.

6.2 복합 행동의 계층적 구성

FlexBE의 계층적 상태 머신 구조를 활용하여, 복합 임무를 재사용 가능한 서브 행동으로 분해하고 조합하는 것이 권장된다:

class SurveillanceMission(Behavior):
    def __init__(self, node):
        super().__init__()
        self.name = 'Surveillance Mission'

    def create(self):
        # 최상위 상태 머신
        _state_machine = OperatableStateMachine(
            outcomes=['finished', 'failed']
        )

        with _state_machine:
            # 사전 비행 점검 서브 행동
            OperatableStateMachine.add(
                'PRE_FLIGHT_CHECK',
                self.use_behavior(PreFlightCheckBehavior),
                transitions={
                    'passed': 'TAKEOFF',
                    'failed': 'failed'
                },
                autonomy={
                    'passed': Autonomy.Low,
                    'failed': Autonomy.Full
                }
            )

            # 이륙 서브 행동
            OperatableStateMachine.add(
                'TAKEOFF',
                self.use_behavior(TakeoffBehavior),
                transitions={
                    'airborne': 'PATROL_AREA',
                    'failed': 'EMERGENCY_LAND'
                },
                autonomy={
                    'airborne': Autonomy.High,
                    'failed': Autonomy.Full
                }
            )

            # 구역 순찰 서브 행동 (동시성 컨테이너)
            patrol_cc = ConcurrencyContainer(
                outcomes=['patrol_complete', 'anomaly_detected', 'low_battery'],
                conditions=[
                    ('patrol_complete', [
                        ('WAYPOINT_PATROL', 'completed')
                    ]),
                    ('anomaly_detected', [
                        ('ANOMALY_DETECTOR', 'detected')
                    ]),
                    ('low_battery', [
                        ('BATTERY_MONITOR', 'critical')
                    ])
                ]
            )

            with patrol_cc:
                ConcurrencyContainer.add(
                    'WAYPOINT_PATROL', WaypointPatrolState()
                )
                ConcurrencyContainer.add(
                    'ANOMALY_DETECTOR', AnomalyDetectionState()
                )
                ConcurrencyContainer.add(
                    'BATTERY_MONITOR', BatteryMonitorState()
                )

            OperatableStateMachine.add(
                'PATROL_AREA',
                patrol_cc,
                transitions={
                    'patrol_complete': 'RETURN_TO_BASE',
                    'anomaly_detected': 'INVESTIGATE_ANOMALY',
                    'low_battery': 'RETURN_TO_BASE'
                },
                autonomy={
                    'patrol_complete': Autonomy.High,
                    'anomaly_detected': Autonomy.Low,
                    'low_battery': Autonomy.Full
                }
            )

        return _state_machine

7. ROS2 환경에서의 FlexBE 통합

7.1 FlexBE ROS2 아키텍처

FlexBE의 ROS2 버전은 다음과 같은 ROS2 고유 기능을 활용한다:

  • 생명주기 노드(Lifecycle Node): 행동 엔진이 ROS2 생명주기 노드로 구현되어, 체계적인 상태 관리와 자원 생명주기 관리를 지원한다.
  • 컴포넌트 노드(Component Node): 상태 실행에 필요한 기능 모듈이 컴포넌트 노드로 구현되어, 단일 프로세스 내에서의 효율적 실행이 가능하다.
  • 액션 인터페이스: ROS2의 액션(Action) 인터페이스를 통해 장기 실행 작업의 비동기 관리를 지원한다.
  • QoS 프로파일: OCS와 행동 엔진 간의 통신에 적절한 QoS 프로파일을 적용하여, 네트워크 환경에 따른 통신 신뢰성을 보장한다.

7.2 Nav2 및 MoveIt2와의 통합

FlexBE는 ROS2의 주요 기능 스택과의 통합을 위한 표준 상태를 제공한다:

Nav2 통합: NavigateToPose, FollowPath, ComputePathToPose 등의 Nav2 액션을 래핑하는 FlexBE 상태가 제공된다. 이를 통해 내비게이션 작업을 FlexBE 행동의 일부로 자연스럽게 포함할 수 있다.

MoveIt2 통합: MoveGroup, GraspPlanning 등의 MoveIt2 기능을 FlexBE 상태로 통합하여, 매니퓰레이션 작업과 내비게이션 작업을 단일 행동 구조 내에서 조합할 수 있다.

8. 성능 특성 및 확장성

8.1 실행 성능

FlexBE의 상태 전이 오버헤드는 최소화되어 있으나, 운영자 승인 대기 시간은 자율성 수준과 네트워크 환경에 따라 가변적이다. 완전 자율(Full Autonomy) 모드에서의 상태 전이는 밀리초 수준의 지연을 보이며, 운영자 승인이 필요한 전이에서는 인간의 반응 시간과 네트워크 지연이 추가된다.

8.2 확장성 고려 사항

  • 상태 수의 증가: 계층적 구조를 활용하여 상태 수의 증가에 따른 복잡도를 관리할 수 있다. 각 계층의 상태 수를 제한하고, 서브 행동으로 분해하는 것이 권장된다.
  • 다중 로봇 지원: 각 로봇에 독립적인 행동 엔진을 배치하고, 상위 조율 계층에서 다중 로봇 간의 동기화를 관리하는 아키텍처가 가능하다.
  • 네트워크 격리: 행동 엔진이 로봇에 탑재되어 실행되므로, 통신 두절 시에도 현재 행동의 실행이 계속될 수 있다.

9. 한계 및 향후 발전 방향

FlexBE는 운영자 감독 기반의 행동 실행에 강점을 가지지만, 다음과 같은 한계가 존재한다:

  • 자동 계획 수립 기능의 부재: FlexBE 자체는 임무 계획을 자동으로 생성하지 않으며, 별도의 임무 계획기와의 통합이 필요하다.
  • 형식적 검증의 제한: 행동의 정합성(Correctness)이나 활성(Liveness) 속성에 대한 형식적 검증 도구가 부족하다.
  • 학습 기반 적응의 미지원: 실행 경험으로부터 행동 구조를 자동으로 개선하는 학습 메커니즘이 아직 통합되어 있지 않다.

향후 발전 방향으로는 형식적 검증 도구의 통합, 강화학습 기반 자율성 수준 자동 조절, 대규모 언어 모델을 활용한 자연어 기반 행동 명세, 그리고 분산 다중 로봇 행동 조율 프레임워크로의 확장이 기대된다.

10. 참고 문헌

  • Schillinger, P., Kohlbrecher, S., & von Stryk, O. (2016). “Human-Robot Collaborative High-Level Control with Application to Rescue Robotics.” IEEE International Conference on Robotics and Automation (ICRA), 2796-2802.
  • Schillinger, P. (2018). “An Approach for Runtime-Modifiable Behavior Execution in Robot Task Planning.” PhD Dissertation, TU Darmstadt.
  • Conner, D. C., & Willis, J. (2017). “Flexible Navigation: Finite State Machine Based Integrated Navigation and Control for ROS Enabled Robots.” SoutheastCon 2017, 1-8.
  • Colledanchise, M., & Ögren, P. (2018). “Behavior Trees in Robotics and AI: An Introduction.” CRC Press.
  • Macenski, S., Foote, T., Gerkey, B., Lalancette, C., & Woodall, W. (2022). “Robot Operating System 2: Design, Architecture, and Uses in the Wild.” Science Robotics, 7(66).