6.3 데이터 발행(Publish)과 구독(Subscribe)의 C++ 구현
네트워크의 고속도로를 깔았으니, 이제 그 위로 물류(Payload) 트럭을 쏘아 보낼 차례다.
과거의 C/C++ 미들웨어(DDS 등)에서는 데이터를 한 번 보내기 위해 IDL(Interface Definition Language) 코드를 짜고 컴파일러를 3번 돌려야 했다. 그러나 Zenoh C++ API는 개발자에게 그 어떤 사전 타입 선언도 강요하지 않는다. 바이트(Byte) 배열이건, 문자열이건, 커스텀 구조체이건 던지면(Put) 날아가고, 콜백(Callback)으로 떨어진다.
이 장에서는 C++11 이후의 꽃인 std::function 과 람다(Lambda) 익명 함수를 활용하여 구독부(Subscriber)를 얼마나 미니멀하게 설계할 수 있는지, 그리고 거대한 커스텀 C 구조체를 직렬화(Serialization)하여 QoS(신뢰성 보장) 딱지를 붙여 네트워크로 날려 보내는 하드코어 전술을 런북으로 기술한다.
1. Publisher 객체 생성과 Key Expression 설계
데이터를 쏠 때는 과녁(Topic)이 명확해야 한다. Zenoh에서는 이 과녁을 Key Expression(경로 표현식) 이라고 부른다.
1.0.1 [Runbook] 타겟팅 발사(Publisher) 전술
단일 스레드에서 캐주얼하게 쏠 때와, 화력 중심의 멀티 스레드에서 연사할 때의 무기 선택이 다르다.
전술 1. 임시 포격 (Session::put)
가끔 시스템 상태를 보고하거나, 1분에 한 번씩 쏘는 수준이라면 굳이 발사대(Publisher)를 거창하게 만들 필요가 없다.
auto session = zenoh::Session::open(std::move(config));
// 경로(Key)를 문자열로 적기만 하면 알아서 라우터로 날아간다!
session.put("robot/1/battery", "98%");
주의: 내부적으로 이 방식은 매번 경로 문자열을 파싱하므로, 초당 1만 번씩 불리면 오버헤드가 발생한다.
전술 2. 거치형 기관총 (Publisher 선언)
초당 60프레임의 영상이나 1,000Hz 센서 데이터를 쏴야 한다면 반드시 이 전술을 써야 한다.
// 1. 라우터에게 "내가 이 경로(Key)로 데이터를 집중 포격할 테니 통로 뚫어놔!" 통보
auto publisher = session.declare_publisher("robot/1/lidar");
// 2. 이후로는 문자열 검증 오버헤드 없이, 뚫린 파이프에 비트(Byte)만 쏟아붓는다.
for (int i = 0; i < 10000; i++) {
publisher.put("raw_sensor_data_chunk...");
}
// 루프가 끝나면 publisher 객체가 RAII에 의해 소멸되며 통로가 자동으로 폐쇄된다!
이 declare_publisher 아키텍처는 C++ 애플리케이션의 CPU 파싱 타임을 줄이고, 네트워크 계층의 라우팅 효율을 2배 이상 끌어올리는 분산 시스템의 기본 수칙이다.
2. 데이터 구독을 위한 Subscriber 객체 및 람다(Lambda) 콜백의 활용
구독(Subscribe)은 누군가 던진 데이터를 낚아채는 그물이다.
과거 C 개발자들은 스레드를 띄우고 함수 포인터를 넘기느라 코드가 스파게티가 되었지만, 모던 C++의 람다(Lambda) 와 결합된 zenoh-cpp 는 불과 3줄짜리 인라인 코드 블록으로 이 비동기 흐름을 완벽히 흡수한다.
2.0.1 [Runbook] 비동기 데이터 낚아채기 (Lambda Callback)
1. 와일드카드 구독망 전개
#include "zenoh.hxx"
#include <iostream>
void run_control_tower() {
auto config = zenoh::Config::create_default();
auto session = zenoh::Session::open(std::move(config));
std::cout << "수신 대기 중..." << std::endl;
// [핵심] '*' 와일드카드를 써서 1번 로봇이든 10번 로봇이든 배터리 상태는 다 긁어모은다!
// 람다 캡처 [=] 를 통해 로컬 변수를 콜백 내부로 무사히 끌고 들어갈 수 있다.
auto subscriber = session.declare_subscriber(
"robot/*/battery",
[](const%20zenoh::Sample&%20sample) {
// 패킷이 도착할 때마다 백그라운드 워커 스레드가 이 람다 블록을 때려(Trigger)준다.
std::cout << "[수신] 출처: " << sample.get_keyexpr().as_string_view()
<< " 베터리: " << sample.get_payload().as_string()
<< std::endl;
}
);
// 메인 스레드는 살려둬야 백그라운드 워커가 죽지 않음!
std::cin.get();
}
// 블록을 빠져나가면 subscriber 객체가 RAII로 소멸하며 라우터에 "구독 취소"를 통보함.
[메모리 방어 아키텍처]
초급 C++ 개발자들은 이 람다 안에서 [&](const%20zenoh::Sample&) 캡처(참조)를 걸어버렸다가, 메인 스코프의 변수가 소멸된 후 백그라운드 람다 콜백이 그 쓰레기 메모리(Dangling Pointer)를 건드려 세그폴트(Segfault)를 띄운다.
비동기 통신망에서 콜백을 선언할 때는 지역 변수 참조(&)를 극한으로 피하고, 스마트 포인터(std::shared_ptr)를 값으로 복사([=])하여 넘기는 것이 데드락과 덤프(Crash)를 막는 생존 본능이다.
3. 기본 데이터 타입(String, Integer 등)의 전송
Zenoh는 뼛속까지 고성능 네트워크 엔진이라서, 데이터를 전송할 때 “JSON으로 말아주세요” 같은 고급 식당 주문을 받지 않는다. int 건 string 이건 무조건 바이트 메모리 블록(uint8_t*)으로 내려쳐서 와이어(Wire)에 태운다.
zenoh-cpp 는 C++ 표준 라이브러리(std::string, std::vector)와 완벽한 형 변환(Casting)을 지원한다.
3.0.1 [Runbook] 스칼라 원시 타입(Primitive type) 발사 전술
전술 1. 문자열 (String API)
std::string danger_msg = "WARNING: Temperature Exceeded";
// std::string을 전달하면 알아서 내부 c_str() 의 바이트 길이만큼을 버퍼로 포장해서 보냄
publisher.put(danger_msg);
전술 2. 원시 데이터 (Integer, Float, Struct Pointer)
온도 센서(int) 하나를 던지는데 문자열 전환(to_string)을 하는 건 CPU를 시궁창에 버리는 행위다.
메모리 주소를 들이대고 사이즈만큼 긁어(Cast) 버리면 그만이다!
int cpu_temp = 85;
// int 타입 변수의 메모리 주소(&cpu_temp)를 들이밀고, 4바이트(sizeof)를 통째로 읽어라!
publisher.put(zenoh::Bytes(&cpu_temp, sizeof(int)));
수신 측 강제 형변환(Re-casting)
이렇게 바이트로 날아온 int를 수신부에서 어떻게 원복시킬 것인가?
auto sub = session.declare_subscriber("sensor/temp",
[](const%20zenoh::Sample&%20sample) {
// payload 의 일련된 일자 버퍼 주소를 가져온다.
const uint8_t* raw_bytes = sample.get_payload().as_slice().data();
// 메모리를 강제 캐스팅 (엔디안 Endianness 문제가 없다면 완벽한 Zero-Copy)
int received_temp = *reinterpret_cast<const int*>(raw_bytes);
std::cout << "온도 수신: " << received_temp << std::endl;
}
);
이렇게 포인터 캐스팅을 하는 순간, 직렬화/역직렬화에 들어가는 CPU Cycle은 사실상 기계어 파이프라인에서 ’0’이 된다. 이것이 HFT 트레이딩이나 로봇 모터 제어 루프에서 C++이 영원히 멸망하지 않는 이유다.
4. 사용자 정의 C 구조체의 직렬화(Serialization) 및 전송
단일 int 변수 1개라면 메모리 포인터 캐스팅으로 끝났겠지만, 문자열과 배열이 혼합된 100줄짜리 거대 로봇 상태 C 구조체(struct)를 네트워크로 던져야 한다면 이야기가 다르다. 구조체의 뻥튀기(Padding) 문제, 빅-리틀 엔디안 교차 문제 때문에 구조체를 생으로 포인터 우겨넣기(Raw Dump) 하면 타겟 시스템에서 값이 쓰레기로 변한다.
4.0.1 [Runbook] C++ 범용 직렬화 파이프라인 우회 침투
Zenoh-cpp 자체는 역직렬화 도구를 강제하지 않는다. 로보틱스 도메인에서는 보통 가벼우면서도 구조가 잡힌 msgpack-c 혹은 flatbuffers 를 많이 쓴다. 여기서는 개념적으로 C++ 코끼리를 직렬화하여 바이트로 구겨 넣는 런북을 전개한다.
1. [송신] 직렬화(Serialization) 후 바이트 버퍼 투하
어떤 라이브러리를 쓰건 결과물은 결국 순수한 std::vector<uint8_t> 다.
#include <vector>
struct RobotState {
int id;
double battery;
bool is_moving;
};
void send_state(zenoh::Publisher& pub, const RobotState& state) {
// 1. (가정) 커스텀 직렬기나 memcpy 로 연속된 바이트 버퍼를 만든다.
std::vector<uint8_t> buffer;
buffer.reserve(sizeof(RobotState));
// 구조체 배열 통째로 복사 (패딩 문제가 없는 통일 아키텍처의 경우)
uint8_t* head = reinterpret_cast<uint8_t*>(const_cast<RobotState*>(&state));
buffer.insert(buffer.end(), head, head + sizeof(RobotState));
// 2. [Zenoh] 이 vector 덩어리를 소유권 이전(Move)하여 통신망에 올려버림!
pub.put(std::move(buffer));
}
2. [수신] 조각난 ZBytes 봉합 및 구조체 부활
Zenoh는 큰 페이로드를 조각내어(Fragmented) 전달할 수도 있다. 이걸 다시 끄집어내는 작업.
auto sub = session.declare_subscriber("sys/state",
[](const%20zenoh::Sample&%20sample) {
// 데이터가 조각(Chunks) 나있을 수 있으므로 일자 배열로 쫙 합쳐(contiguous) 가져온다!
auto payload_vector = sample.get_payload().as_vector();
// 사이즈 무결성 검증
if(payload_vector.size() == sizeof(RobotState)) {
RobotState* state = reinterpret_cast<RobotState*>(payload_vector.data());
std::cout << "상태 복원 성공! 로봇 ID: " << state->id << std::endl;
} else {
std::cout << "공격 또는 깨진 패킷 유입!" << std::endl;
}
}
);
이처럼 응용 개발자가 페이로드 파라미터 규격을 마음대로 주무를 수 있는 유연함이 바로 Zenoh 구조체 바인딩의 미학(Art)이다.
5. 멀티스레드 환경에서의 콜백 함수 스레드 안전성(Thread-safety) 확보
이 장은 C++ 엔지니어가 읽을 때 가장 등골이 서늘해지는 부분이다.
Zenoh의 Subscriber 람다는 “내 메인 스레드“에서 실행되는 게 아니라, Zenoh 하부 코어의 백그라운드 I/O 스레드 내부에서 불쑥불쑥 튀어나온다. 즉, 이 콜백 안에서 바깥에 있는 글로벌 큐(std::queue)나 std::vector 변수에 접근하면 즉각 데이터 레이스(Data Race) 가 발생하며 랜덤 크래시가 일어난다.
5.0.1 [Runbook] 비동기 데이터 컨텍스트 격리 (Locking)
네트워크 이벤트 워커를 살리면서, 내 앱으로 데이터를 안전하게 통과시키는 동기화 배관(Pipeline) 작업이다.
[정석] Mutex & 락 큐(Locking Queue) 패턴 적용
도착한(Push) 메시지를 락을 쥐고 로컬 큐에 집어넣어, 백그라운드 라우팅 스레드는 지체 없이 패킷을 버리고 제갈길을 가게(Non-blocking) 해방시켜야 한다.
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include "zenoh.hxx"
// 앱 단에서 소비할 공유 메모리 자산들
std::mutex g_queue_mutex;
std::queue<std::string> g_msg_queue;
void initiate_subscriber(zenoh::Session& session) {
// 백그라운드 스레드가 치고 들어올 과녁
auto sub = session.declare_subscriber("camera/frame",
[](const%20zenoh::Sample&%20sample) {
std::string data = sample.get_payload().as_string();
// [방어막 1] 락(Lock) 획득! 이걸 놓치면 10분 뒤에 프로그램이 터진다.
std::lock_guard<std::mutex> lock(g_queue_mutex);
g_msg_queue.push(std::move(data));
// 블록 탈출 시 lock_guard 파괴 -> 락 자동 해제
}
);
}
void main_logic_thread() {
while(true) {
std::string my_data;
{
// [전방 초소] 메인 스레드도 데이터를 꺼내갈 때 똑같이 락을 먼저 잡는다.
std::lock_guard<std::mutex> lock(g_queue_mutex);
if (!g_msg_queue.empty()) {
my_data = g_msg_queue.front();
g_msg_queue.pop();
}
}
// 무거운 AI 비전 처리 함수는 락을 '풀고 나서' 단독으로 진행해야
// 네트워크 수신부가 병목(Blocked)에 시달리지 않는다!
if(!my_data.empty()) {
process_ai_vision(my_data);
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
[아키텍처 인스펙션]
고수들은 이 락 획득(Lock Guard) 시간조차 아까워서, C++11 규격의 std::atomic 인덱스나 Lock-free 링 버퍼(SPSC Queue) 를 박아 넣어 백엔드 통신 간의 병목을 진공 상태로 만들어 버린다. Zenoh는 그 어떤 고속 터널이든 맞물려 돌아갈 준비가 되어있다.
6. QoS(Quality of Service) 정책 설정 - 신뢰성(Reliability) 및 혼잡 제어(Congestion Control)
C++ 분산 로직이 들어가는 곳은 장난감 구역이 아니다. “네트워크 버퍼가 막혔으니 비상 정지(Brake) 명령 패킷을 버리겠습니다“라는 미친 행동을 하는 라이브러리를 산업에 쓸 수는 없다.
데이터의 목숨 값과 중요도를 라우터 커널 단에 통보하는 QoS (통신 품질 제어) 정책 세팅은 필연이다.
6.0.1 [Runbook] 생사결단 백프레셔(Backpressure) 통제 전술
zenoh-cpp 환경에서 퍼블리싱을 걸 땐 PutOptions에 이 무시무시한 통제력을 섞을 수 있다.
전술 1. 신뢰도 강제 (100% 도달 보장)
비상 정지, 펌웨어 델타 패치 등은 단 1비트도 증발해선 안 된다.
zenoh::PutOptions opts;
// [무결성 보장] TCP/QUIC 의 재전송 큐를 끝까지 물고 늘어진다. (Reliable)
opts.set_reliability(zenoh::Reliability::Z_RELIABILITY_RELIABLE);
publisher.put("robot/1/cmd_brake", "STOP", opts);
전술 2. 혼잡 제어 (Traffic Jam Policy)
메인 라우터의 큐(Queue)가 꽉 찼다고 치자. 다음 프레임을 어떻게 할 것인가?
zenoh::PutOptions opts_sensor;
// [드랍(Drop) 정책] 카메라 데이터다. 1장 잃어버리는 게 지연되는 것보다 낫다.
// 큐가 꽉 차있으면 내 패킷을 미련 없이 버려라.
opts_sensor.set_congestion_control(zenoh::CongestionControl::Z_CONGESTION_CONTROL_DROP);
publisher.put("robot/1/cam_feed", byte_vector, opts_sensor);
특수방어기제: 블로킹 메커니즘
네트워크가 도저히 패킷을 소화하지 못하면, Z_CONGESTION_CONTROL_BLOCK 을 써보라. 이 설정이 묻은 put 함수는 성공할 때까지 영원히 리턴하지(Return) 않는다! 당신의 C++ 스레드는 그 자리에 멈춰 서버가 죽지 않도록 방어하는 자연 백프레셔(Backpressure) 효과를 내며 전체 붕괴를 지연시킨다.
현역 C++ 데브옵스 아키텍트는 ಈ 옵션들을 데이터 페이로드 종류별로 클래스화 시켜 절대로 휴먼 에러(통신 품질 설정 누락)가 일어나지 않도록 하드코딩(Hardcoding)해 둔다.