5.5 분산 쿼리(Distributed Queries)와 RPC 구현
Pub/Sub은 훌륭한 패턴이지만, 클라이언트가 원할 때 콕 집어 “야, 로봇 3번! 니 배터리 상태 지금 당장 뱉어내!“라고 명령을 내릴 수 없다.
REST API 방식의 Request/Response 모델이 필요할 때, Zenoh는 HTTP 서버를 무겁게 띄우는 대신 빠르고 기민한 Queryable / Querier 아키텍처를 제공한다.
이것은 단순한 RPC(Remote Procedure Call)를 넘어선다. 라우터가 “현재 접속된 모든 로봇의 상태를 가져와라(robot/*/status)“라는 쿼리를 한 번에 뿌려주고, 돌아오는 수백 개의 대답을 병합(Consolidation)해서 클라우드에 쏴 주는 기적의 분산 데이터베이스 인터페이스다.
이 런북에서는 이 매혹적인 기능을 어떻게 Rust 백엔드 스레드에서 안전하고 폭발적으로 구현하는지 그 극비 노하우를 방출한다.
1. Queryable 등록 - 네트워크로부터의 데이터 요청 수신 처리
“누군가 내 데이터를 원하면 그때 그 순간의 값을 계산해서 던져주겠다.”
이 역할을 수행하는 엔티티를 Queryable(질문을 받을 수 있는 자)이라고 부른다.
1.0.1 [Runbook] 데이터 제공 서버(RPC Server) 구현 전술
express.js에서 라우팅 핸들러를 짜는 것과 동일한 감각으로 Zenoh Queryable 스트림을 구축한다.
1. RPC 응답 대기 루프 띄우기
로봇의 메인 CPU(Rust) 안에서 관제 클라우드의 질문에 답변하는 데몬 스레드를 하나 선언하자.
use futures::prelude::*; // 비동기 스트림 활용을 위한 traits
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).res().await.unwrap();
// 내가 "로봇 1번의 배터리"에 대한 질문(Query)을 처리하겠다고 네트워크에 신고!
let mut queryable = session
.declare_queryable("robot/1/battery")
.res()
.await
.unwrap();
println!("배터리 질의(Query) 요청을 무한 대기합니다...");
// 누군가(Querier)가 REST API get 치듯이 질의를 던지면 이 루프가 돈다.
while let Ok(query) = queryable.recv_async().await {
println!("[Query 수신] 누군가 배터리를 묻습니다: {:?}", query.selector());
// 1. 센서에서 진짜 물리적인 배터리 잔량을 즉석으로 읽어온다 (Mocked)
let current_battery = "78%";
// 2. 대답(Reply)을 구성해서 패킷을 쏜 당사자에게 되돌려준다!
query.reply(query.selector(), current_battery).res().await.unwrap();
}
}
이 백그라운드 스레드는 로봇이 꺼질 때까지 살아서 수만 번의 조회 요청(GET)을 블로킹(Blocking) 없이 비동기로 처리해 낸다. 이것이 Zenoh의 제로 오버헤드 마이크로서비스(Microservice)다.
2. Querier를 통한 분산 데이터 검색 및 읽기(get)
데이터(Queryable)를 기다리게 만들었으니, 이번엔 클라우드 관제 대시보드 쪽에서 당당하게 질문(Query)을 던져 데이터를 가져올 차례다.
2.0.1 [Runbook] 마이크로서비스(RPC Client) 호출 기법
Zenoh의 조회(get) 함수는 단일 노드의 결괏값을 가져오는 것이 아니라, 망 전체에서 일치하는 모든 대답(Replies) 을 다 쓸어 담(Pull)아 오는 거대한 비동기 수확기다.
[코드 레벨 데이터 일발 장전]
use futures::prelude::*;
#[tokio::main]
async fn main() {
let session = zenoh::open(Config::default()).res().await.unwrap();
println!("전 세계의 로봇 1번에게 배터리 상태를 물어봅니다...");
// session.get() 을 호출하는 순간, 이 질문이 네트워크 트리에 브로드캐스트된다!
let mut replies = session.get("robot/1/battery").res().await.unwrap();
// 답변자가 1명일수도, 10초 타임아웃 안에서 10명일수도 있으므로 스트림으로 쭉 뽑는다.
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => {
let rcv_payload = String::from_utf8_lossy(&sample.value.payload.contiguous());
println!("[수신 응답] 타겟: {}, 값: {}", sample.key_expr.as_str(), rcv_payload);
}
Err(err) => {
eprintln!("[네트워크 에러] Queryable 쪽에서 라우팅 오류 발생: {}", err);
}
}
}
}
HTTP/REST API였다면 타겟 로봇의 IP 주소를 일일이 알아내서 curl을 돌려야 했겠지만, Zenoh에서는 robot/1/battery라는 키 이름만으로 해시맵(DHT)을 뒤져서 데이터를 가져다 바친다. IP 주소 체계의 완벽한 붕괴이자, 데이터 중심(Data-centric) 아키텍처의 승리다.
3. 쿼리 파라미터(Selector) 구성 및 페이로드 전달 기법
단순히 “배터리 내놔“가 전부가 아니다. “ID가 50 이하인 로봇 중에 온도가 40도 이상인 놈들만 찾아라” 같은 복잡한 조건식(Parameters)을 담아 던져야 진짜 마이크로서비스다.
Zenoh의 쿼리 셀렉터(Selector) 문법은 웹의 URI?QueryString 구조와 완벽히 치환된다.
3.0.1 [Runbook] Selector를 활용한 동적 동기화 쿼리 전술
1. 파라미터를 담아 총알 수 쏘기 (Querier 입장)
클라우드 대시보드에서 ?format=json&unit=celsius 같이 덕지덕지 조건을 붙여 쏜다.
// '?' 뒤에 REST 파라미터와 동일한 스펙문법 부여 (Selector)
let selector = "robot/1/temp?format=json&unit=celsius";
// 페이로드가 필요하면 .with_value() 안에 덩어리를 더 태워 보낼 수도 있다.
let mut replies = session.get(selector)
.with_value("명령어 바디(Body)")
.res()
.await
.unwrap();
2. 깡통에서 조건 분기하기 (Queryable 입장)
질문을 받은 로봇은 이 파라미터(Query String)를 파싱해서 똑똑하게 분기 처리한다.
while let Ok(query) = queryable.recv_async().await {
// 1. 누가 어떤 덕지덕지 붙은 조건으로 질의했는지 파싱
let params = query.parameters();
// 2. 비즈니스 로직 분기
if params.contains("unit=celsius") {
query.reply(query.selector(), "36.5").res().await.unwrap();
} else if params.contains("unit=fahrenheit") {
query.reply(query.selector(), "97.7").res().await.unwrap();
} else {
// [예외 처리] 멍청한 질문에는 에러를 반환
query.reply_err("지원하지 않는 포맷입니다.").res().await.unwrap();
}
}
이 파라미터 전송 기법(Selector Parameters)은 로봇과 클라우드 간의 동기식 프로시저 콜(gRPC 대용)로 쓰이는 완벽한 통신 마법진이다.
4. 비동기식 쿼리와 동기식(블로킹) 쿼리의 성능 비교
“Rust의 비동기(async/await)는 최고다“라는 맹신에 빠져 모든 것을 Tokio 스레드 풀에 던지는 경우가 있다. 하지만 극단적으로 폐쇄된 임베디드 코어나 1ms 레이턴시가 보장되어야 하는 수학 연산 스레드에서는 비동기 컨텍스트 스위칭(Context Switching)조차 독이 될 수 있다.
4.0.1 [Runbook] 워커 스레드 점유 전술 (Async vs Sync)
전술 1. 비동기(recv_async()) 쿼리대기 - I/O 대역폭 최우선 (웹서버형)
- 사용처: 클라우드 인스턴스, 스레드 병렬 처리가 중요한 엣지 게이트웨이 코어.
- 특징: 기다리는 동안 다른 함수(IO 작업)에 CPU 양보.
// 수만 대의 로봇 응답을 기다리면서 CPU를 블로킹하지 않고 다른 Task 를 돌린다.
while let Ok(reply) = replies.recv_async().await {
// ...
}
전술 2. 동기/블로킹(recv()) 쿼리대기 - 하드 리얼타임(OS 레벨) 최우선
- 사용처: 초소형 ARM 코어(라즈베리 파이 단일 스레드), 로봇 팔 제어의 실시간 모션 동기화 루프.
- 특징:
await구문이 빠진다. 즉시 그 자리에서 스레드를 확고하게 멈추고 네트워크 응답이 올 때까지 캐시를 지키며 대기한다. Tokio 풀에 의존하지 않고 OS 커널 스케줄러 영역에서futex블로킹을 건다.
// .await 가 없다! 그냥 멈춰서 기다린다. CPU 레이턴시 변동폭이 0에 수렴.
while let Ok(reply) = replies.recv() {
// 로봇 모션 제어 변수 조작
}
[아키텍처 인스펙션]
하나의 단일 코어에서 CPU 사용률이 100%를 치는 복잡한 수학 연산(자율주행 비전 등) 중이라면 무조건 비동기로 스케줄을 회피해야 한다. 반면 “이 데이터를 받지 않으면 다음 모터를 돌릴 수 없다“는 직렬 파이프라인이라면 C 언어처럼 정직한 recv() 블로킹 전술이 문맥 교환 오버헤드를 줄여 속도를 11% 가량 끌어올린다(Criterion 벤치마크 기준).
5. 여러 Queryable의 응답 병합(Consolidation) 및 충돌 해결 전략
와일드카드 쿼리의 가장 큰 함정이 여기에 있다.
session.get("robot/*/status") 라고 클라우드가 물었을 때, 전 세계의 1만 대의 로봇이 일제히 “나 살아있소” 라며 패킷을 던지기 시작하면 클라우드 서버의 소켓 버퍼가 터져버린다(DDoS 현상).
Zenoh의 발군 트릭은 바로 중재 라우터 단에서의 데이터 병합(Consolidation) 에 있다.
5.0.1 [Runbook] 라우터 트리(Tree) 응답 병합 설계
1. 클라우드 쿼리 발사 (Querier)
let replies = session.get("robot/*/status")
// [보호막] 응답을 취합하여 하나로 합치는 Consolidation 기능을 강제 활성화!
.consolidation(zenoh::query::ConsolidationMode::Monotonic)
.res()
.await
.unwrap();
2. 라우터(Router) 계층의 마법 (Split-Merge)
- 클라우드 라우터가 이 질문을 공장 A라우터, B라우터로 찢어서 보낸다(Split).
- 공장 A라우터 밑에 5,000대의 로봇(Queryable)이 자신들의 데이터를 던진다.
- 이때 클라우드로 패킷이 5,000개 날아가는 것이 아니다.
공장 A라우터가Consolidation지시를 읽고, 5,000개의 대답을 하나의 거대한 JSON 청크(혹은 바이트 배열)로 지퍼로 채우듯 병합(Merge) 한다. - 클라우드 서버는 최종적으로 공장 A병합본 1개, B병합본 1개 (총 2개의 패킷)만 받아 우아하게 역직렬화한다.
[충돌(Collision) 해결 전술]
“만약 두 로봇이 실수로 동일한 토픽(예: robot/1/status)으로 똑같은 위치에서 대답을 한다면 누굴 살려야 하는가?”
이것이 분산 시스템의 철학(Split-Brain) 문제다.
ConsolidationMode::Latest 옵션을 설정하면, Zenoh 해시 엔진 내부에 탑재된 HLC (Hybrid Logical Clock) 알고리즘이 마이크로초 단위의 타임스탬프를 비교하여 더 ‘최신’ 물리 시간에서 파생된 데이터 1개만 살리고 나머지는 네트워크 계층에서 도태(Drop) 시켜 버린다. 클라이언트 개발자는 데이터 충돌 처리에 단 한 줄의 코드도 쓸 필요가 없다.