9.2.3 zenoh-c 기본 통신 패턴 구현

9.2.3 zenoh-c 기본 통신 패턴 구현

에지단 펌웨어(Firmware) 환경 내부에서 글로벌 클라우드 서버와 다이렉트 통신 파이프라인을 체결하는 기반 구현을 서술한다.
zenoh-c 통신 계층의 아키텍처는 고전적인 POSIX 소켓(Socket) 프로그래밍(예: AF_INET, SOCK_STREAM 할당) 스키마와 구조적으로 궤를 달리한다.

그 대신, “설정 속성(Config)의 세부 조율”, “연결 세션(Session)의 논리적 개방”, 그리고 “대여 객체(Loaned Object)의 콜백 주입” 등 Rust 고유의 선언적 비동기 통신 방식을 C 언어의 구조체 포인터 제어 문법으로 투사(Projection)하는 일련의 과정이다. 본 장에서는 분산 통신의 기초가 되는 퍼블리시/서브스크라이브(Pub/Sub) 스펙과 질의/응답(Query/Reply) 디자인 패턴을 C 환경에서 무결점으로 확보하는 코어 구현체를 설계한다.

1. 세션(Session) 생성, 구성 설정 및 종료

표준 Zenoh C 애플리케이션의 라이프사이클(Lifecycle)은 절대적으로 세션을 초기화하고 순차적으로 소멸시키는 과정 내에 바인딩된다.

1.0.1 클라이언트 통신 모드(Client Mode) 강제화 및 세션 오버라이딩 전술

저전력 마이크로컨트롤러(MCU) 배터리 기반의 로봇 펌웨어는 분산망 내에서 통신 중계 목적인 “라우터(Router)” 노드가 아닌, 종단 “클라이언트(Client)” 모드의 역할을 엄격히 제한받아야 한다. 에지 단말이 백본 오버레이 네트워크 중계기로 동적 승격될 경우 메인 CPU 클럭 및 가용 전력 버짓(Power Budget)이 치명적으로 고갈된다.

#include <zenoh.h>
#include <stdio.h>
#include <stdlib.h>

int boot_robot_comm() {
    // 1. 디폴트 메타 설정 확보 (소유권 Owner C로 이관)
    z_owned_config_t config = z_config_default();

    // 2. [아키텍처 강제] 라우팅 브로드캐스트 모드 비활성화 및 클라이언트 역할 고정 지정
    // zc_config_insert_json() 함수를 통해 Rust 내부 계층의 JSON 설정 트리를 오버라이딩한다.
    // 주의: 직렬화 문자열 주입 시 내부 JSON 파서가 작동하므로,
    // "client" 토큰 주위로 이스케이프 문자(\")가 중첩 감싸진 '"client"' 규격을 엄수해야 한다.
    zc_config_insert_json(z_loan(config), Z_CONFIG_MODE_KEY, "\"client\"");
    
    // (선택 사항) 특정 로컬 서브넷 라우터 IP로 정적 바인딩(Static Binding) 토폴로지 구성 시
    zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, "[\"tcp/192.168.0.5:7447\"]");

    printf("[System] Zenoh 데몬 인스턴스 시동 개시...\n");

    // 3. 통신 세션 점화 
    // z_move(config) 호출을 매개로, 구성 완료된 설정체 메모리의 소유권을 세션 프로세서 측으로 전면 양도한다.
    // 본 시점 횡단 이후 'config' 변수에 재접근 시 런타임 메모리 폴트(Memory Fault)가 발생함.
    z_owned_session_t session = z_open(z_move(config));

    if (!z_session_check(&session)) {
        printf("[Error] 세션 개시 실패. 네트워크 데몬을 강제 정지(Halt)합니다.\n");
        return -1;
    }

    printf("[System] 스웜망(Swarm Network) 접속 완료!\n");

    // 4. 로보틱스 도메인 비즈니스 루프(Business Loop) 수행
    // ... do something ...

    // 5. 메모리 세이프(Safe) 시스템 종료 통제 (네트워크 소켓의 우아한 파기 및 자원 반환)
    z_close(z_move(session));
    return 0;
}

이 패턴 설계 시 유의할 점은, z_config_t 구조체 딕셔너리에 JSON을 삽입하는 매크로 파생 함수들의 시그니처가 프로토콜 버전에 따라 zc_config_insert_json 등으로 미세하게 변경될 가능성이다. zenoh-c 코어 C-Header 파일(/include/zenoh.h)의 구조를 컴파일 타임에 주기적으로 열람 대조하는 개발 방법론이 병행되어야 한다.

2. Publisher 및 Subscriber 생성과 다중 스레드 콜백 처리 설계

논리적인 “발행자(Publisher)“와 “구독자(Subscriber)” 역할을 C 구조체 위에서 확립하는 로직이다.
C 스펙의 함수 포인터 콜백(Callback) 이벤트 구동 체제 특성상, “어느 출처의 소유권(Ownership)을 가진 코어 스레드에서 해당 핸들러가 트리거되는가?“에 대한 통찰이 결여되면, 운영 체제의 전역 메모리 공간을 무단 변조하는 레이스 컨디션(Race Condition)으로 직결되어 시스템 동기화가 완전히 파괴된다.

2.0.1 이벤트 드리븐(Event-driven) 데이터 버스 송수신 전술

스트리밍 수신부 (비동기 콜백 구역)

#include <zenoh.h>
#include <stdio.h>

// [경고] 본 구독 핸들러 함수는 프로세스의 메인 스레드가 아닌, 
// 백그라운드 "Zenoh Rust 엔진 I/O 멀티플렉싱 스레드" 풀 내부에서 비동기적(Asynchronously)으로 호출된다!
void motor_command_callback(const z_sample_t* sample, void* my_context) {
    // 1. 라우팅 Key 식별자 문자열 추출
    z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
    
    // 2. 바디(Payload) 데이터 메모리 참조 (제로 카피 지향)
    printf("[명령 수신] 타겟 노드: %s, 트래픽 길이: %zu Bytes\n", z_loan(keystr), sample->payload.len);
    
    // 3. (옵션) 클로저(Closure) 셋업 시 주입했던 Void 포인터(Context)를 사용자 정의 타입으로 캐스팅 제어
    // int* motor_speed = (int*)my_context;
    
    // 할당받은 일회성 문자열 객체 소유권 파기
    z_drop(z_move(keystr)); 
}

메인 데몬 구동부

int run_dual_node(z_owned_session_t* session) {
    // 1. 퍼블리셔(송신 인터페이스) 인스턴스 장착
    z_owned_keyexpr_t pub_key = z_keyexpr_new("robot/status");
    z_owned_publisher_t pub = z_declare_publisher(z_loan(*session), z_loan(pub_key), NULL);
    
    // 2. 서브스크라이버(수신 리스너) 장착
    z_owned_keyexpr_t sub_key = z_keyexpr_new("robot/motor/cmd");
    // 클로저(Closure) 구조체 스코프를 선언하고 타겟 함수 포인터(Callback)를 바인딩한다.
    z_owned_closure_sample_t callback;
    z_closure(&callback, motor_command_callback, NULL, NULL); // Context 파라미터는 NULL 허용
    
    z_owned_subscriber_t sub = z_declare_subscriber(z_loan(*session), z_loan(sub_key), z_move(callback), NULL);
    
    // 3. 메인 런 루프 (Main Run-loop) 폴링 타임
    printf("[System] 양방향 이벤트 노드 기동 완료. 1Hz 주기로 상태 텔레메트리 스트리밍 개시.\n");
    for (int i=0; i<100; i++) {
        char buf[64];
        int len = snprintf(buf, sizeof(buf), "Battery_Level: %d", 100 - i);
        
        // C 언어 특유의 Raw Byte Array 패킷 데이터 일괄 전송!
        z_publisher_put(z_loan(pub), (const uint8_t*)buf, len, NULL);
        
        // Blocking Wait (unistd.h 의존)
        sleep(1); 
    }
    
    // 4. 모놀리식 해제 (RAII 모델의 부재로 인한 강제 수동 스위핑 파이프라인 명시)
    z_undeclare_subscriber(z_move(sub));
    z_undeclare_publisher(z_move(pub));
    z_drop(z_move(pub_key));
    z_drop(z_move(sub_key));
    return 0;
}

이 이벤트 루프 아키텍처에서 경계해야 할 가장 은밀한 크래시 요인은, motor_command_callback 컨텍스트 블록 안에서 블로킹 연산(sleep, DB Lock)이나 고비용 O(N^2) 루프를 돌릴 경우, 하위 계층에 포진한 Zenoh 의 **이벤트 분배 스레드(Event Dispatcher Pipeline)**의 응답 스택 자체가 마비되어 시스템 전역의 네트워킹 타임아웃(Timeout) 결함을 유발한다는 사실이다. C 콜백 로직 내부에서는 오로지 전역 상태(State) 레지스터 변형이나 Lock-free 큐로 메모리 포인터만을 이관(Push)하고 즉각 단편적인 타임라인으로 함수를 반환(Return)하여야 한다.

3. Queryable 및 Querier 패턴을 위시한 분산 RPC 기반(요청-응답) 파이프라인

분산 네트워크 쿼리(Query)와 병렬 다중 응답 전술 체계를 C 런타임에 이식한다.
순수 C 애플리케이션 개발 패러다임에서, 단일한 송신 스트림 요청에 다수의 논리적 파편화(Fragmentation) 데이터가 응답(Multi-Reply)되는 비동기적(Asynchronous) 개념은 전통적인 HTTP 프로토콜의 1:1 선형 동기화 구조와 이질적이다. zenoh-c 규격에서 분산된 다중 응답 청크를 유실 없이 집계(Aggregation)하려면, 어떻게 포인터 레코드를 수거해야 할지 수거 모델(Collector Model)을 확립한다.

3.0.1 지연 분산 쿼리(Query) 격발 및 채널 트랜시버 큐(Transceiver Queue) 전술

이 시스템 아키텍처의 중추는 대기 채널 **응답 큐(FIFO Queue Channel, z_owned_reply_channel_t)**를 인스턴스화시켜, 메인 루드가 비동기적으로 몰려드는 수신 버퍼들을 병목 없이 적재(Push)할 수동 댐 공간을 건축하는 논리이다.

쿼리 핸들러 노드 측 구조 (Queryable Server)

#include <zenoh.h>

void my_query_handler(const z_query_t* query, void* context) {
    // 분산망 어딘가에서 특정 URI 규격에 대한 지연 질의를 송출하면 본 핸들러 훅이 인터셉트한다.
    printf("[트래픽 모니터] 원격 쿼리 이벤트 인터셉트 성공.\n");
    
    // 질의자가 설계한 오리지널 타겟 라우팅 패스를 반환 응답용 Key로 리플리케이션(Clone) 설정
    z_owned_keyexpr_t reply_key = z_keyexpr_clone(z_query_keyexpr(query));
    
    z_query_reply_options_t opts = z_query_reply_options_default();
    
    // 추출된 페이로드 청크를 원 질의 요청 클라이언트 측으로 단방향 회귀 발송!
    z_query_reply(query, z_loan(reply_key), (const uint8_t*)"My Battery is 75%", 17, &opts);
    
    z_drop(z_move(reply_key));
}

// 메인 데몬 루프 영역에서: 선언된 z_declare_queryable() 로 이 핸들러 포인터를 백그라운드와 바인드(Bind)한다.

쿼리 클라이언트 발동 노드 구조 (Querier & Collector)

void fetch_remote_data(z_owned_session_t* session) {
    z_owned_keyexpr_t target_key = z_keyexpr_new("robot/*/battery");
    z_get_options_t opts = z_get_options_default();
    
    // [중요 아키텍처] C 언어 컴파일러 환경에서는 비동기 Promise 풀링(for await) 구루가 성립하지 않는다. 
    // 이를 타개하기 위해 Zenoh 계층이 역분배하는 논리적 대답 패킷들을 선입선출적(FIFO)으로 격리 보관할 
    // 파이프라인 버퍼 채널(z_owned_reply_channel_t) 컴포넌트를 직접 선언 개방한다.
    z_owned_reply_channel_t channel = zc_reply_fifo_new(16); // Buffer Limit: 16 Depth Queue

    // 네트워크 쿼리 투사 개시! 리모트 엣지에서 생성된 응답들은 방금 개방한 무형의 channel 통로로 속속 버퍼링된다.
    z_get(z_loan(*session), z_loan(target_key), "", z_move(channel.send), &opts);

    // [이벤트 폴링 회수 루프 (Collector Loop)]
    // 응답 객체 메모리 프레임을 Null 상태체로 사전 초기화
    z_owned_reply_t reply = z_reply_null();
    
    // 채널 파이프 파킹 큐를 점유(z_call)한다. 큐 버퍼 채널이 완전히 Close 신호를 받거나, 
    // 데이터 유효성 판별 리턴(!z_reply_check)이 발생할 때까지 무한정 대기 소비(Consumer)를 루프한다.
    for (z_call(channel.recv, &reply); z_reply_check(&reply); z_call(channel.recv, &reply)) {
        
        if (z_reply_is_ok(&reply)) {
            // 유효 성공 튜플 확보
            z_sample_t sample = z_reply_ok(&reply);
            printf("[데이터 집계] 응답 바이트 렌더: %.*s\n", (int)sample.payload.len, sample.payload.start);
        } else {
            printf("[경고] 통신 파이프라인 에러 런타임 폴백 감지!\n");
        }
        
        // 메모리 루프 단절 방지를 위한 단건 응답 파괴 지시 (필수 요건)
        z_drop(z_move(reply)); 
    }

    // 채널 데몬 버퍼 및 잔존 핸들링 리소스의 완전 거세
    z_drop(z_move(channel));
    z_drop(z_move(target_key));
}

이 구현 코드는 C 애플리케이션 구조체 내에서 “복합 비동기 멀티캐스트의 흐름 제어(Flow Control & Synchronization)” 백엔드 로직을 하드코딩한 네트워크 무결성 패턴이다. 백그라운드 이벤트 컨텍스트 풀은 FIFO 큐 대기 채널(z_owned_reply_channel_t)을 임시 스토리지 버퍼로 간주하여 z_get 응답 데이터를 무작위 병렬 적재하고, 전위의 for 폴링(Polling) 루프는 이 버퍼의 스택을 싱글 스레드 제어권 기반으로 안전히 차감(Dequeue)함으로써, 시스템 콜 데드락(System Call Deadlock) 리스크를 원천적으로 차단하는 데 의의를 가진다.