7.8 고급 API 기능 활용
단순히 데이터를 “쏘고(Put)” “받는(Sub)” 수준에 머물러 있다면 Zenoh의 성능을 절반도 쓰지 못한 것이다.
데이터가 수신되지 않을 때 이것이 ’송신자가 값이 없어서 안 보내는 것’인지, 아니면 ’라우터가 죽어서 끊긴 것’인지 판별할 수 있는가?
1GB짜리 데이터를 복제(Copy) 없이 로컬 워커 스레드 10개에 뿌릴 수 있는가?
이 챕터는 엔터프라이즈 데브옵스, 혹은 자율주행 차량의 아키텍트들이 파이썬을 메인 제어(Control) 언어로 채택할 수 있게 만들어주는 무기(Weapon)의 서(書) 다.
Liveliness 토큰을 응용한 헬스 체크 시스템부터, 리눅스 커널의 공유 메모리(/dev/shm)를 직접 찌르는 영거리(Point-blank) 데이터 타격 전술까지 Zenoh Python 래퍼의 가장 깊은 곳을 파헤친다.
1. Liveliness 토큰 발행 및 원격 노드 상태 모니터링
“노드 A가 죽었다!“를 알기 위해 5초에 한 번씩 파이썬으로 핑(Ping)을 쏘고 Heartbeat 리시버를 만드는 건 학부생이나 하는 짓이다.
Zenoh의 Liveliness Token은 당신의 세션이 라우터와 맺어져 있는 한, Zenoh 네트워크 망이 자동으로 헬스 체크를 대행해 주는 궁극의 생존 증명 시스템이다.
1.0.1 [Runbook] 생명 유지 장치 및 사망 선고 전술
1. [클라이언트 측] 생명 토큰(Liveliness) 선언
import zenoh
import time
with zenoh.open() as session:
print("센서 #1 가동")
# "망(Network)아, 내가 살아있는 동안 이 경로(robot/1/alive)를 활성 상태로 띄워놔!"
# 파이썬 워커가 비정상 종료(Segfault, 킬 당함)되거나 랜선이 뽑히면
# 라우터가 즉각 참수(Drop) 판정을 내린다.
token = session.declare_liveliness("robot/1/alive")
# 밥먹듯 평범한 작업 수행
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
# 정상 종료 시 토큰은 스크립트 종결과 함께 회수된다.
pass
2. [관제탑 측] 데스 노트(Death Note) 리스너 구축
import zenoh
import time
def on_liveliness_event(sample: zenoh.Sample):
# Liveliness 이벤트는 "상태(kind)"를 반환한다.
kind = sample.kind
node_name = sample.key_expr
if kind == zenoh.SampleKind.PUT:
# 누군가 토큰을 막 발행했다! (새 생명의 탄생)
print(f"[탄생] {node_name} 이망에 접속했습니다.")
elif kind == zenoh.SampleKind.DELETE:
# 누군가 죽었다! (연결 단절, 프로세스 폭발 등)
print(f">> [경보! 경보!] {node_name} 노드가 사망했습니다! 긴급 페일오버 가동!")
with zenoh.open() as session:
# "robot/*" 영역에서 발생하는 생명(Liveliness) 상태 변화를 전부 모니터링한다.
# 일반 데이터 채널과는 완전히 독립적인 '시스템 메타데이터 채널'을 타게 된다.
sub = session.declare_liveliness_subscriber("robot/*/alive", on_liveliness_event)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
Docker Swarm 이나 Kubernetes 환경 파드(Pod) 모니터링에 이 Liveliness 시스템을 결합하면, 파이썬 기반으로 완벽한 분산형 오토 스케일링/오토 리스타트 사령탑을 구축할 수 있다.
2. Shared Memory(SHM) 활용을 통한 대용량 데이터 고속 전송
자율주행 엣지 컴퓨터 안에는 C++(LiDAR 센싱), Python(AI 추론), Rust(제어) 프로세스가 섞여 돈다.
만약 C++이 10MB짜리 3D 포인트 클라우드를 파이썬에게 줄 때 TCP 소켓을 통과하면 어떻게 될까? OS 커널 메모리, 커널 버퍼, 파이썬 소켓 버퍼를 복사하며 수십 밀리초(ms)가 낭비된다.
Zenoh 는 동일 머신 내(Local IPC) 통신일 경우 바이트 배열의 전송로를 TCP 망에서 “운영체제 공유 메모리(Shared Memory)“로 자동(Bypass) 우회시켜버리는 악마적인 최적화를 지원한다. (단, Linux/Unix 계열 운영체제 전용 전술이다)
2.0.1 [Runbook] 제로 카피(Zero-copy) 로컬 고속도로 전개
이걸 쓰려면 Zenoh Config 객체에 마취 주사를 놔야 한다.
1. 공유 메모리(SHM) 활성화 설정
import zenoh
conf = zenoh.Config()
## [핵심] 리눅스의 /dev/shm 공간에 100MB 짜리 공유 거울을 만든다.
## 이 설정을 켜고 실행하는 순간, 파이썬 앱은 TCP 루프백(127.0.0.1) 연결을 버리고
## C/Rust 프로세스와 직접 메모리 번지(Pointer)를 주고받는다!
conf.insert_json5("transport/shared_memory/enabled", "true")
session = zenoh.open(conf)
2. SHM 전송 송/수신 (투명성)
코드 레벨에서는 당신이 기존에 짜둔 Pub/Sub 코드를 1바이트도 바꿀 필요가 없다.
Zenoh가 알아서 “아, 저 구독자가 나와 같은 리눅스 OS 안에 물리적으로 존재하는 놈이군? 그럼 TCP 소켓에 페이로드를 태우지 말고 공유 메모리 주소(ID)만 8바이트로 던질게!” 라고 행동한다.
## 기존 코드와 100% 동일. 하지만 내부 속도는 수백 배(약 14GB/s) 차이가 난다.
pub = session.declare_publisher("video/raw_frame")
pub.put(huge_numpy_array.tobytes())
만약 Docker 컨테이너 여러 개를 띄웠다면, 컨테이너 생성 시 --ipc=host 옵션을 반드시 주어야만 이 파이썬 스크립트들이 서로의 /dev/shm 영역을 쳐다보며 진정한 제로 카피 연산을 치를 수 있다.
3. 네트워크 대역폭 최적화를 위한 배치(Batching) 처리
초당 10바이트짜리 센서 1,000개가 파이썬 put() 을 초당 10번 호출하면, 데이터 크기는 총 100KB/s 불과하지만 TCP 패킷 헤더와 암호화 오버헤드, 그리고 커널 Syscall 때문에 OS 전체가 심정지를 일으킨다.
이때 필요한 것은 “작은 소포 1,000개를 개별 택배로 보내지 말고, 컨테이너 화물 1개에 담아서 한 방에 보내는(Batching)” 전술이다.
3.0.1 [Runbook] TCP 프레이밍 강제 유예 전술
Zenoh는 이 Batching 처리를 파이썬 개발자가 직접 배열을 묶어가며 짤 필요 없이, 라이브러리 레벨의 옵션 조작만으로 완벽하게 지원한다. 아무 옵션을 주지 않은 Zenoh의 기본값 역시 백그라운드 큐를 이용한 Auto-batching을 어느 정도 하지만, 극한 상황에서는 명시적 컨트롤이 필수다.
import zenoh
import time
## 튜닝용 뼈대 설정
conf = zenoh.Config()
## [핵심] TCP 통신의 Nagle 알고리즘을 강제 재활성화하거나 대기 시간을 준다.
## 파이썬 워커가 너무 빨리(1ms 단위) 슛()을 때려도 즉시 네트워크 카드로 내보내지 말고,
## 버퍼가 어느 정도 찰 때까지 최대 10ms 까지 묶어 뒀다(Batch) 쏘라는 지시.
conf.insert_json5("transport/tcp/nodelay", "false") # 묶어쏘기 모드 활성화
session = zenoh.open(conf)
pub = session.declare_publisher("sensor/fast_loop")
print("배치 발사 모드 가동. (10,000 연사)")
start = time.time()
for i in range(10000):
# 아무리 미친 듯이 파이썬 루프를 돌아도,
# 내부 Rust 엔진이 이 패킷들을 차곡차곡 모아 커다란 한 덩어리로 묶어 버린다.
pub.put(b"L")
print(f"소요 시간: {time.time() - start:.3f} 초")
만약 이 조각난 데이터를 쏘는데 LAN 스위치가 터질 것 같다면(PPS: Packets Per Second 한계점 도달), 위와 같이 Batching을 활성화하는 것만으로 단말(Node)의 CPU I/O 인터럽트를 90% 이상 깎아내 버리는 기염을 토해낸다.
4. 사용자 정의 데이터 인코딩(Custom Encoding) 명세 및 구현
Zenoh의 Sample.payload 는 본질적으로 멍청한 바이트(Byte) 덩어리일 뿐이다. 따라서 파이썬 스크립트가 수신한 데이터가 JSON인지, C++ 객체 메모리 덤프인지, 암호화된 ZIP 파일인지 수신자는 미리 파악하기 어렵다.
HTTP 통신에서 이 문제를 풀기 위해 Content-Type: application/json 이라는 헤더 구조를 만들었듯, Zenoh에는 Encoding 파라미터 영역이 네이티브로 준비되어 있다.
4.0.1 [Runbook] 데이터 타입-세이프(Type-Safe) 파이프라인
1. [송신부] 커스텀 인코딩 마킹 전술
단순한 텍스트가 아니라, “이 패킷은 zlib 으로 압축된 나의 커스텀 프로토콜 데이터다!” 라며 봉인(Seal)을 붙인다.
import zenoh
import json
import zlib
session = zenoh.open()
pub = session.declare_publisher("server/metrics")
## 압축할 거대한 더미 딕셔너리
data = {"status": "ok", "heavy_logs": "A" * 10000}
compressed_bytes = zlib.compress(json.dumps(data).encode('utf-8'))
## [핵심] 그냥 쏘지 않고, 인코딩 아이디를 부여한다.
## AppCustom 플래그를 쓰고, 설명란(suffix)에 "zlib/json" 이라는 내장 규격을 합의한다.
enc = zenoh.Encoding(zenoh.KnownEncoding.APP_CUSTOM, "zlib/json")
pub.put(compressed_bytes, encoding=enc)
print("커스텀 인코딩 송출 완료.")
2. [수신부] 인코딩 선별 및 스위칭 전술
수신부(파이썬 콜백)에서는 바이트 1개를 열어보기 전에 겉면의 인코딩 마크를 보고 어떤 디코더(Decoder) 함수를 돌릴지 결정한다.
def smart_listener(sample: zenoh.Sample):
# 인코딩이 명시되어 있는지 확인
enc = sample.encoding
if enc is not None:
# 내가 약속한 커스텀 타입(APP_CUSTOM) 이며, Suffix 가 "zlib/json" 인가?
if enc.prefix == zenoh.KnownEncoding.APP_CUSTOM and enc.suffix == "zlib/json":
try:
# 안전하게 압축을 풀고 JSON을 푼다.
decompressed = zlib.decompress(sample.payload)
parsed = json.loads(decompressed.decode('utf-8'))
print("[성공] 나만의 100% 호환 데이터 획득!")
except Exception:
print("[에러] 인코딩 서명은 맞는데 압축이 깨졌음!")
else:
print(f"내가 해독할 수 없는 데이터 포맷 (타입: {enc.prefix}) 이므로 버림(Drop)")
else:
print("경고: 인코딩 명세(Content-Type)가 없는 원시 데이터가 날아옴.")
sub = session.declare_subscriber("server/metrics", smart_listener)
이 패턴은 현업에서 수십 개 팀의 데이터 엔지니어들이 단일 Zenoh 라우터 망을 공유할 때, 엉뚱한 팀의 패킷을 받아 파이썬 역직렬화 도중 “UnicodeDecodeError” 나 “Dump Crash” 가 발생하는 대재앙을 원천적으로 틀어막는 가장 견고한 분산 방어벽이다.