659.29 TransformListener를 이용한 변환 수신

1. 개요

TransformListenertf2_ros 패키지에서 제공하는 핵심 클래스로서, /tf/tf_static 토픽으로 발행되는 좌표 변환 메시지를 수신하여 tf2::Buffer에 저장하는 역할을 담당한다. 로봇 시스템에서 좌표 변환을 활용하는 모든 노드는 TransformListener를 통하여 변환 데이터를 수집하고, Buffer의 조회 메서드를 통하여 원하는 프레임 간의 변환을 취득한다. 본 절에서는 TransformListener의 아키텍처, 동작 원리, 그리고 실제 사용 방법을 체계적으로 기술한다.

2. TransformListener의 역할과 아키텍처

2.1 기본 역할

TransformListener는 다음의 세 가지 핵심 역할을 수행한다.

  1. 토픽 구독: /tf 토픽(동적 변환)과 /tf_static 토픽(정적 변환)을 자동으로 구독한다.
  2. 버퍼 최신화: 수신한 TransformStamped 메시지를 tf2::Buffer에 저장한다.
  3. 구독자 관리: 내부적으로 두 개의 구독자(subscriber)를 생성하고 관리한다.

2.2 아키텍처 다이어그램

[TransformBroadcaster] ──(/tf)──────→ [TransformListener] ──→ [tf2::Buffer]
[StaticTransformBroadcaster] ──(/tf_static)──→                  │
                                                                 ↓
                                                         lookupTransform()
                                                         canTransform()

TransformListener는 발행자(broadcaster)와 사용자(consumer) 사이의 중간 계층으로서, 변환 데이터의 수집과 버퍼링을 투명하게 처리한다.

2.3 Buffer와의 관계

TransformListener는 독립적으로 동작하지 않으며, 반드시 tf2::Buffer(C++) 또는 tf2_ros.Buffer(Python)와 함께 사용된다. Buffer는 변환 데이터의 저장소이며, TransformListener는 이 저장소를 최신 상태로 유지하는 역할을 한다.

TransformListener = 토픽 구독자 (데이터 수집)
Buffer = 변환 데이터 저장소 (데이터 조회)

이 분리된 설계는 관심 분리(Separation of Concerns) 원칙을 따르며, 테스트와 확장에 유리하다.

3. 내부 동작 메커니즘

3.1 구독 생성

TransformListener는 생성 시 다음의 두 구독자를 자동으로 생성한다.

구독 토픽QoS DurabilityQoS Reliability용도
/tfVOLATILERELIABLE동적 변환 수신
/tf_staticTRANSIENT_LOCALRELIABLE정적 변환 수신

/tf_static 구독자는 TRANSIENT_LOCAL 내구성을 사용하므로, 구독 시작 이전에 발행된 정적 변환도 수신할 수 있다.

3.2 콜백 처리

수신한 TFMessage 메시지는 내부 콜백 함수에 의하여 처리된다. 콜백은 메시지에 포함된 각 TransformStamped를 순회하며, Buffer::setTransform() 메서드를 통하여 버퍼에 저장한다.

수신: TFMessage {transforms: [T1, T2, T3]}
  → Buffer.setTransform(T1, authority, is_static)
  → Buffer.setTransform(T2, authority, is_static)
  → Buffer.setTransform(T3, authority, is_static)

authority 인자는 변환의 출처를 나타내며, 디버깅 시 어떤 노드가 특정 변환을 발행하였는지 추적하는 데 활용된다.

3.3 스레드 모델

TransformListener의 구독 콜백은 노드의 Executor에 의하여 실행된다. ROS2에서는 두 가지 Executor 모델이 제공된다.

  • SingleThreadedExecutor: 모든 콜백이 단일 스레드에서 순차적으로 실행된다.
  • MultiThreadedExecutor: 복수의 스레드에서 콜백이 병렬로 실행된다.

tf2::Buffer는 내부적으로 뮤텍스(mutex)를 사용하여 스레드 안전성을 보장하므로, MultiThreadedExecutor 환경에서도 안전하게 동작한다.

4. rclcpp에서의 사용

4.1 기본 초기화

#include <rclcpp/rclcpp.hpp>
#include <tf2_ros/transform_listener.h>
#include <tf2_ros/buffer.h>

class FrameListener : public rclcpp::Node
{
public:
  FrameListener()
  : Node("frame_listener")
  {
    tf_buffer_ = std::make_unique<tf2_ros::Buffer>(this->get_clock());
    tf_listener_ = std::make_shared<tf2_ros::TransformListener>(*tf_buffer_);

    timer_ = this->create_wall_timer(
      std::chrono::milliseconds(500),
      std::bind(&FrameListener::timer_callback, this));
  }

private:
  void timer_callback()
  {
    try {
      auto transform = tf_buffer_->lookupTransform(
        "map", "base_link", tf2::TimePointZero);

      RCLCPP_INFO(this->get_logger(),
        "Translation: [%.2f, %.2f, %.2f]",
        transform.transform.translation.x,
        transform.transform.translation.y,
        transform.transform.translation.z);
    } catch (const tf2::TransformException & ex) {
      RCLCPP_WARN(this->get_logger(),
        "Could not get transform: %s", ex.what());
    }
  }

  std::unique_ptr<tf2_ros::Buffer> tf_buffer_;
  std::shared_ptr<tf2_ros::TransformListener> tf_listener_;
  rclcpp::TimerBase::SharedPtr timer_;
};

Bufferthis->get_clock()을 인자로 받아 노드의 시계를 공유한다. 이를 통하여 use_sim_time 파라미터에 의한 시뮬레이션 시간을 자동으로 반영한다.

TransformListenerBuffer의 참조를 받아 생성된다. 생성 즉시 /tf/tf_static 토픽의 구독을 시작한다.

4.2 생성 순서

BufferTransformListener의 생성 순서는 반드시 다음과 같아야 한다.

  1. Buffer 생성
  2. TransformListener 생성 (Buffer 참조 전달)

TransformListenerBuffer보다 먼저 소멸하면 안 되므로, 멤버 변수의 선언 순서에도 주의하여야 한다.

5. rclpy에서의 사용

5.1 기본 초기화

import rclpy
from rclpy.node import Node
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener


class FrameListener(Node):
    def __init__(self):
        super().__init__('frame_listener')
        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)

        self.timer = self.create_timer(0.5, self.timer_callback)

    def timer_callback(self):
        try:
            transform = self.tf_buffer.lookup_transform(
                'map', 'base_link', rclpy.time.Time())

            self.get_logger().info(
                f'Translation: '
                f'[{transform.transform.translation.x:.2f}, '
                f'{transform.transform.translation.y:.2f}, '
                f'{transform.transform.translation.z:.2f}]')
        except TransformException as ex:
            self.get_logger().warn(
                f'Could not get transform: {ex}')


def main(args=None):
    rclpy.init(args=args)
    node = FrameListener()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

Python에서는 Buffer()를 인자 없이 생성할 수 있으며, TransformListener의 두 번째 인자로 노드를 전달한다.

6. 변환 조회 방법

6.1 lookupTransform

Buffer에 저장된 변환을 조회하는 주요 메서드이다.

// C++
auto transform = tf_buffer_->lookupTransform(
  "target_frame",   // 변환 결과의 기준 프레임
  "source_frame",   // 변환할 원본 프레임
  tf2::TimePointZero);  // 최신 가용 변환
# Python
transform = self.tf_buffer.lookup_transform(
    'target_frame',
    'source_frame',
    rclpy.time.Time())  # 최신 가용 변환

target_frame은 변환 결과가 표현될 좌표계이며, source_frame은 변환의 원점이 되는 좌표계이다.

6.2 canTransform

변환이 가능한지 사전에 확인하는 메서드이다.

// C++
bool available = tf_buffer_->canTransform(
  "target_frame", "source_frame",
  tf2::TimePointZero);
# Python
available = self.tf_buffer.can_transform(
    'target_frame', 'source_frame',
    rclpy.time.Time())

6.3 시간 지정 조회

특정 시점의 변환을 조회하려면 타임스탬프를 명시적으로 지정한다.

// C++
rclcpp::Time query_time = this->get_clock()->now();
auto transform = tf_buffer_->lookupTransform(
  "map", "base_link", query_time);
# Python
query_time = self.get_clock().now()
transform = self.tf_buffer.lookup_transform(
    'map', 'base_link', query_time)

7. 예외 처리

lookupTransform() 호출 시 다음의 예외가 발생할 수 있다.

예외 유형발생 조건
LookupException지정된 프레임이 변환 트리에 존재하지 않는 경우
ConnectivityException두 프레임 간에 연결 경로가 없는 경우
ExtrapolationException요청한 시간의 변환이 캐시 범위를 벗어나는 경우
InvalidArgumentException잘못된 인자가 전달된 경우

모든 예외는 tf2::TransformException(C++) 또는 tf2_ros.TransformException(Python)의 하위 클래스이므로, 상위 예외 클래스로 일괄 처리할 수 있다.

// C++
try {
  auto transform = tf_buffer_->lookupTransform(
    "map", "base_link", tf2::TimePointZero);
} catch (const tf2::TransformException & ex) {
  RCLCPP_WARN(this->get_logger(), "%s", ex.what());
}
# Python
try:
    transform = self.tf_buffer.lookup_transform(
        'map', 'base_link', rclpy.time.Time())
except TransformException as ex:
    self.get_logger().warn(f'{ex}')

8. TransformListener의 생성자 옵션

8.1 spin_thread 옵션 (rclcpp)

rclcpp에서 TransformListener는 선택적으로 전용 콜백 스레드를 생성할 수 있다.

// 전용 스레드 사용 (기본값: true)
tf_listener_ = std::make_shared<tf2_ros::TransformListener>(
  *tf_buffer_, this, true);

// Executor에 의존
tf_listener_ = std::make_shared<tf2_ros::TransformListener>(
  *tf_buffer_, this, false);

spin_thread=true(기본값)인 경우, TransformListener가 내부적으로 별도의 콜백 그룹(callback group)과 Executor 스레드를 생성하여 토픽 콜백을 처리한다. 이 모드에서는 사용자의 Executor 설정과 무관하게 변환 수신이 독립적으로 동작한다.

spin_thread=false인 경우, TransformListener의 구독 콜백은 사용자가 설정한 Executor에 의하여 처리된다. 이 경우 rclcpp::spin() 또는 executor.spin()이 호출되어야 변환이 수신된다.

8.2 spin_thread 옵션 (rclpy)

# spin_thread=True (기본값)
self.tf_listener = TransformListener(
    self.tf_buffer, self, spin_thread=True)

# spin_thread=False
self.tf_listener = TransformListener(
    self.tf_buffer, self, spin_thread=False)

Python에서도 동일한 spin_thread 옵션이 제공된다.

9. 실용적 사용 패턴

9.1 센서 데이터 변환

센서 데이터를 수신한 후, 해당 데이터의 좌표를 목표 프레임으로 변환하는 패턴이다.

void laser_callback(
  const sensor_msgs::msg::LaserScan::SharedPtr msg)
{
  try {
    auto transform = tf_buffer_->lookupTransform(
      "base_link",
      msg->header.frame_id,
      tf2::timeFromSec(rclcpp::Time(msg->header.stamp).seconds()));

    // 변환을 사용하여 레이저 스캔 데이터 처리
  } catch (const tf2::TransformException & ex) {
    RCLCPP_WARN(this->get_logger(), "%s", ex.what());
  }
}

9.2 로봇 위치 추적

주기적으로 로봇의 현재 위치를 Map 프레임 기준으로 조회하는 패턴이다.

def timer_callback(self):
    try:
        transform = self.tf_buffer.lookup_transform(
            'map', 'base_link', rclpy.time.Time())

        x = transform.transform.translation.x
        y = transform.transform.translation.y
        self.get_logger().info(
            f'Robot position: ({x:.2f}, {y:.2f})')
    except TransformException as ex:
        self.get_logger().debug(f'{ex}')

10. 성능 고려 사항

10.1 조회 빈도 최적화

lookupTransform()Buffer 내부의 캐시를 탐색하므로, 연산 비용이 매우 낮다. 그러나 예외 처리(exception handling)는 상대적으로 비용이 높으므로, 고주기 루프에서는 canTransform()을 먼저 호출하여 예외 발생을 최소화하는 것이 권장된다.

if (tf_buffer_->canTransform("map", "base_link", tf2::TimePointZero)) {
  auto transform = tf_buffer_->lookupTransform(
    "map", "base_link", tf2::TimePointZero);
  // 변환 사용
}

10.2 메모리 사용량

Buffer의 캐시 기간(cache duration)은 기본적으로 10초이다. 많은 수의 프레임이 고주기로 발행되는 환경에서는 캐시 기간을 적절히 조정하여 메모리 사용량을 관리하여야 한다.

11. 참고 자료

  • ROS2 공식 문서, “Writing a tf2 listener”, https://docs.ros.org/en/humble/Tutorials/Intermediate/Tf2/Writing-A-Tf2-Listener-Cpp.html
  • tf2_ros API 문서, https://docs.ros2.org/latest/api/tf2_ros/
  • REP 103 – Standard Units of Measure and Coordinate Conventions, https://www.ros.org/reps/rep-0103.html
  • ROS2 Humble Hawksbill