7.5 데이터 발행과 구독 (Publish / Subscribe)

7.5 데이터 발행과 구독 (Publish / Subscribe)

Zenoh 통신의 척추이자 심장인 Pub/Sub 모델을 파이썬으로 다룰 차례다.
파이썬의 가장 큰 축복은 C++처럼 메모리 얼로케이터(Allocator)나 포인터 라이프사이클을 고민할 필요 없이, 가장 직관적이고 아름다운 형태의 비동기 코드를 짜낼 수 있다는 점이다.

하지만 그 편리함의 이면에는 파이썬 생태계 최악의 병목인 GIL (Global Interpreter Lock) 이라는 괴물이 도사리고 있다. 무거운 데이터를 쏘아대고 콜백으로 받아내는 과정에서, 딕셔너리를 압축(Serialization)하고 해제하는 코드를 잘못 짜면 초당 수천 개의 패킷이 병목에 걸려 서버 전체가 마비되는 참사가 일어난다.
이 챕터에서는 파이썬 고유의 동적 직렬화 기법(JSON, Struct)부터 GIL의 영향을 피하는 람다 함수 설계 규칙까지, 진짜 실무에서 망을 태울 수 있는 하드코어 런북을 전개한다.

1. 퍼블리셔(Publisher) 인스턴스 생성 및 데이터 전송

데이터를 공중으로 뿌릴 때, Zenoh 파이썬 바인딩은 가장 파이썬다운 인터페이스를 제공한다.
하지만 “매번 경로를 치고 전송하는 방식“과 “전담 대포(Publisher)를 세워두고 전송하는 방식“의 CPU 소모량 차이는 극심하다.

1.0.1 [Runbook] 데이터 발사대 구축 전술

단 한 번 쏠 것인가, 연사(Burst)할 것인가에 따라 무기를 다르게 선택하라.

전술 1. 게릴라 전술 (session.put)
1분에 한 번씩 가끔 로그를 남길 때는 이 전술이 파이썬 코드를 가장 깔끔하게 유지한다.

import zenoh

with zenoh.open() as session:
    # 파이썬 문자열이나 바이트 객체를 즉시 전송
    session.put("system/status", "Booting...")

전술 2. 거치형 기관총 전술 (Publisher 객체 활용)
센서처럼 초당 100회(100Hz) 이상의 데이터를 쏴야 한다면, 무조건 declare_publisher 를 써라.
그래야 문법 검사 트리(Trie)를 우회하여 파이썬과 Rust 사이의 컨텍스트 스위칭 비용을 극한으로 줄인다.

import zenoh
import time
import random

def start_sensor(session):
    # 1. 쏘려는 타겟 경로를 고정하고 파이프를 확장한다.
    # 옵션: congested_control, reliability 등 네트워크 QoS 세팅은 여기서 묶임
    pub = session.declare_publisher("robot/1/battery")

    print("[센서] 발사대 거치 완료. 초당 10발씩 쏜다.")
    for _ in range(30):
        # 2. 경로 재입력 없이, 바로 본문(Payload)만 밀어버린다.
        # 파이썬 내부 인코딩(.encode) 오버헤드를 줄이기 위해 byte_string(b"") 을 선호하라.
        val = random.randint(80, 100)
        pub.put(f"Battery: {val} %".encode('utf-8'))
        time.sleep(0.1)

if __name__ == "__main__":
    with zenoh.open() as session:
        start_sensor(session)

[아키텍처 인스펙션]
퍼블리셔 인스턴스 객체(pub)는 파이썬 바깥쪽, 즉 Rust 시스템 메모리 영역의 C 포인터에 결합되어 있다. 만약 저 함수 블록을 벗어나 파이썬 가비지 컬렉터(GC)가 pub 변수를 지워버리면, 백그라운드의 라우팅 파이프도 즉시 닫힌다. 이 객체의 생명 주기(Lifespan)를 파이썬의 로컬/전역 변수로 똑똑하게 컨트롤해야 한다.

2. 서브스크라이버(Subscriber) 생성 및 동기식 콜백 함수 구현

누군가 던진 데이터를 파이썬 프로그램이 받아내는 과정이다.
가장 중요한 점은, 당신이 등록한 콜백 함수는 파이썬 메인 스레드가 아니라, Zenoh가 몰래 띄워놓은 백그라운드 이벤트 워커(Worker) 스레드에서 실행된다는 것이다.

2.0.1 [Runbook] 비동기 데이터 캡처망 전술

파이썬의 동시성(Concurrency)과 asyncio 지식 없이도 순수 동기식 콜백을 완벽하게 래핑(Wrapping)하는 법이다.

import zenoh
import time

## 글로벌 데이터 수조
g_received_data = []

def robust_listener(sample: zenoh.Sample):
    """
    [경고] 이 콜백은 Zenoh 백그라운드 스레드에 의해 초당 수십~수백 번씩 폭행당한다!
    여기서 time.sleep()이나 무거운 머신러닝 연산을 넣으면 전체 망이 마비(Blocked)된다.
    """
    try:
        # 1. Payload 추출. sample.payload는 메모리 뷰어다. decode('utf-8')로 꺼낸다.
        topic = sample.key_expr
        payload_str = sample.payload.decode('utf-8')

        # 2. 안전하고 가벼운 작업만 수행 후 리스트에 밀어넣기
        g_received_data.append((topic, payload_str))
        print(f"[수신] {topic}: {payload_str}")
        
    except Exception as e:
        # [방어막] 콜백 안에서 난 에러는 파이썬 메인 스크립트를 죽이지 않고 허공으로 증발한다.
        # 따라서 반드시 독자적인 예외 처리를 달아 디버깅 로그를 찍어야 한다.
        print(f"콜백 처리 중 크래시: {e}")

if __name__ == "__main__":
    with zenoh.open() as session:
        print("구독망 전개 완료 (Wildcard ** 사용)")
        
        # 선언과 동시에 백그라운드 스레드는 'robust_listener'를 쏠 준비를 마친다.
        sub = session.declare_subscriber("building/**/metrics", robust_listener)
        
        # 메인 스레드는 살려둔 채, 데이터가 쌓이기를 기다린다.
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            # 사용자가 Ctrl+C 를 누르면 구독을 취소하고 빠져나온다.
            # (Session 과 Subscriber 가 순서대로 파괴됨)
            print("수신 셧다운")

[메모리 최적화 팁]
sample.payload 객체를 type() 로 찍어보면 순수 bytes가 아니라 Zenoh의 버퍼링 객체다. 이걸 그대로 글로벌 큐 배열에 삽입하면 파이썬 가비지 콜렉터가 C 확장의 참조 카운트를 제대로 계산하지 못해 크래시가 날 수 있다. 파이썬 메모리로 꺼내올 때는 반드시 .tobytes().decode() 를 써서 완벽한 Deep Copy본으로 분리한 뒤 저장하라.

3. 데이터 페이로드 직렬화 및 역직렬화 기법 (JSON, Pickle, Struct, Protobuf)

파이썬의 가장 큰 무기 중 하나는 딕셔너리(dict) 형태의 계층 데이터를 다채롭게 요리하는 능력이다. 하지만 Zenoh 네트워크 위에 이 딕셔너리 덩어리를 생으로 던질 순 없다. 통신망에 태우기 전과 후, CPU 사이클을 어떻게 깎아낼 것인가?

3.0.1 [Runbook] 데이터 직렬화 아키텍처 선택 가이드라인

전술 1. JSON (프론트엔드 연동 단기전)
사람이 읽을 수 있고, Node.js나 브라우저 앱과 연동할 때 필수적이지만 파이썬의 내장 json.dumps는 제법 느리다.

import json

pub.put(json.dumps({"robot": 1, "battery": 97}))

## 수신부 (json.loads 로 역직렬화)
def rx_json(s):
    data = json.loads(s.payload.decode('utf-8'))

전술 2. 파이썬 struct 모듈 (이기종 C++ 브릿지용 바이너리)
로봇 코드는 C++이고 모니터링만 파이썬(Zenoh)이라면, 가장 강력한 카드는 파이썬에 내장된 struct 패키지를 이용한 C/C++ Raw Byte 캐스팅이다! (엄청나게 빠르다)

import struct

## C++ 구조체: struct { int id; float bat; } (4바이트 + 4바이트 = 8바이트)
## 포맷 문자열 "i f" : i(정수형 4 byte), f(실수형 4 byte)
binary_payload = struct.pack("i f", 1, 97.5)

pub.put(binary_payload) # 단 8바이트로 쏘아버림! (JSON 대비 10배 작음)

## 수신부 복원
def rx_struct(s):
    robot_id, battery = struct.unpack("i f", s.payload)

전술 3. Python 내부 전용 융단폭격 Pickle (사내 통신용)
양쪽이 모두 Python 버전이 같다면, 파이썬 객체의 살점(클래스 인스턴스 상태)까지 100% 보존해서 던지는 궁극기다.

import pickle
import dataclasses

@dataclasses.dataclass
class JobOrder:
    command: str
    target_fps: int

my_job = JobOrder(command="START_AI", target_fps=30)
pub.put(pickle.dumps(my_job))

## 수신단에서 클래스 포맷까지 그대로 부활
def rx_pickle(s):
    job = pickle.loads(s.payload)
    print(f"명령: {job.command}")

[아키텍처 인스펙션]
만약 초당 1만 건의 배열이나 텐서를 주고받는다면 위 3가지 모두 실패한다. C++이나 Rust 생태계와 1억 단위로 통신할 때는 Protocol Buffers (Protobuf) 확장 라이브러리(from google.protobuf import message)를 물려놓고 .SerializeToString()을 호출하여 바이트 버퍼만 Zenoh로 넘겨주는 방식이 글로벌 스탠다드다.

4. 혼잡 제어(Congestion Control) 및 QoS 우선순위 설정

비전 카메라에서 나온 초당 60프레임(약 120MB/s)의 이미지를 파이썬 session.put()으로 난사하고 있다 치자. 이때 와이파이(Wi-Fi) 연결망이 5초쯤 끊겼다.
라우터나 소켓의 전송 대기 큐(Queue)는 순식간에 썩은 과거의 프레임들로 가득 찬다. 통신이 재개되는 순간 파이썬 앱은 의미 없는 5초 전 과거 프레임부터 꾸역꾸역 전송하기 시작하는 최악의 트래픽 잼(Traffic Jam)에 걸린다. 데이터 우선순위를 제어해야 한다.

4.0.1 [Runbook] 데이터의 목숨값 통제하기 (QoS)

putzenoh.Queueingzenoh.PutOptions 락을 더한다.

전술 1. 드롭 체제 (최신 데이터 지상주의)
동영상 프레임이나 라이다 좌표는 0.1초 전 데이터는 쓸모가 없다. 큐가 막히면 이전 것을 즉시 폐기(Drop)하고 무조건 새 데이터로 라우터의 배를 채워라.

import zenoh

session = zenoh.open()

## 막히면 무자비하게 옛날 짐을 바다에 던져버리는 혼잡 제어 세팅
opts = zenoh.PutOptions(
    congestion_control=zenoh.CongestionControl.DROP
)

## 옵션을 같이 얹어서 전송
session.put("robot/1/cam_frame", huge_binary_payload, options=opts)

전술 2. 자물쇠 체제 (강제 블로킹)
은행 로그 데이터, 결제 정보 등 1바이트도 유실되면 안 되고 순서가 100% 보장되어야 할 때 쓴다.
네트워크가 막히면 session.put() 라인이 얼어붙고(Blocked) 다음 줄(Next Line) 코드로 영원히 넘어가지 않게 방패막이(Backpressure)를 친다.

opts = zenoh.PutOptions(
    congestion_control=zenoh.CongestionControl.BLOCK
)
## 이 라인에서 네트워크 큐가 터질 것 같으면 파이썬 스레드가 파업하고 대기한다!
session.put("important/transaction/101", json_payload, options=opts)

어떤 혼잡 제어를 선택하든, Python 이 직접 메모리를 관리하며 터져나가는 것을 하부 Rust 코어가 방어해 준다. 파이썬 단에서는 그저 DROP인지 BLOCK인지만 결정해서 주입해 주면, C++ 이나 Rust와 완전히 똑같은 아키텍처 수준의 방어력을 갖추게 된다.

5. 백그라운드 스레드 활용 및 GIL(Global Interpreter Lock) 고려사항

대망의 고통스러운 주제다.
파이썬 스크립트 메인 스레드에서 AI 머신러닝 연산(예: YOLO 박스 치기)을 하느라 CPU 점유율이 99%인 상태라고 치자.
이때 백그라운드의 Zenoh Rust 라우터가 “패킷 왔어! 파이썬, 이 람다 콜백 좀 실행해줘!” 라고 외쳐도, 파이썬 인터프리터는 GIL 때문에 단 하나의 스레드만 돌아갈 수 있어 콜백을 즉각 실행해주지 못한다. 결국 통신 레이턴시(Ping)가 심하게 튀게(Jitter) 된다.

5.0.1 [Runbook] GIL 감옥을 부수는 파이프라인 아키텍처

콜백 지옥과 GIL을 회피하기 위해 가장 전통적이고 우아한 Producer-Consumer 큐(Queue) 토폴로지를 짠다.

import zenoh
import time
import queue
import threading

## 가장 가볍고 안전한 스레드 공유 바구니 (파이썬 내장 thread-safe 큐)
data_queue = queue.Queue(maxsize=1000)

def zenoh_fast_callback(sample: zenoh.Sample):
    """
    [핵심 방벽 1] 
    이 안에서는 데이터를 파싱하거나 모델 돌리는 등 GIL을 잡아먹는 짓거리를 '절대' 하면 안 된다!
    그저 큐에 던져넣기만 하고 0.001초 만에 빨리 퇴근(Return)해야 
    Zenoh 백그라운드 C 코어가 병목 없는 극한의 패킷 수리를 계속한다.
    """
    try:
        # payload를 통째로 딥카피(가장 빨리 꺼내는 방법)해서 던지기
        data_queue.put_nowait((sample.key_expr, sample.payload))
    except queue.Full:
        # 혹시 메인 스레드 연산이 너무 느려 큐가 가득 차면, 과감히 최신 패킷 버림.
        pass

def ai_inference_worker():
    """
    [핵심 방벽 2] 
    GIL의 축복을 받은 메일 스레드나, 전담 워커가 이 큐를 들여다보면서
    자신의 템포에 맞게 무거운 작업을 치러나간다.
    """
    while True:
        try:
            # 큐에서 패킷 뽑아오기 안 들어오면 1초 대기 후 재도전.
            key, raw_bytes = data_queue.get(timeout=1.0)
            
            # [헤비 듀티] 스트링 변환이나 탠서 행렬 연산 등...
            text = raw_bytes.decode('utf-8')
            # 50ms 짜리 무거운 연산 가정
            # time.sleep(0.05) 
            
            print(f"무거운 처리 완료: {key} -> {text}")

        except queue.Empty:
            continue

if __name__ == "__main__":
    with zenoh.open() as session:
        sub = session.declare_subscriber("drone/**/vision", zenoh_fast_callback)
        
        # 메인 스레드는 영원히 돌면서 큐를 빼먹는 소비자가 된다.
        ai_inference_worker()

이 패턴은 Python + Zenoh 구조에서 겪는 “레이턴시 폭증”, “원인 모를 메모리 크래시”, 그리고 “PyO3 바인딩 락(Lock) 에러” 3대장을 한 줄에 지워버리는 가장 모범적인 프로덕션 스탠다드 설계다. Zenoh에 들어오는 그 어떤 초고주파 데이터라도 당신의 파이썬 인터프리터를 얼리지 못할 것이다.