7.9 타 프레임워크 및 데이터 과학 라이브러리 통합
Python 생태계의 패왕은 NumPy, PyTorch 같은 C++ 기반의 수치 해석 라이브러리와 FastAPI 같은 비동기 웹 프레임워크다. Zenoh가 제아무리 우주에서 가장 빠른 프로토콜이라 한들, 저 거대한 코끼리들과 결합하지 못한다면 한낱 콘솔 스크립트 장난감에 불과하다.
이 챕터에서는 로봇의 카메라에서 뽑혀 나온 4차원 텐서(Tensor)가 아무런 메모리 병목 없이 PyTorch 추론 엔진으로 직행하는 법, 그리고 그 추론 결과가 FastAPI의 웹소켓(WebSocket)을 거쳐 클라우드 대시보드에 전시(Display)되기까지, Python의 양대 생태계(데이터 사이언스 & 웹 백엔드)와 Zenoh를 결합하는 마이크로아키텍처 인테그레이션(Integration) 기법을 다룬다.
1. NumPy 다차원 배열 및 Pandas 데이터프레임 전송 기법
CCTV나 라이다(LiDAR)에서 뽑아낸 수백 메가바이트짜리 포인트 클라우드를 NumPy 배열로 들고 있다고 가정하자. 이걸 파이썬 기본 list로 풀어서 JSON으로 쏘는 행위는 트래픽 폭파 행위다. NumPy가 갖고 있는 C언어 연속 메모리(Contiguous Memory View) 형태를 1바이트의 훼손 없이 그대로 마이크로 네트워크 버스에 밀어 넣어야 한다.
1.0.1 [Runbook] NumPy 영거리 사격 전술
1. [송신] Shape와 Byte 분리 전송
NumPy 행렬은 단순히 바이트가 아니라 “차원” 정보(예: 1920x1080x3)를 갖고 있다.
import zenoh
import numpy as np
import json
session = zenoh.open()
## 가상의 거대한 이미지 데이터 (1080p RGB)
image_array = np.random.randint(0, 255, (1080, 1920, 3), dtype=np.uint8)
## 1. 뼈대(Shape, Dtype) 정보는 직렬화가 쉬운 JSON 포스트잇(Attachment)에 적는다.
meta_info = {
"shape": image_array.shape,
"dtype": str(image_array.dtype)
}
opts = zenoh.PutOptions()
opts.attachment = json.dumps(meta_info).encode('utf-8')
## 2. 본문(Payload)은 NumPy의 메모리 뷰(.tobytes())를 생으로 꽂는다. (속도 극대화)
session.put("drone/1/camera_feed", image_array.tobytes(), options=opts)
2. [수신] Shape 복원 및 C-언어 레벨 리팩토링
def on_image_receive(sample: zenoh.Sample):
# 1. 뼈대 정보 해독
meta = json.loads(sample.attachment.decode('utf-8'))
# 2. 버퍼의 바이트들을 NumPy 그물망에 던져 넣어 C 배열을 즉각 복원한다.
# np.frombuffer 는 값을 복사하지 안고(No-copy) 참조 뷰만 만든다! 최강의 속도!
restored_array = np.frombuffer(sample.payload, dtype=meta['dtype'])
# 3. 1차원 뱀처럼 늘어진 배열을 1920x1080x3 박스 형태로 도로 접어버린다.
restored_array = restored_array.reshape(meta['shape'])
print(f"[수신 성공] Shape: {restored_array.shape}")
sub = session.declare_subscriber("drone/*/camera_feed", on_image_receive)
만약 파이낸셜 공학에서 Pandas DataFrame을 쓴다면, 내장된 df.to_parquet() 메서드를 호출하여 바이너리 압축을 한 뒤 .put() 으로 쏘는 것이 국룰(Best Practice)이다.
2. 머신러닝 파이프라인 통합 (PyTorch, TensorFlow 텐서 교환)
에지 컴퓨팅 생태계에서는 수십 대의 카메라 모듈이 VRAM 24GB짜리 한 대의 GPU 워커로 이미지 Tensor를 끊임없이 공급한다. 이 과정에서 파이썬 CPU 스레드 병목을 일으키면 초당 프레임(FPS) 처리가 절반 이하로 토막 난다.
2.0.1 [Runbook] Tensor 직거래 무역 전술
PyTorch 텐서는 NumPy와 비슷해 보이지만, CPU 램이 아닌 GPU(CUDA) 램에 상주할 수 있다. 네트워크로 쏘기 위해서는 텐서를 잠시 CPU 램(NumPy 형태)으로 끌어내리는 과정이 필수적이다.
PyTorch -> Zenoh 파이프라인 뼈대
import zenoh
import torch
import io
session = zenoh.open()
## GPU 머신러닝 결과물 행렬 (예: 100마리 사람 인식을 해낸 Bounding Box 좌표 텐서)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
bounding_boxes = torch.tensor([[10, 20, 50, 50], [90, 80, 200, 200]], device=device)
## [전술 1] GPU -> CPU 메모리로 하역 작업
boxes_cpu = bounding_boxes.cpu()
## [전술 2] 직렬화 (PyTorch 고유의 save() 사용)
## 파일이 아니라 메모리 I/O 버퍼에 직렬화 바이트를 구워버린다.
buffer = io.BytesIO()
torch.save(boxes_cpu, buffer)
## 바이트 덩어리 전송
session.put("ai/worker/1/results", buffer.getvalue())
수신(Sub)하는 측 시스템에서는 Python 콜백 함수 내에서 torch.load(io.BytesIO(sample.payload)) 한 방으로 텐서 객체가 순식간에 원래 모습으로 부활한다.
이 파이프라인 아키텍처는 거대한 GPT 언어 모델을 여러 대의 머신 서버가 번갈아 가며 계산하는 이른바 “파이프라인 병렬화(Pipeline Parallelism)” 구축 시, 노드 간 통신 미들웨어로 gRPC 대신 Zenoh를 끼워 넣기 위한 핵심 기술이다.
3. 웹 프레임워크 연동 (FastAPI, Flask를 이용한 Zenoh REST 게이트웨이 구축)
어떤 훌륭한 C++ 로봇 시스템을 짜더라도, 일반 사용자는 구글 크롬 브라우저를 통해 예쁜 대시보드로 데이터를 보고 싶어 한다. 브라우저는 Zenoh 프로토콜을 모른다. 따라서 중간에 이 언어를 통역해 줄 API 게이트웨이(Gateway) 가 필요하다.
여기서 비동기의 왕, FastAPI가 등판한다.
3.0.1 [Runbook] 비동기 WebSocket - Zenoh 융합 전술
FastAPI의 Uvicorn 이벤트 루프 안에서 Zenoh aopen() 을 띄워, 양방향 펌프 구조를 완성한다.
import asyncio
from fastapi import FastAPI, WebSocket
import zenoh
app = FastAPI()
z_session = None
## 서버 부팅 시 단 한 번 Zenoh 엔진 시동
@app.on_event("startup")
async def startup_event():
global z_session
# [핵심] uvicorn 서버 루프(asyncio)의 영토에 Zenoh 백그라운드를 물망초처럼 심는다.
z_session = await zenoh.aopen()
print("FastAPI <-> Zenoh 플러그인 결속 완료")
@app.on_event("shutdown")
async def shutdown_event():
await z_session.aclose()
## REST API 방식 쿼리 조회
@app.get("/api/robot/{robot_id}/temp")
async def get_robot_temp(robot_id: int):
# 클라이언트의 HTTP 요청을 받았을 때 Zenoh 분산망으로 get() 을 갈긴다.
replies = z_session.aget(f"factory/robot/{robot_id}/temp", timeout=1.0)
data = []
async for reply in replies:
if reply.is_ok:
data.append(reply.ok.payload.decode('utf-8'))
return {"status": "success", "data": data}
## [최종전술] 클라이언트 브라우저로 쏘아주는 초고속 WebSocket
@app.websocket("/ws/livestream")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# 큐를 하나 뚫어서 Zenoh 콜백과 FastAPI 루프를 분리
queue = asyncio.Queue()
async def z_callback(sample: zenoh.Sample):
# 파이썬 워커의 방해 없이, Zenoh 메시지가 도착하는 즉시 큐로 들어간다.
await queue.put(sample.payload.decode('utf-8'))
# 구독 가동
sub = z_session.declare_subscriber("factory/global/alerts", z_callback)
try:
while True:
# 큐에서 뽑아낸 패킷을 그대로 웹소켓 너머 브라우저로 쏴버린다
msg = await queue.get()
await websocket.send_text(msg)
except Exception:
pass
finally:
sub.undeclare()
이 코드는 실현 불가능해 보였던 ‘로봇 CAN 제어기(C++) -> Zenoh 미들웨어 -> 클라우드 파이썬(FastAPI) -> React 웹 브라우저’ 에 이르는 3,000마일 횡단 데이터 파이프라인을 지연(Jitter) 단 10ms 이내로 구축해 내는 마스터 아키텍처다.
4. IoT 디바이스 제어를 위한 하드웨어 제어 라이브러리(RPi.GPIO 등) 연동
지금까지 당신은 마이크로서비스라는 가상의 구름 위에 떠 있었다. 이제부터는 진짜 땅바닥, 즉 센서에 전압을 주고 모터를 굴리는 라즈베리 파이(Raspberry Pi)의 전기 배선(GPIO) 구역으로 내려간다.
Zenoh 는 무거운 DDS나 ROS2를 설치할 수 없는 작은 IoT 엣지(Edge) 보드에서 진정한 지배력을 발휘한다.
4.0.1 [Runbook] 실물 구동계 액체 융합 작전 (GPIO Binding)
원격 클라우드의 명령 하나가 어떻게 물리적 스위치(릴레이)를 ‘딸깍’ 하고 여닫는지, 가장 거친 펌웨어의 맛을 모사한다.
디바이스 제어 액추에이터(Actuator) 워커 구축
import zenoh
import time
import json
## 가상의 라즈베리 파이 GPIO 라이브러리 호출
try:
import RPi.GPIO as GPIO
HARDWARE_AVAILABLE = True
except ImportError:
print("[경고] x86 터미널로 감지됨! 모터 시뮬레이션 모드로 빙의합니다.")
HARDWARE_AVAILABLE = False
MOTOR_PIN = 18
if HARDWARE_AVAILABLE:
GPIO.setmode(GPIO.BCM)
GPIO.setup(MOTOR_PIN, GPIO.OUT)
## [명령 수신부] 클라우드(Control Tower)에서 쏜 명령 처리
def on_motor_command(sample: zenoh.Sample):
cmd = sample.payload.decode('utf-8')
print(f"[액추에이터] 명령 하달 받음: {cmd}")
if cmd == "ENGINE_ON":
if HARDWARE_AVAILABLE:
GPIO.output(MOTOR_PIN, GPIO.HIGH)
print(">> 모터 부팅 (물리계 개방)")
elif cmd == "ENGINE_OFF":
if HARDWARE_AVAILABLE:
GPIO.output(MOTOR_PIN, GPIO.LOW)
print(">> 모터 차단")
def start_edge_node():
with zenoh.open() as session:
# 1. 원격 제어용 안테나 전개
sub = session.declare_subscriber("drone/1/motor/cmd", on_motor_command)
# 2. 로컬 센서(온도 등) 계측 루프
pub = session.declare_publisher("drone/1/motor/status")
try:
while True:
# 3. 엣지의 건강 상태를 능동적으로 상부에 보고(Telemetry)
status = json.dumps({"rpm": 3000, "temp_c": 55.4})
pub.put(status)
time.sleep(1)
except KeyboardInterrupt:
if HARDWARE_AVAILABLE:
GPIO.cleanup() # 안전을 위한 릴레이 강제 해제
if __name__ == "__main__":
start_edge_node()
이 하드웨어 인터럽팅(Hardware Interrupting) 구성에 7.5.5장의 Lock-free Queue 를 결합하면, 그 어떤 극심한 트래픽 스파이크가 몰려와도 모터의 제어 속도가 요동치지 않는 극소형의 하드 리얼타임(Hard Real-time) 인버터 제어기를 설계할 수 있게 된다. “가장 가벼운 미들웨어“인 Zenoh가 가진 폭력적인 배포 유연성이다.