659.79 tf2_ros의 메시지 필터 (MessageFilter) 활용

1. 개요

로봇 시스템에서 센서 데이터는 특정 좌표 프레임에서 측정된다. 이 센서 데이터를 다른 좌표 프레임으로 변환하여 활용하려면 해당 시점의 좌표 변환 정보가 반드시 필요하다. 그러나 센서 데이터와 좌표 변환 데이터는 서로 다른 토픽을 통하여 비동기적으로 전달되므로, 센서 메시지가 도착하는 시점에 해당 타임스탬프의 좌표 변환이 아직 이용 가능하지 않을 수 있다. **메시지 필터(MessageFilter)**는 이러한 시간적 동기화 문제를 해결하기 위한 tf2_ros 패키지의 핵심 구성 요소이다.

2. 메시지 필터의 필요성

2.1 비동기 데이터 도착 문제

ROS2에서 센서 데이터와 좌표 변환 데이터는 독립적인 퍼블리셔에 의하여 발행된다. 예를 들어, LiDAR 센서가 sensor_msgs/msg/PointCloud2 메시지를 발행하고, 로봇의 주행 제어기가 /tf 토픽으로 주행 측정(odometry) 기반 좌표 변환을 발행하는 경우를 고려하자. 두 데이터 스트림은 다음과 같은 이유로 시간적 불일치가 발생할 수 있다.

  • 네트워크 지연(Network Latency): DDS 미들웨어를 통한 메시지 전달 시 가변적인 지연이 발생한다.
  • 처리 지연(Processing Delay): 변환 데이터를 생성하는 노드와 센서 데이터를 생성하는 노드의 처리 속도 차이가 존재한다.
  • 발행 주파수 불일치: 센서 데이터의 발행 주파수와 변환 데이터의 발행 주파수가 상이할 수 있다.

2.2 직접 처리의 한계

메시지 필터 없이 센서 데이터 콜백 내에서 직접 lookupTransform()을 호출하면 다음과 같은 문제가 발생한다.

// 문제가 있는 접근 방식
void pointcloud_callback(const sensor_msgs::msg::PointCloud2::SharedPtr msg)
{
    try {
        // 센서 메시지 도착 시점에 해당 타임스탬프의 변환이 아직 없을 수 있음
        auto transform = tf_buffer_->lookupTransform(
            "map", msg->header.frame_id,
            tf2_ros::fromMsg(msg->header.stamp));
        // 변환 적용
    } catch (const tf2::ExtrapolationException& ex) {
        // 변환이 아직 이용 가능하지 않아 예외 발생
        RCLCPP_WARN(get_logger(), "변환 불가: %s", ex.what());
    }
}

위 코드에서 센서 메시지의 타임스탬프에 해당하는 좌표 변환이 tf2::Buffer에 아직 저장되지 않은 경우 ExtrapolationException이 발생하며, 해당 센서 데이터는 손실된다.

3. tf2_ros::MessageFilter의 동작 원리

3.1 아키텍처

tf2_ros::MessageFilter는 **입력 필터(Input Filter)**와 **출력 콜백(Output Callback)**으로 구성되는 메시지 파이프라인 구조를 갖는다.

센서 메시지 → [message_filters::Subscriber] → [tf2_ros::MessageFilter] → 사용자 콜백
                                                       ↑
                                                 [tf2::Buffer]
                                              (변환 가용성 확인)

동작 과정은 다음과 같다.

  1. message_filters::Subscriber가 센서 토픽으로부터 메시지를 수신한다.
  2. 수신된 메시지가 tf2_ros::MessageFilter에 전달된다.
  3. MessageFilter는 메시지의 header.stampheader.frame_id를 이용하여, 목표 프레임으로의 좌표 변환이 tf2::Buffer에서 이용 가능한지 확인한다.
  4. 변환이 이용 가능하면 메시지를 출력 콜백으로 즉시 전달한다.
  5. 변환이 이용 가능하지 않으면 메시지를 내부 큐에 보관하고, 변환이 이용 가능해질 때까지 대기한다.
  6. 큐 내 메시지가 허용 대기 시간을 초과하면 해당 메시지를 폐기한다.

3.2 핵심 매개변수

tf2_ros::MessageFilter의 동작을 제어하는 주요 매개변수는 다음과 같다.

매개변수설명기본값
target_frame변환의 목표 좌표 프레임필수 지정
queue_size내부 큐의 최대 크기10
tolerance변환 시간의 허용 오차 (Duration)0
buffer변환 데이터를 조회할 tf2::Buffer 참조필수 지정

4. C++에서의 MessageFilter 활용

4.1 기본 사용법

#include <rclcpp/rclcpp.hpp>
#include <message_filters/subscriber.h>
#include <tf2_ros/buffer.h>
#include <tf2_ros/transform_listener.h>
#include <tf2_ros/message_filter.h>
#include <sensor_msgs/msg/point_cloud2.hpp>
#include <tf2_sensor_msgs/tf2_sensor_msgs.hpp>

class PointCloudTransformNode : public rclcpp::Node
{
public:
    PointCloudTransformNode()
    : Node("pointcloud_transform_node")
    {
        // tf2 Buffer 및 Listener 초기화
        tf_buffer_ = std::make_shared<tf2_ros::Buffer>(this->get_clock());
        tf_listener_ = std::make_shared<tf2_ros::TransformListener>(*tf_buffer_, this);

        // message_filters::Subscriber를 이용한 토픽 구독
        point_cloud_sub_.subscribe(this, "/lidar/points",
            rmw_qos_profile_sensor_data);

        // tf2_ros::MessageFilter 설정
        tf_filter_ = std::make_shared<tf2_ros::MessageFilter<sensor_msgs::msg::PointCloud2>>(
            point_cloud_sub_,       // 입력 필터
            *tf_buffer_,            // tf2 Buffer
            "map",                  // 목표 프레임
            100,                    // 큐 크기
            this->get_node_logging_interface(),
            this->get_node_clock_interface(),
            std::chrono::milliseconds(100)  // 허용 대기 시간
        );

        // 출력 콜백 등록
        tf_filter_->registerCallback(
            std::bind(&PointCloudTransformNode::on_filtered_pointcloud,
                      this, std::placeholders::_1));
    }

private:
    void on_filtered_pointcloud(
        const sensor_msgs::msg::PointCloud2::ConstSharedPtr& msg)
    {
        // 이 콜백이 호출되는 시점에는 변환이 반드시 이용 가능함
        try {
            auto transform = tf_buffer_->lookupTransform(
                "map", msg->header.frame_id,
                tf2_ros::fromMsg(msg->header.stamp));

            sensor_msgs::msg::PointCloud2 transformed_cloud;
            tf2::doTransform(*msg, transformed_cloud, transform);

            RCLCPP_INFO(get_logger(),
                "변환 완료: %zu 포인트, 프레임: %s → map",
                transformed_cloud.width * transformed_cloud.height,
                msg->header.frame_id.c_str());

            // 변환된 포인트 클라우드 활용
            process_transformed_cloud(transformed_cloud);

        } catch (const tf2::TransformException& ex) {
            RCLCPP_ERROR(get_logger(), "변환 실패: %s", ex.what());
        }
    }

    void process_transformed_cloud(
        const sensor_msgs::msg::PointCloud2& cloud)
    {
        (void)cloud;
        // 변환된 데이터 처리 로직
    }

    std::shared_ptr<tf2_ros::Buffer> tf_buffer_;
    std::shared_ptr<tf2_ros::TransformListener> tf_listener_;
    message_filters::Subscriber<sensor_msgs::msg::PointCloud2> point_cloud_sub_;
    std::shared_ptr<tf2_ros::MessageFilter<sensor_msgs::msg::PointCloud2>> tf_filter_;
};

int main(int argc, char** argv)
{
    rclcpp::init(argc, argv);
    rclcpp::spin(std::make_shared<PointCloudTransformNode>());
    rclcpp::shutdown();
    return 0;
}

4.2 다중 목표 프레임 설정

MessageFilter는 복수의 목표 프레임을 지정할 수 있다. 모든 목표 프레임으로의 변환이 이용 가능해야 메시지가 출력 콜백으로 전달된다.

// 복수의 목표 프레임 설정
std::vector<std::string> target_frames = {"map", "odom", "base_link"};

tf_filter_ = std::make_shared<tf2_ros::MessageFilter<sensor_msgs::msg::PointCloud2>>(
    point_cloud_sub_,
    *tf_buffer_,
    target_frames[0],  // 초기 목표 프레임
    100,
    this->get_node_logging_interface(),
    this->get_node_clock_interface()
);

// 추가 목표 프레임 등록
tf_filter_->setTargetFrames(target_frames);

5. Python에서의 MessageFilter 활용

import rclpy
from rclpy.node import Node
from tf2_ros import Buffer, TransformListener
from tf2_ros.buffer import Buffer
from tf2_ros import TransformException
import message_filters
from tf2_ros.message_filter import MessageFilter
from sensor_msgs.msg import LaserScan
from rclpy.qos import QoSProfile, ReliabilityPolicy


class LaserTransformNode(Node):
    def __init__(self):
        super().__init__('laser_transform_node')

        # tf2 Buffer 및 Listener 초기화
        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)

        # QoS 프로파일 설정
        sensor_qos = QoSProfile(
            depth=10,
            reliability=ReliabilityPolicy.BEST_EFFORT
        )

        # message_filters.Subscriber를 이용한 토픽 구독
        self.laser_sub = message_filters.Subscriber(
            self, LaserScan, '/scan', qos_profile=sensor_qos)

        # tf2_ros MessageFilter 설정
        self.tf_filter = MessageFilter(
            self.laser_sub,
            self.tf_buffer,
            'map',          # 목표 프레임
            100,            # 큐 크기
            self             # 노드
        )

        # 출력 콜백 등록
        self.tf_filter.registerCallback(self.on_filtered_laser)

    def on_filtered_laser(self, msg):
        """변환이 이용 가능한 레이저 스캔 데이터 처리"""
        try:
            transform = self.tf_buffer.lookup_transform(
                'map',
                msg.header.frame_id,
                msg.header.stamp
            )

            self.get_logger().info(
                f'레이저 스캔 변환 완료: '
                f'{msg.header.frame_id} → map, '
                f'ranges: {len(msg.ranges)} 포인트'
            )

            # 변환된 데이터 활용
            self.process_scan(msg, transform)

        except TransformException as ex:
            self.get_logger().warning(f'변환 실패: {ex}')

    def process_scan(self, scan, transform):
        """변환된 레이저 스캔 데이터 처리"""
        pass


def main():
    rclpy.init()
    node = LaserTransformNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

6. MessageFilter의 고급 활용

6.1 message_filters와의 체이닝

tf2_ros::MessageFiltermessage_filters 프레임워크의 일부로서, 다른 필터와 체이닝(chaining)하여 복합적인 메시지 동기화 파이프라인을 구성할 수 있다.

#include <message_filters/subscriber.h>
#include <message_filters/time_synchronizer.h>
#include <message_filters/sync_policies/approximate_time.h>
#include <tf2_ros/message_filter.h>
#include <sensor_msgs/msg/image.hpp>
#include <sensor_msgs/msg/camera_info.hpp>

class StereoTransformNode : public rclcpp::Node
{
public:
    StereoTransformNode()
    : Node("stereo_transform_node")
    {
        tf_buffer_ = std::make_shared<tf2_ros::Buffer>(this->get_clock());
        tf_listener_ = std::make_shared<tf2_ros::TransformListener>(*tf_buffer_, this);

        // 좌측 영상 구독 → MessageFilter → 시간 동기화
        left_sub_.subscribe(this, "/stereo/left/image_raw");

        // tf2 MessageFilter 적용 (좌측 영상에 대해)
        left_tf_filter_ = std::make_shared<
            tf2_ros::MessageFilter<sensor_msgs::msg::Image>>(
            left_sub_, *tf_buffer_, "map", 50,
            this->get_node_logging_interface(),
            this->get_node_clock_interface());

        // 우측 영상 구독
        right_sub_.subscribe(this, "/stereo/right/image_raw");

        // 카메라 정보 구독
        camera_info_sub_.subscribe(this, "/stereo/left/camera_info");

        // ApproximateTime 정책을 이용한 시간 동기화
        using SyncPolicy = message_filters::sync_policies::ApproximateTime<
            sensor_msgs::msg::Image,
            sensor_msgs::msg::Image,
            sensor_msgs::msg::CameraInfo>;

        sync_ = std::make_shared<message_filters::Synchronizer<SyncPolicy>>(
            SyncPolicy(50),
            *left_tf_filter_,   // tf2 필터가 적용된 좌측 영상
            right_sub_,         // 우측 영상
            camera_info_sub_    // 카메라 정보
        );

        sync_->registerCallback(
            std::bind(&StereoTransformNode::on_synchronized,
                      this, std::placeholders::_1,
                      std::placeholders::_2, std::placeholders::_3));
    }

private:
    void on_synchronized(
        const sensor_msgs::msg::Image::ConstSharedPtr& left_img,
        const sensor_msgs::msg::Image::ConstSharedPtr& right_img,
        const sensor_msgs::msg::CameraInfo::ConstSharedPtr& cam_info)
    {
        RCLCPP_INFO(get_logger(),
            "동기화 완료: 좌측[%ux%u] 우측[%ux%u]",
            left_img->width, left_img->height,
            right_img->width, right_img->height);
        (void)cam_info;
    }

    std::shared_ptr<tf2_ros::Buffer> tf_buffer_;
    std::shared_ptr<tf2_ros::TransformListener> tf_listener_;
    message_filters::Subscriber<sensor_msgs::msg::Image> left_sub_;
    message_filters::Subscriber<sensor_msgs::msg::Image> right_sub_;
    message_filters::Subscriber<sensor_msgs::msg::CameraInfo> camera_info_sub_;
    std::shared_ptr<tf2_ros::MessageFilter<sensor_msgs::msg::Image>> left_tf_filter_;
    using SyncPolicy = message_filters::sync_policies::ApproximateTime<
        sensor_msgs::msg::Image, sensor_msgs::msg::Image,
        sensor_msgs::msg::CameraInfo>;
    std::shared_ptr<message_filters::Synchronizer<SyncPolicy>> sync_;
};

6.2 실패 콜백 등록

MessageFilter에서 변환 대기 시간을 초과하여 폐기되는 메시지에 대한 실패 콜백을 등록할 수 있다. 이를 통하여 데이터 손실의 원인을 진단할 수 있다.

// 실패 콜백 등록
tf_filter_->registerFailureCallback(
    std::bind(&MyNode::on_filter_failure, this,
              std::placeholders::_1, std::placeholders::_2));

void on_filter_failure(
    const sensor_msgs::msg::PointCloud2::ConstSharedPtr& msg,
    tf2_ros::filter_failure_reasons::FilterFailureReason reason)
{
    std::string reason_str;
    switch (reason) {
        case tf2_ros::filter_failure_reasons::OutTheBack:
            reason_str = "메시지가 Buffer 캐시 기간보다 오래됨";
            break;
        case tf2_ros::filter_failure_reasons::EmptyFrameID:
            reason_str = "메시지의 frame_id가 비어 있음";
            break;
        default:
            reason_str = "알 수 없는 사유";
            break;
    }
    RCLCPP_WARN(get_logger(),
        "메시지 필터 실패 [stamp=%.3f]: %s",
        rclcpp::Time(msg->header.stamp).seconds(),
        reason_str.c_str());
}

7. 큐 크기와 허용 오차 설정 지침

7.1 큐 크기(Queue Size) 결정

큐 크기는 변환 데이터 도착의 최대 지연 시간과 센서 데이터의 발행 주파수를 기반으로 결정한다.

\text{queue\_size} \geq f_{\text{sensor}} \times t_{\text{max\_delay}}

여기서 f_{\text{sensor}}는 센서 데이터 발행 주파수(Hz), t_{\text{max\_delay}}는 변환 데이터의 최대 도착 지연 시간(초)이다.

예를 들어, 10 Hz로 발행되는 LiDAR 데이터에 대하여 변환 데이터의 최대 지연이 0.5초인 경우, 큐 크기는 최소 5 이상으로 설정하여야 한다.

7.2 시간 허용 오차(Tolerance) 설정

시간 허용 오차는 센서 데이터의 타임스탬프와 변환 데이터의 타임스탬프 간 허용 가능한 최대 시간 차이를 지정한다. 이 값은 센서 데이터와 변환 데이터 사이의 시간 동기화 정밀도에 따라 결정한다.

  • 0초 (기본값): 정확한 타임스탬프 일치를 요구한다. 동일 발행 주파수에서 적합하다.
  • 양의 값: 지정된 시간 범위 이내의 변환을 허용한다. 보간(interpolation)을 통하여 요청 시점의 변환을 근사한다.
// 10ms 허용 오차 설정
tf_filter_ = std::make_shared<tf2_ros::MessageFilter<sensor_msgs::msg::PointCloud2>>(
    point_cloud_sub_, *tf_buffer_, "map", 100,
    this->get_node_logging_interface(),
    this->get_node_clock_interface(),
    std::chrono::milliseconds(10)  // tolerance
);

8. 성능 고려 사항

8.1 메모리 사용량

MessageFilter의 내부 큐는 대기 중인 센서 메시지를 보관하므로, 큐 크기와 메시지 크기에 비례하여 메모리를 사용한다. 대용량 메시지(예: PointCloud2)를 처리하는 경우 큐 크기를 적절히 제한하여야 한다.

메시지 유형개별 메시지 크기 (예시)큐 크기 100 기준 메모리 사용량
LaserScan (720 포인트)~5.8 KB~580 KB
Image (640×480 RGB)~921.6 KB~92.2 MB
PointCloud2 (65536 포인트)~1.6 MB~160 MB

8.2 Executor와의 상호 작용

MessageFilter의 콜백은 Executor에 의하여 실행된다. MultiThreadedExecutor를 사용하는 경우, MessageFilter의 출력 콜백과 tf2_ros의 구독 콜백이 서로 다른 스레드에서 동시에 실행될 수 있으므로, 공유 데이터에 대한 스레드 안전성을 보장하여야 한다.

8.3 QoS 호환성

MessageFilter의 입력 구독자에 설정된 QoS 정책이 퍼블리셔의 QoS 정책과 호환되어야 한다. 센서 데이터에 대하여는 일반적으로 BEST_EFFORT 신뢰성과 VOLATILE 내구성을 사용한다.

// 센서 데이터용 QoS 설정
rclcpp::QoS sensor_qos(10);
sensor_qos.best_effort();
sensor_qos.durability_volatile();

point_cloud_sub_.subscribe(this, "/lidar/points", sensor_qos.get_rmw_qos_profile());

9. 요약

tf2_ros::MessageFilter는 센서 데이터와 좌표 변환 데이터의 시간적 동기화 문제를 투명하게 해결하는 핵심 도구이다. 센서 메시지를 내부 큐에 보관하고, 해당 메시지의 타임스탬프에 대한 좌표 변환이 이용 가능해질 때까지 자동으로 대기한 후 사용자 콜백으로 전달함으로써, 안정적인 좌표 변환 처리를 보장한다. message_filters 프레임워크와의 체이닝을 통하여 복합적인 동기화 파이프라인을 구성할 수 있으며, 큐 크기와 시간 허용 오차의 적절한 설정이 시스템 성능의 핵심 요소이다.


참고 문헌 및 출처

  • ROS2 공식 문서, “Using Stamped Datatypes with tf2_ros::MessageFilter”, https://docs.ros.org/en/humble/Tutorials/Intermediate/Tf2/Using-Stamped-Datatypes-With-Tf2-Ros-MessageFilter.html (ROS2 Humble Hawksbill)
  • tf2_ros API 문서, tf2_ros::MessageFilter, https://docs.ros2.org/latest/api/tf2_ros/ (ROS2 Humble Hawksbill)
  • message_filters API 문서, https://docs.ros2.org/latest/api/message_filters/ (ROS2 Humble Hawksbill)
  • S. Macenski et al., “Robot Operating System 2: Design, Architecture, and Uses in the Wild,” Science Robotics, vol. 7, no. 66, 2022.