노드의 성능 최적화 개요

ROS2에서 파이썬 노드를 실행할 때, 특히 많은 퍼블리셔나 서브스크라이버를 다루는 경우, 성능을 최적화하는 것이 중요하다. 파이썬은 인터프리터 언어로, 기본적으로 단일 스레드에서 작동하는 전역 인터프리터 락(GIL)이 존재한다. 따라서 멀티코어 CPU 환경을 최대한 활용하기 위해서는 멀티스레딩 혹은 멀티프로세싱을 적절히 활용해야 한다.

GIL의 이해와 멀티스레딩의 필요성

파이썬에서 GIL(Global Interpreter Lock)은 한 번에 하나의 스레드만이 파이썬 바이트코드를 실행할 수 있도록 제약한다. 이로 인해 CPU 바운드 작업에서는 멀티스레딩의 효과가 크게 감소할 수 있다. 그러나 I/O 바운드 작업(예: 네트워크 통신, 파일 입출력)에서는 멀티스레딩이 효과적일 수 있다. ROS2에서는 통신이 빈번하게 발생하므로, 적절한 멀티스레딩 처리가 요구된다.

파이썬의 멀티스레딩

파이썬에서 threading 모듈을 사용하여 멀티스레딩을 구현할 수 있다. 하지만, CPU 바운드 작업을 멀티스레딩으로 처리하려면 GIL의 영향을 받기 때문에 multiprocessing 모듈을 사용하는 것이 더 나은 선택일 수 있다. 아래는 기본적인 멀티스레딩 예제이다.

import threading

def my_task():
    while True:
        print("스레드에서 실행 중")

thread = threading.Thread(target=my_task)
thread.start()

위 코드에서 스레드가 병렬로 실행되지만, CPU 바운드 작업에서는 GIL 때문에 성능이 제한될 수 있다. 반면, ROS2에서의 I/O 바운드 작업은 멀티스레딩으로 최적화할 수 있다.

멀티스레딩 처리에 적합한 상황

ROS2 노드에서 멀티스레딩 처리가 필요한 상황은 주로 아래와 같다: - 여러 퍼블리셔 또는 서브스크라이버를 동시에 처리해야 할 때 - 서비스 요청을 처리하는 도중 다른 서비스액션을 동시에 처리할 때 - 실시간 데이터 처리비동기 작업을 동시에 수행할 때

I/O 바운드 작업이 많은 경우, 멀티스레딩을 통해 동시성을 높여 효율적인 자원 사용이 가능한다.

ROS2 파이썬 노드에서 멀티스레딩 적용

ROS2 파이썬 노드에서 멀티스레딩을 적용하기 위해서는 ROS2의 rclpy에서 제공하는 비동기 기능을 사용해야 한다. ROS2는 기본적으로 비동기 작업을 처리할 수 있도록 설계되어 있기 때문에, spin()을 멀티스레딩으로 분리하여 여러 노드를 동시에 처리할 수 있다.

Executor와 스레드풀

ROS2에서는 비동기 작업을 처리하기 위한 MultiThreadedExecutor가 제공된다. 이를 통해 하나의 노드에서 여러 스레드를 사용하여 각기 다른 작업을 동시에 처리할 수 있다. 예를 들어, 여러 퍼블리셔와 서브스크라이버를 동시에 처리할 때 유용하다.

import rclpy
from rclpy.executors import MultiThreadedExecutor

def main():
    rclpy.init()
    node = rclpy.create_node('my_node')

    executor = MultiThreadedExecutor(num_threads=4)
    executor.add_node(node)

    try:
        executor.spin()
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

위 코드에서 MultiThreadedExecutor는 4개의 스레드를 사용하여 노드의 작업을 병렬로 처리한다. num_threads 파라미터는 스레드의 수를 조정할 수 있다.

데이터 병렬성 처리

벡터 및 행렬 연산에서 멀티스레딩을 통해 성능을 최적화할 수 있다. 예를 들어, 퍼블리셔가 동시에 다수의 데이터를 전송하거나 서브스크라이버가 데이터를 병렬로 수신하는 상황에서 이러한 최적화가 필요하다.

예를 들어, 데이터 처리에 사용할 수 있는 벡터 수식을 정의해 보자:

\mathbf{v} = \begin{bmatrix} v_1 \\ v_2 \\ v_3 \\ \vdots \\ v_n \end{bmatrix}

이를 처리할 때 멀티스레딩을 활용하면 각 스레드가 벡터의 일부분을 병렬로 처리할 수 있다. 즉, 각각의 스레드가 벡터의 일부 요소를 담당하여 병렬로 처리하게 된다.

ROS2 파이썬 노드의 멀티스레딩 예시

다음은 ROS2 파이썬 노드에서 멀티스레딩을 사용한 퍼블리셔와 서브스크라이버의 예시이다.

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import threading

class MultiThreadedNode(Node):
    def __init__(self):
        super().__init__('multi_threaded_node')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning
        self.thread = threading.Thread(target=self.publish_message)
        self.thread.start()

    def publish_message(self):
        msg = String()
        while rclpy.ok():
            msg.data = 'Hello, ROS2'
            self.publisher_.publish(msg)
            self.get_logger().info('Publishing: "%s"' % msg.data)

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)

def main(args=None):
    rclpy.init(args=args)

    multi_threaded_node = MultiThreadedNode()

    rclpy.spin(multi_threaded_node)

    multi_threaded_node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

이 예제에서는 threading.Thread를 통해 퍼블리셔가 별도의 스레드에서 동작하도록 설정하고 있다. 이를 통해 서브스크라이버와 퍼블리셔가 병렬로 실행될 수 있으며, 데이터를 보다 효율적으로 처리할 수 있다.

멀티스레딩과 ROS2 이벤트 처리

ROS2에서 이벤트 기반 처리를 멀티스레딩과 함께 사용할 수 있다. 예를 들어, 특정 토픽에서 메시지를 수신하면 이를 처리하는 콜백 함수를 별도의 스레드에서 실행하여 시스템의 응답성을 높일 수 있다. 이 경우, 각 콜백 함수는 별도의 스레드에서 실행되므로 데이터 경쟁 상황이 발생하지 않도록 주의해야 한다.

멀티스레딩을 사용하여 퍼블리셔와 서브스크라이버를 처리할 때, 공통 자원(예: 공유 변수)에 접근하는 경우, 적절한 동기화 기법을 사용해야 한다. 그렇지 않으면 경쟁 상태(race condition)나 교착 상태(deadlock)가 발생할 수 있다.

공유 자원과 동기화

공유 자원에 접근할 때는 파이썬의 threading.Lock 또는 threading.RLock을 사용하여 자원의 동시 접근을 제어해야 한다. 아래는 공유 자원에 접근하는 간단한 예이다.

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    with lock:
        counter += 1
        print("Counter:", counter)

threads = []
for i in range(10):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

위 코드에서 lock을 사용하여 여러 스레드가 동시에 counter 변수를 수정하지 못하도록 보호하고 있다. 이와 같은 방식으로 ROS2 노드에서도 공유 자원에 접근할 때 동기화 기법을 적용해야 한다.

메시지 큐의 활용

멀티스레딩 처리에서 또 다른 중요한 요소는 메시지 큐이다. 여러 스레드가 동일한 자원에 접근해야 할 때, 메시지 큐를 활용하여 데이터를 안전하게 교환할 수 있다. queue.Queue는 파이썬에서 멀티스레드 환경에서 사용할 수 있는 스레드 안전 큐이다. 아래는 큐를 사용하는 예시이다.

import queue
import threading

q = queue.Queue()

def producer():
    for i in range(10):
        q.put(i)
        print(f"Produced: {i}")

def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        q.task_done()

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
q.put(None)
consumer_thread.join()

큐를 사용하면 데이터를 안전하게 생산(퍼블리싱)하고 소비(서브스크라이빙)할 수 있으며, 이는 ROS2에서 멀티스레딩을 적용할 때 자주 사용되는 패턴이다.

멀티스레딩 처리 성능 고려 사항

멀티스레딩을 적용할 때, CPU 사용률, 메모리 사용량, 그리고 I/O 성능을 신중하게 고려해야 한다. 스레드가 너무 많으면 오히려 성능이 저하될 수 있으며, 스레드 간 컨텍스트 스위칭 비용이 발생한다. 따라서 적절한 스레드 수를 설정하는 것이 중요하다.

\text{적절한 스레드 수} = \frac{n_{\text{작업}}}{n_{\text{코어}}}

여기서 n_{\text{작업}}은 작업의 수, n_{\text{코어}}는 사용 가능한 CPU 코어의 수를 의미한다.

멀티스레딩 작업에서는 작업이 적절히 분배되고, 스레드의 사용이 효율적으로 관리되도록 설계하는 것이 핵심이다.

멀티스레딩을 위한 파이썬의 비동기 처리

파이썬에서는 멀티스레딩 외에도 비동기 I/O 처리를 위해 asyncio를 사용할 수 있다. asyncio를 사용하면 비동기 코드를 작성하여 I/O 작업을 처리할 때 스레드 대신 이벤트 루프를 사용할 수 있다. asyncio는 특히 네트워크 작업에서 효과적이며, ROS2에서도 이를 활용할 수 있다.

import asyncio

async def my_task():
    print("작업 시작")
    await asyncio.sleep(1)
    print("작업 완료")

async def main():
    await asyncio.gather(my_task(), my_task())

asyncio.run(main())

asyncio는 ROS2와 함께 사용할 때도 적절한 타이밍에 이벤트를 처리하고 통신 지연을 최소화하는 데 유용하다.

멀티스레딩과 멀티프로세싱 비교

멀티스레딩 외에도 파이썬의 multiprocessing 모듈을 사용하여 멀티코어 환경에서 성능을 극대화할 수 있다. 멀티스레딩은 스레드 간 메모리를 공유하지만, 멀티프로세싱은 별도의 프로세스를 생성하여 메모리를 독립적으로 할당한다. 따라서 GIL의 제약을 받지 않으며, CPU 바운드 작업에서는 멀티프로세싱이 더 나은 성능을 발휘할 수 있다.

멀티프로세싱을 사용한 예시는 다음과 같다:

import multiprocessing

def my_task(i):
    print(f"프로세스 {i} 실행 중")

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=my_task, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

이와 같은 방식으로 멀티코어를 활용한 파이썬 노드의 성능을 극대화할 수 있다.