10.7 Zenoh-Pico 핵심 API 활용
기초 공사(포팅, OS 스케줄링)가 끝났다면, 이제 실제로 칩을 클라우드에 연결하여 대화를 나눌 차례다.
zenoh-c 와 비슷해 보이지만, 메모리 제한 때문에 모든 API 가 포인터 참조 형태로 미묘하게 비틀려 있다.
이 장에서는 “연결”, “발행(Publish)”, “구독(Subscribe)”, 그리고 로봇 원격 제어의 백미인 “응답(Queryable)” 체계를 8KB RAM 위에서 어떻게 우아하게 풀어내는지 실전 코드로 검증한다.
1. 네트워크 설정 및 세션(Session) 초기화 과정
모든 통신의 시작 지점이다. Pico에서 세션을 열 때 가장 주의할 점은, 연결이 “비동기(Asynchronous)” 로 이루어지는 경우가 많다는 것이다. z_open 이 성공했다고 해서 당장 클라우드로 패킷을 쏠 수 있는 게 아니다.
1.0.1 [Runbook] 연결 대기 및 세션 확립 커맨드
#include <zenoh-pico.h>
#include <stdio.h>
z_session_t session;
uint8_t buffer[1024];
void bootstrap_zenoh() {
// 1. 설정 구조체 선언
z_config_t config = z_config_default();
// 2. 강제 연결 IP 지정 (스카우팅을 쓰지 않을 경우 가장 최적화된 방법)
zp_config_insert_json(&config, ZP_PEER_KEY, "\"tcp/192.168.0.10:7447\"");
// Pico 필수 메모리 바인딩
zp_config_insert_json(&config, ZP_RX_BUF_ADDR_KEY, (char*)&buffer);
zp_config_insert_json(&config, ZP_RX_BUF_SIZE_KEY, "1024");
// 3. 세션 개통
printf("라우터(192.168.0.10)로 접속 시도...\n");
if (z_open(&session, &config, NULL) < 0) {
printf("치명적 에러: 네트워크 초기화 실패!\n");
// 시스템 하드 리셋(리부팅) 로직 필요
}
// 4. [인스펙션] 연결 확인
// z_open 이 끝났다고 즉시 통신이 되는게 아니다.
// z_check() 매크로를 통해 실제 서버와 세션 악수가 끝났는지 확인한다.
if (z_check(session)) {
printf("접속 성공! Zenoh 통신망에 합류 완료.\n");
} else {
printf("아직 라우터와 세션 수립 안됨... 대기 중!\n");
}
}
이 코드는 메인 루프 진입 전 오직 단 한 번만 실행되어야 한다. 만약 z_check 가 실패한다면, 메인 루프의 z_session_yield 를 계속 돌리면서 라우터가 응답(Ack)을 보내줄 때까지 기다려야 패킷 유실을 막을 수 있다.
2. 피어(Peer) 및 라우터 스카우팅(Scouting) 기법
“내 로봇이 지금 어떤 공유기에 붙을지 나도 모른다.”
동적 환경(DHCP)에서는 서버(라우터)의 IP를 하드코딩할 수 없다. 칩 스스로가 허공을 향해 UDP 브로드캐스트(또는 멀티캐스트) 메가폰을 불어 라우터를 찾아야 한다. 이를 Scouting(스카우팅) 이라 부른다.
2.0.1 [Runbook] 자동 경로 탐색 (Auto-Discovery) 전술
#include <zenoh-pico.h>
void auto_discover_and_connect() {
z_config_t config = z_config_default();
// 1. 특정 IP 주소를 박지 않고, Locator 만 지정한다.
// "이더넷 껍데기(IPv4)를 타고 허공에 던져볼게!"
zp_config_insert_json(&config, ZP_LOCATOR_KEY, "\"udp/224.0.0.225:7447\"");
// Pico 는 자체적으로 멀티캐스트를 발송하여 라우터의 응답 메아리를 수집한다.
// 2. 세션 오픈
if (z_open(&session, &config, NULL) == 0) {
printf("스카우팅 패킷 발사 완료. 메달리온(Session) 활성화.\n");
// 3. Pico 의 한계:
// 응답이 올 때까지 메인 루프에서 yield 를 미친 듯이 돌려줘야
// 칩이 라우터의 IP 주소를 낚아채어 진짜 연결(Connection)로 승격시킨다.
}
}
주의할 점: 배터리로 돌아가는 IoT 기기(LoRa, BLE 센서) 에서는 이 스카우팅 기능을 절대 쓰면 안 된다. 랜선을 다 뒤지고 멀티캐스트를 쏘는 행위 자체가 배터리를 미친 듯이 갉아먹는다(Energy Drain). 배터리 기기는 무조건 라우터의 고정 IP 를 심어서(Fixed Peer) 단발에 록온(Lock-on) 해야 한다.
3. 리소스 키(Key Identifier) 선언 및 매핑
우리가 robot/sensor/temp 라는 키워드로 1바이트 온도를 보낼 때, 저 영문자 길이만 17바이트다. 센서 페이로드보다 이름표(Key)가 더 무거운 기형적인 상황이다.
이 통신 낭비를 박살 내기 위해 Zenoh 가 자랑하는 “정수 ID 매핑(Numberic ID Mapping)” 을 활용한다. 라우터와 사전에 “이 긴 이름표는 앞으로 1번(ID) 이라고 부를게!” 라고 약속해버리는 기법이다.
3.0.1 [Runbook] 스트링 압축 키 매핑 전술
#include <zenoh-pico.h>
// 메인 루프를 돌고 있는 전역 세션
extern z_session_t session;
// 이름표 보관함 (전역 변수)
z_owned_keyexpr_t my_temp_key;
void register_key_id() {
// 1. 긴 문자열 선언
z_owned_str_t key_str = z_str("robot/sensor/temp");
// 2. 라우터에게 키 등록(Declaration)을 요구한다 (비동기로 날아감)
my_temp_key = z_keyexpr_new(z_loan(key_str));
z_declare_keyexpr(z_loan(session), z_loan(my_temp_key), NULL);
// 3. 이제부터 z_publisher_put 에 이 my_temp_key 를 넣으면,
// Pico 엔진 내부적으로 문자열 대신 매핑된 숫자(ID)를 패킷에 구겨넣어
// 헤더 크기를 극적으로 줄여(Overhead Zeroing)버린다.
z_drop(z_move(key_str));
}
이 기법은 특히 LoRa 나 Bluetooth 환경에서 통신의 생사를 가를 정도로 중요한 필수 스킬이다.
4. Publisher 구현 및 주기적 데이터 전송
마이크로컨트롤러는 원래 ’읽고 쏘는 기계’다. 하지만 Zenoh-Pico 의 z_publisher_put() 은 너무도 단순해서, 당신이 호출하는 순간 칩의 TCP 스택을 거쳐 그대로 망으로 발사된다. (버퍼링이나 딜레이가 없다).
4.0.1 [Runbook] 고주파 텔레메트리 연사 전술
#include <zenoh-pico.h>
z_publisher_t pub;
void setup_publisher() {
// 1. 내가 쏠 토픽 선언
z_owned_keyexpr_t key = z_keyexpr_new("robot/01/sensor/battery");
// 2. Publisher 객체 생성
pub = z_declare_publisher(z_loan(session), z_loan(key), NULL);
z_drop(z_move(key));
}
// 이 함수는 MCU 메인 루프에서 타이머(예: 매 100ms 마다)에 의해 불린다.
void loop_publish(int battery_level) {
if (z_check(pub)) {
// [제로-히프(Zero-Heap) 데이터 직렬화]
// malloc 따위는 쓰지 않고 그냥 스택(Stack) 메모리의 배열을 쓴다.
char payload[10];
int len = snprintf(payload, sizeof(payload), "%d", battery_level);
// 3. 발사!
// 여기서 막히지 않고(Non-blocking) 1마이크로초 만에 리턴된다.
z_publisher_put(
z_loan(pub),
(const uint8_t*)payload, len,
NULL
);
}
}
만약 이 loop_publish 함수를 타이머 제약 없이 while(1) 안에서 무한궤도로 돌리면 어떻게 될까?
칩의 네트워크 송신 버퍼(Tx Buffer)가 0.1초 만에 가득 차고 OOM (Out Of Memory) 내지는 TX Stall 에러를 터트리며 칩이 기절하게 된다. (제한된 보폭 제어의 중요성).
5. Subscriber 구현 및 콜백(Callback) 함수 처리
마이크로컨트롤러가 데이터를 수신한다는 의미는, 외부의 명령(전등 켜기, 드론 폭파 등)을 받아들이겠다는 엄청난 결단이다.
Zenoh-Pico 의 수신은 인터럽트가 아니다. 오로지 메인 함수 짬짬이 돌리는 z_session_yield() 함수 내부 깊숙한 곳에서, 완성된 패킷이 조립되었을 때 딱 한 번 당신이 선언해 둔 함수(포인터)를 호출해줄 뿐이다.
5.0.1 [Runbook] 디스패처 콜백 전술 (Command Execution)
#include <zenoh-pico.h>
#include <string.h>
z_subscriber_t sub;
// [경고] 이 콜백 함수 안에서는 절대 무거운 연산(예: Flash Write, 통신 대기) 금지!
// 이 함수가 리턴되기 전까지 칩의 모든 Zenoh 통신 기능은 마비(Blocking)된다!
void cmd_callback(const z_sample_t *sample, void *ctx) {
// 넘어온 페이로드를 읽는다 (Zero-copy 로 날아온 버퍼의 일부분 포인터다)
z_str_t val = z_string_make((char*)sample->payload.start, sample->payload.len);
printf("명령 수신: %s\n", val.start);
// 하드웨어 핀(Pin) 당기기 분기문
if (strncmp(val.start, "LED_ON", 6) == 0) {
HW_LED_TURN_ON();
} else if (strncmp(val.start, "LED_OFF", 7) == 0) {
HW_LED_TURN_OFF();
}
}
void setup_subscriber() {
z_owned_keyexpr_t key = z_keyexpr_new("robot/01/cmd");
// 콜백 함수를 클로저(Closure) 구조체로 싼다.
z_owned_closure_sample_t callback;
z_closure(&callback, cmd_callback, NULL, NULL);
// 구독 시작 (라우터에게 '나한테 이 토픽 던져줘' 라고 알림)
sub = z_declare_subscriber(z_loan(session), z_loan(key), z_move(callback), NULL);
z_drop(z_move(key));
}
이 코드는 단순해 보이지만 끔찍한 함정이 숨어있다.
콜백 함수로 주어지는 sample->payload.start 버퍼는 이 콜백 함수가 끝나는 순간 (중괄호가 닫히는 순간) Pico 엔진에 의해 휴지통으로 파기(Overwritten) 된다. 당신이 저 주소(Pointer)를 전역 변수에 저장해 놓고 나중에 메인 루프에서 읽으려 한다면, 거기엔 이미 다른 쓰레기 패킷이 덮어씌워진 상태일 것이다. (반드시 필요한 데이터는 즉각 복사본(Deep Copy)을 만들어 대피시켜라).
6. Queryable 구현 및 분산 쿼리(Get)에 대한 응답 처리
가장 어려운 아키텍처다. 클라우드의 Node.js 서버가 칩(MCU) 에게 질문을 던진다. (“현재 내부 보일러 온도 몇 도 파악 보고하라!”).
칩은 그 질문을 받고, 데이터를 모아 즉각적으로 대답(Reply)을 던져야 한다. Publisher 가 일방적 브로드캐스트 라면, 이 녀석은 RPC(원격 수속 호출) 에 가깝다.
6.0.1 [Runbook] 원격 지연 질의(Queryable) 방어 전술
#include <zenoh-pico.h>
z_queryable_t qable;
// [질문이 도착했을 때 불리는 콜백]
void query_callback(const z_query_t *query, void *ctx) {
printf("클라우드 통제실로부터의 질의 도착 (타겟 키: %s)\n", z_query_keyexpr(query).start);
// 1. 하드웨어 상태 긴급 스캐닝 (블로킹 하면 안 됨)
int current_temp = HW_Read_Temperature();
char reply_buf[16];
int len = snprintf(reply_buf, sizeof(reply_buf), "TEMP=%d", current_temp);
// 2. 대답할 목적지 키 (원래 질문자가 쓴 경로 그대로 복사)
z_owned_keyexpr_t reply_key = z_keyexpr_clone(z_query_keyexpr(query));
// 3. 대답 규격 설정
z_query_reply_options_t options = z_query_reply_options_default();
// 4. 진짜 대답(Reply) 발사!
z_query_reply(
query,
z_loan(reply_key),
(const uint8_t*)reply_buf, len,
&options
);
z_drop(z_move(reply_key));
}
void setup_queryable() {
z_owned_keyexpr_t my_key = z_keyexpr_new("boiler/99/status");
z_owned_closure_query_t callback;
z_closure(&callback, query_callback, NULL, NULL);
// 나를 우주망에 '질문을 받을 수 있는 개체' 로 등록
qable = z_declare_queryable(z_loan(session), z_loan(my_key), z_move(callback), NULL);
z_drop(z_move(my_key));
}
이 패턴이 무서운 점은 단어 그대로 분산형(Distributed) 이라는 점이다. 당신의 공장에 이 코드가 박힌 칩이 100만 대가 있어도 상관없다. 클라우드에서 zenoh.Get("boiler/*/status") 한 줄을 치는 순간, Zenoh 글로벌 망이 저 100만 대의 칩에게 질문을 분배시켜주고, MCU들은 동시에 각각 100만 개의 TCP 패킷 대답(Reply)을 라우터에 쏟아내어 클라우드로 가져다 바친다. (HTTP / REST API 로는 수 초가 걸릴 거대한 분산 병렬 연산을 단결시킨 것이다.)