13.7.5 파이썬 기반의 유효성 검사기(Validator) 클래스 구현 예제 코드
지금까지 13.7절 전반에서 숨 막히게 설계해 온 추출(Extraction), 3중 오라클 코어 검증(Validation), 메타 인지 피드백 기반 자가 수정(Correction), 그리고 최종 심사망 라우팅(Approval / HITL)으로 이어지는 4단계 데이터 생명 주기(EVCA Lifecycle)를, 마우스 클릭 한 번 불필요한 풀 오토메이션(Full-automation)으로 통제하는 마스터 파이프라인(Master Pipeline) 클래스는 다음과 같이 비동기(Asynchronous) 파이썬 코드로 우아하게 구현해 낼 수 있다.
이 코드는 단순한 애플리케이션 데모가 아니다. 이 코드는 이 책의 13장 전체, 더 나아가 비정형 AI 데이터 처리론을 관통하는 철학적, 사상적 결정체이자, 세상에서 가장 지저분한 비정형 픽셀 데이터를 세상에서 가장 아름답고 무결한 엔터프라이즈의 데이터베이스 텐서(1급 시민 객체)로 변환해 내는 **‘최후정화의 거푸집’**이다.
1. 하이브리드 마스터 Validator 클래스(EVCA Pipeline) 구현부
import asyncio
import logging
from typing import Optional, Dict, Any
from pydantic import ValidationError
# -------------------------------------------------------------------
# [의존성 임포트] 책의 앞선 챕터들에서 조각조각 제련해 두었던 부품들
# -------------------------------------------------------------------
# 13.3~13.5절에서 정의한 궁극의 3단계 오라클 Pydantic 스키마 객체
from schemas import InvoiceMasterOracleStage
# 13.7.3절에서 구현한 LLM 비동기 호출 및 instructor 패치 래퍼 (Pydantic 강제 포맷팅)
from llm_client import extract_json_with_instructor
# 13.7.4절에서 구현한 SLM 배심원 앙상블 팩트체킹 평가 함수
from ensemble import run_slm_fact_checker
logger = logging.getLogger("EVCA_Master_Pipeline")
class HybridDocumentValidatorPipeline:
def __init__(self, max_retries: int = 3):
# AI 모델 스스로 메타 인지 디버깅을 허용해 줄 최대 코인 개수 설정
self.max_retries = max_retries
async def process_document(
self, document_image_path: str, ocr_layout_text: str
) -> Optional[InvoiceMasterOracleStage]:
"""
단 1장의 비정형 이미지와 전처리 텍스트를 받아 트랜잭션의 생사(Life/Death)를
결정짓는 시스템의 거대한 메인 제어 루프(State Machine).
"""
attempt = 0
error_memory = [] # LLM의 자가 수정(Self-Correction)을 돕기 위해 오답 노트를 누적할 배열
while attempt < self.max_retries:
attempt += 1
logger.info(f"[Extraction State] 비전 추론 시도 {attempt}/{self.max_retries} ...")
try:
# -------------------------------------------------------------
# 1. Extraction: 비전 LLM에 OCR 텍스트와 누적된 에러 히스토리를 세트로 묶어 비동기 추론 요구
# -------------------------------------------------------------
extracted_pydantic_obj = await extract_json_with_instructor(
image_path=document_image_path,
ocr_context=ocr_layout_text, # 업스트림 방어: Bounding Box 공간 지표 파괴 방어 (13.6.1절)
error_feedback="\n".join(error_memory), # 루프백 피드백: 네가 아까 이런 오라클 에러를 냈었다고 주입 (13.7.3절)
target_schema=InvoiceMasterOracleStage # 타겟 거푸집: 1,2,3단계 오라클이 내부망에 탑재된 무시무시한 객체
)
# [중요 아키텍처 포인트]:
# 위 extract_json_with_instructor 라인이 실행되고 모델이 JSON을 뱉는 동일한 순간,
# InvoiceMasterOracleStage 스키마에 정의된 Pydantic 1단계(타입), 2단계(산술), 3단계(DB지식)
# 오라클 @model_validator 검증 로직 머신건이 런타임에 동기적으로 무자비하게 차례대로 격발됨.
# 여기서 AI가 생성한 값이 이 3중 방어막을 뚫지 못하면 즉시 아래의 except 블록으로 에러 파편이 튕겨 나감.
# 이 라인 바로 턱밑에 살아서 아래 코드로 도달했다는 것은, 저 3가지 지옥주를 모두 무사 통과했음을 수학적으로 증명함.
# -------------------------------------------------------------
# 2. Validation (Ensemble): SLM 교차 검증 배심원 개입 (13.7.4절)
# -------------------------------------------------------------
logger.info("[Ensemble State] 거대 LLM의 1차 생존을 확인. SLM 보조 검증기에 비토권(Veto) 행사 여부를 묻는 중...")
consensus = await run_slm_fact_checker(
raw_text=ocr_layout_text,
parsed_json=extracted_pydantic_obj.model_dump()
)
if not consensus.is_valid:
# 메인 모델의 독주를 견제. 싼 값의 SLM이 환각을 지적하면 무조건 중단
raise ValueError(f"[하이브리드 앙상블 붕괴 에러] SLM 스파이 비토 격발: {consensus.reason}")
# -------------------------------------------------------------
# 3. Approval: 모든 지옥의 오라클을 극적으로 통과한 0.1%의 무결점 객체
# -------------------------------------------------------------
logger.info("[Approval State] 위대함 증명 완료. 모든 오라클 및 앙상블 합의(Consensus) 통과. DB 영속화 절차 이관 허가.")
# ERP 데이터베이스에 이 객체를 즉시 INSERT 쳐도 완벽히 안전함이 수학적, 리걸하게 증명됨!
return extracted_pydantic_obj
except ValidationError as ve:
# Pydantic 스키마가 분노하여 잡아낸 1단계(구문 오타), 2단계(산술 연산 모순) 에러 캐치 (Type 1, 2)
error_msg = f"[모델 논리 에러] 구조/산술 오라클 예외: {ve.errors()[0]['msg']}"
logger.warning(error_msg)
# 에러 메시지를 컨텍스트 창에 버리지 않고, 자가 수정 루프를 위해 메모리에 차곡차곡 누적
error_memory.append(error_msg)
except ValueError as val_e:
# 커스텀 오라클 망이 잡아낸 3단계 외부 지식망 붕괴 등 (Type 3)
error_msg = str(val_e)
logger.error(f"[결정적 메탈 에러] {error_msg}")
if "HITL" in error_msg or "[리걸(Legal) 무결성 파괴]" in error_msg:
# 국세청 파산 벤더이거나 PO 넘버가 없는 등, 모델 지능이 아니라 데이터 자체가 쓰레기인 경우.
# 무의미한 루프 재시도를 즉각 찢어버리고, 대기 중인 인간 전문가에게 트랜잭션을 혐오스럽게 토스함.
self._route_to_human_in_the_loop_queue(document_image_path, error_msg)
return None
error_memory.append(error_msg)
except Exception as system_e:
# 관공서 API TimeOut 등 LLM도 오라클도 잘못이 없는 순수 외부 인프라 타임아웃
logger.critical(f"[플랫폼 인프라 레벨망 붕괴] 트래픽 혼잡 또는 서버 다운: {system_e}")
self._route_to_delayed_retry_queue(document_image_path)
return None
# -------------------------------------------------------------
# 4. Correction State (Failure 종료): 자가 수정 코인을 Max까지 다 소진했으나 회생 실패
# -------------------------------------------------------------
logger.error(f"[루프 강제 종료] 프롬프트 튜닝 피드백을 제공했음에도, LLM 최대 재시도({self.max_retries}) 초과. 모델 역량 한계 봉착.")
final_errors = "\n".join(error_memory)
# 마지막 조치: 파이프라인에서 데이터를 즉각 격리하고 수동 심판대(HITL)로 넘긴다
self._route_to_human_in_the_loop_queue(document_image_path, f"재시도 횟수 초과. 누적 에러 로그:\n{final_errors}")
return None
# ============== 대규모 메시지 분산 큐(Message Queue) 라우팅 레이어 ==============
def _route_to_human_in_the_loop_queue(self, image_path: str, reason: str):
# RabbitMQ 나 AWS SQS 퍼블리싱 로직: [인간 수동 결재] HITL 대시보드 인박스로 라우팅
logger.info(f" -> [DEAD LETTER / HITL Queue 발송 완료] 이미지: {image_path} (추방 사유: {reason})")
def _route_to_delayed_retry_queue(self, image_path: str):
# RabbitMQ Delayed Exchange 로직: 네트워크 혼잡을 피해 5분 뒤 다시 파이프라인에 재투입
logger.info(f" -> [Delay Exponential Backoff Queue 대피 완료] 이미지: {image_path}")