659.80 tf2_ros::MessageFilter를 이용한 동기화 변환

1. 개요

tf2_ros::MessageFilter는 센서 메시지와 좌표 변환 데이터 사이의 시간적 동기화를 자동으로 수행하는 클래스이다. 센서 메시지가 도착하면, 해당 메시지의 타임스탬프에 대응하는 좌표 변환이 tf2::Buffer에서 이용 가능한지 확인한 후, 이용 가능한 경우에만 사용자 콜백으로 메시지를 전달한다. 본 절에서는 tf2_ros::MessageFilter의 내부 동작 메커니즘, API 구조, 그리고 동기화 변환의 구체적인 구현 방법을 상세히 기술한다.

2. tf2_ros::MessageFilter의 클래스 구조

2.1 템플릿 정의

tf2_ros::MessageFilter는 C++ 템플릿 클래스로, 필터링 대상 메시지의 유형을 템플릿 매개변수로 지정한다. 필터링할 수 있는 메시지 유형은 std_msgs/msg/Header 필드를 포함하는 모든 ROS2 메시지 유형이다.

template<class MessageT>
class MessageFilter : public MessageFilterBase,
                      public message_filters::SimpleFilter<MessageT>
{
    // ...
};

2.2 생성자 시그니처

ROS2 Humble 기준으로 tf2_ros::MessageFilter는 다음과 같은 주요 생성자를 제공한다.

// 생성자 1: message_filters::SubscriberBase 입력
MessageFilter(
    message_filters::SubscriberBase<MessageT>& sub,
    tf2::BufferCore& buffer_core,
    const std::string& target_frame,
    uint32_t queue_size,
    const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr& node_logging,
    const rclcpp::node_interfaces::NodeClockInterface::SharedPtr& node_clock,
    tf2::Duration buffer_timeout = tf2::Duration(0)
);

// 생성자 2: 입력 필터 없이 생성 (수동 메시지 주입)
MessageFilter(
    tf2::BufferCore& buffer_core,
    const std::string& target_frame,
    uint32_t queue_size,
    const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr& node_logging,
    const rclcpp::node_interfaces::NodeClockInterface::SharedPtr& node_clock,
    tf2::Duration buffer_timeout = tf2::Duration(0)
);

각 매개변수의 역할은 다음과 같다.

매개변수유형설명
subSubscriberBase<MessageT>&메시지 입력원으로 사용할 구독자 필터
buffer_coretf2::BufferCore&변환 조회에 사용할 Buffer의 핵심 객체
target_frameconst std::string&변환의 목표 좌표 프레임 이름
queue_sizeuint32_t대기 큐의 최대 크기
node_loggingNodeLoggingInterface::SharedPtr로깅 인터페이스
node_clockNodeClockInterface::SharedPtr시계 인터페이스
buffer_timeouttf2::Duration변환 대기 허용 시간

3. 내부 동작 메커니즘

3.1 메시지 수신과 큐잉

MessageFilter가 입력 필터로부터 메시지를 수신하면, 다음의 판정 과정을 수행한다.

메시지 수신
    │
    ├─ frame_id가 비어 있는가? ─── 예 ──→ 실패 콜백 (EmptyFrameID)
    │                                       
    ├─ 목표 프레임으로의 변환이 가능한가?
    │   │
    │   ├─ canTransform() == true ──→ 출력 콜백으로 전달
    │   │
    │   └─ canTransform() == false
    │       │
    │       ├─ 큐 크기가 최대인가?
    │       │   │
    │       │   ├─ 예 ──→ 가장 오래된 메시지 폐기 후 현재 메시지 큐잉
    │       │   │
    │       │   └─ 아니오 ──→ 현재 메시지 큐잉
    │       │
    │       └─ Buffer 갱신 콜백에 재평가 등록
    │
    └─ Buffer 갱신 시 큐의 모든 메시지 재평가

3.2 변환 가용성 재평가

MessageFiltertf2::BufferCore에 새로운 변환이 추가될 때마다 호출되는 콜백을 등록한다. 이 콜백이 호출되면, 내부 큐에 보관된 모든 대기 메시지에 대하여 변환 가용성을 재평가한다. 변환이 이용 가능해진 메시지는 출력 콜백으로 즉시 전달된다.

// 내부 동작 (개념적 의사 코드)
void MessageFilter::transformsChanged()
{
    for (auto it = message_queue_.begin(); it != message_queue_.end(); ) {
        if (canTransformToTarget(it->message)) {
            // 변환 이용 가능: 출력 콜백 호출
            signalMessage(it->message);
            it = message_queue_.erase(it);
        } else if (isExpired(it->message)) {
            // 대기 시간 초과: 실패 콜백 호출
            signalFailure(it->message, OutTheBack);
            it = message_queue_.erase(it);
        } else {
            ++it;
        }
    }
}

4. 동기화 변환의 구현

4.1 C++ 구현: 레이저 스캔 동기화 변환

다음은 sensor_msgs/msg/LaserScan 메시지를 수신하고, 해당 데이터의 좌표 프레임에서 map 프레임으로의 변환이 이용 가능해진 후에만 데이터를 처리하는 완전한 예제이다.

#include <rclcpp/rclcpp.hpp>
#include <message_filters/subscriber.h>
#include <tf2_ros/buffer.h>
#include <tf2_ros/transform_listener.h>
#include <tf2_ros/message_filter.h>
#include <sensor_msgs/msg/laser_scan.hpp>
#include <geometry_msgs/msg/point_stamped.hpp>
#include <tf2_geometry_msgs/tf2_geometry_msgs.hpp>
#include <cmath>

class SynchronizedLaserTransformNode : public rclcpp::Node
{
public:
    SynchronizedLaserTransformNode()
    : Node("sync_laser_transform_node")
    {
        // tf2 인프라 초기화
        tf_buffer_ = std::make_shared<tf2_ros::Buffer>(this->get_clock());
        tf_listener_ = std::make_shared<tf2_ros::TransformListener>(*tf_buffer_, this);

        // message_filters::Subscriber 설정
        laser_sub_.subscribe(this, "/scan",
            rmw_qos_profile_sensor_data);

        // MessageFilter 설정
        tf_filter_ = std::make_shared<
            tf2_ros::MessageFilter<sensor_msgs::msg::LaserScan>>(
            laser_sub_,
            *tf_buffer_,
            "map",              // 목표 프레임
            50,                 // 큐 크기
            this->get_node_logging_interface(),
            this->get_node_clock_interface(),
            std::chrono::milliseconds(200)   // 허용 대기 시간
        );

        // 성공 콜백 등록
        tf_filter_->registerCallback(
            std::bind(&SynchronizedLaserTransformNode::on_laser_transformed,
                      this, std::placeholders::_1));

        // 실패 콜백 등록
        tf_filter_->registerFailureCallback(
            std::bind(&SynchronizedLaserTransformNode::on_filter_failure,
                      this, std::placeholders::_1, std::placeholders::_2));

        // 변환된 포인트 발행자
        point_pub_ = this->create_publisher<geometry_msgs::msg::PointStamped>(
            "/laser_point_in_map", 10);

        RCLCPP_INFO(get_logger(),
            "동기화 레이저 변환 노드가 초기화되었음");
    }

private:
    void on_laser_transformed(
        const sensor_msgs::msg::LaserScan::ConstSharedPtr& scan)
    {
        try {
            // MessageFilter가 보장하므로 이 시점에서 변환은 반드시 이용 가능함
            auto transform = tf_buffer_->lookupTransform(
                "map", scan->header.frame_id,
                tf2_ros::fromMsg(scan->header.stamp));

            // 최소 거리 포인트를 map 프레임으로 변환
            float min_range = std::numeric_limits<float>::max();
            float min_angle = 0.0f;

            for (size_t i = 0; i < scan->ranges.size(); ++i) {
                float range = scan->ranges[i];
                if (range >= scan->range_min && range <= scan->range_max
                    && range < min_range)
                {
                    min_range = range;
                    min_angle = scan->angle_min + i * scan->angle_increment;
                }
            }

            if (min_range < std::numeric_limits<float>::max()) {
                // 레이저 프레임에서의 포인트 좌표
                geometry_msgs::msg::PointStamped point_laser;
                point_laser.header = scan->header;
                point_laser.point.x = min_range * std::cos(min_angle);
                point_laser.point.y = min_range * std::sin(min_angle);
                point_laser.point.z = 0.0;

                // map 프레임으로 변환
                geometry_msgs::msg::PointStamped point_map;
                tf2::doTransform(point_laser, point_map, transform);

                point_pub_->publish(point_map);

                RCLCPP_DEBUG(get_logger(),
                    "최근접 장애물 (map 프레임): [%.3f, %.3f, %.3f]",
                    point_map.point.x,
                    point_map.point.y,
                    point_map.point.z);
            }
        } catch (const tf2::TransformException& ex) {
            RCLCPP_ERROR(get_logger(),
                "예상하지 못한 변환 오류: %s", ex.what());
        }
    }

    void on_filter_failure(
        const sensor_msgs::msg::LaserScan::ConstSharedPtr& scan,
        tf2_ros::filter_failure_reasons::FilterFailureReason reason)
    {
        RCLCPP_WARN_THROTTLE(get_logger(), *get_clock(), 5000,
            "레이저 스캔 메시지 필터링 실패 "
            "[stamp=%.3f, frame=%s, reason=%d]",
            rclcpp::Time(scan->header.stamp).seconds(),
            scan->header.frame_id.c_str(),
            static_cast<int>(reason));
    }

    std::shared_ptr<tf2_ros::Buffer> tf_buffer_;
    std::shared_ptr<tf2_ros::TransformListener> tf_listener_;
    message_filters::Subscriber<sensor_msgs::msg::LaserScan> laser_sub_;
    std::shared_ptr<tf2_ros::MessageFilter<sensor_msgs::msg::LaserScan>> tf_filter_;
    rclcpp::Publisher<geometry_msgs::msg::PointStamped>::SharedPtr point_pub_;
};

int main(int argc, char** argv)
{
    rclcpp::init(argc, argv);
    rclcpp::spin(std::make_shared<SynchronizedLaserTransformNode>());
    rclcpp::shutdown();
    return 0;
}

4.2 Python 구현: 포즈 동기화 변환

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
import message_filters
from tf2_ros import Buffer, TransformListener, TransformException
from tf2_ros.message_filter import MessageFilter
from geometry_msgs.msg import PoseStamped, PointStamped
import tf2_geometry_msgs  # doTransform 등록을 위한 임포트


class SynchronizedPoseTransformNode(Node):
    def __init__(self):
        super().__init__('sync_pose_transform_node')

        # tf2 인프라 초기화
        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)

        # QoS 설정
        qos = QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE)

        # message_filters.Subscriber 설정
        self.pose_sub = message_filters.Subscriber(
            self, PoseStamped, '/detected_object_pose', qos_profile=qos)

        # MessageFilter 설정
        self.tf_filter = MessageFilter(
            self.pose_sub,
            self.tf_buffer,
            'map',      # 목표 프레임
            50,         # 큐 크기
            self
        )

        # 콜백 등록
        self.tf_filter.registerCallback(self.on_pose_transformed)

        # 변환된 포즈 발행자
        self.transformed_pub = self.create_publisher(
            PoseStamped, '/object_pose_in_map', 10)

        self.get_logger().info('동기화 포즈 변환 노드 초기화 완료')

    def on_pose_transformed(self, msg):
        """변환이 보장된 포즈 데이터 처리"""
        try:
            # 소스 프레임에서 map 프레임으로의 변환 조회
            transform = self.tf_buffer.lookup_transform(
                'map',
                msg.header.frame_id,
                msg.header.stamp
            )

            # PoseStamped 변환
            transformed_pose = tf2_geometry_msgs.do_transform_pose_stamped(
                msg, transform)

            # 변환된 포즈 발행
            self.transformed_pub.publish(transformed_pose)

            self.get_logger().debug(
                f'포즈 변환 완료: '
                f'{msg.header.frame_id} → map, '
                f'위치=[{transformed_pose.pose.position.x:.3f}, '
                f'{transformed_pose.pose.position.y:.3f}, '
                f'{transformed_pose.pose.position.z:.3f}]'
            )

        except TransformException as ex:
            self.get_logger().error(f'변환 오류: {ex}')


def main():
    rclpy.init()
    node = SynchronizedPoseTransformNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

5. 다중 프레임 동기화 변환

5.1 setTargetFrames()를 이용한 복수 프레임 동기화

복수의 좌표 프레임으로의 변환이 모두 이용 가능해야 메시지를 전달하는 방식이다.

// 다중 목표 프레임 설정
std::vector<std::string> target_frames = {"map", "odom", "base_footprint"};
tf_filter_->setTargetFrames(target_frames);

이 설정을 적용하면 MessageFilter는 메시지의 소스 프레임에서 map, odom, base_footprint 세 프레임 모두로의 변환이 이용 가능한 경우에만 메시지를 출력 콜백으로 전달한다. 이는 단일 센서 데이터를 복수의 좌표 프레임에서 동시에 활용해야 하는 경우에 유용하다.

5.2 setTargetFrame()을 이용한 동적 프레임 변경

런타임에 목표 프레임을 변경할 수 있다. 프레임이 변경되면 내부 큐의 기존 메시지는 새로운 목표 프레임에 대하여 재평가된다.

// 런타임에 목표 프레임 변경
void switch_reference_frame(const std::string& new_frame)
{
    tf_filter_->setTargetFrame(new_frame);
    RCLCPP_INFO(get_logger(),
        "목표 프레임이 '%s'으로 변경되었음", new_frame.c_str());
}

6. 성능 분석과 튜닝

6.1 큐 크기의 영향

큐 크기가 너무 작으면 변환 데이터 도착 전에 메시지가 폐기되어 데이터 손실이 발생한다. 반대로 큐 크기가 너무 크면 메모리 사용량이 증가하고, 오래된 메시지의 처리로 인하여 시스템의 응답성이 저하될 수 있다.

큐 크기장점단점
작음 (1~10)메모리 사용량 최소, 최신 데이터 우선 처리변환 지연 시 데이터 손실 발생
중간 (10~50)적절한 균형일반적 환경에서 적합
큼 (50~200)데이터 손실 최소화메모리 부담 증가, 오래된 데이터 처리 가능성

6.2 buffer_timeout의 영향

buffer_timeout 매개변수는 canTransform() 호출 시 대기할 최대 시간을 지정한다. 이 값이 0이면 비동기적으로 큐잉만 수행하고, 양의 값이면 지정된 시간만큼 동기적으로 대기한다.

  • 0초 (권장): 비차단 동작으로 Executor 스레드를 차지하지 않는다. 변환이 도착하면 transformsChanged() 콜백을 통하여 재평가된다.
  • 양의 값: 변환 도착까지 블로킹하므로, SingleThreadedExecutor에서는 다른 콜백의 실행이 차단될 수 있다.

6.3 메시지 지연 분석

동기화 변환의 전체 지연 시간은 다음과 같이 분해된다.

t_{\text{total}} = t_{\text{sensor}} + t_{\text{network}} + t_{\text{queue}} + t_{\text{transform}} + t_{\text{processing}}

여기서:

  • t_{\text{sensor}}: 센서 데이터 생성 지연
  • t_{\text{network}}: DDS 네트워크 전송 지연
  • t_{\text{queue}}: MessageFilter 큐 대기 시간
  • t_{\text{transform}}: 변환 데이터 도착까지의 대기 시간
  • t_{\text{processing}}: 콜백 처리 시간

t_{\text{queue}}t_{\text{transform}}이 지배적인 지연 요인이며, 변환 발행 주파수를 높이거나 변환 발행 노드의 처리 우선순위를 조정하여 이를 감소시킬 수 있다.

7. 디버깅과 진단

7.1 큐 상태 모니터링

MessageFilter의 대기 큐 상태를 모니터링하여 동기화 성능을 진단할 수 있다.

// 주기적 큐 상태 보고
void report_filter_status()
{
    RCLCPP_INFO(get_logger(),
        "MessageFilter 상태: "
        "수신=%lu, 전달=%lu, 폐기=%lu, 큐 대기=%lu",
        tf_filter_->getIncomingMessageCount(),
        tf_filter_->getIncomingMessageCount()
            - tf_filter_->getDroppedMessageCount(),
        tf_filter_->getDroppedMessageCount(),
        tf_filter_->getQueueSize());
}

7.2 일반적인 문제와 해결 방안

문제 현상원인해결 방안
출력 콜백이 호출되지 않음변환이 발행되지 않음ros2 topic echo /tf로 변환 확인
출력 콜백이 호출되지 않음frame_id가 비어 있거나 잘못됨실패 콜백을 등록하여 원인 확인
메시지가 대부분 폐기됨큐 크기가 너무 작음큐 크기를 증가
메시지가 대부분 폐기됨변환 발행 주파수가 너무 낮음변환 발행 빈도를 센서 주파수 이상으로 설정
메모리 사용량이 과도함큐 크기가 너무 크고 메시지 크기가 큼큐 크기를 줄이거나 메시지 크기를 최적화

8. 요약

tf2_ros::MessageFilter를 이용한 동기화 변환은 센서 데이터와 좌표 변환 데이터 사이의 본질적인 비동기성 문제를 체계적으로 해결한다. 메시지의 수신, 큐잉, 변환 가용성 재평가, 출력 전달의 전 과정이 자동으로 관리되므로, 개발자는 변환이 보장된 데이터에 대한 처리 로직에만 집중할 수 있다. 큐 크기, buffer_timeout, 목표 프레임 설정 등의 매개변수를 시스템 요구 사항에 맞게 조정하고, 실패 콜백을 활용하여 데이터 손실의 원인을 지속적으로 모니터링하는 것이 안정적인 시스템 운용의 핵심이다.


참고 문헌 및 출처

  • ROS2 공식 문서, “Using Stamped Datatypes with tf2_ros::MessageFilter”, https://docs.ros.org/en/humble/Tutorials/Intermediate/Tf2/Using-Stamped-Datatypes-With-Tf2-Ros-MessageFilter.html (ROS2 Humble Hawksbill)
  • tf2_ros API 문서, tf2_ros::MessageFilter, https://docs.ros2.org/latest/api/tf2_ros/ (ROS2 Humble Hawksbill)
  • message_filters 패키지 소스 코드, https://github.com/ros2/message_filters (ROS2 Humble 브랜치)
  • S. Macenski et al., “Robot Operating System 2: Design, Architecture, and Uses in the Wild,” Science Robotics, vol. 7, no. 66, 2022.