테스트 환경 설정
ROS2에서 파이썬을 활용하여 테스트를 수행하기 위해서는 pytest
와 같은 파이썬의 테스트 프레임워크를 사용할 수 있다. ROS2의 파이썬 패키지인 rclpy
를 기반으로 작성된 노드의 기능을 확인하기 위해 기본적으로 노드 단위 테스트를 설정하는 방법을 먼저 알아보자.
-
테스트 프레임워크 설치: ROS2에서
pytest
를 사용하기 위해서는 다음과 같이 의존성을 설치해야 한다.bash sudo apt install python3-pytest
-
테스트 구조 구성: ROS2의 워크스페이스 내에서 패키지를 생성할 때,
tests
디렉터리를 구성하여 테스트 파일을 저장한다. 예를 들어my_ros_package
라는 패키지가 있다면, 테스트 파일의 위치는 다음과 같다.my_ros_package/ ├── src/ ├── include/ ├── tests/ │ └── test_my_node.py └── setup.py
-
단위 테스트 작성:
pytest
를 사용하여 파이썬 노드의 단위 테스트를 작성할 수 있다. 기본적인 테스트 함수는 다음과 같은 구조를 가진다. ```python import pytest import rclpy from rclpy.node import Node
def test_example(): rclpy.init(args=None) node = Node('test_node') assert node.get_name() == 'test_node' rclpy.shutdown() ```
이 코드에서 rclpy.init()
을 통해 ROS2 시스템을 초기화하고, Node
객체를 생성하여 해당 노드의 이름을 확인하는 간단한 테스트를 작성하였다. 이와 같이 pytest
를 활용하여 노드의 여러 가지 동작을 검증할 수 있다.
퍼블리셔 및 서브스크라이버 테스트
ROS2에서 퍼블리셔와 서브스크라이버의 기능을 테스트하는 것은 매우 중요하다. 이를 통해 노드 간의 통신이 원활하게 이루어지는지 확인할 수 있다.
- 퍼블리셔 테스트: 퍼블리셔 테스트는 노드가 특정 토픽에 올바르게 메시지를 발행하는지 검증하는 과정을 포함한다. 이를 위해서는
pytest
와 함께 ROS2의 토픽 관련 명령어를 사용할 수 있다.
예시 코드: ```python from std_msgs.msg import String
def test_publisher(): rclpy.init(args=None) node = Node('test_publisher') publisher = node.create_publisher(String, 'test_topic', 10) msg = String() msg.data = 'Hello, ROS2' publisher.publish(msg) rclpy.spin_once(node) assert msg.data == 'Hello, ROS2' rclpy.shutdown() ```
이 테스트는 퍼블리셔가 test_topic
이라는 토픽에 Hello, ROS2
메시지를 발행하는지를 확인한다.
- 서브스크라이버 테스트: 서브스크라이버 테스트는 노드가 특정 토픽에서 발행된 메시지를 정확하게 수신하고 처리하는지 확인하는 테스트이다.
예시 코드: ```python def test_subscriber(): rclpy.init(args=None) node = Node('test_subscriber') messages_received = []
def callback(msg):
messages_received.append(msg.data)
node.create_subscription(String, 'test_topic', callback, 10)
rclpy.spin_once(node)
assert len(messages_received) > 0
assert messages_received[0] == 'Hello, ROS2'
rclpy.shutdown()
```
이 테스트는 서브스크라이버가 test_topic
에서 발행된 메시지를 수신하고 이를 처리하는지 확인한다.
디버깅 방법
ROS2에서 노드를 디버깅하는 과정은 rclpy
모듈의 기능을 활용할 수 있다. 특히 launch
파일을 사용하여 여러 노드를 동시에 실행하거나, rqt_console
를 통해 로그를 시각적으로 확인하는 방법을 사용할 수 있다.
-
노드에서 로그 출력: 파이썬 노드 내에서 로그 메시지를 출력하기 위해서는
get_logger()
메서드를 활용하여 로그를 남길 수 있다.python node.get_logger().info('This is a log message')
-
rqt_console 사용:
rqt_console
는 ROS2 노드에서 출력되는 로그 메시지를 실시간으로 확인할 수 있는 도구이다. 이를 활용하면 노드의 실행 상태를 모니터링하면서 문제를 쉽게 추적할 수 있다.
bash
rqt_console
- launch 파일을 통한 다중 노드 디버깅: 복잡한 시스템에서 여러 노드를 동시에 실행하는 경우,
launch
파일을 활용하여 디버깅할 수 있다. 예를 들어, 두 개의 노드를 동시에 실행하는launch
파일은 다음과 같이 작성할 수 있다.
```python from launch import LaunchDescription from launch_ros.actions import Node
def generate_launch_description(): return LaunchDescription([ Node( package='my_package', executable='my_node', name='node_1' ), Node( package='my_package', executable='my_node', name='node_2' ), ]) ```
- QoS 설정 디버깅: 메시지 손실 또는 지연이 발생하는 경우,
QoS
(Quality of Service) 설정을 변경하여 디버깅할 수 있다. 예를 들어, 퍼블리셔와 서브스크라이버 사이에 호환되지 않는 QoS 설정으로 인해 문제가 발생할 수 있다.
QoS 설정 최적화
ROS2에서 퍼블리셔와 서브스크라이버 간의 메시지 통신을 최적화하기 위해서는 QoS 정책을 조정해야 한다. 다음은 주요 QoS 정책이다.
- 신뢰성 (reliability): 퍼블리셔와 서브스크라이버 간의 메시지 신뢰성을 설정한다.
reliable
또는best_effort
옵션이 있으며, 신뢰성이 중요한 경우reliable
을 선택한다. - 역사 (history): 퍼블리셔가 발행한 메시지가 서브스크라이버에게 전달되기 전에 얼마나 오랫동안 메시지를 보관할지를 설정한다.
- 지속성 (durability): 퍼블리셔가 메시지를 발행한 이후에도 서브스크라이버가 메시지를 받을 수 있도록 설정할 수 있다. 이를 통해 네트워크 장애 후에도 메시지를 받을 수 있도록 할 수 있다.
이러한 QoS 설정은 퍼블리셔와 서브스크라이버 모두에서 조정할 수 있으며, 이를 통해 통신 성능을 최적화할 수 있다.
ROS2 테스트 프레임워크 활용
ROS2에서 제공하는 테스트 프레임워크를 통해 노드의 통합 테스트를 진행할 수 있다. 특히 launch_testing
패키지를 사용하여 여러 노드를 동시에 실행하고, 그 상태를 모니터링하는 테스트를 구성할 수 있다.
launch_testing 설정
launch_testing
은 ROS2에서 제공하는 통합 테스트 프레임워크로, 여러 노드를 동시에 실행하고 해당 노드들이 정상적으로 작동하는지 확인할 수 있다.
-
launch_testing 설치: ROS2에서
launch_testing
패키지를 사용하기 위해서는 해당 패키지가 설치되어 있어야 한다. 일반적으로 ROS2 Humble에서는 기본적으로 포함되어 있지만, 필요시 다음 명령어로 설치 가능하다.bash sudo apt install ros-humble-launch-testing
-
launch_testing 구조: 테스트를 진행하기 위해서는
launch
파일과pytest
를 결합하여 사용할 수 있다. 예를 들어, 노드 두 개를 동시에 실행하고 해당 노드들의 상호작용을 테스트하는 코드는 다음과 같다.
```python import launch import launch_ros.actions import launch_testing import pytest
def generate_test_description(): node_1 = launch_ros.actions.Node( package='my_package', executable='my_node', name='node_1' )
node_2 = launch_ros.actions.Node(
package='my_package',
executable='my_node',
name='node_2'
)
return launch.LaunchDescription([
node_1,
node_2,
launch_testing.actions.ReadyToTest(),
])
@pytest.mark.launch_test def test_nodes_running(): assert True ```
이 예시에서는 node_1
과 node_2
라는 두 개의 노드를 동시에 실행하고, launch_testing.actions.ReadyToTest()
를 통해 노드가 정상적으로 시작되었는지 확인한다.
퍼블리셔와 서브스크라이버 통합 테스트
퍼블리셔와 서브스크라이버 간의 통신을 통합적으로 테스트하는 경우, 각 노드가 정상적으로 메시지를 주고받는지 확인해야 한다. 이를 위해 다음과 같은 절차를 따른다.
- 퍼블리셔와 서브스크라이버 설정: 퍼블리셔 노드와 서브스크라이버 노드를 각각 생성한 후, 메시지가 올바르게 전달되는지 확인할 수 있다.
예시 코드: ```python def test_pub_sub(): rclpy.init(args=None) publisher_node = Node('test_publisher') subscriber_node = Node('test_subscriber')
publisher = publisher_node.create_publisher(String, 'test_topic', 10)
messages_received = []
def callback(msg):
messages_received.append(msg.data)
subscriber_node.create_subscription(String, 'test_topic', callback, 10)
msg = String()
msg.data = 'Test Message'
publisher.publish(msg)
rclpy.spin_once(publisher_node)
rclpy.spin_once(subscriber_node)
assert len(messages_received) > 0
assert messages_received[0] == 'Test Message'
rclpy.shutdown()
```
이 테스트에서는 퍼블리셔가 Test Message
라는 메시지를 발행하고, 서브스크라이버가 이를 정상적으로 수신하는지를 확인한다.
rqt_console와 rqt_logger_level 활용
ROS2 노드의 상태를 실시간으로 모니터링하고 디버깅하는 데 유용한 도구로 rqt_console
과 rqt_logger_level
이 있다. 이 도구들은 노드의 로그 메시지를 실시간으로 확인하고, 특정 로그 레벨을 조정할 수 있게 해준다.
rqt_console
rqt_console
는 ROS2 노드에서 출력되는 로그 메시지를 실시간으로 확인할 수 있는 GUI 툴이다. 이를 통해 노드의 상태를 모니터링하면서 문제를 추적할 수 있다.
-
rqt_console 실행: 터미널에서 다음 명령어를 통해
rqt_console
을 실행할 수 있다.bash rqt_console
-
로그 필터링: 로그 레벨별로 메시지를 필터링하거나 특정 노드에서 발생한 로그만을 확인할 수 있다. 예를 들어,
DEBUG
레벨의 메시지만 필터링하고 싶다면, 상단에서 필터 옵션을 설정할 수 있다.
rqt_logger_level
rqt_logger_level
은 특정 노드의 로그 레벨을 실시간으로 조정할 수 있는 도구이다. 이를 활용하면, 디버깅 중에 필요에 따라 로그 레벨을 조정하여 더 많은 정보를 얻을 수 있다.
-
rqt_logger_level 실행: 터미널에서 다음 명령어로 실행할 수 있다.
bash rqt_logger_level
-
로그 레벨 설정: 노드의 로그 레벨을
INFO
,DEBUG
,WARN
,ERROR
등으로 조정할 수 있다. 이를 통해 개발자는 필요한 정보만을 실시간으로 확인할 수 있으며, 디버깅 시 더욱 유용하게 활용할 수 있다.
메시지 직렬화와 역직렬화
ROS2에서는 노드 간의 메시지 전송 시, 데이터를 직렬화(serialize)하여 전송하고, 수신된 메시지는 역직렬화(deserialize)하여 처리한다. 이 과정은 네트워크 성능과 메시지 전달의 효율성에 중요한 영향을 미친다.
-
직렬화 과정: 퍼블리셔가 메시지를 발행할 때, 메시지 데이터를 직렬화하여 바이트 스트림으로 변환한다. 이를 통해 데이터를 네트워크 상에서 전송할 수 있게 된다. 직렬화 과정에서 데이터가 손실되지 않도록 주의해야 하며, 메시지 타입에 맞는 직렬화 방식을 사용해야 한다.
-
역직렬화 과정: 서브스크라이버가 수신한 바이트 스트림 데이터를 다시 원래의 메시지 형식으로 변환하는 과정을 역직렬화라 한다. 역직렬화된 메시지는 서브스크라이버에서 처리되고, 노드 간의 통신이 완료된다.
이 직렬화와 역직렬화 과정에서 발생할 수 있는 문제는 다음과 같다.
- 데이터 손실: 잘못된 직렬화 또는 역직렬화 방식으로 인해 메시지 데이터가 손실될 수 있다.
- 지연 문제: 네트워크 환경에 따라 직렬화된 데이터가 제때 전달되지 않아 메시지 수신에 지연이 발생할 수 있다.
토픽 퍼포먼스 최적화
ROS2에서 토픽 기반의 통신 성능을 최적화하는 것은 시스템의 실시간 성능 및 전체적인 안정성에 매우 중요하다. 이를 위해 다양한 QoS 설정과 노드 구조, 네트워크 환경에 따른 최적화 전략을 고려할 필요가 있다.
QoS 정책을 통한 최적화
QoS(Quality of Service) 설정은 퍼블리셔와 서브스크라이버 간의 통신에 있어서 중요한 역할을 한다. 잘못된 QoS 설정은 메시지 손실이나 지연을 유발할 수 있으므로, 각 설정을 이해하고 환경에 맞게 최적화해야 한다.
신뢰성(reliability)
QoS 설정에서 가장 중요한 요소 중 하나는 신뢰성(reliability)이다. ROS2에서는 두 가지 신뢰성 옵션을 제공한다:
RELIABLE
: 퍼블리셔는 모든 메시지가 서브스크라이버에게 정확하게 전달되도록 보장한다. 그러나 네트워크 환경에 따라 성능이 저하될 수 있다.BEST_EFFORT
: 퍼블리셔는 가능한 한 많은 메시지를 서브스크라이버에게 전달하려고 시도하지만, 일부 메시지가 손실될 수 있다. 성능은RELIABLE
보다 우수하지만 신뢰성이 낮다.
네트워크 상황에 따라 이 두 설정을 적절히 선택하여 메시지 전달의 우선순위와 성능 간의 균형을 맞출 수 있다.
히스토리(history)
history
QoS는 퍼블리셔가 보유하고 있는 메시지의 기록을 얼마나 오래 유지할지를 결정한다. 두 가지 주요 설정이 있다:
KEEP_ALL
: 모든 메시지를 유지하여, 네트워크가 복구되었을 때 서브스크라이버가 메시지 전체를 수신할 수 있게 한다. 메모리 사용량이 높아질 수 있다.KEEP_LAST(n)
: 가장 최근에 발행된 메시지 중 n개만 유지한다. 메모리 사용량을 절약할 수 있지만, 서브스크라이버가 메시지를 놓칠 수 있다.
지속성(durability)
durability
QoS는 퍼블리셔가 발행한 메시지를 서브스크라이버가 나중에 수신할 수 있는지를 결정한다. ROS2에서 두 가지 지속성 설정을 제공한다:
TRANSIENT_LOCAL
: 퍼블리셔가 종료된 이후에도 서브스크라이버가 메시지를 수신할 수 있다.VOLATILE
: 퍼블리셔가 종료되면 메시지가 사라지고, 서브스크라이버는 더 이상 해당 메시지를 수신할 수 없다.
지속성 설정은 특히 실시간성을 요구하는 시스템에서는 중요하지 않지만, 시스템의 복구 시 메시지를 계속 받을 필요가 있는 경우 유용하다.
네트워크 최적화
네트워크 성능에 따라 ROS2 노드 간 통신의 성능이 크게 달라질 수 있다. 특히, 대규모의 시스템에서 여러 노드가 다양한 주파수로 메시지를 발행하고 구독하는 경우, 네트워크 부하가 생길 수 있다.
다중 네트워크 인터페이스 사용
ROS2에서는 다중 네트워크 인터페이스를 사용할 수 있다. 예를 들어, 로컬 네트워크와 외부 네트워크 간의 메시지 통신을 분리하여, 중요한 노드 간 통신은 로컬 네트워크에서만 이루어지고 외부 네트워크는 비실시간 데이터에만 사용하도록 설정할 수 있다.
토픽 필터링
특정 상황에서는 모든 메시지가 필요한 것이 아니라 일부만 필요할 수 있다. 이때 토픽 필터링을 통해 불필요한 메시지를 구독하지 않도록 설정함으로써 네트워크 부하를 줄일 수 있다.
시스템 최적화를 위한 구조 설계
ROS2에서는 시스템을 설계할 때 노드의 개수, 메시지 주기, 네임스페이스 구조 등을 신중하게 설정해야 한다. 성능을 최적화하기 위해 다음 사항을 고려할 수 있다:
-
노드 분리: 단일 노드가 너무 많은 작업을 수행하면 성능이 저하될 수 있다. 이를 방지하기 위해 각 기능별로 노드를 분리하여 병렬로 처리할 수 있도록 설계한다.
-
메시지 주기 조정: 각 노드의 메시지 발행 주기를 적절히 조정하여 불필요한 메시지 전송을 최소화한다. 예를 들어, 센서 데이터의 경우 중요한 데이터만 빠르게 전송하고, 덜 중요한 데이터는 주기를 늘릴 수 있다.
-
네임스페이스 구조 설계: 네임스페이스를 활용하여 토픽 및 노드의 구성을 체계적으로 관리하면 메시지 필터링과 네트워크 트래픽 제어에 도움이 된다.
퍼포먼스 최적화 코드 예시
다음은 QoS 설정을 활용하여 퍼블리셔와 서브스크라이버의 통신 성능을 최적화하는 예시 코드이다.
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
def create_qos():
qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
history=QoSProfile.KeepLast(10)
)
return qos
이 코드에서는 RELIABLE
신뢰성, TRANSIENT_LOCAL
지속성, 그리고 마지막 10개의 메시지만 유지하는 히스토리 설정을 적용하였다.