5.8 고급 API 기능 및 확장 활용

5.8 고급 API 기능 및 확장 활용

지금까지 살펴본 Pub/Sub과 Query가 “뼈대“라면, 이 장에서 다룰 Liveliness(생존 감지), Attachment(메타데이터 첨부), Shared Memory(공유 메모리)는 분산 시스템의 아키텍처를 엔터프라이즈 레벨로 격상시키는 정밀한 근육이다.

네트워크가 아예 끊어졌을 때 관제 대시보드에 즉각적으로 경고 알람을 울려야 하는가? 영상 스트림에 촬영한 카메라 렌즈의 스펙(메타데이터)을 끼워 넣고 싶은가? 혹은 코어 간 데이터 복사 비용을 수학적 ’0’으로 만들고 싶은가?
초보자들은 이 기능을 응용단(App Layer)에서 조잡한 타이머와 쓰레드를 돌리며 구현하려다 자멸한다. Zenoh는 이 모든 복잡성을 네트워크 계층 내부로 흡수하여 우아한 몇 줄의 Rust API로 제공한다. 진정한 백엔드 마스터 도메인으로 들어가 보자.

1. Liveliness 토큰 - 분산 노드의 생존 여부 감지 체계 구현

수천 대의 로봇을 모니터링할 때 “데이터가 들어오고 있다“는 사실만으로는 부족하다. 로봇이 터널 밑으로 들어가서 통신이 끊어졌을 때(Silent Drop), 그 침묵을 즉각적으로 인지하고 관제 화면의 아이콘을 빨간색으로 바꿔야 한다.

1.0.1 [Runbook] 심장 박동(Heartbeat) 감시 체계 자동화

Zenoh의 Liveliness Token은 응용 프로그램에서 while(ping)을 무한으로 날릴 필요 없이, 라우터 망 자체가 각 노드의 심장 박동을 모니터링해 주는 하드코어 기능이다.

1. 로봇(에지) 측: 생존 토큰 발급
로봇은 켜지자마자 자신의 목숨줄(Token)을 쥐고 있어야 한다.

use zenoh::key_expr::keyexpr;

#[tokio::main]
async fn main() {
    let session = zenoh::open(Config::default()).res().await.unwrap();

    // 내가 살아있음을 "robot/1/alive" 라는 공간에 등록 선언한다.
    let _liveliness = session
        .declare_liveliness(keyexpr!("robot/1/alive"))
        .res()
        .await
        .unwrap();

    // 로봇이 정상 작동하는 동안 이 변수(_liveliness)는 스코프 안에 살아있다.
    // 프로그램이 크러시 나거나 랜선이 뽑히면 라우터는 이 토큰을 '소멸' 처리한다.
    loop {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }
}

2. 브레인(클라우드 관제) 측: 부고 및 탄생 알림 감지
관제 시스템은 LivelinessSubscriber를 띄워, 전 세계 로봇들의 토큰이 살아서 켜지고(Alive), 죽어서 꺼지는(Dropped) 즉각(Interrupt) 이벤트만 통보받는다.

let mut lively_sub = session
    .declare_liveliness_subscriber("robot/*/alive")
    .res()
    .await
    .unwrap();

// 토큰 변화 감지 무한 루프
while let Ok(event) = lively_sub.recv_async().await {
    use zenoh::liveliness::LivelinessEvent::*;
    match event {
        Alive(token) => println!("[로그인] {} 로봇이 통신망에 접속했습니다.", token.key_expr.as_str()),
        Dropped(token) => eprintln!("[비상] {} 로봇 통신 두절!", token.key_expr.as_str()),
    }
}

개발자는 타임아웃 계산, Ping/Pong 스레드를 단 한 줄도 짤 필요가 없다.

2. Attachment - 메시지에 메타데이터를 첨부하는 방법

페이로드 버퍼(ZBytes) 안에 직렬화해서 넣기엔 데이터 구조가 지저분해지지만, 수신자가 이 데이터를 처리할 때 반드시 참고해야 하는 태그, 해상도, 타임스탬프 등 메타데이터(Metadata) 들이 존재한다.
이때 본문(Body)을 오염시키지 않고 우표(Attachment)를 붙여 편지를 보내는 기술이 있다.

2.0.1 [Runbook] HTTP 헤더(Header) 모방 전술

Zenoh의 Attachment 기능은 바이트 스트림 위에 키-값(Key-Value) 형태의 부가 정보를 올리는, 아키텍처 레벨의 ‘메모지’ 기능이다.

1. 송신자 측: 메타데이터 봉인

use std::collections::HashMap;

// 10MB짜리 원본 영상
let raw_image_bytes = vec![0u8; 10_000_000];

let mut attachment = HashMap::new();
attachment.insert("resolution".to_string(), "1920x1080".into());
attachment.insert("lens".to_string(), "wide-angle".into());
// 로봇의 자체 측정 응답 시간
attachment.insert("timestamp_ms".to_string(), "1691234567890".into());

publisher.put("robot/1/cam")
    .with_value(raw_image_bytes)
    // 본문을 뜯지 않고도 읽을 수 있는 봉투 겉면 메모 추가
    .attachment(attachment) 
    .res()
    .await
    .unwrap();

2. 수신자 측: 봉투 검열 후 파싱
수신부(Subscriber) 스레드에서는 무거운 10MB 본문을 다루기 전에 겉봉투를 읽고 처리 로직을 동적으로 바꿀 수 있다.

while let Ok(sample) = subscriber.recv_async().await {
    if let Some(attach) = sample.attachment {
        // [핵심 전술] 해상도가 다르면 무거운 백엔드 GPU 연산을 태우지 않고 즉시 반송(Drop)
        if attach.get("resolution").unwrap() != "1920x1080" {
            println!("해상도 규격 불일치. 해당 프레임을 폐기합니다.");
            continue;
        }
    }
    
    // 메타데이터 검열을 통과한 순수 비트맵 덩어리만 본문에 접근하여 조작
    process_image_on_gpu(sample.value.payload.contiguous());
}

이러한 Attachment 분리 설계는 데이터 처리 파이프라인(ETL)에서 무거운 디코딩 작업을 늦추고, 라우터 단에서 특정 태그를 검증 필터에 태우는 세련된 대공 방어망으로 응용된다.

3. Shared Memory(공유 메모리) 전송 모드 활성화 및 노드 간 IPC 최적화

ROS2 사용자나 임베디드 AI 자율주행팀이 Zenoh에 환장하는 유일무이한 이유가 바로 이 챕터다.

한 리눅스 장비 안에서 카메라 노드(C++)가 객체 인식 노드(Rust)로 초당 60프레임의 4K 이미지를 넘겨야 한다면?
TCP 루프백(127.0.0.1)으로 던지면 OS 커널로 데이터를 직렬화해서 내렸다가 다시 퍼올리는 미친 짓(Double Copy)이 일어난다. 우리는 이 CPU 바운드를 부수기 위해 공유 메모리(SHM) 의 마법을 쓴다.

3.0.1 [Runbook] 극단적 Zero-Copy 공유 영토 구축

Cargo.toml 의존성에 shared-memory가 켜져 있어야만 컴파일된다.

1. 공유 메모리 티켓 발행 (송신자)
메모리 한 짝에 영토를 만들고, 데이터 대신 “그 데이터가 있는 지도 좌푯값” 만 전송한다.

use zenoh::shm::ShmProvider;

let mut shm = ShmProvider::new(session.clone()).res().await.unwrap();

// 20MB 짜리 공터 사전 매입
let mut buffer = shm.alloc(20 * 1024 * 1024).res().await.unwrap();

// 버퍼에 4K 카메라 센서값을 다이렉트로 집어넣는다.
fill_image_to_buffer(&mut buffer);

// 데이터 자체가 아니라 공유 메모리 참조 티켓(Ticket)을 보낸다. (복사 비용 0)
publisher.put(buffer).res().await.unwrap();

2. 환희의 지도 수령 (수신자)
수신자는 받은 패킷이 인터넷 밖에서 날아온 데이터인지, 공유 메모리 포인터인지 고민할 필요가 없다. Zenoh 엔진이 패킷을 뜯어보고, 참조 포인터라면 즉시 리눅스 /dev/shm 영역의 메모리 주소를 매핑(mmap)해서 유저 스페이스 변수로 반환해 준다.

while let Ok(sample) = subscriber.recv_async().await {
    // 만약 sample 이 로컬 공유 메모리 티켓이라면,
    // 여기서 생성되는 bytes는 데이터 복제본이 아니라 메모리 원본의 그림자(Reference)다.
    let bytes = sample.value.payload.contiguous();
    
    // CPU 연산 '0'으로 20MB 데이터 획득 성공!
    run_ai_model(bytes);
}

오직 이 Shared Memory 아키텍처만이 단일 SoC 칩(NVIDIA Jetson, NXP 등) 위에서 복수의 마이크로서비스 노드들이 서로의 숨통을 조이지 않고 VRAM을 극한으로 태우게 허락한다.

4. 공간 데이터(Spatial Data) 처리를 위한 커스텀 플러그인 작성 기초

드론 수천 대가 하늘을 날며 자신들의 GPS 좌표(위도, 경도)를 클라우드 라우터로 무차별 살포한다고 상상해보라.
클라우드 대시보드에서는 “강남구 반경 5km 안에 있는 드론 데이터만 줘!” 라고 쿼리(Query)를 때리고 싶을 것이다.

기존 라우터 엔진으로는 “특정 텍스트로 시작하는 경로(Prefix)“는 찾을 수 있지만, 좌표평면 상의 거리 계산(Geometry)은 불가능하다. 이를 극복하려면 라우터 내부의 뇌(CorePlugin) 를 Rust로 직접 개조해 꽂아 넣어야 한다.

4.0.1 [Runbook] GIS 인메모리 반경 탐색 플러그인 스캐폴딩

Zenoh 라우터에 올라갈 .so 동적 라이브러리를 만들자.

1. Cargo.toml 컴파일 타입 선언

[lib]
crate-type = ["cdylib"] # 실행 파일이 아닌 동적 라이브러리로 떨군다.

[dependencies]
zenoh-plugin-trait = "0.10.1"
geo = "0.26" # 2D/3D 기하학 연산 라이브러리

2. 공간 필터 인터셉터(Interceptor) 코드 작성
로봇이 던지는 drone/1/location 데이터에 쿼리가 질의될 때 중간에 끼어든다.

use zenoh_plugin_trait::{Configuration, Plugin, PluginControl};

zenoh_plugin_trait::declare_plugin!(SpatialRouterPlugin);

pub struct SpatialRouterPlugin;

impl Plugin for SpatialRouterPlugin {
    type StartArgs = ();

    fn start(_name: &str, _conf: &Configuration, _args: ()) -> Result<PluginControl, ()> {
        println!("[GIS 가동] 2D 반경 탐색 플러그인이 코어 메모리에 적재되었습니다.");
        
        // 1. 여기서 라우터 로컬 세션(zenoh::open)을 연다.
        // 2. declare_queryable() 로 공간 질의(?lat=37&lon=127&radius=5km)를 무한 수신하는 백그라운드 스레드를 띄운다.
        // 3. 질의가 들어오면, 내부 캐시된 드론들의 좌표들과 삼각함수 거리를 계산(geo 패키지)하여
        // 4. 반경 안에 있는 드론들 데이터만 추려서(Filter) 응답(Reply)한다!
        
        Ok(PluginControl {})
    }
}

이 코드를 빌드하여 libspatial_router_plugin.so 로 빼낸 뒤 $LD_LIBRARY_PATH에 얹으면, 당신의 평범했던 라우터가 순식간에 PostGIS 급의 초고속 인메모리(In-memory) 공간 필터 엔진으로 돌변하게 된다.