8.4 데이터 발행(Publish)과 구독(Subscribe) 아키텍처 구현

8.4 데이터 발행(Publish)과 구독(Subscribe) 아키텍처 구현

기존의 Node.js 개발자가 백엔드 통신을 구현할 때 가장 먼저 켜는 모듈은 axios 거나 Express 였다. 이는 ’명령’과 ’응답’이 단선적으로 묶여있는 구조다.
하지만 우주 단위로 흩어져 있는 IoT 로보틱스 생태계에서는 이런 1:1 대화 방식이 통하지 않는다. 데이터의 생산자(Publisher) 는 누가 내 데이터를 듣고 있는지 관심이 없고, 소비자(Subscriber) 도 이 데이터가 도쿄의 로봇에서 왔는지 서울의 위성에서 왔는지 관심이 없이 오로지 “경로(Key Expression)” 에만 집중하는 탈중앙화 철학.

이 챕터는 TypeScript 환경에서 가장 효율적이고 직관적인 논-블로킹(Non-blocking) Pub/Sub 아키텍처를 세우는 법을 다룬다. 백엔드의 V8 엔진 레벨에서 데이터를 어떻게 우아하게 메모리에 올리고, React 컴포넌트 안으로 어떻게 흘려보내는지 그 코드를 조립해 본다.

1. Publisher 인스턴스 생성 및 QoS(Quality of Service) 옵션

TypeScript 생태계에서 데이터를 쏠 때 두 가지 갈림길이 있다.
어쩌다 한 번 쏠 것이냐(session.put), 아니면 1초에 60번씩 기관총처럼 쏠 것이냐(session.declarePublisher). 후자가 아니면 V8 엔진의 파싱 오버헤드로 인해 프레임 다운이 발생한다.

1.0.1 [Runbook] V8 엔진 최적화된 퍼블리셔 전개 전술

퍼블리셔 인스턴스를 유지(Keep-alive)하여 경로 해석 오버헤드를 0으로 만든다.

1. 거치형 기관총 (Publisher) 패턴

import * as zenoh from "@eclipse-zenoh/zenoh";

async function runHighSpeedRadar(session: zenoh.Session) {
    // 1. 발사대 거치: Rust 코어가 미리 B-Tree 라우팅 테이블을 준비해둔다.
    // (매번 "radar/1/data" 경로를 문자열 파싱하지 않기 위한 튜닝)
    const pub = await session.declarePublisher("radar/1/data");
    console.log("🚀 레이더 퍼블리셔 가동 완료");

    // 2. 고속 루프 (1초에 60발 쏴보기)
    for (let i = 0; i < 60; i++) {
        // [주의] payload는 무조건 Node.js 의 Buffer나 브라우저의 Uint8Array 형태여야 한다!
        const payload = Buffer.from(JSON.stringify({ angle: i, distance: 10.5 }));
        
        // await pub.put() 을 쓰면 네트워크 Ack를 기다리느라 블로킹이 발생한다.
        // 그냥 던지기(Fire-and-forget)를 하려면 await를 뺀다!
        pub.put(payload); 
    }
    
    // 수명 종료 시 반드시 객체를 부숴야 WebAssembly/Rust 영역에 메모리 릭이 안 생긴다.
    pub.undeclare();
}

2. 통신 흐름 통제: QoS (Congestion Control) 옵션
네트워크가 막히면 데이터가 터져나가는 환경에서, 과거의 낡은 데이터를 버리고 최신 데이터로 밀어낼 것인지 방어막을 친다.

const opts = {
    // DROP: 현재 쏠 데이터가 네트워크 대기 큐에 막혔을 경우 과거 데이터를 던져버리고 끼워넣는다.
    // BLOCK: 큐가 막히면 해결될 때까지 V8 백그라운드 스레드가 멈춰 기다린다.
    congestionControl: zenoh.CongestionControl.DROP,
    
    // [보너스] 백그라운드의 패킷 포워딩 우선순위 (0이 가장 빠름)
    priority: zenoh.Priority.REAL_TIME 
};

// 퍼블리셔 객체를 선언할 때부터 이 규칙(QoS)을 각인시킨다.
const pub = await session.declarePublisher("camera/front", opts);

이 QoS 옵션 설정을 간과하고 비전 데이터(Raw Image)를 쏘다가 WiFi 가 끊기면, 1분 뒤 다시 연결될 때 1분 전 과거의 이미지가 밀려 들어오는 최악의 데드록 현상을 맛보게 될 것이다.

2. Subscriber 생성 및 비동기 이벤트 콜백(Callback) 매핑

수신자(Subscriber)는 백그라운드 엔진이 받아온 C 배열 데이터를 JavaScript 메모리 힙 위로 꺼내주는 유일한 통로다.

2.0.1 [Runbook] 데이터 방파제(Event Loop) 안착 전술

JS의 가장 큰 축복인 비동기 콜백 패턴을 이용하여 Zenoh 엔진과 V8 엔진을 매끄럽게 결속한다.

import * as zenoh from "@eclipse-zenoh/zenoh";

async function globalCommandListener(session: zenoh.Session) {
    console.log("📡 지휘소 전파 수신 대기 중...");

    // declareSubscriber는 콜백 함수 하나를 요구한다.
    // 이 콜백은 메인 스레드 이벤트 루프 큐에 스케줄링되어 차근차근 실행된다.
    const sub = await session.declareSubscriber(
        "fleet/**/command", 
        (sample: zenoh.Sample) => {
            // 1. 메타 데이터 확인: 누가 쐈는가?
            const fromTarget = sample.keyExpr;
            
            // 2. 페이로드 해독: Uint8Array 상태인 바이트를 문자열로 복원
            // (브라우저라면 TextDecoder 사용, Node 생태계면 Buffer 사용)
            const textDecoder = new TextDecoder();
            const msg = textDecoder.decode(sample.payload);
            
            console.log(`[수신 완료] 경로: ${fromTarget} | 내용: ${msg}`);
            
            // 콜백 안에서 무거운 CPU 연산(for 무한루프)을 돌리면 다른 패킷 수신이 멈춘다!
            // 무거운 연산은 여기서 await 로 백그라운드 웹 워커로 넘겨버리는 것을 권장한다.
        }
    );
    
    return sub;
}

[아키텍처 인스펙션]
이 콜백은 절대 “하나의 패킷이라도 놓치지 않고 100% 호출“되지 않는다! 만약 퍼블리셔 측에서 QoS를 DROP으로 설정하고 미친 듯이 연사했다면, 물리 네트워크 망이나 시스템 버퍼 상황에 의해 일부 패킷이 증발한 채 최긴 패킷들 위주로만 구독 콜백 창에 떨어지게 된다. 완벽한 도달 보장이 필요하다면 ZBuf 기반의 혼잡 제어 BLOCK 세팅이 동반되어야 한다.

3. Key Expression의 구조적 설계와 와일드카드(*, ``) 매칭 기법

REST API 설계 시 URL을 /api/v1/users/info 처럼 짜듯, 데이터 중심의 Zenoh 네트워크에서는 데이터의 이름표 (Key Expression) 설계가 아키텍처의 전부다.

3.0.1 [Runbook] 경로명 설계와 와일드카드 지배 전술

이름을 잘 지어두면 TypeScript 단연산 몇 번으로 지구 반대편의 장비들을 무더기로 조회할 수 있다.

와일드카드의 차이점

  • * (싱글 와일드카드): URL 계층(슬래시 사이) 딱 한 층(One-level) 만 모든 것을 허용한다.
  • ** (더블 와일드카드): 하위 계층이 몇 층이건 간에 그 밑에 있는 모든(All-level) 경로를 빨아들인다.

[실전 토폴로지 예시]

  • factory/seoul/robot/1/battery
  • factory/seoul/robot/1/cpu
  • factory/busan/robot/99/battery
// 1. 단일 노드 정밀 타격
await session.declareSubscriber("factory/seoul/robot/1/battery", callback);

// 2. (*) 활용: 서울에 있는 '모든 로봇'의 배터리만 모아라!
// -> seoul/robot/1/battery, seoul/robot/2/battery 등에 반응
await session.declareSubscriber("factory/seoul/robot/*/battery", callback);

// 3. (**) 활용: 전 세계 모든 도시의 로봇이 뿜어내는 '모든' 상태 데이터를 다 잡아내라!
// -> factory/... 밑에 글씨가 뭐가 있든 모조리 다 수신함.
await session.declareSubscriber("factory/**", callback);

TypeScript 생태계 측면에서, Key Expression 구조를 프론트엔드의 상태 관리(Redux, Zustand) 트리에 1:1로 매핑시키는 패턴(Mapping Pattern)이 자주 쓰인다. 구독한 factory/seoul/robot/*/battery 정보를 파싱할 때 sample.keyExpr.split('/') 함으로써 손쉽게 robot_id 값을 뽑아내고 Redux Store의 슬라이스를 업데이트하는 방식이 가장 우아하다.