659.29 TransformListener를 이용한 변환 수신
1. 개요
TransformListener는 tf2_ros 패키지에서 제공하는 핵심 클래스로서, /tf 및 /tf_static 토픽으로 발행되는 좌표 변환 메시지를 수신하여 tf2::Buffer에 저장하는 역할을 담당한다. 로봇 시스템에서 좌표 변환을 활용하는 모든 노드는 TransformListener를 통하여 변환 데이터를 수집하고, Buffer의 조회 메서드를 통하여 원하는 프레임 간의 변환을 취득한다. 본 절에서는 TransformListener의 아키텍처, 동작 원리, 그리고 실제 사용 방법을 체계적으로 기술한다.
2. TransformListener의 역할과 아키텍처
2.1 기본 역할
TransformListener는 다음의 세 가지 핵심 역할을 수행한다.
- 토픽 구독:
/tf토픽(동적 변환)과/tf_static토픽(정적 변환)을 자동으로 구독한다. - 버퍼 최신화: 수신한
TransformStamped메시지를tf2::Buffer에 저장한다. - 구독자 관리: 내부적으로 두 개의 구독자(subscriber)를 생성하고 관리한다.
2.2 아키텍처 다이어그램
[TransformBroadcaster] ──(/tf)──────→ [TransformListener] ──→ [tf2::Buffer]
[StaticTransformBroadcaster] ──(/tf_static)──→ │
↓
lookupTransform()
canTransform()
TransformListener는 발행자(broadcaster)와 사용자(consumer) 사이의 중간 계층으로서, 변환 데이터의 수집과 버퍼링을 투명하게 처리한다.
2.3 Buffer와의 관계
TransformListener는 독립적으로 동작하지 않으며, 반드시 tf2::Buffer(C++) 또는 tf2_ros.Buffer(Python)와 함께 사용된다. Buffer는 변환 데이터의 저장소이며, TransformListener는 이 저장소를 최신 상태로 유지하는 역할을 한다.
TransformListener = 토픽 구독자 (데이터 수집)
Buffer = 변환 데이터 저장소 (데이터 조회)
이 분리된 설계는 관심 분리(Separation of Concerns) 원칙을 따르며, 테스트와 확장에 유리하다.
3. 내부 동작 메커니즘
3.1 구독 생성
TransformListener는 생성 시 다음의 두 구독자를 자동으로 생성한다.
| 구독 토픽 | QoS Durability | QoS Reliability | 용도 |
|---|---|---|---|
/tf | VOLATILE | RELIABLE | 동적 변환 수신 |
/tf_static | TRANSIENT_LOCAL | RELIABLE | 정적 변환 수신 |
/tf_static 구독자는 TRANSIENT_LOCAL 내구성을 사용하므로, 구독 시작 이전에 발행된 정적 변환도 수신할 수 있다.
3.2 콜백 처리
수신한 TFMessage 메시지는 내부 콜백 함수에 의하여 처리된다. 콜백은 메시지에 포함된 각 TransformStamped를 순회하며, Buffer::setTransform() 메서드를 통하여 버퍼에 저장한다.
수신: TFMessage {transforms: [T1, T2, T3]}
→ Buffer.setTransform(T1, authority, is_static)
→ Buffer.setTransform(T2, authority, is_static)
→ Buffer.setTransform(T3, authority, is_static)
authority 인자는 변환의 출처를 나타내며, 디버깅 시 어떤 노드가 특정 변환을 발행하였는지 추적하는 데 활용된다.
3.3 스레드 모델
TransformListener의 구독 콜백은 노드의 Executor에 의하여 실행된다. ROS2에서는 두 가지 Executor 모델이 제공된다.
- SingleThreadedExecutor: 모든 콜백이 단일 스레드에서 순차적으로 실행된다.
- MultiThreadedExecutor: 복수의 스레드에서 콜백이 병렬로 실행된다.
tf2::Buffer는 내부적으로 뮤텍스(mutex)를 사용하여 스레드 안전성을 보장하므로, MultiThreadedExecutor 환경에서도 안전하게 동작한다.
4. rclcpp에서의 사용
4.1 기본 초기화
#include <rclcpp/rclcpp.hpp>
#include <tf2_ros/transform_listener.h>
#include <tf2_ros/buffer.h>
class FrameListener : public rclcpp::Node
{
public:
FrameListener()
: Node("frame_listener")
{
tf_buffer_ = std::make_unique<tf2_ros::Buffer>(this->get_clock());
tf_listener_ = std::make_shared<tf2_ros::TransformListener>(*tf_buffer_);
timer_ = this->create_wall_timer(
std::chrono::milliseconds(500),
std::bind(&FrameListener::timer_callback, this));
}
private:
void timer_callback()
{
try {
auto transform = tf_buffer_->lookupTransform(
"map", "base_link", tf2::TimePointZero);
RCLCPP_INFO(this->get_logger(),
"Translation: [%.2f, %.2f, %.2f]",
transform.transform.translation.x,
transform.transform.translation.y,
transform.transform.translation.z);
} catch (const tf2::TransformException & ex) {
RCLCPP_WARN(this->get_logger(),
"Could not get transform: %s", ex.what());
}
}
std::unique_ptr<tf2_ros::Buffer> tf_buffer_;
std::shared_ptr<tf2_ros::TransformListener> tf_listener_;
rclcpp::TimerBase::SharedPtr timer_;
};
Buffer는 this->get_clock()을 인자로 받아 노드의 시계를 공유한다. 이를 통하여 use_sim_time 파라미터에 의한 시뮬레이션 시간을 자동으로 반영한다.
TransformListener는 Buffer의 참조를 받아 생성된다. 생성 즉시 /tf와 /tf_static 토픽의 구독을 시작한다.
4.2 생성 순서
Buffer와 TransformListener의 생성 순서는 반드시 다음과 같아야 한다.
Buffer생성TransformListener생성 (Buffer 참조 전달)
TransformListener가 Buffer보다 먼저 소멸하면 안 되므로, 멤버 변수의 선언 순서에도 주의하여야 한다.
5. rclpy에서의 사용
5.1 기본 초기화
import rclpy
from rclpy.node import Node
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
class FrameListener(Node):
def __init__(self):
super().__init__('frame_listener')
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
self.timer = self.create_timer(0.5, self.timer_callback)
def timer_callback(self):
try:
transform = self.tf_buffer.lookup_transform(
'map', 'base_link', rclpy.time.Time())
self.get_logger().info(
f'Translation: '
f'[{transform.transform.translation.x:.2f}, '
f'{transform.transform.translation.y:.2f}, '
f'{transform.transform.translation.z:.2f}]')
except TransformException as ex:
self.get_logger().warn(
f'Could not get transform: {ex}')
def main(args=None):
rclpy.init(args=args)
node = FrameListener()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
Python에서는 Buffer()를 인자 없이 생성할 수 있으며, TransformListener의 두 번째 인자로 노드를 전달한다.
6. 변환 조회 방법
6.1 lookupTransform
Buffer에 저장된 변환을 조회하는 주요 메서드이다.
// C++
auto transform = tf_buffer_->lookupTransform(
"target_frame", // 변환 결과의 기준 프레임
"source_frame", // 변환할 원본 프레임
tf2::TimePointZero); // 최신 가용 변환
# Python
transform = self.tf_buffer.lookup_transform(
'target_frame',
'source_frame',
rclpy.time.Time()) # 최신 가용 변환
target_frame은 변환 결과가 표현될 좌표계이며, source_frame은 변환의 원점이 되는 좌표계이다.
6.2 canTransform
변환이 가능한지 사전에 확인하는 메서드이다.
// C++
bool available = tf_buffer_->canTransform(
"target_frame", "source_frame",
tf2::TimePointZero);
# Python
available = self.tf_buffer.can_transform(
'target_frame', 'source_frame',
rclpy.time.Time())
6.3 시간 지정 조회
특정 시점의 변환을 조회하려면 타임스탬프를 명시적으로 지정한다.
// C++
rclcpp::Time query_time = this->get_clock()->now();
auto transform = tf_buffer_->lookupTransform(
"map", "base_link", query_time);
# Python
query_time = self.get_clock().now()
transform = self.tf_buffer.lookup_transform(
'map', 'base_link', query_time)
7. 예외 처리
lookupTransform() 호출 시 다음의 예외가 발생할 수 있다.
| 예외 유형 | 발생 조건 |
|---|---|
LookupException | 지정된 프레임이 변환 트리에 존재하지 않는 경우 |
ConnectivityException | 두 프레임 간에 연결 경로가 없는 경우 |
ExtrapolationException | 요청한 시간의 변환이 캐시 범위를 벗어나는 경우 |
InvalidArgumentException | 잘못된 인자가 전달된 경우 |
모든 예외는 tf2::TransformException(C++) 또는 tf2_ros.TransformException(Python)의 하위 클래스이므로, 상위 예외 클래스로 일괄 처리할 수 있다.
// C++
try {
auto transform = tf_buffer_->lookupTransform(
"map", "base_link", tf2::TimePointZero);
} catch (const tf2::TransformException & ex) {
RCLCPP_WARN(this->get_logger(), "%s", ex.what());
}
# Python
try:
transform = self.tf_buffer.lookup_transform(
'map', 'base_link', rclpy.time.Time())
except TransformException as ex:
self.get_logger().warn(f'{ex}')
8. TransformListener의 생성자 옵션
8.1 spin_thread 옵션 (rclcpp)
rclcpp에서 TransformListener는 선택적으로 전용 콜백 스레드를 생성할 수 있다.
// 전용 스레드 사용 (기본값: true)
tf_listener_ = std::make_shared<tf2_ros::TransformListener>(
*tf_buffer_, this, true);
// Executor에 의존
tf_listener_ = std::make_shared<tf2_ros::TransformListener>(
*tf_buffer_, this, false);
spin_thread=true(기본값)인 경우, TransformListener가 내부적으로 별도의 콜백 그룹(callback group)과 Executor 스레드를 생성하여 토픽 콜백을 처리한다. 이 모드에서는 사용자의 Executor 설정과 무관하게 변환 수신이 독립적으로 동작한다.
spin_thread=false인 경우, TransformListener의 구독 콜백은 사용자가 설정한 Executor에 의하여 처리된다. 이 경우 rclcpp::spin() 또는 executor.spin()이 호출되어야 변환이 수신된다.
8.2 spin_thread 옵션 (rclpy)
# spin_thread=True (기본값)
self.tf_listener = TransformListener(
self.tf_buffer, self, spin_thread=True)
# spin_thread=False
self.tf_listener = TransformListener(
self.tf_buffer, self, spin_thread=False)
Python에서도 동일한 spin_thread 옵션이 제공된다.
9. 실용적 사용 패턴
9.1 센서 데이터 변환
센서 데이터를 수신한 후, 해당 데이터의 좌표를 목표 프레임으로 변환하는 패턴이다.
void laser_callback(
const sensor_msgs::msg::LaserScan::SharedPtr msg)
{
try {
auto transform = tf_buffer_->lookupTransform(
"base_link",
msg->header.frame_id,
tf2::timeFromSec(rclcpp::Time(msg->header.stamp).seconds()));
// 변환을 사용하여 레이저 스캔 데이터 처리
} catch (const tf2::TransformException & ex) {
RCLCPP_WARN(this->get_logger(), "%s", ex.what());
}
}
9.2 로봇 위치 추적
주기적으로 로봇의 현재 위치를 Map 프레임 기준으로 조회하는 패턴이다.
def timer_callback(self):
try:
transform = self.tf_buffer.lookup_transform(
'map', 'base_link', rclpy.time.Time())
x = transform.transform.translation.x
y = transform.transform.translation.y
self.get_logger().info(
f'Robot position: ({x:.2f}, {y:.2f})')
except TransformException as ex:
self.get_logger().debug(f'{ex}')
10. 성능 고려 사항
10.1 조회 빈도 최적화
lookupTransform()은 Buffer 내부의 캐시를 탐색하므로, 연산 비용이 매우 낮다. 그러나 예외 처리(exception handling)는 상대적으로 비용이 높으므로, 고주기 루프에서는 canTransform()을 먼저 호출하여 예외 발생을 최소화하는 것이 권장된다.
if (tf_buffer_->canTransform("map", "base_link", tf2::TimePointZero)) {
auto transform = tf_buffer_->lookupTransform(
"map", "base_link", tf2::TimePointZero);
// 변환 사용
}
10.2 메모리 사용량
Buffer의 캐시 기간(cache duration)은 기본적으로 10초이다. 많은 수의 프레임이 고주기로 발행되는 환경에서는 캐시 기간을 적절히 조정하여 메모리 사용량을 관리하여야 한다.
11. 참고 자료
- ROS2 공식 문서, “Writing a tf2 listener”, https://docs.ros.org/en/humble/Tutorials/Intermediate/Tf2/Writing-A-Tf2-Listener-Cpp.html
- tf2_ros API 문서, https://docs.ros2.org/latest/api/tf2_ros/
- REP 103 – Standard Units of Measure and Coordinate Conventions, https://www.ros.org/reps/rep-0103.html
- ROS2 Humble Hawksbill