659.31 TransformListener의 rclpy 초기화

1. 개요

tf2_ros.TransformListener를 Python 클라이언트 라이브러리 rclpy에서 초기화하기 위해서는 tf2_ros.Buffer 객체를 먼저 생성하고, 이를 TransformListener에 전달하여야 한다. Python에서의 초기화 과정은 C++ 구현에 비하여 간결하나, 시뮬레이션 시간 지원, 전용 스레드 설정, Executor 연계 등의 옵션을 적절히 구성하여야 한다. 본 절에서는 TransformListenerrclpy 기반 초기화 과정을 다양한 시나리오별로 상세히 기술한다.

2. 필수 모듈 및 의존성

2.1 Python 임포트

import rclpy
from rclpy.node import Node
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener

2.2 package.xml

<exec_depend>rclpy</exec_depend>
<exec_depend>tf2_ros_py</exec_depend>
<exec_depend>geometry_msgs</exec_depend>

2.3 setup.py

entry_points={
    'console_scripts': [
        'tf_listener = my_robot_tf.tf_listener:main',
    ],
},

3. Buffer 초기화

3.1 기본 초기화

self.tf_buffer = Buffer()

Python에서 Buffer()는 인자 없이 생성할 수 있다. 이 경우 기본 캐시 기간(10초)이 적용된다.

3.2 캐시 기간 지정

from rclpy.duration import Duration

# 캐시 기간 30초
self.tf_buffer = Buffer(cache_time=Duration(seconds=30))

rclpy.duration.Duration 객체를 통하여 캐시 기간을 명시적으로 지정할 수 있다.

3.3 노드 시계 연결

시뮬레이션 시간(use_sim_time)을 올바르게 반영하려면, Buffer에 노드의 시계를 연결하여야 한다.

self.tf_buffer = Buffer(node=self)

또는 명시적으로 시계를 전달할 수 있다.

self.tf_buffer = Buffer()
# TransformListener 생성 시 노드를 전달하면 자동으로 시계가 연결됨

4. TransformListener 초기화 패턴

4.1 패턴 1: 기본 초기화

가장 간단하고 일반적인 초기화 패턴이다.

class TFListenerNode(Node):
    def __init__(self):
        super().__init__('tf_listener_node')

        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)

        self.timer = self.create_timer(0.5, self.on_timer)

    def on_timer(self):
        try:
            transform = self.tf_buffer.lookup_transform(
                'map', 'base_link', rclpy.time.Time())
            self.get_logger().info(
                f'Position: ({transform.transform.translation.x:.2f}, '
                f'{transform.transform.translation.y:.2f})')
        except TransformException as ex:
            self.get_logger().debug(f'{ex}')

TransformListener(self.tf_buffer, self)에서 첫 번째 인자는 Buffer 객체이고, 두 번째 인자는 Node 객체이다. 생성 즉시 /tf/tf_static 토픽의 구독을 시작한다.

4.2 패턴 2: spin_thread 옵션

TransformListenerspin_thread 옵션을 통하여 전용 콜백 스레드의 사용 여부를 제어한다.

# 전용 스레드 사용 (기본값: True)
self.tf_listener = TransformListener(
    self.tf_buffer, self, spin_thread=True)

# Executor에 의존
self.tf_listener = TransformListener(
    self.tf_buffer, self, spin_thread=False)

spin_thread=True(기본값)인 경우:

  • 내부적으로 별도의 Executor 스레드가 생성되어 변환 메시지를 수신한다.
  • 사용자의 rclpy.spin() 호출과 독립적으로 동작한다.
  • 별도의 스레드 자원이 소모된다.

spin_thread=False인 경우:

  • TransformListener의 구독 콜백이 사용자의 Executor에 의하여 처리된다.
  • 반드시 rclpy.spin() 또는 executor.spin()이 호출되어야 변환이 수신된다.
  • 스레드 자원을 절약할 수 있다.

4.3 패턴 3: QoS 프로파일 지정

구독자의 QoS 프로파일을 명시적으로 지정할 수 있다.

from rclpy.qos import QoSProfile, DurabilityPolicy, ReliabilityPolicy

qos = QoSProfile(depth=100)

self.tf_listener = TransformListener(
    self.tf_buffer, self,
    qos=qos,
    static_qos=QoSProfile(
        depth=100,
        durability=DurabilityPolicy.TRANSIENT_LOCAL,
        reliability=ReliabilityPolicy.RELIABLE))

일반적으로 기본 QoS 설정을 변경할 필요는 없으나, 네트워크 환경에 따라 조정이 필요한 경우가 있다.

5. 전체 예제: 로봇 위치 추적

import rclpy
from rclpy.node import Node
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
from tf_transformations import euler_from_quaternion
import math


class RobotPositionTracker(Node):
    def __init__(self):
        super().__init__('robot_position_tracker')

        # Buffer 초기화
        self.tf_buffer = Buffer()

        # TransformListener 초기화
        self.tf_listener = TransformListener(self.tf_buffer, self)

        # 위치 조회 타이머 (10Hz)
        self.timer = self.create_timer(0.1, self.track_position)

        self.get_logger().info('Position tracker initialized.')

    def track_position(self):
        try:
            transform = self.tf_buffer.lookup_transform(
                'map', 'base_link', rclpy.time.Time())

            x = transform.transform.translation.x
            y = transform.transform.translation.y
            z = transform.transform.translation.z

            q = transform.transform.rotation
            _, _, yaw = euler_from_quaternion([q.x, q.y, q.z, q.w])

            self.get_logger().info(
                f'Position: ({x:.2f}, {y:.2f}, {z:.2f}), '
                f'Yaw: {math.degrees(yaw):.1f}°',
                throttle_duration_sec=2.0)

        except TransformException as ex:
            self.get_logger().debug(f'{ex}',
                throttle_duration_sec=5.0)


def main(args=None):
    rclpy.init(args=args)
    node = RobotPositionTracker()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

6. 센서 데이터 변환 예제

from sensor_msgs.msg import LaserScan
from tf2_ros import TransformException


class SensorTransformNode(Node):
    def __init__(self):
        super().__init__('sensor_transform_node')

        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)

        self.laser_sub = self.create_subscription(
            LaserScan, 'scan', self.laser_callback, 10)

    def laser_callback(self, msg):
        try:
            transform = self.tf_buffer.lookup_transform(
                'base_link',
                msg.header.frame_id,
                rclpy.time.Time.from_msg(msg.header.stamp))

            # 변환을 사용하여 센서 데이터 처리
            self.get_logger().debug(
                f'Transform from {msg.header.frame_id} to base_link available')

        except TransformException as ex:
            self.get_logger().warn(
                f'Could not transform: {ex}',
                throttle_duration_sec=1.0)

7. MultiThreadedExecutor와의 연계

MultiThreadedExecutor를 사용하는 경우, spin_thread=False으로 설정하여 Executor 스레드를 공유하는 것이 리소스 관리 측면에서 유리하다.

import rclpy
from rclpy.executors import MultiThreadedExecutor


def main(args=None):
    rclpy.init(args=args)

    node = TFListenerNode()  # spin_thread=False 설정

    executor = MultiThreadedExecutor(num_threads=4)
    executor.add_node(node)

    try:
        executor.spin()
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

8. 비동기 변환 조회

Python의 asyncio와 연계하여 비동기적으로 변환을 조회할 수 있다.

import asyncio


class AsyncTFListener(Node):
    def __init__(self):
        super().__init__('async_tf_listener')

        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)

    async def wait_for_transform(self, target_frame, source_frame,
                                   timeout_sec=5.0):
        start_time = self.get_clock().now()
        while rclpy.ok():
            try:
                transform = self.tf_buffer.lookup_transform(
                    target_frame, source_frame, rclpy.time.Time())
                return transform
            except TransformException:
                elapsed = (self.get_clock().now() - start_time).nanoseconds / 1e9
                if elapsed > timeout_sec:
                    raise TimeoutError(
                        f'Transform {source_frame} -> {target_frame} '
                        f'not available after {timeout_sec}s')
                await asyncio.sleep(0.1)

9. rclcpp 구현과의 주요 차이점

항목rclcpp (C++)rclpy (Python)
Buffer 생성Buffer(clock) (시계 필수)Buffer() (시계 선택)
Listener 인자 순서(buffer, node, spin_thread)(buffer, node, spin_thread)
시간 지정 (최신)tf2::TimePointZerorclpy.time.Time()
예외 클래스tf2::TransformExceptiontf2_ros.TransformException
Throttle 로깅RCLCPP_WARN_THROTTLEthrottle_duration_sec 인자
기본 spin_threadtrueTrue

10. 초기화 시 흔한 오류

10.1 오류 1: Buffer를 지역 변수로 생성

# 잘못된 사용
def __init__(self):
    buffer = Buffer()  # 지역 변수!
    self.tf_listener = TransformListener(buffer, self)
    # buffer가 스코프를 벗어나면 가비지 컬렉션될 수 있음

Buffer는 반드시 self.tf_buffer와 같이 인스턴스 변수로 유지하여야 한다.

10.2 오류 2: spin 호출 누락 (spin_thread=False)

# 잘못된 사용
self.tf_listener = TransformListener(
    self.tf_buffer, self, spin_thread=False)
# rclpy.spin()을 호출하지 않으면 변환이 수신되지 않음

spin_thread=False를 사용하는 경우 반드시 rclpy.spin()을 호출하여야 한다.

11. 참고 자료

  • ROS2 공식 문서, “Writing a tf2 listener (Python)”, https://docs.ros.org/en/humble/Tutorials/Intermediate/Tf2/Writing-A-Tf2-Listener-Py.html
  • tf2_ros API 문서, https://docs.ros2.org/latest/api/tf2_ros/
  • rclpy API 문서, https://docs.ros2.org/latest/api/rclpy/
  • ROS2 Humble Hawksbill