13.7.5 파이썬 기반의 유효성 검사기(Validator) 클래스 구현 예제 코드

13.7.5 파이썬 기반의 유효성 검사기(Validator) 클래스 구현 예제 코드

지금까지 13.7절 전반에서 숨 막히게 설계해 온 추출(Extraction), 3중 오라클 코어 검증(Validation), 메타 인지 피드백 기반 자가 수정(Correction), 그리고 최종 심사망 라우팅(Approval / HITL)으로 이어지는 4단계 데이터 생명 주기(EVCA Lifecycle)를, 마우스 클릭 한 번 불필요한 풀 오토메이션(Full-automation)으로 통제하는 마스터 파이프라인(Master Pipeline) 클래스는 다음과 같이 비동기(Asynchronous) 파이썬 코드로 우아하게 구현해 낼 수 있다.

이 코드는 단순한 애플리케이션 데모가 아니다. 이 코드는 이 책의 13장 전체, 더 나아가 비정형 AI 데이터 처리론을 관통하는 철학적, 사상적 결정체이자, 세상에서 가장 지저분한 비정형 픽셀 데이터를 세상에서 가장 아름답고 무결한 엔터프라이즈의 데이터베이스 텐서(1급 시민 객체)로 변환해 내는 **‘최후정화의 거푸집’**이다.

1. 하이브리드 마스터 Validator 클래스(EVCA Pipeline) 구현부

import asyncio
import logging
from typing import Optional, Dict, Any
from pydantic import ValidationError

# -------------------------------------------------------------------
# [의존성 임포트] 책의 앞선 챕터들에서 조각조각 제련해 두었던 부품들
# -------------------------------------------------------------------
# 13.3~13.5절에서 정의한 궁극의 3단계 오라클 Pydantic 스키마 객체
from schemas import InvoiceMasterOracleStage 
# 13.7.3절에서 구현한 LLM 비동기 호출 및 instructor 패치 래퍼 (Pydantic 강제 포맷팅)
from llm_client import extract_json_with_instructor 
# 13.7.4절에서 구현한 SLM 배심원 앙상블 팩트체킹 평가 함수
from ensemble import run_slm_fact_checker 

logger = logging.getLogger("EVCA_Master_Pipeline")

class HybridDocumentValidatorPipeline:
    def __init__(self, max_retries: int = 3):
        # AI 모델 스스로 메타 인지 디버깅을 허용해 줄 최대 코인 개수 설정
        self.max_retries = max_retries

    async def process_document(
        self, document_image_path: str, ocr_layout_text: str
    ) -> Optional[InvoiceMasterOracleStage]:
        """
        단 1장의 비정형 이미지와 전처리 텍스트를 받아 트랜잭션의 생사(Life/Death)를 
        결정짓는 시스템의 거대한 메인 제어 루프(State Machine).
        """
        attempt = 0
        error_memory = [] # LLM의 자가 수정(Self-Correction)을 돕기 위해 오답 노트를 누적할 배열

        while attempt < self.max_retries:
            attempt += 1
            logger.info(f"[Extraction State] 비전 추론 시도 {attempt}/{self.max_retries} ...")

            try:
                # -------------------------------------------------------------
                # 1. Extraction: 비전 LLM에 OCR 텍스트와 누적된 에러 히스토리를 세트로 묶어 비동기 추론 요구
                # -------------------------------------------------------------
                extracted_pydantic_obj = await extract_json_with_instructor(
                    image_path=document_image_path,
                    ocr_context=ocr_layout_text,            # 업스트림 방어: Bounding Box 공간 지표 파괴 방어 (13.6.1절)
                    error_feedback="\n".join(error_memory), # 루프백 피드백: 네가 아까 이런 오라클 에러를 냈었다고 주입 (13.7.3절)
                    target_schema=InvoiceMasterOracleStage  # 타겟 거푸집: 1,2,3단계 오라클이 내부망에 탑재된 무시무시한 객체
                )

                # [중요 아키텍처 포인트]: 
                # 위 extract_json_with_instructor 라인이 실행되고 모델이 JSON을 뱉는 동일한 순간,
                # InvoiceMasterOracleStage 스키마에 정의된 Pydantic 1단계(타입), 2단계(산술), 3단계(DB지식) 
                # 오라클 @model_validator 검증 로직 머신건이 런타임에 동기적으로 무자비하게 차례대로 격발됨. 
                # 여기서 AI가 생성한 값이 이 3중 방어막을 뚫지 못하면 즉시 아래의 except 블록으로 에러 파편이 튕겨 나감.
                # 이 라인 바로 턱밑에 살아서 아래 코드로 도달했다는 것은, 저 3가지 지옥주를 모두 무사 통과했음을 수학적으로 증명함.

                # -------------------------------------------------------------
                # 2. Validation (Ensemble): SLM 교차 검증 배심원 개입 (13.7.4절)
                # -------------------------------------------------------------
                logger.info("[Ensemble State] 거대 LLM의 1차 생존을 확인. SLM 보조 검증기에 비토권(Veto) 행사 여부를 묻는 중...")
                consensus = await run_slm_fact_checker(
                    raw_text=ocr_layout_text, 
                    parsed_json=extracted_pydantic_obj.model_dump()
                )
                
                if not consensus.is_valid:
                    # 메인 모델의 독주를 견제. 싼 값의 SLM이 환각을 지적하면 무조건 중단
                    raise ValueError(f"[하이브리드 앙상블 붕괴 에러] SLM 스파이 비토 격발: {consensus.reason}")

                # -------------------------------------------------------------
                # 3. Approval: 모든 지옥의 오라클을 극적으로 통과한 0.1%의 무결점 객체
                # -------------------------------------------------------------
                logger.info("[Approval State] 위대함 증명 완료. 모든 오라클 및 앙상블 합의(Consensus) 통과. DB 영속화 절차 이관 허가.")
                
                # ERP 데이터베이스에 이 객체를 즉시 INSERT 쳐도 완벽히 안전함이 수학적, 리걸하게 증명됨!
                return extracted_pydantic_obj

            except ValidationError as ve:
                # Pydantic 스키마가 분노하여 잡아낸 1단계(구문 오타), 2단계(산술 연산 모순) 에러 캐치 (Type 1, 2)
                error_msg = f"[모델 논리 에러] 구조/산술 오라클 예외: {ve.errors()[0]['msg']}"
                logger.warning(error_msg)
                
                # 에러 메시지를 컨텍스트 창에 버리지 않고, 자가 수정 루프를 위해 메모리에 차곡차곡 누적
                error_memory.append(error_msg) 

            except ValueError as val_e:
                # 커스텀 오라클 망이 잡아낸 3단계 외부 지식망 붕괴 등 (Type 3)
                error_msg = str(val_e)
                logger.error(f"[결정적 메탈 에러] {error_msg}")
                
                if "HITL" in error_msg or "[리걸(Legal) 무결성 파괴]" in error_msg:
                    # 국세청 파산 벤더이거나 PO 넘버가 없는 등, 모델 지능이 아니라 데이터 자체가 쓰레기인 경우.
                    # 무의미한 루프 재시도를 즉각 찢어버리고, 대기 중인 인간 전문가에게 트랜잭션을 혐오스럽게 토스함.
                    self._route_to_human_in_the_loop_queue(document_image_path, error_msg)
                    return None
                    
                error_memory.append(error_msg)
            
            except Exception as system_e:
                # 관공서 API TimeOut 등 LLM도 오라클도 잘못이 없는 순수 외부 인프라 타임아웃
                logger.critical(f"[플랫폼 인프라 레벨망 붕괴] 트래픽 혼잡 또는 서버 다운: {system_e}")
                self._route_to_delayed_retry_queue(document_image_path) 
                return None

        # -------------------------------------------------------------
        # 4. Correction State (Failure 종료): 자가 수정 코인을 Max까지 다 소진했으나 회생 실패 
        # -------------------------------------------------------------
        logger.error(f"[루프 강제 종료] 프롬프트 튜닝 피드백을 제공했음에도, LLM 최대 재시도({self.max_retries}) 초과. 모델 역량 한계 봉착.")
        final_errors = "\n".join(error_memory)
        
        # 마지막 조치: 파이프라인에서 데이터를 즉각 격리하고 수동 심판대(HITL)로 넘긴다
        self._route_to_human_in_the_loop_queue(document_image_path, f"재시도 횟수 초과. 누적 에러 로그:\n{final_errors}")
        return None

    # ============== 대규모 메시지 분산 큐(Message Queue) 라우팅 레이어 ==============
    
    def _route_to_human_in_the_loop_queue(self, image_path: str, reason: str):
        # RabbitMQ 나 AWS SQS 퍼블리싱 로직: [인간 수동 결재] HITL 대시보드 인박스로 라우팅
        logger.info(f"  -> [DEAD LETTER / HITL Queue 발송 완료] 이미지: {image_path} (추방 사유: {reason})")

    def _route_to_delayed_retry_queue(self, image_path: str):
        # RabbitMQ Delayed Exchange 로직: 네트워크 혼잡을 피해 5분 뒤 다시 파이프라인에 재투입 
        logger.info(f"  -> [Delay Exponential Backoff Queue 대피 완료] 이미지: {image_path}")