659.31 TransformListener의 rclpy 초기화
1. 개요
tf2_ros.TransformListener를 Python 클라이언트 라이브러리 rclpy에서 초기화하기 위해서는 tf2_ros.Buffer 객체를 먼저 생성하고, 이를 TransformListener에 전달하여야 한다. Python에서의 초기화 과정은 C++ 구현에 비하여 간결하나, 시뮬레이션 시간 지원, 전용 스레드 설정, Executor 연계 등의 옵션을 적절히 구성하여야 한다. 본 절에서는 TransformListener의 rclpy 기반 초기화 과정을 다양한 시나리오별로 상세히 기술한다.
2. 필수 모듈 및 의존성
2.1 Python 임포트
import rclpy
from rclpy.node import Node
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
2.2 package.xml
<exec_depend>rclpy</exec_depend>
<exec_depend>tf2_ros_py</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
2.3 setup.py
entry_points={
'console_scripts': [
'tf_listener = my_robot_tf.tf_listener:main',
],
},
3. Buffer 초기화
3.1 기본 초기화
self.tf_buffer = Buffer()
Python에서 Buffer()는 인자 없이 생성할 수 있다. 이 경우 기본 캐시 기간(10초)이 적용된다.
3.2 캐시 기간 지정
from rclpy.duration import Duration
# 캐시 기간 30초
self.tf_buffer = Buffer(cache_time=Duration(seconds=30))
rclpy.duration.Duration 객체를 통하여 캐시 기간을 명시적으로 지정할 수 있다.
3.3 노드 시계 연결
시뮬레이션 시간(use_sim_time)을 올바르게 반영하려면, Buffer에 노드의 시계를 연결하여야 한다.
self.tf_buffer = Buffer(node=self)
또는 명시적으로 시계를 전달할 수 있다.
self.tf_buffer = Buffer()
# TransformListener 생성 시 노드를 전달하면 자동으로 시계가 연결됨
4. TransformListener 초기화 패턴
4.1 패턴 1: 기본 초기화
가장 간단하고 일반적인 초기화 패턴이다.
class TFListenerNode(Node):
def __init__(self):
super().__init__('tf_listener_node')
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
self.timer = self.create_timer(0.5, self.on_timer)
def on_timer(self):
try:
transform = self.tf_buffer.lookup_transform(
'map', 'base_link', rclpy.time.Time())
self.get_logger().info(
f'Position: ({transform.transform.translation.x:.2f}, '
f'{transform.transform.translation.y:.2f})')
except TransformException as ex:
self.get_logger().debug(f'{ex}')
TransformListener(self.tf_buffer, self)에서 첫 번째 인자는 Buffer 객체이고, 두 번째 인자는 Node 객체이다. 생성 즉시 /tf와 /tf_static 토픽의 구독을 시작한다.
4.2 패턴 2: spin_thread 옵션
TransformListener는 spin_thread 옵션을 통하여 전용 콜백 스레드의 사용 여부를 제어한다.
# 전용 스레드 사용 (기본값: True)
self.tf_listener = TransformListener(
self.tf_buffer, self, spin_thread=True)
# Executor에 의존
self.tf_listener = TransformListener(
self.tf_buffer, self, spin_thread=False)
spin_thread=True(기본값)인 경우:
- 내부적으로 별도의 Executor 스레드가 생성되어 변환 메시지를 수신한다.
- 사용자의
rclpy.spin()호출과 독립적으로 동작한다. - 별도의 스레드 자원이 소모된다.
spin_thread=False인 경우:
TransformListener의 구독 콜백이 사용자의 Executor에 의하여 처리된다.- 반드시
rclpy.spin()또는executor.spin()이 호출되어야 변환이 수신된다. - 스레드 자원을 절약할 수 있다.
4.3 패턴 3: QoS 프로파일 지정
구독자의 QoS 프로파일을 명시적으로 지정할 수 있다.
from rclpy.qos import QoSProfile, DurabilityPolicy, ReliabilityPolicy
qos = QoSProfile(depth=100)
self.tf_listener = TransformListener(
self.tf_buffer, self,
qos=qos,
static_qos=QoSProfile(
depth=100,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
reliability=ReliabilityPolicy.RELIABLE))
일반적으로 기본 QoS 설정을 변경할 필요는 없으나, 네트워크 환경에 따라 조정이 필요한 경우가 있다.
5. 전체 예제: 로봇 위치 추적
import rclpy
from rclpy.node import Node
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
from tf_transformations import euler_from_quaternion
import math
class RobotPositionTracker(Node):
def __init__(self):
super().__init__('robot_position_tracker')
# Buffer 초기화
self.tf_buffer = Buffer()
# TransformListener 초기화
self.tf_listener = TransformListener(self.tf_buffer, self)
# 위치 조회 타이머 (10Hz)
self.timer = self.create_timer(0.1, self.track_position)
self.get_logger().info('Position tracker initialized.')
def track_position(self):
try:
transform = self.tf_buffer.lookup_transform(
'map', 'base_link', rclpy.time.Time())
x = transform.transform.translation.x
y = transform.transform.translation.y
z = transform.transform.translation.z
q = transform.transform.rotation
_, _, yaw = euler_from_quaternion([q.x, q.y, q.z, q.w])
self.get_logger().info(
f'Position: ({x:.2f}, {y:.2f}, {z:.2f}), '
f'Yaw: {math.degrees(yaw):.1f}°',
throttle_duration_sec=2.0)
except TransformException as ex:
self.get_logger().debug(f'{ex}',
throttle_duration_sec=5.0)
def main(args=None):
rclpy.init(args=args)
node = RobotPositionTracker()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
6. 센서 데이터 변환 예제
from sensor_msgs.msg import LaserScan
from tf2_ros import TransformException
class SensorTransformNode(Node):
def __init__(self):
super().__init__('sensor_transform_node')
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
self.laser_sub = self.create_subscription(
LaserScan, 'scan', self.laser_callback, 10)
def laser_callback(self, msg):
try:
transform = self.tf_buffer.lookup_transform(
'base_link',
msg.header.frame_id,
rclpy.time.Time.from_msg(msg.header.stamp))
# 변환을 사용하여 센서 데이터 처리
self.get_logger().debug(
f'Transform from {msg.header.frame_id} to base_link available')
except TransformException as ex:
self.get_logger().warn(
f'Could not transform: {ex}',
throttle_duration_sec=1.0)
7. MultiThreadedExecutor와의 연계
MultiThreadedExecutor를 사용하는 경우, spin_thread=False으로 설정하여 Executor 스레드를 공유하는 것이 리소스 관리 측면에서 유리하다.
import rclpy
from rclpy.executors import MultiThreadedExecutor
def main(args=None):
rclpy.init(args=args)
node = TFListenerNode() # spin_thread=False 설정
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
8. 비동기 변환 조회
Python의 asyncio와 연계하여 비동기적으로 변환을 조회할 수 있다.
import asyncio
class AsyncTFListener(Node):
def __init__(self):
super().__init__('async_tf_listener')
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
async def wait_for_transform(self, target_frame, source_frame,
timeout_sec=5.0):
start_time = self.get_clock().now()
while rclpy.ok():
try:
transform = self.tf_buffer.lookup_transform(
target_frame, source_frame, rclpy.time.Time())
return transform
except TransformException:
elapsed = (self.get_clock().now() - start_time).nanoseconds / 1e9
if elapsed > timeout_sec:
raise TimeoutError(
f'Transform {source_frame} -> {target_frame} '
f'not available after {timeout_sec}s')
await asyncio.sleep(0.1)
9. rclcpp 구현과의 주요 차이점
| 항목 | rclcpp (C++) | rclpy (Python) |
|---|---|---|
| Buffer 생성 | Buffer(clock) (시계 필수) | Buffer() (시계 선택) |
| Listener 인자 순서 | (buffer, node, spin_thread) | (buffer, node, spin_thread) |
| 시간 지정 (최신) | tf2::TimePointZero | rclpy.time.Time() |
| 예외 클래스 | tf2::TransformException | tf2_ros.TransformException |
| Throttle 로깅 | RCLCPP_WARN_THROTTLE | throttle_duration_sec 인자 |
| 기본 spin_thread | true | True |
10. 초기화 시 흔한 오류
10.1 오류 1: Buffer를 지역 변수로 생성
# 잘못된 사용
def __init__(self):
buffer = Buffer() # 지역 변수!
self.tf_listener = TransformListener(buffer, self)
# buffer가 스코프를 벗어나면 가비지 컬렉션될 수 있음
Buffer는 반드시 self.tf_buffer와 같이 인스턴스 변수로 유지하여야 한다.
10.2 오류 2: spin 호출 누락 (spin_thread=False)
# 잘못된 사용
self.tf_listener = TransformListener(
self.tf_buffer, self, spin_thread=False)
# rclpy.spin()을 호출하지 않으면 변환이 수신되지 않음
spin_thread=False를 사용하는 경우 반드시 rclpy.spin()을 호출하여야 한다.
11. 참고 자료
- ROS2 공식 문서, “Writing a tf2 listener (Python)”, https://docs.ros.org/en/humble/Tutorials/Intermediate/Tf2/Writing-A-Tf2-Listener-Py.html
- tf2_ros API 문서, https://docs.ros2.org/latest/api/tf2_ros/
- rclpy API 문서, https://docs.ros2.org/latest/api/rclpy/
- ROS2 Humble Hawksbill