7.2.4.2 멀티캐스트(Multicast) 피어 탐색 기반 서브스크라이버(Subscriber) 수신 콜백 연동

7.2.4.2 멀티캐스트(Multicast) 피어 탐색 기반 서브스크라이버(Subscriber) 수신 콜백 연동

퍼블리셔(Publisher)가 시공간의 제약 없이 네트워크 바다 위로 데이터를 흩뿌렸다면, 서브스크라이버(Subscriber)는 이 거대한 트래픽 소용돌이 속에서 자신에게 필요한 텔레메트리 파편만을 정교하게 건져 올리는 선망(Purse Seine)과 같은 역할을 수행한다.

Zenoh 아키텍처에서 구독(Subscription) 체계는 고정된 서버 IP나 통신 토폴로지에 억눌리지 않는다. UDP 멀티캐스트(Multicast) 기반으로 네트워크 영역 상의 피어(Peer) 노드들을 동적으로 탐색하며, 데이터 소스(Publisher)가 새롭게 깨어나 발포하는 순간 즉각적으로 수신 궤도로 편입되는 자가조직(Self-organizing) 네트워크 특성을 띈다. 본 절에서는 파이썬 환경에서 극단적으로 결합도가 낮은 멀티캐스트 탐색망을 설계하고 비동기 수신 콜백 시스템을 연동하는 구조를 전개한다.

1. 패턴 일치(Pattern Matching) 라우팅 표지판 구축

서브스크라이버는 단일 로봇의 개별 토픽뿐 아니라, 공장 내부 전 영역에서 발포되는 데이터를 손쉽게 통합 감청할 수 있다. Zenoh의 쾌도난마(快刀亂麻)와 같은 강점은 MQTT를 능가하는 우아한 패턴 일치 식별자 구조에 있다.

import zenoh
import json

# 공장 내 모든 층(Floor), 모든 로봇의 상태(status) 정보만을 필터링하는 구독 정규식
TARGET_EXPR = "factory/**/status"

def on_message_received(sample: zenoh.Sample):
    # 콜백 런타임 진입: C++ 백그라운드 스레드에서 Python 인터프리터 락을 일시 취득함
    topic_name = str(sample.key_expr)
    raw_bytes = sample.payload
    
    # 텔레메트리 원시 바이트스트림을 유니코드 JSON으로 디코딩
    try:
        data = json.loads(raw_bytes.decode('utf-8'))
        print(f"[{topic_name} 수신 완료] Battery: {data.get('battery_level')}%")
    except Exception as e:
        print(f"직렬화 파괴 패킷(Broken Packet) 무시 통과: {e}")

# ... (세션 선언 부로 이어짐)

위의 예제에서 사용된 ** 식별자는 디렉터리 구조 하위의 여러 슬래시(/) 뎁스(Depth)를 모두 포괄하여 수신망으로 편입시킨다. 이는 백엔드 서버에서 관제 대시보드(Dashboard)를 생성할 때 개별 로봇의 IP를 일일이 컨피그(Config) 파일에 적어둘 필요가 없도록 시스템 아키텍처의 중앙 의존성(Central Dependency)을 영구 무효화한다.

2. 멀티캐스트(UDP) 오토 디스커버리(Auto-Discovery)

zenoh.open()이 실행되면 시스템 내부적으로 멀티캐스트 네트워크(주로 224.0.0.225:7447)에 탐색 비콘(Scout Beacon)을 방사한다. 이 탐색 레이더에 잡힌 로봇 퍼블리셔들과는 즉각적으로 TCP 브릿지 계층 또는 QUIC 스트림이 맞물려 전이된다.

import time

def main():
    # 1. 제로-컨피그(Zero-Config) 기반 멀티캐스트 자동 탐색 세션 확립 
    config = zenoh.Config()
    
    print("네트워크 디스커버리 진행 중... (Zenoh 런타임 부팅)")
    session = zenoh.open(config)
    
    # 2. 서브스크라이버 객체 선언 (수신망 전개)
    # 선언 즉시 주변 라우터들에게 "나는 이 데이터를 받을 준비가 되었다"고 브로드캐스트 전파
    sub = session.declare_subscriber(TARGET_EXPR, on_message_received)
    
    print(f"서브스크라이버 궤도 안착 완료. 구독 대기 식별자: {TARGET_EXPR}")
    
    try:
        # 백그라운드의 네이티브 수신 스레드(Tokio/Rust)가 콜백을 발포할 수 있도록
        # 메인 파이썬 데몬의 생명주기(Lifecycle) 무한 유지
        while True:
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("\n관제망 정지. 우아한 셧다운(Graceful Shutdown) 진입")
    finally:
        # 3. 구독 파괴 및 세션 단절 선언
        sub.undeclare()
        session.close()

if __name__ == "__main__":
    main()

3. GIL 프리(Free) 구조를 보존하는 비동기 분리(Decoupling) 전술

이 과정에서 튀어나오는 가장 중요한 건축적 함정(Architectural Pitfall)은 on_message_received 단일 함수 블록 안에서 어떤 파편도 무한루프나 소켓 블로킹에 빠져서는 안 된다는 대원칙이다.

만약 수신된 status 값이 비정상적이라 백엔드 데이터베이스 트랜잭션 수립(Insert) 명령을 저 콜백 함수 내부에서 직격으로 날리게 되면, 네트워크를 관장하는 호스트 스레드가 데드락(Deadlock)에 처하게 된다. Zenoh의 C-ABI 계층이 올려보낸 메세지 콜백은 즉시 메모리에 담고 메인 스레드로 반출(Decoupling)하라. 파이썬의 동시성 큐(queue.Queue)나 asyncio.Queue에 원시 데이터를 적재하고 반환하기만 해도 통신 스레드의 GIL 독점(Monopoly)으로 인한 로스(Packet Loss)율을 제로 단위까지 격추시킬 수 있다.