7.7 Python 비동기 프로그래밍 (`Asyncio`) 연동

7.7 Python 비동기 프로그래밍 (Asyncio) 연동

현대의 파이썬 백엔드(FastAPI, Starlette)는 더 이상 동기식 스레드(Thread)를 낭비하지 않는다. 싱글 코어 안에서 이벤트 루프를 돌리며 asyncawait를 통해 수만 개의 I/O 연결을 스위칭하는 것이 스탠다드다.

Zenoh 파이썬 바인딩은 단순한 동기식(Sync) API뿐만 아니라 완벽한 100% 비동기(Async) API를 네이티브로 제공한다. 백그라운드에 있는 Rust 기반 Tokio 런타임과 파이썬의 asyncio 이벤트 루프를 정확하게 맞물리게 하는 것.
콜백 안에서 블로킹이 발생하여 FastAPI 서버 전체가 얼어붙는 대참사를 막기 위해, 이 장에서는 Zenoh를 비동기의 세계로 온전히 끌어들이는 아키텍처 런북을 서술한다.

1. Zenoh의 비동기 지원 현황 및 asyncio 이벤트 루프 통합

동기식 스크립트에서는 zenoh.open() 을 썼지만, 비동기 세계에 들어선 이상 당신은 이 함수를 영원히 머릿속에서 지워야 한다. I/O 블로킹(소켓 열기)이 메인 이벤트 루프를 멈추게(Freeze) 하기 때문이다.

1.0.1 [Runbook] 비동기 세션(Session) 개방 전술

Zenoh는 비동기 전용 패키지 객체와 비동기 전용 클래스를 철저히 분리해 두었다.

import asyncio
import zenoh

async def main():
    print("비동기(Async) Zenoh 라우터 접속 중...")
    
    # [핵심 1] open() 이 아니라 aopen() 을 써야 하며, 반드시 await 를 붙인다.
    # 연결되는 동안 다른 파이썬 태스크(FastAPI 등)가 멈추지 않고 자기 할 일을 한다.
    session = await zenoh.aopen(zenoh.Config())
    
    print(f"세션 획득 완료: {session.info.zid()}")
    
    # ... 비동기 작업 로직 (Pub/Sub) ...
    
    # [핵심 2] 닫을 때도 코루틴을 태워서 닫아야 커널이 데드락에 빠지지 않는다.
    await session.aclose()
    print("비동기 세션 완전 파괴")

if __name__ == "__main__":
    # 파이썬 메인 이벤트 루프 엔진 가동
    asyncio.run(main())

[아키텍처 인스펙션]
내부적으로 aopen 은 파이썬 스레드를 막지 않고, 내부의 Rust 워커 스레드에게 “네트워크 접속을 시도하라“고 명령한 뒤 파이썬의 Future 객체를 반환한다. Rust 코어가 TCP 핸드셰이크를 끝내면 이 Future를 때려서 파이썬 루프를 다시 깨운다. 이것이 컨텍스트 스위칭 낭비 제로(Zero)의 비결이다.

2. Async-Await를 이용한 넌블로킹 데이터 퍼블리싱

데이터를 put() 할 때도 네트워크 버퍼가 꽉 차 있으면 패킷은 병목(Blocking)을 겪는다. 동기식 코드에서는 여기서 스레드가 기절해버리지만, asyncio 세계에서는 “막혔으면 다른 루틴 먼저 처리하고, 소켓 뚫리면 다시 와서 쏴라” 가 가능하다.

2.0.1 [Runbook] 비동기 쾌속 연사 파이프라인

aput (Async Put) 의 활용

import asyncio
import zenoh

async def background_task():
    """Zenoh가 통신하는 동안 돌아가야 할 다른 비즈니스 로직"""
    while True:
        print("[백그라운드] FastAPI 웹 서버 라우팅 유지 중...")
        await asyncio.sleep(0.5)

async def async_publisher(session):
    # 선언 시에는 a- 접두어가 필요 없다. (단순 객체 생성이기 때문)
    pub = session.declare_publisher("async/test/data")
    
    for i in range(10):
        data = f"비동기 폭격 패킷 {i}"
        
        # [방어막] 만약 네트워크 속도가 느려 큐가 차더라도, await aput() 은 
        # 파이썬 전체를 멈추지 않고 자기 자신(이 함수)만 잠깐 양보(Yield)한다.
        await pub.aput(data)
        
        print(f"발송 완료: {data}")
        await asyncio.sleep(1)

async def main():
    conf = zenoh.Config()
    
    # aopen() 에 with 문을 결합하는 최신 파이썬 문법(Async Context Manager)
    # 크래시가 나더라도 알아서 aclose() 가 불린다.
    async with zenoh.aopen(conf) as session:
        # 2개의 작업을 묶어서 이벤트 루프에 동시 투척(Concurrency)
        task1 = asyncio.create_task(background_task())
        task2 = asyncio.create_task(async_publisher(session))
        
        await asyncio.gather(task1, task2)

if __name__ == "__main__":
    asyncio.run(main())

이 패턴이 적용되면 당신의 봇(Bot)이나 카메라 스트리밍 파이프라인은 그야말로 “숨을 쉬게” 된다. Zenoh 통신망이 잠시 불안정해져도 화면에 떠 있는 UI 나 타이머 태스크들(task1)은 단 한 번의 끊김(Stuttering) 없이 매끄럽게 돌아간다.

3. 비동기 서브스크라이버 및 코루틴(Coroutine) 콜백 처리

가장 뜨거운 감자다.
일반적인 콜백(def listener(sample):)은 Zenoh의 백그라운드 스레드에서 돌아갔다. 하지만 그 안에서 만약 데이터베이스 라이브러리인 asyncpg나 비동기 HTTP 요청인 aiohttp를 호출해야 한다면? 동기 스레드 위에서는 await 키워드를 쓸 수 없어서 코드가 완전히 박살 난다.

Zenoh는 콜백 함수가 코루틴(async def)으로 판명되면, 알아서 메인 asyncio 루프 위로 스케줄링(Post)을 넘겨버린다!

3.0.1 [Runbook] 풀-비동기(Full-Async) 옵저버 패턴

import asyncio
import zenoh
import time

async def async_slow_database_save(data):
    """가상의 느린 비동기 DB 인서트"""
    await asyncio.sleep(1) # 1초 블로킹(흉내)
    print(f"   [DB 저장 완료] {data}")

async def async_listener(sample: zenoh.Sample):
    """
    [마법의 영역] 
    async def 로 선언하는 순간, Zenoh는 이 함수를 자신의 백그라운드 C 코어가 직접 때리지 않고,
    파이썬의 asyncio 이벤트 루프의 태스크 큐에 스레드-세이프하게 '예약(Submit)' 해준다!
    따라서 내부에서 마음껏 지연 함수(await)를 써도 Zenoh 네트워크 코어는 1ms도 막히지 않는다.
    """
    val = sample.payload.decode('utf-8')
    print(f">> [수신 트리거] {sample.key_expr} : {val}")
    
    # DB 통신 같은 무거운 네트워크 I/O를 비동기로 깔끔하게 호출
    await async_slow_database_save(val)

async def main():
    async with zenoh.aopen() as session:
        # 구독 선언. 전달하는 콜백이 코루틴(async def) 임을 자체적으로 인식함
        sub = session.declare_subscriber("building/alarm", async_listener)
        print("풀(Full) 비동기 수신망 가동...")
        
        # 메인 루프 살려두기 (무한 슬립)
        while True:
            await asyncio.sleep(3600)

if __name__ == "__main__":
    asyncio.run(main())

[치명적 주의사항]
async_listener 내부에서 await 를 통해 1초를 대기(Yield)했다면, 그 1초 동안 다른 패킷이 도착했을 때 두 번째 async_listener가 메인 루프 위에서 병렬로 실행되기 시작한다!
즉 파이썬 안에서 50개의 콜백이 동시에 await 에 걸려있을 수 있다. 이로 인한 순서 역전(Race Condition)을 원치 않는다면, 콜백 안에서는 오직 비동기 큐(asyncio.Queue)에 쌓기만 하고, 순서를 통제하는 싱글 컨슈머 태스크를 따로 빼내야 한다.

4. 비동기 쿼리 처리와 타임아웃 관리

session.get(...) 은 100개의 노드가 답장하면 결과값이 100번 튀어나오는 제너레이터(Generator)였다.
비동기 통신(Asyncio) 세계에서는 이 제너레이터를 기다리는 동안에도 CPU를 멈춰 세워선(Block) 안 된다. 따라서 파이썬 최상위 문법인 비동기 이터레이터 (async for) 를 동원한다.

4.0.1 [Runbook] 데이터 스크래핑-애웨이트(Await) 서치

import asyncio
import zenoh

async def query_cluster(session):
    print("클러스터 대역 스캔 쿼리 발사!")
    
    # 1. 쿼리 사격 (블로킹 없음, 제너레이터 객체 즉각 반환)
    # timeout: 2초 동안 답장이 안 오면 파이썬 루프가 강제로 이 부분을 끊어낸다.
    replies = session.aget("factory/robot/*/temp", timeout=2.0)
    
    # 2. 비동기 루프 전개
    # 라우터에서 패킷 하나가 뚝 떨어질 때마다 메인 루프가 깨어나서 이 블록을 한 바퀴 실행한다!
    # (패킷이 안 올 때는 FastAPI 서버가 다른 리퀘스트를 처리하고 돌아온다)
    async for reply in replies:
        if reply.is_ok:
            sample = reply.ok
            print(f"[응답 도착] 로봇 {sample.key_expr} -> {sample.payload.decode('utf-8')}")
        else:
            print(f"[응답 실패] {reply.err.payload.decode('utf-8')}")
            
    print("전수 조사 스캔(Query) 종료.")

async def main():
    async with zenoh.aopen() as session:
        await query_cluster(session)

if __name__ == "__main__":
    asyncio.run(main())

aget()async for 의 조합은 거대한 MSA(Microservices Architecture) 백엔드 생태계에서 FastAPI 나 Quart 같은 파이썬 웹 프레임워크가 HTTP REST 요청을 받았을 때 밑단의 1,000대 로봇에게 질의를 날려 취합한 후 JSON으로 응답해 주는(API Gateway 패턴) 가장 강력하고 효율적인 연결 고리(Binding Factor)다. 당신의 웹 서버는 그 어떤 I/O 대기에서도 자유로워진다.