7.12 실전 프로젝트: Python 기반 지능형 에지(Edge) 노드 구축

7.12 실전 프로젝트: Python 기반 지능형 에지(Edge) 노드 구축

“1대의 데스크탑에서 AI를 돌리는 것과, 100대의 로봇 보드에서 AI를 돌려 클라우드로 통합하는 것은 완전히 다른 차원의 엔지니어링이다.”

이 장은 7장에서 배운 Python Zenoh 생태계 결합 기법(Numpy 변환, Multiprocessing, Asyncio 융합)을 통째로 쏟아부어, ‘자율 감시 드론(Drone)과 중앙 관제 서버(Command Center)’ 라는 완전한 IoT-AI 융합 아키텍처를 맨바닥부터 쌓아 올리는 야전 프로젝트다.

단순히 Hello World 를 넘어서, 비전 스레드와 AI 스레드를 어떻게 격리하고 Zenoh 라우터망에 그 결과를 어떻게 태워 보내는지, 그리고 단 한 대의 통제 센터 마스터 노드가 수십 대의 드론 워커들을 어떻게 get() 과 와일드카드로 조종하는지 그 완벽한 코드를 제공한다.

1. 요구사항 분석 및 시스템 아키텍처 설계

프로젝트를 시작하기 전, 스파게티 코드를 막기 위해 파이프라인을 3단계로 명확히 단절시키는 구조 설계(Topology Design)를 긋고 시작한다.

1.0.1 [Runbook] 지능형 드론 방범망 (UAV Security Cluster) 아키텍처

1. [엣지 단말 - Drone Node] (N대 존재)

  • 스레드 A (카메라 펌프): OS 의 /dev/video0 를 점유하고, 초당 30프레임으로 이미지를 빨아들인 뒤 로컬 큐에 집어넣는다. (OpenCV 활용)
  • 스레드 B (AI 인퍼런스): 큐에서 이미지를 꺼내 YOLO 모델에 넣고, “사람 식별 여부” 및 “바운딩 박스” 딕셔너리로 축소 시킨다.
  • 스레드 C (Zenoh 통신망): 축소된 데이터({"bbox": [...], "alert": true})를 drone/1/ai_result 경로로 끊임없이 쏘아댄다 (Publisher). 만약 중앙에서 제어 명령이 떨어지면 반응한다 (Queryable).

2. [클라우드 백본 - Zenoh Router]

  • 모든 드론은 IP를 하드코딩하지 않고 멀티캐스트로 주변의 zenohd (라우터) 를 수색하여 접속한다. 이를 통해 드론이 WiFi 영역과 5G 영역을 오가더라도 데이터를 중앙 클라우드까지 끊기지 않고 릴레이 해준다.

3. [관제 센터 - Master Node] (1대)

  • 파이썬 비동기(asyncio) 기반으로 작동하며, drone/*/ai_result 를 와일드카드로 몽땅 수신(Subscribe)한다.
  • 긴급 상황 시 특정 드론을 지정하여 session.get("drone/1/motor/stop") 으로 멈춤 쿼리를 꽂아버려 제어 권한을 획득한다.

이 파이프라인의 최고 장점은 엣지에서 무거운 비디오 스트림 120MB/s 를 무식하게 전송하지 않고, 엣지의 GPU(또는 NPU) 를 태워 나온 수 KB 짜리 AI 메타 데이터만 Zenoh로 빠르고 가볍게 공유한다는 점이다. 이것이 진정한 지능형(Intelligent) 엣지 아키텍처다.

2. 영상 및 센서 데이터 수집 파이프라인 구현

아키텍처의 맨 앞단, 가장 뜨겁고 더러운 물리 계층과 맞닿는 곳이다.
카메라 프레임 그랩(Grab) 로직이 Zenoh나 AI 로직과 같은 스레드에서 돌면 초당 5프레임도 버텨내지 못한다.

2.0.1 [Runbook] 독립 카메라 워커와 파이프 생성

우선 드론 스크립트(edge_drone.py) 안에 데이터를 퍼다 나를 컨베이어 벨트를 깐다.

import cv2
import queue
import threading
import time

## [전술] 카메라 버퍼 큐 (가장 최신의 5프레임만 유지하고 낡은 건 버린다)
g_frame_queue = queue.Queue(maxsize=5)

def camera_capture_worker():
    """오직 USB 카메라만 노려보는 전담 노동자"""
    cap = cv2.VideoCapture(0)
    # 카메라 제원 타협 (성능을 위해 640x480 강제)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    
    print("[DRONE] 센서 눈(Camera) 이식 완료. 펌핑 개시.")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            print("카메라 센서 단절!")
            time.sleep(1)
            continue
            
        try:
            # 큐가 꽉 차면 제일 늙은 프레임을 가차없이 뽑아 버림
            if g_frame_queue.full():
                g_frame_queue.get_nowait()
            
            # 신선한 프레임 투척
            g_frame_queue.put_nowait(frame)
        except queue.Empty:
            pass
        except queue.Full:
            pass

def init_sensors():
    # 카메라 스레드를 메인 스크립트 백그라운드로 떼어낸다 (데몬 스레드화)
    cam_thread = threading.Thread(target=camera_capture_worker, daemon=True)
    cam_thread.start()
    return cam_thread

if __name__ == "__main__":
    init_sensors()
    # 이 밑에서 AI 모델이 루프를 돌며 큐를 빼먹기만 하면 된다.
    while True: time.sleep(1)

이 카메라 워커는 Zenoh가 무엇인지 모른다. 시스템이 연결이 끊겨도 이 녀석은 맹목적으로 최신 5장의 프레임을 메모리에 올려놓는다. 철저한 “관심사의 분리(Separation of Concerns)“가 시스템 파괴를 막는다.

3. AI 객체 인식 모델 추론 및 결과 데이터 퍼블리싱

이제 g_frame_queue 에서 영상을 빼내 딥러닝 망(PyTorch/YOLO)에 집어넣어 “의미 있는 메타데이터“로 만든 후 Zenoh 통신망으로 쏜다.

3.0.1 [Runbook] 지능형 필터 및 쾌속 송출 스크립트

import zenoh
import json
import time

## (7.12.2 장의 g_frame_queue 임포트 가정)

def pseudo_yolo_inference(frame):
    """가상의 YOLO AI 추론 함수 (실제론 초당 N회 연산)"""
    # ... 복잡한 텐서 연산 ...
    # 사람이 잡혔다고 가정하고 더미 바운딩 박스를 뱉는다.
    return {"person_detected": True, "bbox": [100, 150, 300, 400], "confidence": 0.98}

def ai_and_network_worker(drone_id: int):
    # 1. AI 결과를 라우터로 쏠 총구 개통
    conf = zenoh.Config()
    conf.insert_json5("mode", '"client"') # 엣지이므로 클라이언트 모드 강제
    
    with zenoh.open(conf) as session:
        topic = f"drone/{drone_id}/ai_result"
        pub = session.declare_publisher(topic)
        print(f"[DRONE-{drone_id}] 무전 통신망 연결 완료. 출력 개시.")
        
        while True:
            # 2. 큐에서 갓 구운 프레임을 하나 가져옴 (없으면 기다림)
            frame = g_frame_queue.get(block=True)
            
            # 3. CPU를 혹사 시키는 AI 연산
            # GIL이 잡히지만, Zenoh는 백그라운드 스레드에서 돌아가므로 통신 회선은 무사하다.
            ai_data = pseudo_yolo_inference(frame)
            
            # 4. 사람이 발견되었을 때만! (지능형 필터링) 트래픽 전송
            if ai_data["person_detected"]:
                ai_data["drone_id"] = drone_id
                ai_data["timestamp"] = time.time()
                
                # 딕셔너리를 가벼운 JSON 바이트로 압축하여 발사
                payload = json.dumps(ai_data).encode('utf-8')
                pub.put(payload)
                print(f"[AI] 타겟 탐지 송출: {topic}")

if __name__ == "__main__":
    # 카메라 워커 스레드 부팅 (생략)
    # init_sensors() 
    
    # 1번 드론으로 기동 시작
    ai_and_network_worker(drone_id=1)

이로써 드론 단말(Edge)의 구축은 끝났다.
이 단말은 하루 종일 하늘을 날더라도 네트워크로 데이터를 보내지 않다가, “렌즈에 사람이 걸리는 순간에만” Zenoh 망으로 약 80 bytes 크기의 초경량 JSON 패킷을 관제탑으로 날려 보낸다. 이것이 딥러닝과 분산 라우팅이 결합된 진정한 전술적 엣지 컴퓨팅이다.

4. 마스터 노드에서의 데이터 취합 및 제어 명령(Query) 전송

이제 관제탑(서버)의 모니터를 켤 차례다.
서버 스크립트(master_hq.py)는 전 세계 수천 대의 지능형 드론들이 보고하는(Publishing) 침입자 정보를 와일드카드로 낚아채 모니터에 뿌린다. 나아가 위험한 드론에게 직접 “사살(또는 모터 정지)” 옵션을 Query-Reply 전술로 집행한다.

4.0.1 [Runbook] 비동기 사령탑 지휘소 (Event-Driven Dashboard)

가장 우아한 파이썬 asyncio 활용 아키텍처다.

import asyncio
import zenoh
import json

class DroneCommandCenter:
    def __init__(self, session):
        self.session = session
        
    async def listen_for_intrusions(self):
        """[수비] 모든 드론의 AI 탐지 리포트를 훔쳐듣는 비동기 리스너"""
        # 스레드 충돌을 막기 위한 비동기 큐
        alert_queue = asyncio.Queue()
        
        def sub_callback(sample: zenoh.Sample):
            # C 코어가 찔러주는 데이터를 파이썬 세계의 비동기 안전지대로 피신시킴
            alert_queue.put_nowait(sample)

        # "전 세계의 모든 드론(**)" 을 감시한다
        print("[HQ] AI 탐지기 글로벌 수색 작전 대기 중...")
        sub = self.session.declare_subscriber("drone/**/ai_result", sub_callback)
        
        while True:
            # 대기하다가 누군가 사람을 발견하면
            sample = await alert_queue.get()
            topic = str(sample.key_expr)
            data = json.loads(sample.payload.decode('utf-8'))
            
            drone_id = data["drone_id"]
            conf = data["confidence"]
            print(f"[HQ 긴급 경보] {topic} !! 신원 미상자 (확률 {conf*100}%) 발견됨.")
            
            # 위험도가 너무 높으면, 해당 드론에게만 모터 일시 정지 지시를 날림
            if conf > 0.95:
                await self.send_halt_command(drone_id)

    async def send_halt_command(self, drone_id: int):
        """[공격] 특정 드론의 Queryable 노드로 멈춤 명령(Action)을 하달"""
        target_uri = f"drone/{drone_id}/motor/cmd"
        print(f"  └─> [HQ 작전] {drone_id}번 기체 강제 제어 명령 송출: {target_uri} ...")
        
        # 쿼리를 쏘고(Blocking 없이) 그 대답(Reply)을 기다린다!
        # 여기서 aget() 을 사용해, 제어 명령 파라미터를 보냄
        opts = zenoh.QueryOptions(parameters="action=HALT")
        replies = self.session.aget(target_uri, timeout=3.0, options=opts)
        
        # 드론 내부의 Queryable 콜백이 살아서 대답을 보내왔는가?
        success = False
        async for reply in replies:
            if reply.is_ok:
                res = reply.ok.payload.decode('utf-8')
                print(f"  └─> [드론 응답] 작전 성공: {res}")
                success = True
                
        if not success:
            print(f"  └─> [작전 실패] {drone_id}번 기체가 응답을 끊었거나 격추당함!")

async def main():
    print("=== 관제 사령부 부팅 중 ===")
    async with zenoh.aopen(zenoh.Config()) as session:
        hq = DroneCommandCenter(session)
        
        # 멈추지 않는 글로벌 감시망 기동
        await hq.listen_for_intrusions()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("사령부 전원 분리")

드론 단말 쪽에서는 저 drone/1/motor/cmd 에 화답하기 위해 단 10줄짜리 session.declare_queryable() 만 띄워두면 된다.

이 최종 프로젝트 코드를 통해 당신은 복잡한 데이터 사이언스 생태계(OpenCV, PyTorch)와 초고속 마이크로서비스 엔진(Zenoh 쿼리/PubSub), 그리고 현대적인 파이썬 백엔드 기술상(asyncio)을 단일하고 완벽하게 통합해 냈다. 이것이 현대 Python 분산 아키텍트의 정수다.