6.3 데이터 발행(Publish)과 구독(Subscribe)의 C++ 구현

6.3 데이터 발행(Publish)과 구독(Subscribe)의 C++ 구현

네트워크의 고속도로를 깔았으니, 이제 그 위로 물류(Payload) 트럭을 쏘아 보낼 차례다.
과거의 C/C++ 미들웨어(DDS 등)에서는 데이터를 한 번 보내기 위해 IDL(Interface Definition Language) 코드를 짜고 컴파일러를 3번 돌려야 했다. 그러나 Zenoh C++ API는 개발자에게 그 어떤 사전 타입 선언도 강요하지 않는다. 바이트(Byte) 배열이건, 문자열이건, 커스텀 구조체이건 던지면(Put) 날아가고, 콜백(Callback)으로 떨어진다.

이 장에서는 C++11 이후의 꽃인 std::function 과 람다(Lambda) 익명 함수를 활용하여 구독부(Subscriber)를 얼마나 미니멀하게 설계할 수 있는지, 그리고 거대한 커스텀 C 구조체를 직렬화(Serialization)하여 QoS(신뢰성 보장) 딱지를 붙여 네트워크로 날려 보내는 하드코어 전술을 런북으로 기술한다.

1. Publisher 객체 생성과 Key Expression 설계

데이터를 쏠 때는 과녁(Topic)이 명확해야 한다. Zenoh에서는 이 과녁을 Key Expression(경로 표현식) 이라고 부른다.

1.0.1 [Runbook] 타겟팅 발사(Publisher) 전술

단일 스레드에서 캐주얼하게 쏠 때와, 화력 중심의 멀티 스레드에서 연사할 때의 무기 선택이 다르다.

전술 1. 임시 포격 (Session::put)
가끔 시스템 상태를 보고하거나, 1분에 한 번씩 쏘는 수준이라면 굳이 발사대(Publisher)를 거창하게 만들 필요가 없다.

auto session = zenoh::Session::open(std::move(config));

// 경로(Key)를 문자열로 적기만 하면 알아서 라우터로 날아간다!
session.put("robot/1/battery", "98%");

주의: 내부적으로 이 방식은 매번 경로 문자열을 파싱하므로, 초당 1만 번씩 불리면 오버헤드가 발생한다.

전술 2. 거치형 기관총 (Publisher 선언)
초당 60프레임의 영상이나 1,000Hz 센서 데이터를 쏴야 한다면 반드시 이 전술을 써야 한다.

// 1. 라우터에게 "내가 이 경로(Key)로 데이터를 집중 포격할 테니 통로 뚫어놔!" 통보
auto publisher = session.declare_publisher("robot/1/lidar");

// 2. 이후로는 문자열 검증 오버헤드 없이, 뚫린 파이프에 비트(Byte)만 쏟아붓는다.
for (int i = 0; i < 10000; i++) {
    publisher.put("raw_sensor_data_chunk..."); 
}
// 루프가 끝나면 publisher 객체가 RAII에 의해 소멸되며 통로가 자동으로 폐쇄된다!

declare_publisher 아키텍처는 C++ 애플리케이션의 CPU 파싱 타임을 줄이고, 네트워크 계층의 라우팅 효율을 2배 이상 끌어올리는 분산 시스템의 기본 수칙이다.

2. 데이터 구독을 위한 Subscriber 객체 및 람다(Lambda) 콜백의 활용

구독(Subscribe)은 누군가 던진 데이터를 낚아채는 그물이다.
과거 C 개발자들은 스레드를 띄우고 함수 포인터를 넘기느라 코드가 스파게티가 되었지만, 모던 C++의 람다(Lambda) 와 결합된 zenoh-cpp 는 불과 3줄짜리 인라인 코드 블록으로 이 비동기 흐름을 완벽히 흡수한다.

2.0.1 [Runbook] 비동기 데이터 낚아채기 (Lambda Callback)

1. 와일드카드 구독망 전개

#include "zenoh.hxx"
#include <iostream>

void run_control_tower() {
    auto config = zenoh::Config::create_default();
    auto session = zenoh::Session::open(std::move(config));

    std::cout << "수신 대기 중..." << std::endl;

    // [핵심] '*' 와일드카드를 써서 1번 로봇이든 10번 로봇이든 배터리 상태는 다 긁어모은다!
    // 람다 캡처 [=] 를 통해 로컬 변수를 콜백 내부로 무사히 끌고 들어갈 수 있다.
    auto subscriber = session.declare_subscriber(
        "robot/*/battery", 
        [](const%20zenoh::Sample&%20sample) {
            // 패킷이 도착할 때마다 백그라운드 워커 스레드가 이 람다 블록을 때려(Trigger)준다.
            std::cout << "[수신] 출처: " << sample.get_keyexpr().as_string_view()
                      << " 베터리: " << sample.get_payload().as_string() 
                      << std::endl;
        }
    );

    // 메인 스레드는 살려둬야 백그라운드 워커가 죽지 않음!
    std::cin.get(); 
}
// 블록을 빠져나가면 subscriber 객체가 RAII로 소멸하며 라우터에 "구독 취소"를 통보함.

[메모리 방어 아키텍처]
초급 C++ 개발자들은 이 람다 안에서 [&](const%20zenoh::Sample&) 캡처(참조)를 걸어버렸다가, 메인 스코프의 변수가 소멸된 후 백그라운드 람다 콜백이 그 쓰레기 메모리(Dangling Pointer)를 건드려 세그폴트(Segfault)를 띄운다.
비동기 통신망에서 콜백을 선언할 때는 지역 변수 참조(&)를 극한으로 피하고, 스마트 포인터(std::shared_ptr)를 값으로 복사([=])하여 넘기는 것이 데드락과 덤프(Crash)를 막는 생존 본능이다.

3. 기본 데이터 타입(String, Integer 등)의 전송

Zenoh는 뼛속까지 고성능 네트워크 엔진이라서, 데이터를 전송할 때 “JSON으로 말아주세요” 같은 고급 식당 주문을 받지 않는다. intstring 이건 무조건 바이트 메모리 블록(uint8_t*)으로 내려쳐서 와이어(Wire)에 태운다.

zenoh-cpp 는 C++ 표준 라이브러리(std::string, std::vector)와 완벽한 형 변환(Casting)을 지원한다.

3.0.1 [Runbook] 스칼라 원시 타입(Primitive type) 발사 전술

전술 1. 문자열 (String API)

std::string danger_msg = "WARNING: Temperature Exceeded";

// std::string을 전달하면 알아서 내부 c_str() 의 바이트 길이만큼을 버퍼로 포장해서 보냄
publisher.put(danger_msg);

전술 2. 원시 데이터 (Integer, Float, Struct Pointer)
온도 센서(int) 하나를 던지는데 문자열 전환(to_string)을 하는 건 CPU를 시궁창에 버리는 행위다.
메모리 주소를 들이대고 사이즈만큼 긁어(Cast) 버리면 그만이다!

int cpu_temp = 85;

// int 타입 변수의 메모리 주소(&cpu_temp)를 들이밀고, 4바이트(sizeof)를 통째로 읽어라!
publisher.put(zenoh::Bytes(&cpu_temp, sizeof(int)));

수신 측 강제 형변환(Re-casting)
이렇게 바이트로 날아온 int를 수신부에서 어떻게 원복시킬 것인가?

auto sub = session.declare_subscriber("sensor/temp", 
    [](const%20zenoh::Sample&%20sample) {
        // payload 의 일련된 일자 버퍼 주소를 가져온다.
        const uint8_t* raw_bytes = sample.get_payload().as_slice().data();
        
        // 메모리를 강제 캐스팅 (엔디안 Endianness 문제가 없다면 완벽한 Zero-Copy)
        int received_temp = *reinterpret_cast<const int*>(raw_bytes);
        
        std::cout << "온도 수신: " << received_temp << std::endl;
    }
);

이렇게 포인터 캐스팅을 하는 순간, 직렬화/역직렬화에 들어가는 CPU Cycle은 사실상 기계어 파이프라인에서 ’0’이 된다. 이것이 HFT 트레이딩이나 로봇 모터 제어 루프에서 C++이 영원히 멸망하지 않는 이유다.

4. 사용자 정의 C 구조체의 직렬화(Serialization) 및 전송

단일 int 변수 1개라면 메모리 포인터 캐스팅으로 끝났겠지만, 문자열과 배열이 혼합된 100줄짜리 거대 로봇 상태 C 구조체(struct)를 네트워크로 던져야 한다면 이야기가 다르다. 구조체의 뻥튀기(Padding) 문제, 빅-리틀 엔디안 교차 문제 때문에 구조체를 생으로 포인터 우겨넣기(Raw Dump) 하면 타겟 시스템에서 값이 쓰레기로 변한다.

4.0.1 [Runbook] C++ 범용 직렬화 파이프라인 우회 침투

Zenoh-cpp 자체는 역직렬화 도구를 강제하지 않는다. 로보틱스 도메인에서는 보통 가벼우면서도 구조가 잡힌 msgpack-c 혹은 flatbuffers 를 많이 쓴다. 여기서는 개념적으로 C++ 코끼리를 직렬화하여 바이트로 구겨 넣는 런북을 전개한다.

1. [송신] 직렬화(Serialization) 후 바이트 버퍼 투하
어떤 라이브러리를 쓰건 결과물은 결국 순수한 std::vector<uint8_t> 다.

#include <vector>

struct RobotState {
    int id;
    double battery;
    bool is_moving;
};

void send_state(zenoh::Publisher& pub, const RobotState& state) {
    // 1. (가정) 커스텀 직렬기나 memcpy 로 연속된 바이트 버퍼를 만든다.
    std::vector<uint8_t> buffer;
    buffer.reserve(sizeof(RobotState));
    
    // 구조체 배열 통째로 복사 (패딩 문제가 없는 통일 아키텍처의 경우)
    uint8_t* head = reinterpret_cast<uint8_t*>(const_cast<RobotState*>(&state));
    buffer.insert(buffer.end(), head, head + sizeof(RobotState));

    // 2. [Zenoh] 이 vector 덩어리를 소유권 이전(Move)하여 통신망에 올려버림!
    pub.put(std::move(buffer));
}

2. [수신] 조각난 ZBytes 봉합 및 구조체 부활
Zenoh는 큰 페이로드를 조각내어(Fragmented) 전달할 수도 있다. 이걸 다시 끄집어내는 작업.

auto sub = session.declare_subscriber("sys/state", 
    [](const%20zenoh::Sample&%20sample) {
        
        // 데이터가 조각(Chunks) 나있을 수 있으므로 일자 배열로 쫙 합쳐(contiguous) 가져온다!
        auto payload_vector = sample.get_payload().as_vector();
        
        // 사이즈 무결성 검증
        if(payload_vector.size() == sizeof(RobotState)) {
             RobotState* state = reinterpret_cast<RobotState*>(payload_vector.data());
             std::cout << "상태 복원 성공! 로봇 ID: " << state->id << std::endl;
        } else {
             std::cout << "공격 또는 깨진 패킷 유입!" << std::endl;
        }
    }
);

이처럼 응용 개발자가 페이로드 파라미터 규격을 마음대로 주무를 수 있는 유연함이 바로 Zenoh 구조체 바인딩의 미학(Art)이다.

5. 멀티스레드 환경에서의 콜백 함수 스레드 안전성(Thread-safety) 확보

이 장은 C++ 엔지니어가 읽을 때 가장 등골이 서늘해지는 부분이다.
Zenoh의 Subscriber 람다는 “내 메인 스레드“에서 실행되는 게 아니라, Zenoh 하부 코어의 백그라운드 I/O 스레드 내부에서 불쑥불쑥 튀어나온다. 즉, 이 콜백 안에서 바깥에 있는 글로벌 큐(std::queue)나 std::vector 변수에 접근하면 즉각 데이터 레이스(Data Race) 가 발생하며 랜덤 크래시가 일어난다.

5.0.1 [Runbook] 비동기 데이터 컨텍스트 격리 (Locking)

네트워크 이벤트 워커를 살리면서, 내 앱으로 데이터를 안전하게 통과시키는 동기화 배관(Pipeline) 작업이다.

[정석] Mutex & 락 큐(Locking Queue) 패턴 적용
도착한(Push) 메시지를 락을 쥐고 로컬 큐에 집어넣어, 백그라운드 라우팅 스레드는 지체 없이 패킷을 버리고 제갈길을 가게(Non-blocking) 해방시켜야 한다.

#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include "zenoh.hxx"

// 앱 단에서 소비할 공유 메모리 자산들
std::mutex g_queue_mutex;
std::queue<std::string> g_msg_queue;

void initiate_subscriber(zenoh::Session& session) {
    // 백그라운드 스레드가 치고 들어올 과녁
    auto sub = session.declare_subscriber("camera/frame", 
        [](const%20zenoh::Sample&%20sample) {
            std::string data = sample.get_payload().as_string();
            
            // [방어막 1] 락(Lock) 획득! 이걸 놓치면 10분 뒤에 프로그램이 터진다.
            std::lock_guard<std::mutex> lock(g_queue_mutex);
            g_msg_queue.push(std::move(data));
            // 블록 탈출 시 lock_guard 파괴 -> 락 자동 해제
        }
    );
}

void main_logic_thread() {
    while(true) {
        std::string my_data;
        {
            // [전방 초소] 메인 스레드도 데이터를 꺼내갈 때 똑같이 락을 먼저 잡는다.
            std::lock_guard<std::mutex> lock(g_queue_mutex);
            if (!g_msg_queue.empty()) {
                my_data = g_msg_queue.front();
                g_msg_queue.pop();
            }
        }
        
        // 무거운 AI 비전 처리 함수는 락을 '풀고 나서' 단독으로 진행해야 
        // 네트워크 수신부가 병목(Blocked)에 시달리지 않는다!
        if(!my_data.empty()) {
            process_ai_vision(my_data);
        }
        
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

[아키텍처 인스펙션]
고수들은 이 락 획득(Lock Guard) 시간조차 아까워서, C++11 규격의 std::atomic 인덱스나 Lock-free 링 버퍼(SPSC Queue) 를 박아 넣어 백엔드 통신 간의 병목을 진공 상태로 만들어 버린다. Zenoh는 그 어떤 고속 터널이든 맞물려 돌아갈 준비가 되어있다.

6. QoS(Quality of Service) 정책 설정 - 신뢰성(Reliability) 및 혼잡 제어(Congestion Control)

C++ 분산 로직이 들어가는 곳은 장난감 구역이 아니다. “네트워크 버퍼가 막혔으니 비상 정지(Brake) 명령 패킷을 버리겠습니다“라는 미친 행동을 하는 라이브러리를 산업에 쓸 수는 없다.
데이터의 목숨 값과 중요도를 라우터 커널 단에 통보하는 QoS (통신 품질 제어) 정책 세팅은 필연이다.

6.0.1 [Runbook] 생사결단 백프레셔(Backpressure) 통제 전술

zenoh-cpp 환경에서 퍼블리싱을 걸 땐 PutOptions에 이 무시무시한 통제력을 섞을 수 있다.

전술 1. 신뢰도 강제 (100% 도달 보장)
비상 정지, 펌웨어 델타 패치 등은 단 1비트도 증발해선 안 된다.

zenoh::PutOptions opts;

// [무결성 보장] TCP/QUIC 의 재전송 큐를 끝까지 물고 늘어진다. (Reliable)
opts.set_reliability(zenoh::Reliability::Z_RELIABILITY_RELIABLE);

publisher.put("robot/1/cmd_brake", "STOP", opts);

전술 2. 혼잡 제어 (Traffic Jam Policy)
메인 라우터의 큐(Queue)가 꽉 찼다고 치자. 다음 프레임을 어떻게 할 것인가?

zenoh::PutOptions opts_sensor;

// [드랍(Drop) 정책] 카메라 데이터다. 1장 잃어버리는 게 지연되는 것보다 낫다.
// 큐가 꽉 차있으면 내 패킷을 미련 없이 버려라.
opts_sensor.set_congestion_control(zenoh::CongestionControl::Z_CONGESTION_CONTROL_DROP);

publisher.put("robot/1/cam_feed", byte_vector, opts_sensor);

특수방어기제: 블로킹 메커니즘
네트워크가 도저히 패킷을 소화하지 못하면, Z_CONGESTION_CONTROL_BLOCK 을 써보라. 이 설정이 묻은 put 함수는 성공할 때까지 영원히 리턴하지(Return) 않는다! 당신의 C++ 스레드는 그 자리에 멈춰 서버가 죽지 않도록 방어하는 자연 백프레셔(Backpressure) 효과를 내며 전체 붕괴를 지연시킨다.

현역 C++ 데브옵스 아키텍트는 ಈ 옵션들을 데이터 페이로드 종류별로 클래스화 시켜 절대로 휴먼 에러(통신 품질 설정 누락)가 일어나지 않도록 하드코딩(Hardcoding)해 둔다.