7.6 분산 질의와 응답 (Query / Reply)

7.6 분산 질의와 응답 (Query / Reply)

HTTP 통신, gRPC, 혹은 단순한 TCP 소켓 서버. 이들의 공통점은 클라이언트가 서버의 IP 주소를 “정확히 알고” 요청을 때려야 한다는 것이다.
하지만 클라우드 컨테이너가 1시간마다 죽었다 살아나고, 로봇들이 끊임없이 IP를 바꾸는 거대한 엣지(Edge) 네트워크 환경에서 IP 타겟팅은 낡은 방식이다.

Zenoh의 Query / Reply 시스템은 “주소“가 아닌 “데이터의 이름(Name)“을 부르면 망 내에 존재하는 누군가가 대답하는 거대한 분산 해시 테이블(DHT) 혹은 글로벌 DB 쿼리망처럼 동작한다.
이 장에서는 파이썬으로 가벼운 Queryable 콜백을 띄워 백엔드 DB 서버를 대체하는 기법과, 파이썬 동기식 시스템을 멈추게(Blocking) 하지 않고 멀티 응답 패킷을 병합 수집(Aggregation)하는 극강의 전술을 서술한다.

1. 분산 쿼리 아키텍처 및 라우팅 원리

Zenoh 쿼리의 동작 원리는 단순한 1:1 RPC가 아니다. 파이썬 클라이언트가 하늘을 향해 “지금 서울에 있는 모든 1번 로봇 온도 센서 응답하라!” (seoul/robot/*/temp) 고 외치는 거대한 멀티캐스트 파동(Wave) 이다.

1.0.1 [인스펙션] 질의응답 라우팅의 3단계 위상

  1. 지향성 투사 (Routing Out):
  • 클라이언트 파이썬 스크립트가 session.get("seoul/robot/*") 을 치면, 가장 가까운 Zenoh 라우터가 이 Wildcard를 해석한다.
  • 라우터는 자신이 가진 B-Tree 테이블을 뒤져 “현재 seoul/robot/1 과 2의 Queryable 이 저쪽 일본 센터 라우터에 매달려 있다“는 걸 깨닫고 그쪽으로만 질문(Query) 패킷을 보낸다.
  1. 개별 응답 생산 (Processing):
  • 질문이 도달하면 각 엣지 디바이스의 C++ 이나 파이썬 Queryable 콜백이 트리거된다. 콜백은 자기 파이썬 메모리나 로컬 DB를 스캔한 후 개별적인 답장(Reply)을 쓴다.
  1. 역추적 수집 (Routing In & Aggregation):
  • 각자의 답장은 출발했던 라우팅 경로를 정확히 역추적하여 질문자에게 되돌아온다.
  • 10대가 대답했다면 파이썬의 session.get() 제너레이터(Generator)에는 순차적으로 10개의 Reply 객체가 쏟아져 나온다.

이 원리 덕분에, 중간 라우터 수십 개가 거미줄처럼 얽혀 있어도 파이썬 개발자는 단 한 줄의 인프라 코드 없이 거대한 분산 클러스터를 마치 로컬 딕셔너리를 조회하듯 탐색할 수 있다.

2. 쿼리어블(Queryable) 등록 및 요청 처리 콜백 구현

“누군가 물어보면 내가 대답하겠다.”
이 역할을 맡은 노드를 Zenoh에서는 Queryable이라고 부른다. Flask 나 Django 로 GET /sensor/1 엔드포인트를 뚫는 것과 본질적으로 같지만, HTTP 웹 서버의 무거운 오버헤드가 전혀 없는 마이크로 서비스다.

2.0.1 [Runbook] 데이터 허브 사령부 구축 전술

import zenoh
import time
import json

## 가상의 내부 파이썬 인메모리 DB
MOCK_DB = {
    "factoryA/room1/temp": 24.5,
    "factoryA/room2/temp": 28.0
}

def on_query(query: zenoh.Query):
    """
    [핵심 전술] 누군가 get() 을 칠 때마다 이 함수가 백그라운드 스레드에서 깨어난다.
    """
    key = str(query.key_expr)
    print(f">> [수신] 질의 도착: {key}")
    
    # 1. 필터링 및 DB 조회
    if key in MOCK_DB:
        val = MOCK_DB[key]
        response_payload = json.dumps({"status": "ok", "value": val})
        
        # 2. 질문자에게 응답을 포장해서 쏜다! (Reply)
        query.reply(query.key_expr, response_payload)
        print(f"<< [송신] 정상 응답 전송 완료 ({val})")
    else:
        # 데이터가 없을 때 무시(return) 하거나, 에러를 명시적으로 돌려줄 수 있다.
        # 에러 통보는 클라이언트 로직을 분기시키는 데 매우 유용하다.
        query.reply_err(f"Error: {key} 를 DB에서 찾을 수 없습니다.")
        print(f"<< [송신] 에러 통보 전송")

if __name__ == "__main__":
    with zenoh.open() as session:
        # "factoryA" 로 시작하는 모든 질문에 내가 대답하겠다고 망(Network)에 맹세한다.
        queryable = session.declare_queryable("factoryA/**/temp", on_query)
        print("쿼리 응답 서버(Queryable) 가동 시작...")
        
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            pass

파이썬 백엔드 엔지니어라면 이 코드의 우아함에 전율해야 한다. 단 20줄짜리 코드로, 전 세계 어디서든 session.get("factoryA/room1/temp") 를 쳤을 때 0.1초 만에 응답하는 무중단 분산 API 서버를 구축해 낸 것이다.

3. 쿼리(Query) 전송 및 다중 응답(Replies) 수집

앞서 서버(Queryable)를 세웠으니, 이제는 데이터를 요구하는 클라이언트 전술이다.
Zenoh 파이썬 바인딩에서 session.get() 은 하나의 값만 돌려주는 함수가 아니다. 질문(factoryA/**)에 응답하는 노드가 100개라면, 100개의 대답을 전부 수거해 오는 빗자루(Generator)다.

3.0.1 [Runbook] 다중 데이터 스크래핑 및 병합(Aggregation) 전술

import zenoh
import json

def fetch_all_temperatures():
    with zenoh.open() as session:
        print("온도 센서망 전수 조사 개시...")
        
        # 1. get() 을 때린다. 이는 이터레이터(제너레이터)를 즉시 반환한다. (Non-blocking 느낌)
        # 타임아웃은 혹시 답장을 안 하는 좀비 노드를 무한정 기다리는 걸 막는다.
        replies = session.get("factoryA/**/temp", timeout=2.0)
        
        result_map = {}
        
        # 2. 루프를 돌며 라우터망에서 도착하는 패킷들을 하나씩 건져 올린다.
        for reply in replies:
            if reply.is_ok:
                sample = reply.ok
                key = str(sample.key_expr)
                payload = json.loads(sample.payload.decode('utf-8'))
                
                # 병합(Aggregation) 보관함에 끼워넣기
                result_map[key] = payload['value']
                print(f"[성공] 수신: {key} -> {payload['value']}")
            else:
                err_msg = reply.err.payload.decode('utf-8')
                print(f"[에러 수신] 노드가 불평함: {err_msg}")
                
        # 3. 모든(All) 응답이 끝났거나 타임아웃이 나면 For 루프가 비로소終了한다!
        print("\n=== 최종 수집 보고서 ===")
        for k, v in result_map.items():
            print(f"- {k} : {v} C")
            
if __name__ == "__main__":
    fetch_all_temperatures()

이 패턴은 클라우드의 대시보드 백엔드, 혹은 로컬 망의 파이썬 스크립트 모니터링 도구에서 가장 많이 쓰이는 무기다. * 기호 하나만으로 물리적으로 찢어져 있는 100개의 장비에 일제 명령을 내리고 결과를 회수하는 분산 오케스트레이션(Orchestration)이다.

4. 타겟 지정(Targeting) 전략과 선택적 응답 처리

공장에 1번 로봇부터 100번 로봇이 있다. 내가 robot/*/status 라고 질의했을 때 100대의 로봇 모두가 답장을 보내느라 네트워크 대역폭(Bandwidth)이 시뻘겋게 터져버리면 어떡할까?
현장에서는 모든 대답이 필요한 게 아니라, **“망에 존재하는 단 1대의 라우터 DB 캐시값”**만 가져오고 싶거나, “살아 있는 아무 한 놈의 대답” 만 필요할 때가 있다.

4.0.1 [Runbook] 데이터 검색 스코프(Scope) 커맨드 통제

Zenoh 쿼리의 타겟팅을 통제하는 zenoh.QueryTarget 열거형을 쓴다.

import zenoh

session = zenoh.open()

print("최고로 핏줄을 조이는 질의 타겟팅 실험")

## 전술 1. 가장 가까운 놈 한 명만 대답해! (기본값 캐시 서버 대용)
## 만약 가까운 메모리(Router Cache)에 최근 데이터가 있다면, 굳이 로봇까지 핑을 뚫지 않고 
## 라우터가 즉각 대답을 던지고 끝낸다.
replies_best = session.get("robot/*/status", 
                           zenoh.Queueing(), 
                           target=zenoh.QueryTarget.BEST_MATCHING)

## 전술 2. 전 세계 모든 놈 다 찾아서 대답 받아와라 (ALL)
## 백업 스토리지부터 엣지 단말 센서 100개까지 빠짐없이 다 뒤진다.
replies_all = session.get("robot/*/status", 
                          zenoh.Queueing(), 
                          target=zenoh.QueryTarget.ALL)

## 전술 3. [고급] 완전한 쿼리 옵션(QueryOptions) 제어
## 커스텀 파라미터(?req_id=123)까지 심어서 백엔드 파이썬 DB를 흔든다.
opts = zenoh.QueryOptions(
    target=zenoh.QueryTarget.ALL,
    timeout=5.0, # 5초 기다림
    parameters="req_id=999&auth=admin"
)

## 이 파라미터는 Queryable의 콜백 내부에서 query.parameters 변수로 파싱해 쓸 수 있다!
sub = session.get("secure/database/logs", options=opts)

이 고도의 타겟팅 옵션을 가지고 놀 줄 알아야, 단순한 HTTP GET 요청을 넘어 분산 데이터베이스(Storage Plugin)나 시스템 전체 서치(Search) 파이프라인을 자유자재로 설계하는 마스터 아키텍트로 인정받는다.

5. 어태치먼트(Attachment)를 활용한 메타데이터 전달

쿼리의 본문이 되는 페이로드(Payload)나 파라미터(Parameter) 말고, 좀 더 시스템틱하고 은밀한 메타데이터(예: 암호화 인증 키, 타임스탬프, 처리 컨텍스트)를 전달해야 할 때가 있다.
파이썬의 session.get() 이나 query.reply()를 때릴 때, 우리는 우표 뒤쪽에 메모를 써넣듯 **어태치먼트(Attachment)**를 던질 수 있다.

5.0.1 [Runbook] 비파괴(Non-Destructive) 헤더 편입 전술

어태치먼트를 쓰면 페이로드 배열(바이트 무결성)을 전혀 건드리지 않고, 별도의 딕셔너리를 패킷 겉면(Envelope layer)에 묻혀 보낼 수 있다.

1. 대답할 때 겉봉투에 포스트잇 붙이기 (Queryable)

import zenoh

session = zenoh.open()

def complex_query_handler(query: zenoh.Query):
    # 본문 데이터는 무거운 4K 고화질 압축 이미지(약 5MB)
    huge_image_binary = b"\xff\xd8\xff\xe0..." 
    
    opts = zenoh.ReplyOptions()
    
    # [핵심 공격] 어태치먼트 딕셔너리에 카메라 렌즈 설정값이나 타임스탬프를 심어 넣는다
    opts.attachment = {
        "lens_mode": b"macro",        # 바이너리 값 대응
        "generate_time_ms": b"123456" 
    }
    
    # 5MB 이미지는 훼손시키지 않고 투하!
    query.reply(query.key_expr, huge_image_binary, options=opts)

session.declare_queryable("robot/1/camera_snapshot", complex_query_handler)

2. 겉봉투를 읽고 지능적으로 역직렬화하기 (Getter)

## 클라이언트 쪽에서 GET
for reply in session.get("robot/1/camera_snapshot"):
    if reply.is_ok:
        sample = reply.ok
        
        # 1. 일단 겉봉투 메모지(Attachment)가 있는지 점검
        if sample.attachment is not None:
            # 2. 이 렌즈 모드 값을 바탕으로 백엔드 로직 스위칭.
            lens_mode_bytes = sample.attachment.get(b"lens_mode")
            if lens_mode_bytes == b"macro":
                print("매크로 렌즈 모드! 분석 알고리즘 A 가동")
        
        # 3. 비파괴된 오리지널 데이터 추출 (Zero-copy 뷰)
        image_view = sample.payload

이 패턴은 사실상 HTTP 통신의 Header 와 본문(Body)을 완벽하게 재현한 것이며, 시스템 수준의 보안 토큰 전송이나 데이터 종류(content-type)를 분류할 때 파이썬 개발자들이 본능적으로 가장 사랑하게 될 궁극의 마이크로 아키텍처다.