6.9 실전 C++ 미니 프로젝트: 고성능 다중 센서 융합 노드 구축
“이론은 끝났다. 코드를 돌려라(Show me the code).”
이 챕터는 지금까지 6장에서 배운 모든 C++ Zenoh 아키텍처—스마트 포인터 래핑, 멀티스레드 락 프리 큐, 비동기 송수신, 그리고 프로파일링—를 단 하나의 무결점 프로젝트로 통합하는 최종 관문이다.
우리가 구축할 시스템은 ‘자율주행 차량의 다중 센서 퓨전 코어’ 다.
1번 스레드는 100Hz로 라이다(LiDAR) 포인트 클라우드를 쏘아대고, 2번 스레드는 10Hz로 온도 센서를 난사한다. 이 무지막지한 이형(Heterogeneous) 데이터를 하나의 C++ Zenoh 파이프라인으로 병합하여 관제 서버로 날려 보내는 과정에서, 밀리초 단위의 레이턴시를 깎아내는 리얼타임 엔진의 진수를 맛보게 될 것이다.
1. 다중 스레드 기반 가상 센서 데이터 생성기(Generator) 구현
현장의 센서들은 자비를 베풀지 않는다. 서로 다른 주파수(Hz)로 메인 메모리에 인터럽트를 걸어대며 데이터를 쏟아낸다. 이 센서들을 대신할 Virtual Generator를 C++ 멀티스레드로 정밀하게 타공한다.
1.0.1 [Runbook] 이기종 주파수 멀티스레드 시뮬레이터
메인 통신 스레드를 건드리지 않고, 완전히 독립적으로 펌프질을 하는 백그라운드 워커 2개를 띄운다.
#include <thread>
#include <atomic>
#include <chrono>
#include <vector>
#include <iostream>
// 글로벌 종료 플래그
std::atomic<bool> g_running{true};
// 센서 1: 고주파 스캐너 (100Hz, 대용량 버퍼)
void lidar_generator_task() {
std::cout << "[LiDAR] 100Hz 가동 시작" << std::endl;
// 5MB 짜리 포인트 클라우드 모형 버퍼
std::vector<uint8_t> dummy_lidar(5 * 1024 * 1024, 0xFF);
auto next_wake = std::chrono::steady_clock::now();
while(g_running) {
// TODO: 여기서 Zenoh 통신 큐로 데이터를 Push 한다! (6.9.2 절에서 연동)
// 칼같은 10ms (100Hz) 루프 타이머
next_wake += std::chrono::milliseconds(10);
std::this_thread::sleep_until(next_wake);
}
}
// 센서 2: 저주파 메타 센서 (10Hz, 소용량 스트링)
void temp_generator_task() {
std::cout << "[Temp] 10Hz 가동 시작" << std::endl;
int temp_val = 20;
auto next_wake = std::chrono::steady_clock::now();
while(g_running) {
std::string payload = "Temp:" + std::to_string(temp_val++);
// TODO: Zenoh 통신 큐로 Push
next_wake += std::chrono::milliseconds(100);
std::this_thread::sleep_until(next_wake);
}
}
이 코드는 std::this_thread::sleep_until을 사용해 운영체제의 강제 스케줄링 오차를 뚫고 정시성(Real-time Jitter 극복) 을 확보하는 고전적이면서도 강력한 C++ 시뮬레이션 전술이다.
2. Zenoh 기반 비동기 데이터 퍼블리싱 파이프라인 설계
센서 스레드들이 쏟아내는 데이터를 메인 통신 엔진(Zenoh)이 어떻게 소화할 것인가?
각 스레드에서 직접 session.put() 을 남발하면 네트워크 소켓 단에 미친 듯한 병목 락(Lock) 경합이 발생한다. 생산자(Sensor)와 통신(Zenoh Transport)을 분리(Decoupling)하는 링 버퍼 배관 공사를 실시한다.
2.0.1 [Runbook] Lock-free Queue 기반의 무결성 배관 연결
MPSC (Multi-Producer Single-Consumer) 큐를 직접 짜기는 힘드니, C++ std::mutex 를 최소한의 찰나로 묶고 넘기는 전술을 채택한다.
#include "zenoh.hxx"
#include <mutex>
#include <queue>
// 전송을 대기하는 배달 물품 상자
struct TxPacket {
std::string topic;
std::vector<uint8_t> payload; // Zero 카피 이동용
};
std::mutex g_tx_mutex;
std::queue<TxPacket> g_tx_queue;
// 생산자: 센서 스레드 측 (예: Lidar)에서 호출
void push_to_zenoh_pipeline(std::string topic, std::vector<uint8_t>&& data) {
std::lock_guard<std::mutex> lock(g_tx_mutex);
g_tx_queue.push({std::move(topic), std::move(data)});
// memcpy 없이 포인터표만 큐어 밀어넣는다! (소요 시간 0.001ms 미만)
}
// 소비자: 메인 스레드의 Zenoh 퍼블리셔 전담 워커
void zenoh_tx_worker(zenoh::Session& session) {
while(g_running) {
TxPacket packet;
bool has_data = false;
{ // 락은 큐에서 데이터를 빼올 때 단 1μs 만 잡고 즉시 놓는다!
std::lock_guard<std::mutex> lock(g_tx_mutex);
if(!g_tx_queue.empty()) {
packet = std::move(g_tx_queue.front());
g_tx_queue.pop();
has_data = true;
}
}
if(has_data) {
// [Zenoh 폭격 개시] 락이 풀린 안전한 공간에서, 느려터진 네트워크 I/O 실행
session.put(packet.topic, std::move(packet.payload));
} else {
// 빈 루프를 막기 위한 양보(Yield)
std::this_thread::yield();
}
}
}
이 구조야말로 아무리 센서가 폭주해도, Zenoh 백엔드 네트워크망의 지연이 센서 스레드의 샘플링 주기(100Hz)를 방해하지 못하게 막는 최후의 블로킹 방어선이다.
3. 중앙 관제 노드에서의 데이터 수신 및 지연 시간(Latency) 측정
쏟아붓는 엣지 로봇이 있다면, 전부 주워 담는 관제 클라우드가 있어야 한다.
단순히 수신만 해선 이 책의 독자 수준에 맞지 않다. 패킷에 묻어온 생성 타임스탬프와 현재 리눅스 커널 도착 시간을 빼서 밀리초 단위의 통신 지연율(RTT/Latency)을 찍어내야 시스템의 건강검진이 끝난다.
3.0.1 [Runbook] 마이크로초 단위 통신 레이턴시 트래킹
송신부(6.9.2)에서 페이로드 앞부분 8바이트에 uint64_t로 송신 시각(Epoch)을 묻혀서 보냈다고 가정한다. (혹은 Attachment 로 붙여도 무방하다).
#include "zenoh.hxx"
#include <iostream>
#include <chrono>
// 현재 서버의 마이크로초 구하기
uint64_t get_current_micros() {
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
int main() {
auto session = zenoh::Session::open(zenoh::Config::create_default());
std::cout << "[관제탑] 모든 로봇의 센서망 수신 대기 (Wildcard **)" << std::endl;
auto sub = session.declare_subscriber(
"robot/**",
[](const%20zenoh::Sample&%20sample) {
uint64_t receive_time = get_current_micros();
// 패킷 버퍼 파편화 통합
auto bytes = sample.get_payload().as_vector();
if (bytes.size() >= sizeof(uint64_t)) {
// 발송자가 박아넣은 최초 생성 시점(8byte) 파싱
uint64_t send_time = *reinterpret_cast<const uint64_t*>(bytes.data());
// [포렌식] 이동 시간 계산
uint64_t latency = receive_time - send_time;
std::cout << "[모니터] " << sample.get_keyexpr().as_string_view()
<< " | 레이턴시: " << latency << " μs | 크기: "
<< bytes.size() << " bytes" << std::endl;
}
}
);
std::cin.get();
return 0;
}
이 코드를 AWS 인스턴스와 공장의 로봇 간에 돌리면 터미널에 레이턴시: 12400 μs 같이 무자비한 현실 수치가 찍힐 것이다. 여기서부터 당신의 튜닝(LTO 켜기, Batching 적용하기) 결과물이 어떻게 숫자를 깎아내려 가는지를 관찰하라.
4. 시스템 최적화 전/후의 Throughput 성능 지표 분석
6.9.1 ~ 6.9.3 장을 거쳐 완성한 시스템의 벤치마크 데이터를 추출했다면, 당신은 상사에게 보고할 아키텍처 리포트를 써야 한다. “C++ 코드에 약간의 튜닝을 가했더니 Zenoh가 C++ DDS를 발라버렸습니다.“라는 말을 숫자로 증명해 보자.
4.0.1 [Runbook] 최적화 3단계 페이즈 리포트
Stage 1: 순정 코드 (안티패턴 혼재)
- 행동: 배열을
std::string로 매번 복사하고, Lidar가 100번 루프를 돌 때마다 1번씩pub.put()을 즉시 때렸다. - 성능: Throughput(초당 전송량)
250 MB/s, CPU 점유율85%. - 문제점: TCP 커널 콜(Syscall)이 너무 많이 발생하여 OS 스케줄러가 죽어납니다.
Stage 2: 배치(Batching)와 제로 카피 도입
- 행동: 6.9.2의 큐잉 아키텍처를 도입하고, 100Hz 패킷을 10개씩 모아서 버퍼(
std::move)로 이동시켜pub.put()송출 빈도를 줄였다. - 성능: Throughput
800 MB/s, CPU 점유율40%. - 결과: 네트워크 I/O 호출이 감소하여 C++ 워커 스레드가 숨을 쉬기 시작함.
Stage 3: 궁극의 Shared Memory (로컬 실험 한정)
- 행동: 라우터와 클라이언트가 동일 리눅스 서버에 있다는 가정하에, 6.5.3장의
/dev/shm공유 메모리 포인터 전술을 이지스함의 방패처럼 씌웠다. - 성능: Throughput
14.5 GB/s (메모리 버스 한계), CPU 점유율5%. 레이턴시0.1 μs. - 결과: OS 커널의 복사 본능 자체를 거세해 버림으로써, 무거운 AI 자율주행 프로세스가 통신 때문에 병목 걸리는 참사 완전 해결.
이 파이프라인의 완성은 당신이 단순한 “C++ 코더“가 아니라 “분산 네트워크 아키텍트“로 진화했음을 증명한다. Zenoh의 Rust 기반 라우팅 코어와 당신의 정교한 C++ 프론트엔드가 결합된 이 엔진은 현존하는 그 어떤 미들웨어보다 가볍고, 미친 듯이 빠르다.