1. rclpy 소개

rclpy는 ROS2에서 파이썬으로 노드를 작성할 수 있도록 해주는 클라이언트 라이브러리이다. ROS2에서 대부분의 상호작용은 노드를 통해 이루어지며, rclpy를 이용하여 파이썬 코드로 퍼블리셔, 서브스크라이버, 서비스, 액션 등의 기능을 구현할 수 있다.

2. 파이썬 노드 기본 구조

rclpy를 사용하여 노드를 생성할 때 기본적인 단계는 다음과 같다.

  1. rclpy 라이브러리를 초기화한다.
  2. 노드를 생성한다.
  3. 노드가 제공하는 기능(퍼블리셔, 서브스크라이버, 서비스, 액션 등)을 정의한다.
  4. 이벤트 루프를 실행하여 노드가 계속 동작하도록 한다.
  5. 작업이 완료되면 rclpy를 종료한다.

기본 코드 예시

import rclpy
from rclpy.node import Node

class SimpleNode(Node):
    def __init__(self):
        super().__init__('simple_node')
        self.get_logger().info('Node is running')

def main(args=None):
    rclpy.init(args=args)
    node = SimpleNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

이 코드에서는 rclpy.init() 함수로 ROS2를 초기화한 후 SimpleNode라는 노드를 생성한다. 이 노드는 rclpy.spin()으로 실행되며, ROS2 메시지 처리 루프에 들어가게 된다.

3. 퍼블리셔 구현

퍼블리셔는 데이터를 다른 노드에 전달하는 역할을 한다. ROS2에서는 토픽(topic)을 통해 데이터를 퍼블리싱할 수 있으며, rclpy에서 이를 쉽게 구현할 수 있다.

퍼블리셔의 기본 구조

퍼블리셔를 구현하는 기본 절차는 다음과 같다. 1. 퍼블리셔 객체를 생성하고 퍼블리시할 메시지 타입을 설정한다. 2. 주기적으로 데이터를 퍼블리시할 타이머를 설정한다. 3. 노드 실행 중에 퍼블리시를 수행한다.

퍼블리셔 코드 예시

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class PublisherNode(Node):
    def __init__(self):
        super().__init__('publisher_node')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        timer_period = 1.0  # 1초마다 퍼블리시
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = f'Hello, ROS2: {self.i}'
        self.publisher_.publish(msg)
        self.get_logger().info(f'Publishing: "{msg.data}"')
        self.i += 1

def main(args=None):
    rclpy.init(args=args)
    node = PublisherNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

위 코드에서 create_publisher 함수를 사용하여 퍼블리셔 객체를 생성하며, 'topic'이라는 이름의 토픽에 String 메시지를 퍼블리시한다. create_timer 함수로 주기적인 콜백 함수를 설정하여 1초마다 timer_callback 함수가 호출되어 메시지가 퍼블리시된다.

4. 서브스크라이버 구현

서브스크라이버는 퍼블리셔가 보낸 데이터를 수신하는 역할을 한다. rclpy에서는 서브스크라이버를 이용해 특정 토픽에서 메시지를 수신할 수 있다.

서브스크라이버의 기본 구조

서브스크라이버를 구현하는 기본 절차는 다음과 같다. 1. 서브스크라이버 객체를 생성하고 구독할 메시지 타입과 토픽을 설정한다. 2. 콜백 함수를 설정하여 메시지를 수신할 때마다 처리하도록 한다.

서브스크라이버 코드 예시

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class SubscriberNode(Node):
    def __init__(self):
        super().__init__('subscriber_node')
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info(f'I heard: "{msg.data}"')

def main(args=None):
    rclpy.init(args=args)
    node = SubscriberNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

위 코드에서 create_subscription 함수를 사용하여 서브스크라이버 객체를 생성한다. 'topic'이라는 이름의 토픽에서 String 메시지를 구독하며, 메시지가 수신될 때마다 listener_callback 함수가 호출된다.

5. 퍼블리셔와 서브스크라이버 동작 원리

퍼블리셔와 서브스크라이버는 토픽을 통해 데이터를 교환하며, 퍼블리셔가 특정 주기마다 메시지를 토픽에 퍼블리시하면, 해당 토픽을 구독하는 서브스크라이버가 그 메시지를 수신하게 된다.

6. 주기적 퍼블리싱과 서브스크라이빙을 위한 타이머 사용

타이머는 노드가 주기적인 작업을 수행할 수 있도록 도와준다. 퍼블리셔에서 주기적으로 메시지를 퍼블리시하거나, 서브스크라이버에서 주기적인 작업을 수행할 때 타이머를 활용할 수 있다.

타이머는 노드의 이벤트 루프 내에서 동작하며, 설정된 주기마다 지정된 콜백 함수가 호출된다. 이를 통해 반복 작업을 구현할 수 있다.

7. QoS 설정

QoS(품질 서비스, Quality of Service)는 퍼블리셔와 서브스크라이버가 통신할 때의 메시지 전달 신뢰도와 성능을 제어한다. rclpy에서는 퍼블리셔와 서브스크라이버를 생성할 때 QoS 정책을 설정할 수 있다.

QoS 설정 예시

from rclpy.qos import QoSProfile

qos_profile = QoSProfile(depth=10)
self.publisher_ = self.create_publisher(String, 'topic', qos_profile)
self.subscription = self.create_subscription(
    String,
    'topic',
    self.listener_callback,
    qos_profile)

QoSProfile 객체를 생성하여 depth와 같은 QoS 설정을 조정할 수 있다.

8. 동기화된 퍼블리셔와 서브스크라이버

ROS2에서 퍼블리셔와 서브스크라이버 간의 통신은 비동기적으로 이루어진다. 퍼블리셔가 메시지를 퍼블리시하면, 서브스크라이버는 이를 비동기적으로 수신한다. 즉, 퍼블리셔는 메시지를 퍼블리시한 후 바로 다른 작업을 수행할 수 있고, 서브스크라이버도 메시지를 수신한 후 이를 즉시 처리하는 대신 다른 작업과 병행할 수 있다.

동기화 작업

비동기적인 통신을 기반으로 퍼블리셔와 서브스크라이버의 작업을 동기화하려면, 특정 조건에서 메시지 송수신 타이밍을 맞추거나, 메시지를 저장하고 처리하는 방식을 고려해야 한다.

예를 들어, 퍼블리셔가 주기적으로 데이터를 퍼블리시하지만, 서브스크라이버가 그 데이터를 빠짐없이 처리해야 할 경우, 버퍼를 이용하여 메시지를 저장한 후 처리하는 방식이 적합할 수 있다.

from collections import deque

class SubscriberNode(Node):
    def __init__(self):
        super().__init__('subscriber_node')
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.message_queue = deque(maxlen=10)  # 메시지 버퍼

    def listener_callback(self, msg):
        self.message_queue.append(msg.data)
        self.get_logger().info(f'Message added to queue: "{msg.data}"')

위 코드에서는 deque를 사용하여 메시지를 큐에 저장하는 방법을 보여준다. 이렇게 하면 서브스크라이버가 들어오는 메시지를 즉시 처리하지 않더라도, 후속 작업에서 버퍼에 저장된 메시지를 순차적으로 처리할 수 있다.

9. 멀티스레딩을 이용한 동시 처리

멀티스레딩을 사용하여 퍼블리셔와 서브스크라이버의 작업을 병렬로 처리할 수 있다. rclpy는 파이썬의 표준 멀티스레딩 라이브러리를 사용하여 여러 작업을 동시에 수행할 수 있는 환경을 제공한다.

멀티스레딩의 기본 구조

  1. threading 라이브러리를 이용해 별도의 스레드를 생성한다.
  2. 각 스레드에서 퍼블리셔나 서브스크라이버 작업을 처리한다.

멀티스레딩 코드 예시

import threading

class MultiThreadNode(Node):
    def __init__(self):
        super().__init__('multi_thread_node')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.timer = self.create_timer(1.0, self.timer_callback)
        self.i = 0

        # 서브스크라이버를 별도의 스레드에서 실행
        self.subscriber_thread = threading.Thread(target=rclpy.spin, args=(self,))
        self.subscriber_thread.start()

    def timer_callback(self):
        msg = String()
        msg.data = f'Publishing from timer: {self.i}'
        self.publisher_.publish(msg)
        self.get_logger().info(f'Published: "{msg.data}"')
        self.i += 1

    def listener_callback(self, msg):
        self.get_logger().info(f'I heard: "{msg.data}"')

def main(args=None):
    rclpy.init(args=args)
    node = MultiThreadNode()
    rclpy.spin(node)  # 퍼블리셔는 메인 스레드에서 실행
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

이 예제에서는 퍼블리셔는 메인 스레드에서 동작하고, 서브스크라이버는 별도의 스레드에서 동작하도록 설정되어 있다. 이를 통해 두 작업이 동시에 병렬로 실행되며, 성능을 더욱 향상시킬 수 있다.

10. 퍼블리셔와 서브스크라이버의 성능 최적화

퍼블리셔와 서브스크라이버 간의 메시지 전달 성능을 최적화하려면 다음과 같은 사항을 고려할 수 있다.

메시지 크기 최적화

큰 메시지를 퍼블리시할 경우 네트워크 대역폭과 처리 성능에 영향을 미칠 수 있다. 가능하면 메시지 크기를 줄이거나, 필요한 데이터만을 퍼블리시하는 것이 좋다.

QoS 설정 최적화

QoS 설정을 통해 메시지 전달 신뢰도와 성능을 제어할 수 있다. 예를 들어, 데이터 유실을 감수하고 성능을 우선시할 경우 reliabilitybest_effort로 설정할 수 있으며, 실시간 데이터를 다룰 때는 depth 값을 적절히 조정하여 메시지 처리 대기열을 관리할 수 있다.

멀티스레드 및 멀티프로세싱

멀티스레딩과 멀티프로세싱을 통해 퍼블리셔와 서브스크라이버의 작업을 분리하여 병렬로 실행함으로써 성능을 향상시킬 수 있다. 특히 실시간 처리가 필요한 경우에는 멀티프로세싱이 더욱 적합할 수 있다.

import multiprocessing

def publisher_process():
    rclpy.init()
    node = PublisherNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

def subscriber_process():
    rclpy.init()
    node = SubscriberNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    pub_process = multiprocessing.Process(target=publisher_process)
    sub_process = multiprocessing.Process(target=subscriber_process)

    pub_process.start()
    sub_process.start()

    pub_process.join()
    sub_process.join()

위 예제는 퍼블리셔와 서브스크라이버를 각각 다른 프로세스에서 실행하여 병렬로 동작하게 만드는 코드이다. 이를 통해 CPU 자원을 효율적으로 사용할 수 있다.

11. 사용자 정의 메시지 생성

ROS2에서 제공하는 기본 메시지 타입 외에도, 응용 프로그램에 맞게 사용자 정의 메시지를 생성할 수 있다. 사용자 정의 메시지는 복잡한 데이터를 퍼블리시하고 서브스크라이브할 때 유용하게 사용된다.

사용자 정의 메시지 파일 생성

사용자 정의 메시지는 패키지 내의 msg 디렉터리에 .msg 파일로 정의된다. 예를 들어, 다음과 같이 Person.msg 파일을 정의할 수 있다.

string name
int32 age
float32 height

이 메시지 파일은 name, age, height의 세 가지 필드를 가진 Person 메시지를 정의한다.

CMakeLists.txt 및 package.xml 설정

사용자 정의 메시지를 생성하기 위해서는 패키지의 CMakeLists.txtpackage.xml 파일에 메시지 생성에 대한 설정을 추가해야 한다.

CMakeLists.txt 설정
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Person.msg"
)
package.xml 설정
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>

이 설정을 추가하면, 패키지를 빌드할 때 ROS2에서 사용자 정의 메시지를 자동으로 생성한다.

사용자 정의 메시지 사용

사용자 정의 메시지가 생성된 후, 이를 퍼블리셔나 서브스크라이버에서 사용할 수 있다.

from my_package.msg import Person  # 사용자 정의 메시지 임포트

class PublisherNode(Node):
    def __init__(self):
        super().__init__('publisher_node')
        self.publisher_ = self.create_publisher(Person, 'person_topic', 10)
        self.timer = self.create_timer(1.0, self.timer_callback)

    def timer_callback(self):
        msg = Person()
        msg.name = 'John Doe'
        msg.age = 30
        msg.height = 175.5
        self.publisher_.publish(msg)
        self.get_logger().info(f'Publishing: {msg.name}, {msg.age}, {msg.height}')

def main(args=None):
    rclpy.init(args=args)
    node = PublisherNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

위 코드에서는 사용자 정의 메시지 Person을 퍼블리셔에서 퍼블리시하는 예시를 보여준다. 메시지의 각 필드에 값을 할당한 후 이를 퍼블리시할 수 있다.

12. 동적 파라미터 설정 및 활용

ROS2에서는 노드에서 동적으로 파라미터를 설정하고, 이를 통해 노드의 동작을 제어할 수 있다. rclpy에서는 파라미터 서버를 이용해 노드 간 파라미터를 공유하거나, 노드가 실행 중일 때에도 파라미터를 변경할 수 있다.

파라미터 선언과 설정

노드에서 파라미터를 선언하고 초기 값을 설정할 수 있다. 파라미터는 노드 생성 시 선언되며, declare_parameter 함수를 사용한다.

self.declare_parameter('my_param', 'default_value')

이 함수는 my_param이라는 파라미터를 선언하고 초기 값을 'default_value'로 설정한다. 이후 파라미터 값을 가져오려면 get_parameter 함수를 사용한다.

my_param = self.get_parameter('my_param').get_parameter_value().string_value
self.get_logger().info(f'My param value: {my_param}')

동적 파라미터 재설정

ROS2에서는 노드가 실행 중일 때 파라미터를 동적으로 변경할 수 있다. 이를 위해 파라미터 변경 이벤트를 처리하는 콜백 함수를 설정할 수 있다.

def parameter_callback(self, params):
    for param in params:
        self.get_logger().info(f'Parameter {param.name} changed to {param.value}')
    return rclpy.parameter.ParameterEventDescriptors()

self.add_on_set_parameters_callback(self.parameter_callback)

이 코드는 파라미터가 변경될 때마다 parameter_callback 함수가 호출되며, 변경된 파라미터 값을 로깅한다.

파라미터의 동적 재설정 예시

import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter

class DynamicParamNode(Node):
    def __init__(self):
        super().__init__('dynamic_param_node')
        self.declare_parameter('my_param', 'default_value')
        self.add_on_set_parameters_callback(self.parameter_callback)

    def parameter_callback(self, params):
        for param in params:
            self.get_logger().info(f'Parameter {param.name} changed to {param.value}')
        return rclpy.parameter.ParameterEventDescriptors()

def main(args=None):
    rclpy.init(args=args)
    node = DynamicParamNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

이 예제에서는 my_param이라는 파라미터를 동적으로 설정하고, 값이 변경될 때마다 이를 콜백 함수에서 처리한다.

13. 파라미터 서버와 다중 노드 상호작용

ROS2의 파라미터 서버를 통해 다중 노드가 동일한 파라미터를 공유하거나, 특정 노드의 파라미터를 다른 노드가 가져오는 형태로 상호작용할 수 있다.