8.7 TypeScript 고급 비동기 패턴과 데이터 스트림 제어
초당 1만 건의 데이터가 쏟아지는 자율주행 차량의 라이다(LiDAR) 스트림을 브라우저나 Node.js로 스트리밍 받을 때, 가장 먼저 터지는 곳은 무엇일까? 놀랍게도 네트워크나 Zenoh 로직이 아니라, 순수 파이썬/자바스크립트의 이벤트 루프(Event Loop) 다.
V8 엔진은 싱글 스레드다. Zenoh Rust 코어가 초당 1만 개의 Sample 객체를 콜백 큐(Callback Queue)에 무자비하게 쑤셔 넣으면, 메인 스레드는 콜백 함수를 실행하느라 CPU 점유율이 100%를 찍으며 브라우저 화면은 하얗게 굳어버린다.
이 장에서는 TypeScript가 지닌 최고의 무기 AsyncIterator 와 생태계 최강의 스트림 통제기 RxJS 를 동원하여, 이 거대한 데이터 폭포수를 얌전하게 깎아내고 모아서(Throttling, Backpressure) V8 엔진이 소화할 수 있는 안전한 크기로 잘게 썰어 전달하는 ‘마이크로초 단위의 스트림 제어 전술’ 을 파헤친다. 단순히 “콜백으로 데이터를 받는다“에서 벗어나, 진정한 스트림 마스터로 거듭나는 과정이다.
1. Promise 체이닝과 async/await을 활용한 Zenoh 흐름 제어
복잡한 시동 절차(부팅 -> 구독 생성 -> 쿼리 퍼블리셔 등록 -> Liveliness 발송)를 콜백으로 짜면 악명 높은 콜백 지옥(Callback Hell) 이 열린다. Zenoh TypeScript 바인딩은 모든 C++ 호출에 대해 Promise 래퍼(Wrapper)를 제공하므로, 동기식(Synchronous) 코드처럼 보이게 짜는 것이 가능하다.
1.0.1 [Runbook] V8 루프 동결(Freeze) 방어 선언 전술
Zenoh 코드를 짤 때 가장 많이 실수하는 패턴이 async 함수 안에서 await 를 빼먹는 것이다.
import * as zenoh from "@eclipse-zenoh/zenoh";
async function bootZenohChain() {
try {
console.log("1. 세션 부팅 시도...");
// await를 빼먹으면 session 객체가 아니라 Promise<Session> 껍데기가 반환되어 이후 로직 붕괴
const session = await zenoh.open();
console.log("2. 토폴로지 구독 선언...");
const sub = await session.declareSubscriber("local/test", (s) => {});
console.log("3. 마이크로 응답기 선언...");
const q = await session.declareQueryable("my/status", (q) => {});
console.log(">> 체인 부팅 100% 완료!");
} catch (err) {
// 이 try-catch 블록 하나로, Zenoh Rust 엔진에서 터진 모든 C 에러
// (네트워크 끊김, 포트 중복 점유 등)를 V8 스택 트레이스로 안전하게 잡아낸다.
console.error("Zenoh 부팅 체인 붕괴:", err);
throw err;
}
}
이 “리니어(Linear) 프로그래밍 스타일“은 초기화 체인에서는 유효하다. 하지만 영원히 지속되어야 하는 무한 스트림 데이터(구독 콜백)를 처리할 때는 이 패턴만으로는 역부족이다.
2. 비동기 이터레이터(AsyncIterator) 기반의 무한 이벤트 스트림 처리
8.5장에서 분산 쿼리(session.get)를 처리할 때 잠깐 맛보았던 for await 패턴이다.
이 패턴이 진정으로 빛을 발하는 곳은 단순한 1회성 쿼리가 아니라, 무한히 밀려오는 구독 스트림(Subscriber Stream)을 핸들링할 때다.
2.0.1 [Runbook] 무한 대기 큐 루핑 전술
일반적인 declareSubscriber 를 쓰면 데이터가 올 때마다 콜백 함수가 강제로 발사된다. 메인 스레드는 속수무책으로 당한다. 하지만 만약 “내가 필요할 때마다 큐(Queue)에서 하나씩 꺼내먹을게” 형태로 통달한다면?
주의: 이 코드는 Zenoh TS Native Queueing 구현체가 포함된 최신 버전에서 동작하거나, JS Array 기반의 비동기 버퍼를 직접 구현하여 우회 응용해야 할 수 있다. (표준 가이드라인)
import * as zenoh from "@eclipse-zenoh/zenoh";
// 비동기 큐를 임의로 구현한 래퍼 (실무 필수품)
class AsyncBuffer<T> {
private queue: T[] = [];
private resolvers: ((value: T) => void)[] = [];
push(item: T) {
if (this.resolvers.length > 0) {
const resolve = this.resolvers.shift();
resolve!(item);
} else {
this.queue.push(item);
}
}
// 큐가 비어있으면 데이터가 들어올 때까지 Promise 를 세워두고 '대기(Block)' 한다!
async pop(): Promise<T> {
if (this.queue.length > 0) return this.queue.shift()!;
return new Promise<T>(resolve => this.resolvers.push(resolve));
}
}
async function streamConsumer(session: zenoh.Session) {
const buffer = new AsyncBuffer<zenoh.Sample>();
// 콜백 함수 지옥(Hell) 지우기: 콜백은 그저 큐에 보관만 한다.
await session.declareSubscriber("sensor/ultra/fast", (sample) => {
buffer.push(sample);
});
console.log("스트림 수신 시작...");
// 내가 원하는 속도, 내가 원하는 타이밍에 데이터를 하나씩 빼서 먹는다!
while (true) {
// 데이터가 안 오면 CPU 점유율 0% 로 여기서 영원히 잠든다.
const sample = await buffer.pop();
// 1건 처리 수행
console.log(">> 데이터 처리:", sample.keyExpr);
// 데이터 핑(Hz)이 너무 빠르면, 여기서 강제로 시간을 주어 메인 스레드 혹사를 막는다.
await new Promise(r => setTimeout(r, 10)); // 10ms 쉬어가기
}
}
이 패턴은 메인 스레드 루프의 통제권을 Zenoh 콜백에서 “내 애플리케이션의 루프“로 완전히 되찾아오는 아키텍처적 극의(極意)다. Node.js 백엔드 로직을 방어할 때 무조건 투입해야 하는 기법이다.
3. RxJS와의 통합: Zenoh 이벤트를 Observable로 변환 및 조작
프론트엔드/백엔드 생태계에서 스트림(Stream)을 다루는 궁극의 표준 라이브러리인 RxJS 에 Zenoh를 물려보자.
Zenoh의 미친 데이터 속도는 RxJS의 강력한 오퍼레이터(filter, map, debounce) 파이프라인과 만나는 순간 가장 얌전한 시냇물로 변한다.
3.0.1 [Runbook] “스트림 - 옵저버” 용접 전술
RxJS의 Observable 래퍼로 덮어버리는 간단한 클래스 구현이다.
import * as zenoh from "@eclipse-zenoh/zenoh";
import { Observable } from "rxjs";
import { map, filter, bufferTime } from "rxjs/operators";
// 스트림을 뿜어내는 공장 구축
function createZenohObservable(
session: zenoh.Session,
keyExpr: string
): Observable<zenoh.Sample> {
return new Observable<zenoh.Sample>((subscriber) => {
let subObj: zenoh.Subscriber;
// 콜백 함수 안에서 RxJS의 next() 를 호출하여 물길을 터준다.
session.declareSubscriber(keyExpr, (sample) => {
subscriber.next(sample);
}).then(sub => {
subObj = sub;
});
// 사용자가 구독을 해지(Unsubscribe)하면 Zenoh 라우터 망에서도 발을 뺀다.
return () => {
if (subObj) subObj.undeclare();
};
});
}
async function runRxJS(session: zenoh.Session) {
const rawStream$ = createZenohObservable(session, "robot/*/telemetry");
// 파이프라인 전개:
// 로봇 100대가 1초에 60번 쏘는 데이터를 필터링하고 걸러낸다.
const cleanStream$ = rawStream$.pipe(
// 1. ArrayBuffer -> 쓸만한 문자열 텍스트로 치환 (map)
map(sample => new TextDecoder().decode(sample.payload)),
// 2. 내용이 "WARN" 글자가 포함된 것만 살려둠 (filter)
filter(text => text.includes("WARN")),
// 3. 1초(1000ms) 동안 발생한 에러 메시지를 배열(Array)에 한번에 모아서 뱉음
// 1초에 1번씩만 화면을 렌더링하기 위한 방어 (Backpressure)
bufferTime(1000)
);
// 이제 프론트엔드는 이 깔끔하고 속도가 버퍼링된 데이터만 받는다.
cleanStream$.subscribe((warnArray) => {
if (warnArray.length > 0) {
console.log(`[UI 업데이트] 지난 1초간 발생한 경고들: ${warnArray.length}건`);
// 여기서 React setState 호출
}
});
}
Angular 환경이나 대규모 기업용 대시보드에서는 사실상 이 RxJS 파이프가 표준 규격이다. Zenoh를 하부에, RxJS를 그 위에 얹는 것만으로 프론트엔드 성능 이슈의 99%가 소멸한다.
4. 대량 데이터 수신 시 백프레셔(Backpressure) 및 스로틀링(Throttling) 기법
1초에 1천 장씩 날아오는 이미지 프레임을 React 상태 훅(setState)에 밀어 넣으면 브라우저는 하얗게 비전(Out of Memory) 상태가 되며 크롬 자체가 죽어버린다. 디스플레이 화면 주사율은 1초에 60번(60Hz)일 뿐이다. 화면이 소화하지 못할 데이터를 V8 엔진이 렌더링 큐에 넘기는 것을 가차 없이 막아야 한다.
4.0.1 [Runbook] 브라우저 지살(목숨) 보호 전술
이 테크닉은 프론트엔드 생존 확률을 비약적으로 올린다. 시간 기반 폐기(Debounce/Throttle) 로직이다.
무자비한 시간 컷오프(Throttle) 방식
만약 앞선 시점부터 50ms 이내에 새로운 데이터가 들어온다면, 무시하고 파기(Drop)해버린다. (어차피 사용자 눈으로는 구분을 못 하는 카메라 프레임의 경우 유효)
let lastRenderTime = 0;
const RENDER_INTERVAL_MS = 16; // 60 FPS 달성을 위한 약 16.6ms 마진
const safeCallback = (sample: zenoh.Sample) => {
const now = Date.now();
// 마지막 렌더링 시간으로부터 아직 16ms가 안 지났으면?
// 이 패킷은 과감하게 하늘로 날려버린다. (메모리 파괴)
if (now - lastRenderTime < RENDER_INTERVAL_MS) {
return;
}
// 16ms 가 넘었을 때만 화면 갱신에 데이터를 태운다.
lastRenderTime = now;
// (예시) 캔버스에 그리기
drawToCanvas(sample.payload);
};
// 구독기 설치
session.declareSubscriber("camera/600fps/raw", safeCallback);
이 코드는 겨우 10줄 남짓이지만, 백엔드의 C++ 서버가 미쳐서 10,000 FPS로 이미지를 폭격하더라도 브라우저는 평온하게 60 FPS를 유지하며 사용자의 마우스 클릭 이벤트에 100% 끊김 없이 반응하게 만드는 기적을 선사한다.
이것이 진정한 “데이터 드롭 백프레셔(Backpressure via data dropping)” 건축술이다.