7.12 실전 프로젝트: Python 기반 지능형 에지(Edge) 노드 구축
“1대의 데스크탑에서 AI를 돌리는 것과, 100대의 로봇 보드에서 AI를 돌려 클라우드로 통합하는 것은 완전히 다른 차원의 엔지니어링이다.”
이 장은 7장에서 배운 Python Zenoh 생태계 결합 기법(Numpy 변환, Multiprocessing, Asyncio 융합)을 통째로 쏟아부어, ‘자율 감시 드론(Drone)과 중앙 관제 서버(Command Center)’ 라는 완전한 IoT-AI 융합 아키텍처를 맨바닥부터 쌓아 올리는 야전 프로젝트다.
단순히 Hello World 를 넘어서, 비전 스레드와 AI 스레드를 어떻게 격리하고 Zenoh 라우터망에 그 결과를 어떻게 태워 보내는지, 그리고 단 한 대의 통제 센터 마스터 노드가 수십 대의 드론 워커들을 어떻게 get() 과 와일드카드로 조종하는지 그 완벽한 코드를 제공한다.
1. 요구사항 분석 및 시스템 아키텍처 설계
프로젝트를 시작하기 전, 스파게티 코드를 막기 위해 파이프라인을 3단계로 명확히 단절시키는 구조 설계(Topology Design)를 긋고 시작한다.
1.0.1 [Runbook] 지능형 드론 방범망 (UAV Security Cluster) 아키텍처
1. [엣지 단말 - Drone Node] (N대 존재)
- 스레드 A (카메라 펌프): OS 의
/dev/video0를 점유하고, 초당 30프레임으로 이미지를 빨아들인 뒤 로컬 큐에 집어넣는다. (OpenCV 활용) - 스레드 B (AI 인퍼런스): 큐에서 이미지를 꺼내 YOLO 모델에 넣고, “사람 식별 여부” 및 “바운딩 박스” 딕셔너리로 축소 시킨다.
- 스레드 C (Zenoh 통신망): 축소된 데이터(
{"bbox": [...], "alert": true})를drone/1/ai_result경로로 끊임없이 쏘아댄다 (Publisher). 만약 중앙에서 제어 명령이 떨어지면 반응한다 (Queryable).
2. [클라우드 백본 - Zenoh Router]
- 모든 드론은 IP를 하드코딩하지 않고 멀티캐스트로 주변의
zenohd(라우터) 를 수색하여 접속한다. 이를 통해 드론이 WiFi 영역과 5G 영역을 오가더라도 데이터를 중앙 클라우드까지 끊기지 않고 릴레이 해준다.
3. [관제 센터 - Master Node] (1대)
- 파이썬 비동기(
asyncio) 기반으로 작동하며,drone/*/ai_result를 와일드카드로 몽땅 수신(Subscribe)한다. - 긴급 상황 시 특정 드론을 지정하여
session.get("drone/1/motor/stop")으로 멈춤 쿼리를 꽂아버려 제어 권한을 획득한다.
이 파이프라인의 최고 장점은 엣지에서 무거운 비디오 스트림 120MB/s 를 무식하게 전송하지 않고, 엣지의 GPU(또는 NPU) 를 태워 나온 수 KB 짜리 AI 메타 데이터만 Zenoh로 빠르고 가볍게 공유한다는 점이다. 이것이 진정한 지능형(Intelligent) 엣지 아키텍처다.
2. 영상 및 센서 데이터 수집 파이프라인 구현
아키텍처의 맨 앞단, 가장 뜨겁고 더러운 물리 계층과 맞닿는 곳이다.
카메라 프레임 그랩(Grab) 로직이 Zenoh나 AI 로직과 같은 스레드에서 돌면 초당 5프레임도 버텨내지 못한다.
2.0.1 [Runbook] 독립 카메라 워커와 파이프 생성
우선 드론 스크립트(edge_drone.py) 안에 데이터를 퍼다 나를 컨베이어 벨트를 깐다.
import cv2
import queue
import threading
import time
## [전술] 카메라 버퍼 큐 (가장 최신의 5프레임만 유지하고 낡은 건 버린다)
g_frame_queue = queue.Queue(maxsize=5)
def camera_capture_worker():
"""오직 USB 카메라만 노려보는 전담 노동자"""
cap = cv2.VideoCapture(0)
# 카메라 제원 타협 (성능을 위해 640x480 강제)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
print("[DRONE] 센서 눈(Camera) 이식 완료. 펌핑 개시.")
while True:
ret, frame = cap.read()
if not ret:
print("카메라 센서 단절!")
time.sleep(1)
continue
try:
# 큐가 꽉 차면 제일 늙은 프레임을 가차없이 뽑아 버림
if g_frame_queue.full():
g_frame_queue.get_nowait()
# 신선한 프레임 투척
g_frame_queue.put_nowait(frame)
except queue.Empty:
pass
except queue.Full:
pass
def init_sensors():
# 카메라 스레드를 메인 스크립트 백그라운드로 떼어낸다 (데몬 스레드화)
cam_thread = threading.Thread(target=camera_capture_worker, daemon=True)
cam_thread.start()
return cam_thread
if __name__ == "__main__":
init_sensors()
# 이 밑에서 AI 모델이 루프를 돌며 큐를 빼먹기만 하면 된다.
while True: time.sleep(1)
이 카메라 워커는 Zenoh가 무엇인지 모른다. 시스템이 연결이 끊겨도 이 녀석은 맹목적으로 최신 5장의 프레임을 메모리에 올려놓는다. 철저한 “관심사의 분리(Separation of Concerns)“가 시스템 파괴를 막는다.
3. AI 객체 인식 모델 추론 및 결과 데이터 퍼블리싱
이제 g_frame_queue 에서 영상을 빼내 딥러닝 망(PyTorch/YOLO)에 집어넣어 “의미 있는 메타데이터“로 만든 후 Zenoh 통신망으로 쏜다.
3.0.1 [Runbook] 지능형 필터 및 쾌속 송출 스크립트
import zenoh
import json
import time
## (7.12.2 장의 g_frame_queue 임포트 가정)
def pseudo_yolo_inference(frame):
"""가상의 YOLO AI 추론 함수 (실제론 초당 N회 연산)"""
# ... 복잡한 텐서 연산 ...
# 사람이 잡혔다고 가정하고 더미 바운딩 박스를 뱉는다.
return {"person_detected": True, "bbox": [100, 150, 300, 400], "confidence": 0.98}
def ai_and_network_worker(drone_id: int):
# 1. AI 결과를 라우터로 쏠 총구 개통
conf = zenoh.Config()
conf.insert_json5("mode", '"client"') # 엣지이므로 클라이언트 모드 강제
with zenoh.open(conf) as session:
topic = f"drone/{drone_id}/ai_result"
pub = session.declare_publisher(topic)
print(f"[DRONE-{drone_id}] 무전 통신망 연결 완료. 출력 개시.")
while True:
# 2. 큐에서 갓 구운 프레임을 하나 가져옴 (없으면 기다림)
frame = g_frame_queue.get(block=True)
# 3. CPU를 혹사 시키는 AI 연산
# GIL이 잡히지만, Zenoh는 백그라운드 스레드에서 돌아가므로 통신 회선은 무사하다.
ai_data = pseudo_yolo_inference(frame)
# 4. 사람이 발견되었을 때만! (지능형 필터링) 트래픽 전송
if ai_data["person_detected"]:
ai_data["drone_id"] = drone_id
ai_data["timestamp"] = time.time()
# 딕셔너리를 가벼운 JSON 바이트로 압축하여 발사
payload = json.dumps(ai_data).encode('utf-8')
pub.put(payload)
print(f"[AI] 타겟 탐지 송출: {topic}")
if __name__ == "__main__":
# 카메라 워커 스레드 부팅 (생략)
# init_sensors()
# 1번 드론으로 기동 시작
ai_and_network_worker(drone_id=1)
이로써 드론 단말(Edge)의 구축은 끝났다.
이 단말은 하루 종일 하늘을 날더라도 네트워크로 데이터를 보내지 않다가, “렌즈에 사람이 걸리는 순간에만” Zenoh 망으로 약 80 bytes 크기의 초경량 JSON 패킷을 관제탑으로 날려 보낸다. 이것이 딥러닝과 분산 라우팅이 결합된 진정한 전술적 엣지 컴퓨팅이다.
4. 마스터 노드에서의 데이터 취합 및 제어 명령(Query) 전송
이제 관제탑(서버)의 모니터를 켤 차례다.
서버 스크립트(master_hq.py)는 전 세계 수천 대의 지능형 드론들이 보고하는(Publishing) 침입자 정보를 와일드카드로 낚아채 모니터에 뿌린다. 나아가 위험한 드론에게 직접 “사살(또는 모터 정지)” 옵션을 Query-Reply 전술로 집행한다.
4.0.1 [Runbook] 비동기 사령탑 지휘소 (Event-Driven Dashboard)
가장 우아한 파이썬 asyncio 활용 아키텍처다.
import asyncio
import zenoh
import json
class DroneCommandCenter:
def __init__(self, session):
self.session = session
async def listen_for_intrusions(self):
"""[수비] 모든 드론의 AI 탐지 리포트를 훔쳐듣는 비동기 리스너"""
# 스레드 충돌을 막기 위한 비동기 큐
alert_queue = asyncio.Queue()
def sub_callback(sample: zenoh.Sample):
# C 코어가 찔러주는 데이터를 파이썬 세계의 비동기 안전지대로 피신시킴
alert_queue.put_nowait(sample)
# "전 세계의 모든 드론(**)" 을 감시한다
print("[HQ] AI 탐지기 글로벌 수색 작전 대기 중...")
sub = self.session.declare_subscriber("drone/**/ai_result", sub_callback)
while True:
# 대기하다가 누군가 사람을 발견하면
sample = await alert_queue.get()
topic = str(sample.key_expr)
data = json.loads(sample.payload.decode('utf-8'))
drone_id = data["drone_id"]
conf = data["confidence"]
print(f"[HQ 긴급 경보] {topic} !! 신원 미상자 (확률 {conf*100}%) 발견됨.")
# 위험도가 너무 높으면, 해당 드론에게만 모터 일시 정지 지시를 날림
if conf > 0.95:
await self.send_halt_command(drone_id)
async def send_halt_command(self, drone_id: int):
"""[공격] 특정 드론의 Queryable 노드로 멈춤 명령(Action)을 하달"""
target_uri = f"drone/{drone_id}/motor/cmd"
print(f" └─> [HQ 작전] {drone_id}번 기체 강제 제어 명령 송출: {target_uri} ...")
# 쿼리를 쏘고(Blocking 없이) 그 대답(Reply)을 기다린다!
# 여기서 aget() 을 사용해, 제어 명령 파라미터를 보냄
opts = zenoh.QueryOptions(parameters="action=HALT")
replies = self.session.aget(target_uri, timeout=3.0, options=opts)
# 드론 내부의 Queryable 콜백이 살아서 대답을 보내왔는가?
success = False
async for reply in replies:
if reply.is_ok:
res = reply.ok.payload.decode('utf-8')
print(f" └─> [드론 응답] 작전 성공: {res}")
success = True
if not success:
print(f" └─> [작전 실패] {drone_id}번 기체가 응답을 끊었거나 격추당함!")
async def main():
print("=== 관제 사령부 부팅 중 ===")
async with zenoh.aopen(zenoh.Config()) as session:
hq = DroneCommandCenter(session)
# 멈추지 않는 글로벌 감시망 기동
await hq.listen_for_intrusions()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("사령부 전원 분리")
드론 단말 쪽에서는 저 drone/1/motor/cmd 에 화답하기 위해 단 10줄짜리 session.declare_queryable() 만 띄워두면 된다.
이 최종 프로젝트 코드를 통해 당신은 복잡한 데이터 사이언스 생태계(OpenCV, PyTorch)와 초고속 마이크로서비스 엔진(Zenoh 쿼리/PubSub), 그리고 현대적인 파이썬 백엔드 기술상(asyncio)을 단일하고 완벽하게 통합해 냈다. 이것이 현대 Python 분산 아키텍트의 정수다.