9.3.3 zenoh-go 기본 통신 패턴 구현

9.3.3 zenoh-go 기본 통신 패턴 구현

이제 Go 언어의 강력한 Goroutine 을 등에 업고 실제 선언적 통신망을 구축할 차례다.
기존 HTTP나 gRPC 통신이 ’IP와 포트 번호’에 얽매여 있었다면, Zenoh는 ’주제(Key)’만 알면 망 전체를 뒤흔들 수 있다.

이 단원은 서버 개발자가 가장 익숙한 “Config 파싱”, “Pub/Sub 쏘기”, 그리고 분산 데이터베이스를 연상케 하는 “분산 쿼리 집계” 의 기초적인 전투 교범을 다룬다. 특히 Go 언어 특유의 chanselect 제어문이 어떻게 Zenoh 멀티캐스트의 불확실성을 단단하게 묶어(Bind) 내는지 똑똑히 지켜보라.

1. Config 객체를 활용한 세션 연결 및 네트워크 토폴로지 설정

Go 백엔드는 보통 Docker 안에서 뜬다. 따라서 로컬 브로드캐스팅(UDP)에 의존하기보다는, 명확한 진입점(Entrypoint) 라우터의 IP를 수동으로 지정해서 붙어주는 게 가장 안전하다.

1.0.1 [Runbook] 도커라이즈드(Dockerized) 백엔드 연결 전술

package main

import (
	"fmt"
	"github.com/eclipse-zenoh/zenoh-go"
	"os"
)

func bootBackendSession() *zenoh.Session {
	// 1. 기본 설정 뼈대(Config) 획득
	conf := zenoh.DefaultConfig()

	// 2. Go 백엔드용 강제 클라이언트 모드
	// (Go 서버가 다른 로봇들의 망 중계기로 쓰이는걸 방지함. CPU 낭비임)
	conf.Insert("mode", "client")

	// 3. 환경 변수로 주입된 중앙 라우터 TCP 주소 로드
	// 예: export ZENOH_ROUTER_IP="tcp/10.0.0.5:7447"
	routerAddr := os.Getenv("ZENOH_ROUTER_IP")
	if routerAddr == "" {
		fmt.Println("[경고] 라우터 주소 없음. 로컬 호스트망을 자동 탐색합니다.")
	} else {
		// 명시적 커넥션 핀 등록
		conf.Insert("connect/endpoints", `["`+routerAddr+`"]`)
		fmt.Println("[접속 지시] 타겟 ->", routerAddr)
	}

	// 4. 폭발 점화! (Open)
	// (만약 라우터가 안켜져있거나 뻗어있다면 여기서 err가 폭발한다)
	session, err := zenoh.Open(conf)
	if err != nil {
		fmt.Printf("[크리티컬] Zenoh 스웜 합류 실패: %s\n", err)
		os.Exit(1) // 쿠버네티스에게 컨테이너 재시작(CrashLoopBackOff)을 요구하라!
	}

	fmt.Println(">> Zenoh 백엔드 세션 확립 완료.")
	return session
}

이 패턴이 Go 진영의 핵심이다. zenoh.Open() 의 리턴 값이 error 를 뱉느냐 마느냐에 따라 Liveness Probe 를 통제해야 한다.

2. 데이터 발행(Publish) 및 비동기 스트림 기반 구독(Subscribe)

가장 순수한 Pub/Sub. Go 에서는 이걸 어떻게 짤까?
Subscriber 에서 callback 함수를 던지는 것은 사실 내부적으로 C 언어의 함수 포인터 제어를 래핑한 것이다.

2.0.1 [Runbook] 데이터 버스 폭격 및 방어 전술

송신 파트 (로봇 측이라 가정)

func startTelemetry(session *zenoh.Session, myId string) {
	// 특정 경로 퍼블리셔 등록
	key := fmt.Sprintf("robot/%s/battery", myId)
	
	// 명시적 닫기를 강제하는 Go의 꽃 "defer"
	pub, err := session.DeclarePublisher(key, nil)
	if err != nil {
		panic(err)
	}
	defer pub.Undeclare() 

	for i := 100; i > 0; i-- {
		// byte 배여로 변환해서 발사
		payload := []byte(fmt.Sprintf("%d", i))
		
		fmt.Println("발사:", string(payload))
		pub.Put(payload, nil) // Non-blocking
		
		time.Sleep(1 * time.Second)
	}
}

수신 파트 (클라우드 관제 데몬이라 가정)

func startDashboard(session *zenoh.Session) {

	// 와일드카드(*)를 써서 망 내의 모~든 로봇의 배터리를 도청한다!
	sub, err := session.DeclareSubscriber("robot/*/battery", func(sample zenoh.Sample) {
		
		// 이 콜백 함수는 Zenoh Rust CGO 스레드풀의 불특정 워커에서 실행된다!!
		// (동시성 동기화에 극도로 유의해야 함)
		sourceKey := sample.KeyExpr().String() // 예: robot/r-01/battery
		val := string(sample.Payload())
		
		// DB에 INSERT 하거나 Redis에 세팅한다.
		fmt.Printf("🚨 비상 상황! %s 의 배터리가 %s%% 로 떨어짐 \n", sourceKey, val)
	})
	
	if err != nil {
		panic(err)
	}
	
	// 프로그램이 메인스레드에서 안꺼지고 버티게 함
	defer sub.Undeclare()
	select {} 
}

이 구조의 뛰어난 점은, 1만 대의 로봇이 데이터를 퍼부어도 메모리를 점유하는 Subscriber 객체는 “단 1개” 뿐이라는 것이다. 이것이 MQTT의 세션당 할당 한계를 깨부수는 ‘경로 기반 멀티플렉싱’ 아키텍처다.

3. 분산 쿼리(Query)의 병렬 처리 및 응답 집계(Aggregation)

이 장의 알파이자 오메가. “전국의 모든 신호등에게 묻는다. 지금 빨간불인 녀석들만 대답해!” 라는 쿼리를 날릴 수 있다.
HTTP 였다면 신호등 1,000개의 IP 목록을 루프 돌려가며 GET 을 만 번 때려야겠지만, Zenoh 에서는 그냥 session.Get("light/*/status") 한 줄이면 끝난다.

Go의 채널(chan)은 여러 로봇들이 경쟁적으로 쏘아대는 대답들을 이쁘게 줄세워서 먹기(Consume)에 최고의 궁합을 자랑한다.

3.0.1 [Runbook] 분산 브로드캐스트-스위핑(Sweeping) 전술

package main

import (
	"fmt"
	"github.com/eclipse-zenoh/zenoh-go"
	"time"
)

func sweepAllRobots(session *zenoh.Session) {
	// 1. 타임아웃 2초 세팅
	// 이 2초 안에 안오면 죽은 로봇으로 간주하고 내버리고 간다!
	getOpts := zenoh.GetOptions{}
	// (실제 최신 Go API 버전에 따라 time.Duration 이나 Timeout 프로퍼티 설정)
	
	fmt.Println("🚀 글로벌 쿼리 발사! (2초 파티)")

	// 2. 발사 & 수신 파이프 즉각 개통
	// 리턴 받는 replies 채널은 비동기 파이프다. 망에 흩어졌던 대답들이 
	// 도달하는 족족 이 파이프 구멍으로 쏟아지기 시작한다.
	replies := session.Get("robot/*/status", zenoh.QueueingDefault(), &getOpts)

	robotCount := 0

	// 3. 채널 순회 (Go 언어의 특기)
	// replies 파이프가 닫힐 때까지(즉, 타임아웃 2초가 지나 Zenoh 엔진이 파이프를 박살낼때까지)
	// 이 루프는 블로킹(Blocking) 되며 대답을 주워 먹는다.
	for reply := range replies {
		if reply.IsOk() {
			sample := reply.GetOk()
			rId := sample.KeyExpr().String()
			val := string(sample.Payload())
			
			fmt.Printf(" [건져올림] %s : %s\n", rId, val)
			robotCount++
		} else {
			// 상대방 Queryable 이 명시적으로 에러를 뱉었을 경우
			fmt.Println(" [에러응답]", string(reply.GetErr().Payload()))
		}
	}

	// 4. 채널 루프가 깨졌다는 건, 수집 작전이 끝났다는 뜻이다.
	fmt.Printf("✅ 작전 종료. 총 %d 대의 로봇 상태 수집 완료.\n", robotCount)
}

이 패턴은 클라우드의 대시보드 API (예: React -> Go Backend -> Zenoh Swarm) 를 짤 때 완벽하게 동작한다. REST API 컨트롤러 안에서 이 함수를 부르면, Go 서버는 단 2초 만에 전 세계 10만 대의 로봇 중 살아서 대답한 로봇들의 데이터를 모아 JSON 배열로 React에게 예쁘게 반환하게 된다.