6.4 질의응답(Query / Reply) 모델과 RPC(Remote Procedure Call)
MQTT처럼 맹목적으로 데이터를 뿌리기만 하는 브로드캐스트형 프로토콜의 시대는 지나갔다.
로보틱스 시스템에서는 “내 서버 온도를 초당 10번씩 뿌려줘“라는 Pub/Sub 모델도 필요하지만, “지금 당장 창고 B구역에 있는 로봇들의 배터리 잔량을 취합해서 나에게 보고해“처럼 내가 지시할 때만 동작하는 동기화 메커니즘도 필수적이다.
이것이 Zenoh의 Query(질의) 시스템이다. C++ 환경에서는 이를 통해 기존 gRPC 구성을 완전히 대체할 수 있는 폭발적인 분산 RPC(Remote Procedure Call) 파이프라인을 구축할 수 있다. gRPC처럼 복잡한 .proto 파일을 컴파일하고 Stub 코드를 만들 필요조차 없다.
이 챕터에서는 서버가 Queryable로 응답을 대기하고, 클라이언트가 std::future를 활용해 응답을 비동기로 낚아채는 아키텍처 레벨의 통신 런북을 파고든다.
1. Queryable 객체 생성을 통한 데이터 제공자(Provider) 구현
당신의 C++ 서버가 백엔드 DB와 연동되어 있다고 치자. 클라이언트가 “최신 로그 내놔“라고 외치면, 서버는 즉시 DB를 긁어(Read) 결과를 전송해 주어야 한다.
Queryable은 이 요청(Query)을 라우터로부터 받아채는 일종의 “콜백 서버“다.
1.0.1 [Runbook] 데이터 허브 응답기(Queryable) 스캐폴딩
1. Queryable 등록 및 람다 콜백 바인딩
#include "zenoh.hxx"
#include <iostream>
void run_database_provider() {
auto config = zenoh::Config::create_default();
auto session = zenoh::Session::open(std::move(config));
// "db/sensor/*" 로 들어오는 모든 질문에 내가 대답하겠다고 선언!
auto queryable = session.declare_queryable(
"db/sensor/*",
[](const%20zenoh::Query&%20query) {
auto key = query.get_keyexpr().as_string_view();
std::cout << "질의 접수: " << key << std::endl;
// 2. 파라미터(Selector) 추출
// 예: "db/sensor/temp?room=A" 의 파라미터 'room=A' 를 로깅
std::cout << "파라미터: " << query.get_parameters().as_string_view() << std::endl;
// 3. 내부 비즈니스 로직(DB 검색 등)을 거친 가상의 결과물
std::string result = "25.4 Celsius";
// 4. 요청이 들어온 그 파이프(Query)를 통해 답장(Reply)을 꽂아 넣는다!
query.reply(key, result);
}
);
std::cout << "데이터 소스 프로바이더 가동 완료. (Enter 종료)" << std::endl;
std::cin.get();
}
[아키텍처 인스펙션]
gRPC는 서버 주소(IP:Port)를 명시적으로 알아야 연결이 가능하다. 그러나 Zenoh의 Queryable은 서버 IP를 알 필요가 없다! 그저 “내 이름은 db/sensor야“라고 라우터에 외칠 뿐이다.
심지어 이 Queryable 객체가 죽어버리면 라우터는 알아서 트래픽을 다른 백업 Queryable로 돌려버리는 이중화(Fail-over) 라우팅을 C++ 코드 단 한 줄의 추가 없이 공짜로 제공한다.
2. 데이터 조회를 위한 질의(Query) 전송 메커니즘
이제 클라이언트 쪽(요청자)의 런북을 살펴본다.
Subscriber가 수동적으로 데이터가 오기만을 멍하니 기다리는 녀석이라면, Query를 던지는 클라이언트는 능동적으로 “지금 내놔라!“고 외치는 공격수다.
2.0.1 [Runbook] 데이터 강제 징발(Data Fetching) 전술
Zenoh C++ 환경에서 Query를 칠 때는 콜백(Callback)을 통해 응답(Reply)을 비동기로 받는다.
#include "zenoh.hxx"
#include <iostream>
int main() {
auto config = zenoh::Config::create_default();
auto session = zenoh::Session::open(std::move(config));
std::cout << "센서 데이터 질의 시작..." << std::endl;
// session.get() 을 호출하는 순간, 라우터 망 전체에 파동학적 검색이 뻗어나간다.
session.get(
"db/sensor/temp", // 질문 대상 토픽
"", // 페이로드 혹은 파라미터 (예: "room=A")
// 각 대답(Reply)이 도착할 때마다 호출되는 람다
[](const%20zenoh::Reply&%20reply) {
if (reply.is_ok()) {
auto sample = reply.get_ok();
std::cout << "성공 수신 [" << sample.get_keyexpr().as_string_view() << "] : "
<< sample.get_payload().as_string() << std::endl;
} else {
std::cerr << "에러 통보 수신: " << reply.get_err().get_payload().as_string() << std::endl;
}
},
// 모든 응답이 전송되었음을 알리는 종료 람다
[]() {
std::cout << "데이터 수집 완료 (All replies received)" << std::endl;
}
);
// 백그라운드 스레드가 응답을 받을 때까지 메인 스레드 유지
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
이 구조는 REST API 의 GET 요청과 본질적으로 같다. 하지만 C++ libcURL 을 써서 HTTP 통신을 짜 본 자라면, 이게 얼마나 미친 듯이 아름답고 짧은 코드인지 직감할 것이다. TCP 포트 설정이나 HTTP 헤더 조립 따위가 일절 존재하지 않는다!
3. C++ std::future와 std::promise를 활용한 비동기(Asynchronous) 응답 처리
앞서 살펴본 콜백(Callback) 방식은 수신 로직이 깊어질수록 이른바 “콜백 지옥(Callback Hell)“에 빠지는 전형적인 옛날 방식이다.
모던 C++ 개발자들은 스레드를 동기화할 때 콜백을 짜지 않는다. std::future 와 std::promise 를 구사하여 메인 스레드의 통제권을 확보하고, 우아하게 블로킹(Blocking)을 건다.
3.0.1 [Runbook] 프로덕션 비동기/동기 제어 브릿지 패턴
네트워크 백그라운드의 콜백을 C++ 프론트 쓰레드로 안전하게 끌어올리는 전술이다.
#include "zenoh.hxx"
#include <iostream>
#include <future>
#include <string>
int main() {
auto session = zenoh::Session::open(zenoh::Config::create_default());
// 1. 미래의 결과값을 담아줄 텅 빈 약속 상자(Promise) 준비
std::promise<std::string> temp_promise;
// 이 약속 상자에 미래(Future)가 연결되어 있다.
std::future<std::string> temp_future = temp_promise.get_future();
// 2. 비동기 질의(Query) 발사
session.get(
"db/sensor/temp", "",
// 응답 람다 캡처로 promise 패스를 넘긴다!
[&temp_promise](const%20zenoh::Reply&%20reply) {
if (reply.is_ok()) {
// [약속 이행] 데이터를 받는 순간, promise 공간에 값을 밀어넣어 버린다!
temp_promise.set_value(reply.get_ok().get_payload().as_string());
}
},
[]() { /* 종료 처리 생략 */ }
);
std::cout << "네트워크 응답 대기중..." << std::endl;
// 3. [핵심 방어막] 메인 스레드는 future 가 값을 쥘 때까지 멈춰선다. (Blocking)
// 영원히 멈추는 걸 방지하기 위해 3초 타임아웃을 건다!
if (temp_future.wait_for(std::chrono::seconds(3)) == std::future_status::ready) {
std::string result = temp_future.get();
std::cout << "성공! 블로킹 해제: " << result << std::endl;
} else {
std::cerr << "타임아웃! 3초간 응답이 없어 강제 중단합니다." << std::endl;
}
return 0;
}
당신은 이 아키텍처를 도입함으로써 악명 높은 멀티스레드 std::mutex 나 std::condition_variable 을 한 줄도 쓰지 않고 완벽하게 스레드 안전(Thread-Safe)한 C++ 동기화 통신 모듈을 구축해 낸 것이다.
4. 다중 응답(Multi-reply) 수신 및 데이터 병합(Aggregation) 로직 작성
Zenoh Query 패러다임이 압도적인 파괴력을 지니는 지점이다.
내가 robot/*/temp 라는 수식(Wildcard)으로 Query를 날리면 한 놈이 대답하는 게 아니라 물리적으로 서로 다른 C++ Queryable 프로그램 100개가 동시에 대답(Multi-Reply) 을 쏟아내기 시작한다. 이 다중 응답 폭격을 어떻게 맞이할 것인가?
4.0.1 [Runbook] 분산 빅데이터 콜라주(Aggregation) 전술
도착하는 응답들의 출처(Key)가 제각각이므로, 이들을 하나의 C++ 자료구조(std::map이나 std::vector)에 긁어모아야(Aggregated) 한다.
#include "zenoh.hxx"
#include <iostream>
#include <map>
#include <mutex>
int main() {
auto session = zenoh::Session::open(zenoh::Config::create_default());
// 데이터 병합을 위한 저장소
std::map<std::string, std::string> aggregated_data;
std::mutex data_mutex;
// 와일드카드를 포함한 광역 질의(Query) 타격!
session.get(
"robot/*/temp", "",
// [중요] 이 람다는 대답하는 로봇 개수(N) 만큼 여러 번 호출된다!
[&aggregated_data, &data_mutex](const%20zenoh::Reply&%20reply) {
if (reply.is_ok()) {
auto sample = reply.get_ok();
auto key = sample.get_keyexpr().as_string();
auto val = sample.get_payload().as_string();
// 멀티 쓰레드 안전 보장을 위한 락 확보 (매우 필수)
std::lock_guard<std::mutex> lock(data_mutex);
aggregated_data[key] = val; // 지도(Map)에 퍼즐 조각 끼워넣기
std::cout << "[개별 수신] " << key << " -> " << val << std::endl;
}
},
// [중료 콜백] 망 내의 모든 응답이 끝났을 때 1번 호출되는 최종 함수
[&aggregated_data, &data_mutex]() {
std::lock_guard<std::mutex> lock(data_mutex);
std::cout << "\n======= [병합 통계 보고서] =======" << std::endl;
std::cout << "총 응답 로봇 대수: " << aggregated_data.size() << std::endl;
for(const auto& [k, v] : aggregated_data) {
std::cout << "로봇 " << k << " 의 온도는 " << v << " 입니다." << std::endl;
}
}
);
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
이 코드는 중앙집중식 서버가 100대의 로봇에게 일일이 TCP 루프를 돌며 데이터를 폴링(Polling)하던 구시대의 병목 코드를 완전히 박살 낸다. 클라이언트가 외치는 즉시, 라우팅 망이 모든 로봇에게 파동을 전달하고 수집된 패킷만을 즉각적인 콜백으로 내려주기 때문이다.
5. 동기식(Synchronous) API와 비동기식(Asynchronous) API의 성능 비교
마무리를 짓자면, C++ 세상에서는 항상 ’무엇이 더 빠른가’에 대한 종교 전쟁이 일어난다.
앞서 std::future로 메인 스레드를 멈춰 세우는 방식(Synchronous)과, 백그라운드 람다 콜백에 전부 맡겨버리고 잊어버리는(Fire and Forget) 비동기 빙식(Asynchronous) 중 어떤 전술을 택해야 하는가?
5.0.1 [Runbook] 아키텍트의 아키텍처 선택 가이드라인
1. 동기(Synchronous) 블로킹 락 패턴 (std::future/mutex)
- 특징: 호출자가 대답이 올 때까지 CPU 스레드를 붙잡고 잔다.
- 언제 써야 하나요? REST API를 대체할 때. 사용자가 “조회” 버튼을 누르고 화면이 정지된 상태로 데이터를 즉각 요청해서 받아야 하는 UI/GUI 프론트엔드 연동 상황.
- 단점: 대답이 늦게 오는 노드가 있으면 스레드가 점유된 채로 병목 창구가 된다.
QPS(Query Per Second)가 바닥을 긴다.
2. 극한의 완전 비동기(Asynchronous) 콜백 패턴
- 특징: 질의(Query) 루프 수백 개를 던져놓고 즉각 반환한다. 도착하는 결과는 백그라운드 스레드풀(ThreadPool)이 병렬 타공으로 소화한다.
- 언제 써야 하나요? 초당 10만 건 이상의 HFT 트레이딩, 분산 DB의 데이터 스크래핑 파이프라인. 이벤트 구동형(Event-Driven) 백엔드 코어.
- 성능 차이 분석: 비동기 방식 도입 시 컨텍스트 스위칭 대기 시간(Wait State)이 사라지므로, 동기 방식 대비 네트워크 I/O 처리량이 최대 5배~10배(Throughput) 극단적으로 솟구친다.
[경고] C++에서 완전 비동기 성능 펌핑을 맛보려면, 당신의 람다 콜백 안에 절대 sleep이나 I/O 파일 쓰기 등 블로킹 함수를 집어넣어서는 안 된다. Zenoh 내부 라우터 스레드를 막아버리면, 전체 망의 레이턴시(Latency)가 연쇄 폭발하는 대참사가 일어난다!