5.4 데이터 발행(Publish)과 구독(Subscribe) 메커니즘
Zenoh의 통신 패턴은 Pub/Sub, 쿼리, RPC 등 다양하지만, 결국 모든 데이터 분산 아키텍처의 알파이자 오메가는 “누가 데이터를 쏘고(Publish), 누가 그 데이터를 받아먹는가(Subscribe)” 구도로 귀결된다.
Rust에서 이 메커니즘을 구현하는 것은 극도의 스릴을 제공한다. C++처럼 메모리가 터질까 봐 콜백(Callback) 지옥에서 허우적댈 필요 없이, 우아한 비동기 스트림(Stream)과 컴파일러가 보장하는 소유권(Ownership) 통제하에서 10GB/s 급의 데이터를 마음껏 퍼부을 수 있기 때문이다. 이 장에서는 단순한 튜토리얼 수준을 넘어, 스레드 혼잡(Congestion)이 발생했을 때 백프레셔(Backpressure)를 어떻게 견뎌내는지, 실제 상용 로봇에서 쓰이는 QoS(Quality of Service) 튜닝 런북을 파고든다.
1. Key Expression의 이해와 Rust 매크로(keyexpr!) 활용법
스트링(String) 타입 변수에 단순히 "robot/1/battery" 라고 적어서 데이터를 쏘는 행위는, 라우터 내부에서 스트링 파싱(Parsing) 비용을 끝없이 발생시키는 죄악이다.
Zenoh는 경로 매칭(Routing)의 성능을 극대화하기 위해 토픽 이름을 단순 문자열이 아닌 KeyExpr (Key Expression) 이라는 고유 타입으로 추상화했다.
1.0.1 [Runbook] 런타임 검증 vs 컴파일 타임 검증
안티 패턴 (런타임 오버헤드 발생)
// 매번 put()을 호출할 때마다 문자열이 유효한 토픽 포맷인지 정규식 검사를 수행한다! (비효율)
session.put("robot/1/battery", "80%").res().await.unwrap();
[정석 1] 매크로를 이용한 컴파일 타임 0비용 파싱
use zenoh::key_expr::keyexpr;
// 컴파일러가 빌드할 때 토픽 문법(Syntax)을 검증해버린다.
// 런타임에는 스트링 검사 비용이 수학적으로 완벽한 '0'이다.
const BATTERY_TOPIC: &'static keyexpr = keyexpr!("robot/1/battery");
session.put(BATTERY_TOPIC, "80%").res().await.unwrap();
[정석 2] 동적 경로 조합 시 메모리 재할당 최소화
로봇 ID가 실행 시점에 정해진다면 포맷팅해야 하지만, 이 역시 String 대신 최적화된 빌더를 사용한다.
use zenoh::key_expr::OwnedKeyExpr;
let robot_id = 5;
// 내부적으로 청크(Chunk) 단편화를 막는 전용 파서 사용
let dynamic_topic = OwnedKeyExpr::try_from(format!("robot/{}/battery", robot_id)).unwrap();
session.put(&dynamic_topic, "80%").res().await.unwrap();
마이크로 파라미터 튜닝을 사랑하는 Rust 엔지니어라면 절대 순정 String을 네트워크 발사대에 올리지 마라. keyexpr! 매크로는 당신의 라우팅 I/O를 10% 이상 끌어올리는 숨은 치트키다.
2. Publisher 객체 생성 및 데이터 전송 API (put, delete)
session.put()은 캐주얼하게 데이터를 쏠 때 쓰는 임시 방편이다. 똑같은 토픽(robot/1/lidar)으로 1초에 100번씩 쏴야 한다면? 매번 라우터 경로를 검증하고 세션을 뒤적이는 건 심각한 오버헤드다.
2.0.1 [Runbook] 전용 포신(Publisher) 구축
한 토픽으로 미친 듯이 연사(Burst)를 해야 한다면, 라우터 쪽에 “나 전용 파이프 뚫어놔!“라고 선언(declare_publisher)을 먼저 때리고 시작해야 한다.
1. 초고속 퍼블리셔 전개
use zenoh::key_expr::keyexpr;
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).res().await.unwrap();
// [핵심] 해당 토픽에 대한 라우터 라우팅 테이블 트리 연산을
// 매번 하는 게 아니라 단 한 번(Declare)으로 끝내버린다!
let publisher = session
.declare_publisher(keyexpr!("robot/1/lidar"))
.res()
.await
.unwrap();
// 이제 쏘기만 하면 된다. 성능 10배 폭발.
publisher.put("raw_bytes...").res().await.unwrap();
}
2. State 삭제 통보 (delete)
로봇이 망가졌거나 주차 모드로 들어갔을 때, 라우터가 임시 저장해 둔(Storage) 과거 상태 데이터를 어떻게 지울 것인가?
REST API의 DELETE 메서드처럼, Zenoh 네트워크 망 자체에 삭제 신호를 쏠 수 있다.
// 캐싱된 스토리지 데이터베이스와 다른 구독자들에게 "이 데이터는 폐기하라"고 브로드캐스트!
publisher.delete().res().await.unwrap();
이 delete 명령 모델 덕분에 Zenoh는 단순 Pub/Sub을 넘어서는 “분산 데이터베이스(Distributed Data-fabric)“로서의 생명력을 얻게 된다.
3. Subscriber 설정 및 비동기 스트림(Stream) 처리 모델
MQTT 등 구시대의 미들웨어 라이브러리를 쓰면 콜백 함수(Callback Fn)를 포인터로 넘기고, 그 함수 안에서 어떻게 외부 변수(공유 자원)를 조작할지 뮤텍스 락과 싸워야 했다.
Rust와 Zenoh의 만남은 이 콜백 지옥을 비동기 스트림(Stream) 이라는 궁극의 예술로 승화시켰다.
3.0.1 [Runbook] 무한 비동기 루프 수신 전술
패킷 찢어 먹기 (While Let 루프)
구독자(Subscriber) 변수 자체를 큐(Queue)처럼 쓸 수 있다.
use futures::prelude::*; // StreamExt 트레이트 활성화 필요
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).res().await.unwrap();
// '*' 와일드카드를 통해 1번부터 10번 로봇 데이터 싹쓸이 선언
let mut subscriber = session
.declare_subscriber("robot/*/battery")
.res()
.await
.unwrap();
println!("배터리 관제 모니터 대기중...");
// 콜백 함수? 그딴 건 없다.
// 들어오는 패킷을 비동기 이벤트 루프가 순차적으로 밀어넣어(Pull) 준다.
while let Ok(sample) = subscriber.recv_async().await {
let topic = sample.key_expr().as_str();
let payload = String::from_utf8_lossy(&sample.value.payload.contiguous());
println!("[수신] {} -> {}", topic, payload);
}
}
[아키텍처 인스펙션]
이 루프 모델(recv_async)의 무서운 점은 메모리 안전성이다. 컴파일러가 루프 내부 스코프에 넘겨주는 sample 변수는 오직 당신만이 소유권(Ownership)을 가진다. 아무리 복잡한 비즈니스 로직을 태워도 메모리가 꼬일 일이 없다!
4. 신뢰성(Reliability)과 혼잡 제어(Congestion Control) QoS 옵션 적용
영상 스트리밍이 조금 끊긴다고 로봇이 죽진 않는다. 하지만 “비상정지(E-Stop)” 패킷이 라우터 병목에 밀려 유실(Drop)된다면 인명 사고로 직결된다.
기본적으로 Zenoh는 미친 속도를 뽑아내기 위해 가장 효율적인 경로와 큐를 쓰지만, 엔지니어가 각 데이터의 목숨값(QoS, Quality of Service)에 따라 통제 플래그를 세워줘야 한다.
4.0.1 [Runbook] 프로덕션 레벨 QoS 부여
Publisher를 선언할 때나, put을 날릴 때 데이터의 생로병사를 제어할 수 있다.
전술 1. 신뢰성(Reliability) 배정
- 비상정지 명령 (절대 보장): 무조건 100% 도착해야 한다.
use zenoh::qos::Reliability; publisher .put("robot/1/cmd") .reliability(Reliability::Reliable) // TCP 레벨의 재전송 확인을 끝까지 쫓아간다. .res() .await .unwrap();
- **초당 30프레임 카메라 데이터 (가성비):** 중간에 1프레임 잃어버리는 것보다 최신 데이터를 빨리 받는 게 생명이다.
```rust
publisher
.put("robot/1/cam")
.reliability(Reliability::BestEffort) // 막히면 미련 없이 드랍(Drop)하고 내달린다.
.res()
.await
.unwrap();
전술 2. 혼잡 제어(Congestion Control) 우선순위
네트워크 백본 파이프가 꽉 차서 정체(Traffic Jam)가 왔을 때 어떤 놈을 살려낼지 결정해야 한다.
use zenoh::qos::CongestionControl;
publisher
.put("robot/1/emergency")
.congestion_control(CongestionControl::Block)
// 큐가 다 차도 패킷을 버리지 않고, 메모리가 허락하는 한 전송 스레드를 잠시 블로킹(Block) 시키면서까지 우겨넣는다!
.res()
.await
.unwrap();
이 2가지 다이얼(Reliability, Congestion Control)을 데이터의 특성에 맞게 조향하는 것이 당신이 초보에서 시니어 분산 아키텍트로 넘어가는 척도가 된다.
5. 백프레셔(Backpressure) 상황에서의 Subscriber 버퍼 관리
Publisher는 초당 1만 개의 데이터를 로켓처럼 쏘아 올리는데, 정작 데이터베이스에 접근해야 하는 맹꽁이 같은 클라우드 Subscriber는 초당 1,000개밖에 처리하지 못한다면 어떤 지옥도가 펼쳐질까?
메모리는 한계치까지 차오르고(OOM, Out of Memory), 큐는 폭발한다. 이 기현상을 백프레셔(Backpressure) 라고 부르며, 현장의 분산 시스템을 붕괴시키는 원인 1순위다.
5.0.1 [Runbook] Subscriber 큐 다이어트 및 버퍼 튜닝
상대방이 너무 빨라서 목이 멕힌다면, 받는 쪽(Subscriber)이 목구멍의 크기(Buffer Size)를 조절하고 넘치는 패킷을 강제로 깎아내야(Drop) 살 수 있다.
1. 채널 버퍼 용량(FIFO) 강제 제어
구독 선언 시 백그라운드 스레드가 쥐고 있을 최대 메모리 허용량을 지정한다.
use zenoh::prelude::r#async::*;
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).res().await.unwrap();
// [우주 방어선 구축]
// 큐 사이즈를 '1024개' 패킷으로 막아버린다!
// 1025번째 패킷이 날아오면 큐 정책에 따라 가장 오래된 패킷을 자동 파쇄(Ring Buffer) 시킨다.
let subscriber = session
.declare_subscriber("robot/*/lidar")
.with(zenoh::publication::ChannelSize(1024))
.res()
.await
.unwrap();
// 천하태평한 처리 로직 (1개 처리하는데 1시간이 걸려도 프로그램이 뻗지 않음)
while let Ok(sample) = subscriber.recv_async().await {
heavy_ai_inference(sample).await;
}
}
[아키텍트 철학] OOM과의 전쟁
클라우드 인스턴스는 한 달에 수백만 원짜리 장비가 아니다.
이 ChannelSize 튜닝을 누락하면, 무거운 AI 처리 백엔드가 트래픽 홍수에 직방으로 노출되고 결국 프로세스가 리눅스 OOM 킬러(OOM-Killer)에 의해 모가지가 날아간다. 백프레셔를 당해 패킷이 듬성듬성 유실되는(Drop) 것이 시스템 전체가 뻗는 것보다 1억 배 나은 아키텍처라는 점을 명심하라.