9.3.2 Go API의 특성과 동시성 모델
C 언어가 “포인터로 메모리를 직접 만지는 외과 수술” 이라면, Go 언어의 선언적 통신은 “컨베이어 벨트에 물건을 올리고 내리는 거대한 물류 센터” 기법이다.
Zenoh 는 이미 그 자체로 비동기적으로(Asynchronous) 이벤트가 쏟아져 들어오는 무한한 스트림 구조를 띠고 있다. 이 구조를 100% 엮어낼 수 있는 지구상 최고의 백엔드 언어는 단연코 Go 의 고루틴(Goroutine)과 채널(Channel)이다.
이 장에서는 수만 개의 데이터 패킷을 어떻게 channel 로 빨아들이고, 사용자 연결이 끊겼을 때 어떻게 context 로 고루틴을 학살(Cancellation)하여 메모리를 방어하는지, 마지막으로 악명 높은 CGO 경계면(C-Go Boundary)에서의 가비지 컬렉터(GC) 폭발을 무시하고 살아남는 어빌리티(Ability) 런북을 시작한다.
1. 고루틴(Goroutine)과 채널(Channel)을 활용한 이벤트 처리 통합
단 한 대의 Go 백엔드 서버(API Gateway)가 1만 대의 로봇이 쏘아대는 텔레메트리 데이터를 에러 없이 동시에 처리해야 한다.
C 였다면 pthread 의 mutex lock 레이스 컨디션으로 서버가 뻗었겠지만, Go는 우아하게 “채널(Channel)” 이라는 댐을 건설한다.
1.0.1 [Runbook] 데이터 댐(Channel Dam) 및 다중 워커 전술
1. 채널 개통 및 구독 선언
package main
import (
"fmt"
"github.com/eclipse-zenoh/zenoh-go"
)
func runSwarmListener(session *zenoh.Session) {
// 1만 개의 로봇 데이터가 들어갈 거대한 댐(Buffer) 건설
dataDam := make(chan zenoh.Sample, 10000)
// Zenoh 수신기를 개통하고, 들어오는 모든 물줄기를 채널로 꺾어버린다!
// 여기서 콜백 지옥이나 락(Lock) 같은 건 존재하지 않는다.
sub, err := session.DeclareSubscriber("robot/*/status", func(sample zenoh.Sample) {
dataDam <- sample // 데이터를 채널로 밀어넣고 즉시 퇴근 (Non-blocking)
})
if err != nil {
panic(err)
}
defer sub.Undeclare()
// 2. 5명의 노가다 인부(Goroutine Worker)를 고용하여 댐에서 데이터를 퍼낸다!
for i := 0; i < 5; i++ {
go robotDataProcessor(i, dataDam)
}
// 메인 스레드는 여기서 무한 대기
select {}
}
2. 워커(Worker) 프로세스
func robotDataProcessor(workerId int, ch <-chan zenoh.Sample) {
// 채널 파이프가 닫히지 않는 한 영원히 돌며 데이터를 퍼다 나른다.
for sample := range ch {
// Key 파싱 및 페이로드 추출
robotPath := sample.KeyExpr().String()
payloadText := string(sample.Payload()) // []byte -> string 제로 카피 변환(꼼수 필요)에 가까움
fmt.Printf("[Worker %d] %s => %s 처리 완료\n", workerId, robotPath, payloadText)
// 이 곳에서 DB (MongoDB, PostgreSQL) 로 INSERT 하는 무거운 작업을 수행
}
}
이 “1개의 구독 채널, N개의 처리 워커” 패턴은 Go 진영의 영원한 베스트 프랙티스(Fan-out)다.
로봇의 수가 10만 대로 늘어나면, 당신은 그저 저 위의 for i := 0; i < 5 (워커 수)를 50 으로 올려버리기만 하면 즉각적인 스케일 아웃이 끝난다.
2. 컨텍스트(Context)를 활용한 타임아웃, 취소 제어 및 라이프사이클 관리
HTTP 요청을 던진 모바일 앱 사용자가 3초 만에 화가 나서 [새로고침]을 누르거나 앱을 껐다.
하지만 뒤쪽의 Go 엔진은 여전히 Zenoh 에 분산 쿼리를 쏘고 로봇들을 깨워대는 중이라면?
수만 명의 유저가 이 짓을 반복하면 라우터망(Network) 전체가 “유령 트래픽“에 깔려 질식사한다.
2.0.1 [Runbook] context.Context 기반 연쇄 학살(Cascading Cancel) 전술
클라이언트(REST/gRPC 등)에서 넘어온 타임아웃과 취소 신호를 Zenoh 통신망까지 그대로 전달해 “쓸모없는 통신 파이프“를 선제 차단한다.
package main
import (
"context"
"fmt"
"time"
"github.com/eclipse-zenoh/zenoh-go"
)
// HTTP 컨트롤러나 gRPC 핸들러에서 호출되는 함수 (ctx를 무조건 받는다)
func fetchRobotStatus(ctx context.Context, session *zenoh.Session, targetId string) error {
// 1. 만약 이 ctx가 [3초 타임아웃]이 걸린 컨텍스트라면?
// Zenoh 의 Get 옵션도 정확히 3초만 채우고 끊어지게끔 동기화시킨다!
getOpts := zenoh.GetOptions{}
// (Zenoh-Go의 최신 스펙에 맞춰 timeout 세팅)
// getOpts.Timeout = 3 * time.Second
// 실제론 context 가 취소되는 걸 훔쳐봐야 한다.
fmt.Println(">> 쿼리 전송 개시. 타겟:", targetId)
// 2. 채널(채널)을 통한 대답 수거 파이프라인 개통
replyChannel := session.Get("robot/" + targetId + "/status", zenoh.QueueingDefault(), &getOpts)
// 3. 다중 대기(Select) 루프 (이것이 진정한 Go 의 정수)
for {
select {
// 케이스 A: 모바일 유저가 뒤로가기를 눌렀거나, 타임아웃이 났다!
case <-ctx.Done():
fmt.Println("[중단 경보] 유저 이탈 감지! Zenoh 쿼리 파기 및 즉시 리턴.")
// 이 순간 아래의 채널 처리는 버려지며, 가비지 컬렉터가 알아서 청소한다.
return ctx.Err()
// 케이스 B: 로봇으로부터 정상적으로 대답이 돌아오고 있다.
case reply, ok := <-replyChannel:
if !ok {
// 채널이 완전히 닫혔다. (로봇들이 다 대답함)
fmt.Println(">> 쿼리 완료.")
return nil
}
// 응답 바디 확인
if reply.IsOk() {
sample := reply.GetOk()
fmt.Printf("응답 수신: %s\n", string(sample.Payload()))
} else {
fmt.Printf("로봇 측 에러 수신: %s\n", string(reply.GetErr().Payload()))
}
}
}
}
Go 의 Select - ctx.Done() 패턴과 Zenoh 의 Reply Channel 은 완벽하게 결합한다. “대답을 기다리는 채널” 과 “죽어버리라는 신호” 를 동시에 레이스(select) 시킴으로써, 좀비 쓰레드로 전락하는 고루틴 트래픽을 완벽하게 살균(Sanitize) 해낸다.
3. 가비지 컬렉션(GC) 동작 원리와 CGO 경계에서의 메모리 누수 회피
“Go 는 가비지 컬렉터가 있으니까 포인터 따윈 신경 안 써도 되잖아?”
절반만 맞다. zenoh-go 는 밑바닥에 C 로 짜여진 엔진을 깔고 있다.
Go 언어의 가비지 컬렉터는 “Go 세상 안에서 make 나 new 로 생성된 메모리” 만 추적하고 쓸어 담는다. 만약 Rust(C) 엔진 쪽에서 거대한 버퍼를 열어서 당신(Go)에게 전달했다면, Go의 GC는 그것의 존재조차 알지 못한다. 방치하면 서버 메모리가 가득 차서 OOM(Out of Memory)으로 서버가 불타버린다.
3.0.1 [Runbook] CGO 경계 초과 메모리 강제 반납 전술
최신 zenoh-go 모듈은 내부적으로 runtime.SetFinalizer 를 일부 활용하여 GC가 돌 때 C 메모리를 함께 죽여주는 꼼수를 쓰고 있다. 하지만, 1초에 10만 건씩 Sample 이 날아오는 극한 상황에서는 GC 가 돌기도 전에 런타임 힙이 꽉 차버린다.
가장 안전한 방법은 C 언어처럼 “내가 다 썼으면 즉각 손으로 파기(Defer Close)” 시켜버리는 것이다.
package main
import (
"fmt"
"github.com/eclipse-zenoh/zenoh-go"
)
func runHeavyTrafficNode(session *zenoh.Session) {
// [명심] Session, Publisher, Subscriber 같은 뼈대 객체는
// Go 의 GC 가 언제 치워줄지 기다리지 말고, 함수 끝날 때 무조건 박살내라.
pub, _ := session.DeclarePublisher("drone/*/video/h264", nil)
defer pub.Undeclare() // 이 선언 하나로 서버 크래시의 90%를 막는다!
// 초당 60 프레임 발사 무한 루프
for i := 0; i < 3600; i++ {
// Go 언어로 생성한 10MB 짜리 무스펙 슬라이스
megaBytes := make([]byte, 10*1024*1024)
// (가짜 비디오 데이터 채우기)
// 1. 발사!
// 이 때, Go 슬라이스의 메모리 포인터가 잠시 `zenoh-c` 엔진으로 복사/전이 된다.
pub.Put(megaBytes, nil)
// 2. 이 메가바이트 변수는 다음 루프 때 오직 Go GC 의 처분에 맡겨진다.
// (만약 극한의 튜닝이 필요하다면 여기서 sync.Pool 로 바이트 배열 자체를 재활용하라)
}
}
[심화 최적화] 바이트 풀링 sync.Pool 방어망
수십 메가바이트 단위의 슬라이스를 매 루프마다 생성(make)하면 Go 의 GC 가 버티질 못하고 Stop The World 현상을 일으킨다.
반드시 var framePool = sync.Pool{New: func() interface{} { return make([]byte, 1024*1024) }} 를 선언해서 빈 바이트 껍데기를 재활용(Get/Put) 하는 습관을 들이는 것이 Go-Zenoh 서버의 진정한 퍼포먼스 마스터리다.