9.3.4 `zenoh-go` 고급 기능 및 활용

9.3.4 zenoh-go 고급 기능 및 활용

기본적인 Pub/Sub과 쿼리만으로는 “클라우드 서비스의 사령탑“을 선언할 수 없다. 완벽한 백엔드는 단순히 데이터를 받는 것을 넘어, 파이프라인의 병목(Bottleneck) 현상을 제어하고, 수만 가지의 주제(Key) 중 원하는 것만 정밀 타격(Filter)하여 수집하며, 연결이 끊긴 노드를 즉각적으로 감지하는 생사 확인(Liveliness) 을 수행해야 한다.

Go 언어와 Zenoh가 만나면 마이크로서비스 아키텍처(MSA)에서 브로커 서버(Kafka, Redis)를 통째로 빼버려도 시스템이 완벽하게 돌아기는 마법을 경험할 수 있다.
이 챕터는 이 강력한 기술들을 바탕으로 엔터프라이즈 레벨의 스트림 백엔드를 구축하는 심화 전술 런북이다.

1. 채널(Channel) 기반의 데이터 스트리밍 파이프라인 구축

초당 10만 건 이상의 압도적 트래픽이 쏟아질 때, Subscriber 콜백 안에서 데이터 마사지(JSON 파싱 등)를 수행하면 라우터단에서 큐 막힘(Backpressure) 현상이 발생한다. Go 언어의 Buffered Channel을 통해 수신단과 처리단을 완벽히 격리(Isolation)한다.

1.0.1 [Runbook] 데이터 버퍼링 및 파이프라인 전술

package main

import (
	"fmt"
	"github.com/eclipse-zenoh/zenoh-go"
	"time"
)

// 1. 순수 데이터 컨테이너
type ProcessJob struct {
	Key     string
	Payload []byte
}

func startPipeline(session *zenoh.Session) {
	// 2. 버퍼 크기 50,000 의 거대한 댐 건설
	jobQueue := make(chan ProcessJob, 50000)

	// 3. 수신조 (데이터를 퍼올리기만 함)
	// 콜백 함수는 연산을 전혀 하지 않고 채널에 쑤셔넣기만 하므로 0.001ms 만에 종료된다!
	sub, _ := session.DeclareSubscriber("sensor/**", func(s zenoh.Sample) {
		jobQueue <- ProcessJob{
			Key:     s.KeyExpr().String(),
			Payload: s.Payload(),
		}
	})
	defer sub.Undeclare()

	// 4. 가공조 (데이터 쓰레기 청소 및 DB 적재)
	// 10개의 고루틴(Worker)이 댐에 고인 물을 미친 듯이 퍼내서 처리한다.
	const WORKER_COUNT = 10
	for i := 0; i < WORKER_COUNT; i++ {
		go func(workerID int) {
			for job := range jobQueue {
				// 여기서 무거운 파싱(JSON Unmarshal, gRPC 전송 등)을 수행
				_ = processData(job)
			}
		}(i)
	}

	// 메인 스레드 블로킹
	select {}
}

func processData(job ProcessJob) error {
	// 무거운 작업 시뮬레이션
	time.Sleep(1 * time.Millisecond)
	return nil
}

이 파이프라인이 구축되면, 트래픽이 갑자기 스파이크(Spike) 치더라도 5만 개의 큐가 넉넉하게 충격을 흡수한다(Shock Absorber).
단, jobQueue 가 가득 찼을 때 chan 에 데이터를 넣으면 Goroutine 블로킹이 발생하므로, 극한 상황에서는 select + default 문으로 패킷을 고의 파기(Drop)하는 로직 추가가 필수다.

2. Key Expression과 Selector 문법의 고급 필터링 활용

Zenoh의 최강 무기는 MQTT 처럼 단순히 / 로 쪼개진 계층을 넘어, 사실상 “쿼리 문법” 에 가까운 동적 필터링을 지원한다는 점이다. 클라우드 백엔드는 이 셀렉터(Selector)를 조합해 필요한 데이터만 날카롭게 추출해야 한다.

2.0.1 [Runbook] 정밀 타격(Sniper) 셀렉터 전술

1. 와일드카드 조합 공격 (* vs **)

  • 단일 뎁스 (Single Depth): robot/*/status
  • 매칭: robot/r-01/status (O)
  • 미매칭: robot/seoul/r-01/status (X)
  • 다중 뎁스 (Multi Depth): robot/**/status
  • 매칭: robot/seoul/r-01/status (O), robot/status (O)

2. 구독시 Selector 필터링 스펙

// "온도가 100도가 넘고 압력이 높은 보일러의 상태" 만 필터링하기 위해
// URL Query Parameter 같은 방식을 쓴다.
// 실제 백엔드 Queryable이 이 파라미터를 읽어 분기 처리를 해준다.

filterKey := "boiler/**/status?temp>100&pressure=high"
sub, _ := session.DeclareSubscriber(filterKey, func(s zenoh.Sample) {
    fmt.Printf("🚨 폭발 위험 경보: %s\n", string(s.Payload()))
})

이 패턴이 무서운 점은 중앙 브로커(Broker) 서버가 이 필터링을 하는 것이 아니라, 망을 구성하는 모든 각자의 라우터와 클라이언트가 이 셀렉터 규칙을 인지하고 애초에 데이터를 쏘지도 않게 만든다는 점이다. 대역폭 낭비가 0%로 수렴한다.

3. Liveliness 토큰 생명주기 관리 및 네트워크 상태 모니터링

“드론 1번이 살아있는지 1초마다 Ping을 쳐봐!” 라는 고리타분한 관제(Monitoring) 시스템은 Zenoh 환경에서 완전히 폐기 처분된다.
대신 대상들이 Liveliness Token(생명 징표) 을 스스로 발급하게 놔두고, 백엔드 서버(Go)는 이 징표가 부러지는 것(Drop)만 가만히 지켜보면 된다.

3.0.1 [Runbook] 클라우드 묘지(Graveyard) 모니터링 전술

이 코드는 수만 대의 로봇들이 연결되었다가 끊기는 현상을 SampleKind(PUT/DELETE) 로 완벽히 추적하여 DB에 오프라인(Offline) 도장을 찍는 구조다.

package main

import (
	"fmt"
	"github.com/eclipse-zenoh/zenoh-go"
	"strings"
)

func runFleetLivelinessMonitor(session *zenoh.Session) {

	// 1. "liveliness/robot/**" 경로에서 꽂히는 생명 토큰을 감시한다.
	sub, err := session.DeclareLivelinessSubscriber("liveliness/robot/**", func(sample zenoh.Sample) {

		// 뽑아낸 "liveliness/robot/R-01" 같은 경로
		tokenPath := sample.KeyExpr().String()
		
		// ID 추출 
		parts := strings.Split(tokenPath, "/")
		robotID := parts[len(parts)-1] 

		// 2. Kind 분석 (PUT 이면 살았고, DELETE 면 죽었다!)
		switch sample.Kind() {
		
		case zenoh.SampleKindPut:
			// 새로운 로봇이 전원을 켰거나 터널을 벗어났다.
			fmt.Printf("🟢 [ONLINE] 로봇 %s 부팅 감지.\n", robotID)
			// Redis > SET robot:status:R-01 "ONLINE"
			
		case zenoh.SampleKindDelete:
			// 로봇 전원이 꺼졌거나, WiFi가 끊겨서 라우터가 사망 선고(Timeout)를 때렸다.
			fmt.Printf("🔴 [OFFLINE] 로봇 %s 통신 두절!\n", robotID)
			// Redis > SET robot:status:R-01 "OFFLINE"
			
		}
	})

	if err != nil {
		panic(err)
	}
	defer sub.Undeclare()

	select {} // 무한 감시 대기
}

이 패턴이 적용된 Go 서버는 CPU SetInterval 루프를 단 한 가닥도 돌리지 않기 때문에, 서버 유휴 자원 사용률이 거의 0%에 수렴한다. 진정한 Event-Driven 관제 시스템의 정수다.