PYTHON

[Python] Searchive 프로젝트( 1 ) - MinIO, ElasticSearch, KeyBert

ch010104 2025. 10. 11. 16:16

Searchive 프로젝트:

- 단순히 파일을 저장하는 것을 넘어, 파일의 내용을 이해하고 자동으로 분류하며, 강력한 검색 기능을 제공하는 AI 기반 문서 관리 API 서버입니다. 

 

깃허브 주소:

- https://github.com/Chaehyunli/Searchive-backend


문서 업로드 파이프라인: A 9-Step Journey

  1. Gateway: API 엔드포인트에서 UploadFile 객체로 파일 수신
  2. Preparation: pathlib과 uuid를 이용한 고유 저장 경로 생성
  3. Storage: MinIO 객체 스토리지에 실제 파일 데이터 저장
  4. Metadata Logging: PostgreSQL에 파일의 메타데이터 기록
  5. Text Extraction: PDF, DOCX 등 다양한 문서에서 텍스트 추출
  6. Indexing: Elasticsearch에 텍스트를 색인하여 검색 준비
  7. AI Tagging: KeyBERT와 Elasticsearch를 활용한 하이브리드 키워드 추출
  8. Tag Association: 생성된 태그를 PostgreSQL에 저장하고 문서와 연결
  9. Response: 처리된 문서와 태그 정보를 클라이언트에 반환

1단계: 관문 - API 엔드포인트와 UploadFile

  • 모든 프로세스는 documents/controller.py의 API 엔드포인트에서 시작
  • FastAPI는 multipart/form-data 요청을 효율적으로 처리하기 위한 UploadFile 객체를 사용

💡 핵심 라이브러리: pathlib & uuid

- UploadFile (FastAPI) UploadFile은 단순한 데이터 컨테이너가 아닌, 메모리 효율성을 극대화한 스마트 객체

- 작은 파일은 메모리에서 신속하게 처리하고, 대용량 파일은 자동으로 디스크에 스풀링(spooling)하여 서버의 메모리 고갈을 방지

- filename, content_type 등의 필수 메타데이터도 함께 제공하여 개발 편의성을 높임

# src/domains/documents/controller.py
from fastapi import APIRouter, Depends, UploadFile, File

@router.post("/upload", status_code=201)
async def upload_document(
    # file: 업로드된 파일 데이터와 메타데이터를 담은 UploadFile 객체
    file: UploadFile = File(...),
    # user_id: 인증 미들웨어를 통해 검증된 사용자의 ID
    user_id: int = Depends(get_current_user_id),
    # document_service: 실제 비즈니스 로직을 담당하는 서비스 (의존성 주입)
    document_service: DocumentService = Depends()
):
    """문서 업로드 API 엔드포인트. 컨트롤러는 요청 검증과 서비스 호출만 담당한다."""
    document, tags, extraction_method = await document_service.upload_document(
        user_id=user_id,
        file=file
    )
    # Pydantic 스키마를 통해 최종 응답 구조를 정의하고 반환한다.
    return DocumentUploadResponse(...)

2단계: 저장 준비 - Path와 uuid로 고유 경로 생성

- 데이터 무결성을 보장하고 파일명 충돌을 방지하기 위해, 서버에 저장될 고유한 경로와 파일명을 생성

💡 핵심 라이브러리: pathlib & uuid

  • Path: 문자열 기반의 경로 처리에서 발생할 수 있는 오류를 방지하는 객체지향적 경로 관리 도구. .suffix (확장자), .stem (파일명) 등 경로의 각 부분을 안전하게 추출
  • uuid: uuid.uuid4()를 통해 예측 불가능한 랜덤 고유 ID를 생성, 파일명 충돌을 원천적으로 차단하고 보안을 강화
# src/domains/documents/service.py
import uuid
from pathlib import Path

async def upload_document(self, user_id: int, file: UploadFile):
    # ...
    # 1. pathlib.Path를 이용해 원본 파일명에서 확장자를 안전하게 분리한다.
    # 예: "my_report.pdf" -> ".pdf"
    file_extension = Path(file.filename).suffix
    
    # 2. uuid4를 사용하여 예측 불가능하고 고유한 파일명을 생성한다.
    # 예: "a1b2c3d4-e5f6-7890-abcd-ef1234567890.pdf"
    unique_filename = f"{uuid.uuid4()}{file_extension}"
    
    # 3. 사용자 ID를 디렉토리로 사용하여 데이터를 격리하고 최종 경로를 완성한다.
    # 예: "123/a1b2c3d4-e5f6-7890-abcd-ef1234567890.pdf"
    storage_path = f"{user_id}/{unique_filename}"
    # ...

3단계: 파일 안치 - MinIO 객체 스토리지에 저장

- 생성된 고유 경로를 사용하여 실제 파일 데이터를 영구 저장소인 MinIO에 업로드

💡 핵심 기술: MinIO (객체 스토리지)
- 대용량 비정형 데이터(파일, 이미지 등) 저장에 최적화된 시스템
- 파일을 '객체' 단위로 관리하며 HTTP API를 통해 접근
- 확장성이 뛰어나고 S3와 호환되어 클라우드 환경에서 표준으로 사용

# src/domains/documents/service.py
# UploadFile 객체에서 비동기적으로 전체 파일 데이터를 읽어온다.
file_data = await file.read()
file_size = len(file_data)

# MinIO 클라이언트 래퍼에 작업을 위임한다.
# 서비스 계층은 MinIO의 구체적인 구현을 알 필요가 없다 (관심사 분리).
self.minio_client.upload_file(
    file_path=storage_path,        # 2단계에서 생성한 고유 경로
    file_data=BytesIO(file_data),  # 파일 데이터를 메모리 내 바이너리 스트림으로
    file_size=file_size,
    content_type=file.content_type
)

4단계: 신상 명세 기록 - PostgreSQL에 메타데이터 저장

- 파일의 물리적 저장이 완료되면, 해당 파일을 관리하기 위한 정보(메타데이터)를 PostgreSQL 데이터베이스에 기록

# src/domains/documents/service.py
# 데이터 접근 계층(Repository)을 통해 DB에 INSERT 쿼리를 실행한다.
document = await self.document_repository.create(
    user_id=user_id,
    original_filename=file.filename, # 검색 및 표시를 위한 원본 파일명
    storage_path=storage_path,       # MinIO에 저장된 실제 파일 경로
    file_type=file.content_type,
    file_size_kb=file_size / 1024
)
# 이 작업이 끝나면, DB에서 자동 생성된 `document_id`가 포함된 `Document` 객체가 반환된다.

5단계: 내용물 확인 - 다양한 파일에서 텍스트 추출

- AI 분석과 검색을 위해 파일 형식(PDF, DOCX 등)에 관계없이 순수 텍스트를 추출해야 함

- 이 책임은 TextExtractor 모듈이 담당

# src/domains/documents/service.py
extracted_text = self.text_extractor.extract_text_from_bytes(
    file_data=file_data,
    file_type=file.content_type,
    filename=file.filename
)

# src/core/text_extractor.py
@staticmethod
def extract_text_from_bytes(file_data: bytes, file_type: str, filename: str):
    """파일의 MIME 타입을 기반으로 적절한 추출 라이브러리를 동적으로 선택한다."""
    if file_type == "application/pdf":
        return TextExtractor._extract_from_pdf(file_data)
    elif "wordprocessing" in file_type: # DOCX
        return TextExtractor._extract_from_docx(file_data)
    # ... (기타 형식 처리) ...

@staticmethod
def _extract_from_pdf(file_data: bytes):
    """pypdf 라이브러리를 사용하여 PDF의 각 페이지에서 텍스트를 추출하고 병합한다."""
    import pypdf
    from io import BytesIO
    
    pdf_reader = pypdf.PdfReader(BytesIO(file_data))
    text_parts = [page.extract_text() for page in pdf_reader.pages if page.extract_text()]
    return "\n".join(text_parts)

6단계: 검색 준비 - Elasticsearch에 텍스트 색인

- 추출된 텍스트를 검색 엔진 Elasticsearch에 저장(색인)

- 이 과정을 통해 텍스트는 검색 가능한 작은 단위(토큰)로 분해되고 역색인(Inverted Index)이 생성

💡 핵심 기술: Elasticsearch (검색 엔진)
- 텍스트 검색에 고도로 최적화된 분산 시스템
- 어떤 단어가 어떤 문서에 있는지를 기록한 '역색인' 구조 덕분에, 수십억 건의 문서에서도 밀리초 단위의 빠른 검색 속도를 보장

# src/domains/documents/service.py
await self.elasticsearch_client.index_document(
    document_id=document.document_id, # PostgreSQL의 PK와 ID를 일치시켜 데이터 정합성 유지
    user_id=user_id,
    content=extracted_text,           # 5단계에서 추출한 순수 텍스트
    # ... 기타 메타데이터 ...
)

7단계: 하이브리드 키워드 추출

- Searchive의 핵심 기능인 AI 태깅이 이 단계에서 수행

- 데이터의 양에 따라 최적의 방법을 선택하는 하이브리드 전략을 사용

  • Cold Start (문서 < 10개): 단일 문서의 의미를 깊게 이해하는 KeyBERT 사용.
  • Normal (문서 >= 10개): 전체 문서와의 통계적 비교를 통해 핵심어를 찾는 Elasticsearch TF-IDF 사용.
# src/domains/documents/service.py
keywords, extraction_method = await self.keyword_extraction_service.extract_keywords(
    text=extracted_text,
    document_id=document.document_id
)

# src/core/keyword_extraction.py (HybridKeywordExtractionService 내부)
async def extract_keywords(self, text: str, document_id: int):
    # 현재 색인된 총 문서 수를 확인하여 전략을 결정한다.
    document_count = await self.elasticsearch_client.get_document_count()

    if document_count < self.threshold:
        # KeyBERT는 텍스트 내용만으로 분석 가능
        keywords = await self.keybert_extractor.extract_keywords(text=text)
        method = "keybert"
    else:
        # Elasticsearch는 비교 대상 문서(document_id)가 반드시 필요
        keywords = await self.elasticsearch_extractor.extract_keywords(
            text=text, document_id=document_id
        )
        method = "elasticsearch"
    
    return keywords, method

8단계: 태그 생성 및 연결

- AI가 추출한 키워드들을 tags 테이블에 저장하고, document_tags 연결 테이블을 통해 현재 문서와 다대다(Many-to-Many) 관계를 설정

# src/domains/documents/service.py
# TagService는 DB 부하를 최소화하기 위해 최적화된 로직을 포함한다.
# - Get-or-Create: 이미 존재하는 태그는 새로 만들지 않고 재사용한다.
# - Bulk Operations: 여러 태그를 한 번의 쿼리로 조회하고 생성하여 N+1 문제를 방지한다.
tags = await self.tag_service.attach_tags_to_document(
    document_id=document.document_id,
    tag_names=keywords # 7단계에서 추출된 키워드 리스트
)

9단계: 최종 결과 응답

- 모든 파이프라인 단계를 성공적으로 통과하면, 컨트롤러는 최종적으로 생성된 문서 정보와 AI가 추출한 태그 목록을 JSON 형태로 클라이언트에게 반환하며 전체 프로세스를 완료

{
  "document_id": 101,
  "original_filename": "my_report.pdf",
  "tags": [
    {"tag_id": 1, "name": "machine learning"},
    {"tag_id": 2, "name": "deep learning"}
  ],
  "extraction_method": "keybert"
}