6.6 고급 통신 제어 및 분산 시스템 토폴로지 관리

6.6 고급 통신 제어 및 분산 시스템 토폴로지 관리

데이터를 무사히 주고받는 데 성공했다면, 당신의 C++ 애플리케이션은 이제 겨우 걸음마를 뗀 것이다.
수백 대의 서버와 수만 개의 엣지(Edge) 센서가 뒤엉킨 진정한 의미의 분산 군집(Distributed Swarm) 환경에서는 데이터 그 자체보다 “이 데이터가 어디서 왔고, 보낸 놈이 지금 살아 있는지” 파악하는 감시망(Topology Management)이 시스템의 생사를 가른다.

이 챕터에서는 Zenoh 네트워크가 마치 살아있는 생명체처럼 호흡하는 과정을 코드 레벨로 해부한다. 다른 C++ 노드의 죽음을 즉각 감지하고 대체 서버를 띄우는 Liveliness 방어망, 그리고 본문(Payload)을 더럽히지 않고 센서의 설정값(해상도, 타임스탬프 등)을 비밀스럽게 끼워 파는 Attachment 기술을 살펴본다. 진정한 C++ 백엔드 시니어로 진입하기 위한 아키텍처 관문이다.

1. Liveliness 토큰을 활용한 원격 C++ 노드 상태(Presence) 감지

어제까지만 해도 잘 돌아가던 로봇 하나가 터널에 진입하더니 통신이 끊겼다. 메인 관제 서버의 대시보드에는 이 로봇이 언제 죽었는지(Disconnected) 표시조차 되지 않는다. C++로 무식한 타이머가 달린 자작(Custom) Heartbeat 핑 루프를 돌리고 싶은가?

Zenoh는 이 바보 같은 짓을 막고자, 코어 단에서 관리해 주는 생명줄인 Liveliness Token을 제공한다.

1.0.1 [Runbook] 심장 박동(Heartbeat) 토큰 발급 전술

자신의 생존을 망(Network) 전체에 맹세하는 클라이언트

#include "zenoh.hxx"
#include <iostream>
#include <thread>

int main() {
    auto config = zenoh::Config::create_default();
    auto session = zenoh::Session::open(std::move(config));

    std::cout << "센서 노드 부팅 완료. 라우터 통신 개시." << std::endl;

    // [핵심] 1. 내 목숨줄(Token)을 "presence/robot/1" 이라는 이름표를 붙여 라우터에 제출한다.
    auto liveliness = session.declare_liveliness("presence/robot/1");

    // 2. 이 liveliness 객체가 살아있는 동안, 라우터는 나를 'Alive' 상태로 전파한다.
    // 만약 라우터와 내 C++ 프로그램 간의 연결(TCP 등)이 끊어지면, 
    // 라우터는 즉시 'Dropped(죽음)' 상태를 망 전체로 뿌린다!
    
    // 강제로 살려두는 무한 루프. 
    // 여기서 C++ 프로그램이 강제 종료명령(Ctrl+C, Segfault)을 맞으면 소멸자가 호출되거나 세션이 물리적으로 끊긴다.
    while(true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    return 0;
}

이 코드를 박아 넣는 순간, 당신의 인프라는 개별 C++ 유닛의 연결 상태를 일일이 루프 돌면서 감시하지 않아도 된다. 토큰의 발급과 만료 관리는 전적으로 뛰어난 최적화를 자랑하는 Zenoh 하부 라우팅 엔진이 위임받는다.

2. Liveliness 상태 변경 이벤트 수신 및 장애 조치(Failover) 구현

앞서 엣지 단말기가 자신의 Liveliness 토큰을 내걸었다. 이제 클라우드나 관제 센터의 중앙 C++ 백엔드는 이 토큰들의 탄생과 소멸을 실시간으로 도청하여, 죽은 놈의 흔적을 재빨리 치우는 청소 로직을 가동해야 한다.

2.0.1 [Runbook] 전역 관제망(Monitoring) 이벤트 콜백 구축

LivelinessSubscriber를 열어 망 내의 상태 전이(State Transition) 이벤트를 감지한다.

#include "zenoh.hxx"
#include <iostream>

void run_failover_coordinator(zenoh::Session& session) {
    // [중요 망각 방지]
    // 와일드카드(**)를 써서 'presence' 밑으로 떨어지는 전 세계 모든 기기의 탄생과 죽음을 훑는다!
    auto presence_sub = session.declare_liveliness_subscriber(
        "presence/**",
        [](const%20zenoh::Sample&%20sample) {
            auto client_id = sample.get_keyexpr().as_string_view();
            
            // Liveliness의 상태(종류)는 Z_LIVELINESS_ALIVE 인지 DROPPED 인지로 나뉜다.
            switch (sample.get_kind()) {
                case zenoh::SampleKind::Z_SAMPLE_KIND_PUT: 
                    // 누군가 살아있다고 Token을 발급한 상태 (ALIVE)
                    std::cout << "[시스템 관제] " << client_id << " 단말기가 통신망에 새롭게 입장했습니다!" << std::endl;
                    break;
                    
                case zenoh::SampleKind::Z_SAMPLE_KIND_DELETE:
                    // 누군가의 Token이 소멸된 상태 (DROPPED / DEAD)
                    std::cerr << "[긴급 알람 🚨] " << client_id << " 통신 두절! 데이터 플로우를 대체 서버로 우회(Failover)합니다." << std::endl;
                    
                    // TODO: 데이터베이스에 '오프라인' 마킹, 혹은 다른 스레드를 깨워 알람 문자 전송
                    break;
                    
                default:
                    break;
            }
        }
    );
}

int main() {
    auto session = zenoh::Session::open(zenoh::Config::create_default());
    std::cout << "글로벌 재난 감시탑 구동 완료..." << std::endl;
    run_failover_coordinator(session);
    
    // 관제탑 영구 구동
    while(true) { std::this_thread::sleep_for(std::chrono::seconds(10)); }
    return 0;
}

이러한 Liveliness 기반의 옵저버(Observer) 패턴은, Kubernetes의 파드(Pod) 모니터링이나 미들웨어 클러스터의 무중단 이중화(High Availability) 아키텍처를 완벽에 가깝게 구현해 내는 가장 값진 전술이다.

3. Key Expression 매칭 리스너(Matching Listener)를 통한 동적 구독 관리

백엔드 라우터 단에서 “지금 video/1080p 를 구독하고 있는 사용자가 1명이라도 있는가?” 를 알고 싶을 때가 있다.
클라이언트 1명도 없는데 비싼 GPU 자원을 돌려가며 비디오 프레임을 1080p로 인코딩해 봤자 망 트래픽만 잡아먹는 미친 짓이다(Compute Waste).
아무도 보지 않으면 카메라 센서를 끄고(Zz), 누군가 구독(Subscribe)을 누르는 순간 카메라를 켜는(Wake) 런북.

3.0.1 [Runbook] 수요 지향형(Demand-Driven) 자원 활성화 전술

Zenoh는 네트워크 상의 누군가가 특정 Topic을 구독(Subscribe)하거나 취소(Drop)하는 행위마저도 콜백으로 받아칠 수 있는 Matching Listener 기능을 갖췄다.

#include "zenoh.hxx"
#include <iostream>
#include <atomic>

// 누군가 데이터를 보고 싶어하는지 상태 트리거
std::atomic<int> g_viewer_count{0};

void start_resource_manager(zenoh::Session& session) {
    // "이 KeyExpr 에 관심이 있는 놈이 생기거나 사라지면 나한테 알려라!"
    auto matching_sub = session.declare_subscriber(
        "@/router/local/subscriber/+/camera/feed", // 시스템 메타 채널 도청
        [](const%20zenoh::Sample&%20sample) {
            if (sample.get_kind() == zenoh::SampleKind::Z_SAMPLE_KIND_PUT) {
                g_viewer_count++;
                std::cout << "새로운 뷰어 입장. 현재 시청자 수: " << g_viewer_count << std::endl;
                
                if (g_viewer_count == 1) {
                    std::cout << "[시스템] GPU 인코더를 기동합니다! (Cold Start)" << std::endl;
                }
            } else if (sample.get_kind() == zenoh::SampleKind::Z_SAMPLE_KIND_DELETE) {
                g_viewer_count--;
                std::cout << "뷰어 퇴장. 현재 시청자 수: " << g_viewer_count << std::endl;

                if (g_viewer_count == 0) {
                    std::cout << "[시스템] 시청자가 없습니다. GPU와 카메라 렌즈를 끕니다." << std::endl;
                    // TODO: 하드웨어 전력 차단 로직
                }
            }
        }
    );
}

(참고: Zenoh 0.10 이상 버전부터는 MatchingListener API 스펙이나 시스템 토픽(@/) 규격이 지속적으로 고도화되고 있다. 핵심 아키텍처는 “클라이언트의 구독 여부를 서버가 인지할 수 있다“는 데 있다.)
이 기법을 응용하면, 아무도 듣지 않는 텔레메트리 데이터를 보내느라 낭비되는 AWS 엣지 클라우드의 수백만 원짜리 아웃바운드 트래픽 요금을 완벽하게 방어할 수 있다.

4. Attachment를 활용한 메타데이터 전송 및 수신

C++ 로 카메라 프레임을 직렬화해 보낼 때, 영상의 크기(1920x1080)나 엔코딩 방식(H.264), 송신 시각(Timestamp)을 어떻게 전달할 것인가?
비디오 버퍼 앞에 억지로 C struct 헤더를 붙이는(Prefix) 개발자가 제일 한심하다. 이렇게 본문을 오염시면, 라우터가 “H.264 코덱만 필터링“하고 싶어도 무거운 비디오 본문을 일일이 뜯어봐야 하는 파싱 지옥이 열린다.

Zenoh는 이진(Binary) 배열 본문을 1비트도 건드리지 않고, 봉투 겉면에 메모지를 붙이는 Attachment(부착물) 시스템을 제공한다.

4.0.1 [Runbook] 비파괴(Non-Destructive) 메타데이터 태깅 전술

#include "zenoh.hxx"
#include <iostream>
#include <vector>

// ----------------- [송신부 런북] -----------------
void send_with_attachment(zenoh::Publisher& pub) {
    // 거대한 영상 원본
    std::vector<uint8_t> huge_raw_image(5 * 1024 * 1024, 128);

    // 보내기 위한 옵션 객체 생성
    zenoh::PutOptions opts;

    // 겉봉투에 HTTP 헤더처럼 Key-Value 메모를 작성한다!
    opts.attachment().insert("resolution", "1920x1080");
    opts.attachment().insert("codec", "raw");
    opts.attachment().insert("camera_id", "cam-A1");

    // 본문 오염 없이 전송
    pub.put(std::move(huge_raw_image), opts);
    std::cout << "메타데이터가 우표처럼 첨부되어 발사 성공." << std::endl;
}

// ----------------- [수신부 런북] -----------------
void receive_and_filter(zenoh::Session& session) {
    auto sub = session.declare_subscriber("camera/feed", [](const%20zenoh::Sample&%20sample) {
        
        // 1. 그 무거운 Payload(5MB)를 병합하기 전에, 겉봉투부터 검열한다!
        if (sample.has_attachment()) {
            auto attach = sample.get_attachment();
            
            // C++ 맵(map)처럼 접근하여 속성 체크
            if (attach.get("codec").as_string() != "raw") {
                std::cout << "[Drop] 지원하지 않는 코덱입니다. 버퍼를 무시합니다." << std::endl;
                // return 해버리면 5MB Payload 는 복사도 안된채 패스(Drop) 되어 CPU '0' 소모!
                return;
            }
            std::cout << "[수용] 해당 프레임을 해석합니다. 해상도: " 
                      << attach.get("resolution").as_string() << std::endl;
        }

        // 2. 검열을 통과한 프레임에 대해서만 딥 카피(Deep 복원) 또는 접근 시도
        auto raw_video = sample.get_payload().as_vector();
        // ...
    });
}

메타데이터 분리 전술은 클라우드 네이티브의 기본 소양이다. 이 Attachment 시스템은 Zenoh-Flow나 데이터베이스(Backend) 플러그인과 결합될 때 데이터 필터링(Filtering/Where clause) 조건을 적용하기 위한 핵심 인덱스(Index) 용도로 눈부시게 활약한다.