Searchive 프로젝트:
- 단순히 파일을 저장하는 것을 넘어, 파일의 내용을 이해하고 자동으로 분류하며, 강력한 검색 기능을 제공하는 AI 기반 문서 관리 API 서버입니다.
깃허브 주소:
- https://github.com/Chaehyunli/Searchive-backend
문서 업로드 파이프라인: A 9-Step Journey
- Gateway: API 엔드포인트에서 UploadFile 객체로 파일 수신
- Preparation: pathlib과 uuid를 이용한 고유 저장 경로 생성
- Storage: MinIO 객체 스토리지에 실제 파일 데이터 저장
- Metadata Logging: PostgreSQL에 파일의 메타데이터 기록
- Text Extraction: PDF, DOCX 등 다양한 문서에서 텍스트 추출
- Indexing: Elasticsearch에 텍스트를 색인하여 검색 준비
- AI Tagging: KeyBERT와 Elasticsearch를 활용한 하이브리드 키워드 추출
- Tag Association: 생성된 태그를 PostgreSQL에 저장하고 문서와 연결
- Response: 처리된 문서와 태그 정보를 클라이언트에 반환
1단계: 관문 - API 엔드포인트와 UploadFile
- 모든 프로세스는 documents/controller.py의 API 엔드포인트에서 시작
- FastAPI는 multipart/form-data 요청을 효율적으로 처리하기 위한 UploadFile 객체를 사용
💡 핵심 라이브러리: pathlib & uuid
- UploadFile (FastAPI) UploadFile은 단순한 데이터 컨테이너가 아닌, 메모리 효율성을 극대화한 스마트 객체
- 작은 파일은 메모리에서 신속하게 처리하고, 대용량 파일은 자동으로 디스크에 스풀링(spooling)하여 서버의 메모리 고갈을 방지
- filename, content_type 등의 필수 메타데이터도 함께 제공하여 개발 편의성을 높임
# src/domains/documents/controller.py
from fastapi import APIRouter, Depends, UploadFile, File
@router.post("/upload", status_code=201)
async def upload_document(
# file: 업로드된 파일 데이터와 메타데이터를 담은 UploadFile 객체
file: UploadFile = File(...),
# user_id: 인증 미들웨어를 통해 검증된 사용자의 ID
user_id: int = Depends(get_current_user_id),
# document_service: 실제 비즈니스 로직을 담당하는 서비스 (의존성 주입)
document_service: DocumentService = Depends()
):
"""문서 업로드 API 엔드포인트. 컨트롤러는 요청 검증과 서비스 호출만 담당한다."""
document, tags, extraction_method = await document_service.upload_document(
user_id=user_id,
file=file
)
# Pydantic 스키마를 통해 최종 응답 구조를 정의하고 반환한다.
return DocumentUploadResponse(...)
2단계: 저장 준비 - Path와 uuid로 고유 경로 생성
- 데이터 무결성을 보장하고 파일명 충돌을 방지하기 위해, 서버에 저장될 고유한 경로와 파일명을 생성
💡 핵심 라이브러리: pathlib & uuid
- Path: 문자열 기반의 경로 처리에서 발생할 수 있는 오류를 방지하는 객체지향적 경로 관리 도구. .suffix (확장자), .stem (파일명) 등 경로의 각 부분을 안전하게 추출
- uuid: uuid.uuid4()를 통해 예측 불가능한 랜덤 고유 ID를 생성, 파일명 충돌을 원천적으로 차단하고 보안을 강화
# src/domains/documents/service.py
import uuid
from pathlib import Path
async def upload_document(self, user_id: int, file: UploadFile):
# ...
# 1. pathlib.Path를 이용해 원본 파일명에서 확장자를 안전하게 분리한다.
# 예: "my_report.pdf" -> ".pdf"
file_extension = Path(file.filename).suffix
# 2. uuid4를 사용하여 예측 불가능하고 고유한 파일명을 생성한다.
# 예: "a1b2c3d4-e5f6-7890-abcd-ef1234567890.pdf"
unique_filename = f"{uuid.uuid4()}{file_extension}"
# 3. 사용자 ID를 디렉토리로 사용하여 데이터를 격리하고 최종 경로를 완성한다.
# 예: "123/a1b2c3d4-e5f6-7890-abcd-ef1234567890.pdf"
storage_path = f"{user_id}/{unique_filename}"
# ...
3단계: 파일 안치 - MinIO 객체 스토리지에 저장
- 생성된 고유 경로를 사용하여 실제 파일 데이터를 영구 저장소인 MinIO에 업로드
💡 핵심 기술: MinIO (객체 스토리지)
- 대용량 비정형 데이터(파일, 이미지 등) 저장에 최적화된 시스템
- 파일을 '객체' 단위로 관리하며 HTTP API를 통해 접근
- 확장성이 뛰어나고 S3와 호환되어 클라우드 환경에서 표준으로 사용
# src/domains/documents/service.py
# UploadFile 객체에서 비동기적으로 전체 파일 데이터를 읽어온다.
file_data = await file.read()
file_size = len(file_data)
# MinIO 클라이언트 래퍼에 작업을 위임한다.
# 서비스 계층은 MinIO의 구체적인 구현을 알 필요가 없다 (관심사 분리).
self.minio_client.upload_file(
file_path=storage_path, # 2단계에서 생성한 고유 경로
file_data=BytesIO(file_data), # 파일 데이터를 메모리 내 바이너리 스트림으로
file_size=file_size,
content_type=file.content_type
)
4단계: 신상 명세 기록 - PostgreSQL에 메타데이터 저장
- 파일의 물리적 저장이 완료되면, 해당 파일을 관리하기 위한 정보(메타데이터)를 PostgreSQL 데이터베이스에 기록
# src/domains/documents/service.py
# 데이터 접근 계층(Repository)을 통해 DB에 INSERT 쿼리를 실행한다.
document = await self.document_repository.create(
user_id=user_id,
original_filename=file.filename, # 검색 및 표시를 위한 원본 파일명
storage_path=storage_path, # MinIO에 저장된 실제 파일 경로
file_type=file.content_type,
file_size_kb=file_size / 1024
)
# 이 작업이 끝나면, DB에서 자동 생성된 `document_id`가 포함된 `Document` 객체가 반환된다.
5단계: 내용물 확인 - 다양한 파일에서 텍스트 추출
- AI 분석과 검색을 위해 파일 형식(PDF, DOCX 등)에 관계없이 순수 텍스트를 추출해야 함
- 이 책임은 TextExtractor 모듈이 담당
# src/domains/documents/service.py
extracted_text = self.text_extractor.extract_text_from_bytes(
file_data=file_data,
file_type=file.content_type,
filename=file.filename
)
# src/core/text_extractor.py
@staticmethod
def extract_text_from_bytes(file_data: bytes, file_type: str, filename: str):
"""파일의 MIME 타입을 기반으로 적절한 추출 라이브러리를 동적으로 선택한다."""
if file_type == "application/pdf":
return TextExtractor._extract_from_pdf(file_data)
elif "wordprocessing" in file_type: # DOCX
return TextExtractor._extract_from_docx(file_data)
# ... (기타 형식 처리) ...
@staticmethod
def _extract_from_pdf(file_data: bytes):
"""pypdf 라이브러리를 사용하여 PDF의 각 페이지에서 텍스트를 추출하고 병합한다."""
import pypdf
from io import BytesIO
pdf_reader = pypdf.PdfReader(BytesIO(file_data))
text_parts = [page.extract_text() for page in pdf_reader.pages if page.extract_text()]
return "\n".join(text_parts)
6단계: 검색 준비 - Elasticsearch에 텍스트 색인
- 추출된 텍스트를 검색 엔진 Elasticsearch에 저장(색인)
- 이 과정을 통해 텍스트는 검색 가능한 작은 단위(토큰)로 분해되고 역색인(Inverted Index)이 생성
💡 핵심 기술: Elasticsearch (검색 엔진)
- 텍스트 검색에 고도로 최적화된 분산 시스템
- 어떤 단어가 어떤 문서에 있는지를 기록한 '역색인' 구조 덕분에, 수십억 건의 문서에서도 밀리초 단위의 빠른 검색 속도를 보장
# src/domains/documents/service.py
await self.elasticsearch_client.index_document(
document_id=document.document_id, # PostgreSQL의 PK와 ID를 일치시켜 데이터 정합성 유지
user_id=user_id,
content=extracted_text, # 5단계에서 추출한 순수 텍스트
# ... 기타 메타데이터 ...
)
7단계: 하이브리드 키워드 추출
- Searchive의 핵심 기능인 AI 태깅이 이 단계에서 수행
- 데이터의 양에 따라 최적의 방법을 선택하는 하이브리드 전략을 사용
- Cold Start (문서 < 10개): 단일 문서의 의미를 깊게 이해하는 KeyBERT 사용.
- Normal (문서 >= 10개): 전체 문서와의 통계적 비교를 통해 핵심어를 찾는 Elasticsearch TF-IDF 사용.
# src/domains/documents/service.py
keywords, extraction_method = await self.keyword_extraction_service.extract_keywords(
text=extracted_text,
document_id=document.document_id
)
# src/core/keyword_extraction.py (HybridKeywordExtractionService 내부)
async def extract_keywords(self, text: str, document_id: int):
# 현재 색인된 총 문서 수를 확인하여 전략을 결정한다.
document_count = await self.elasticsearch_client.get_document_count()
if document_count < self.threshold:
# KeyBERT는 텍스트 내용만으로 분석 가능
keywords = await self.keybert_extractor.extract_keywords(text=text)
method = "keybert"
else:
# Elasticsearch는 비교 대상 문서(document_id)가 반드시 필요
keywords = await self.elasticsearch_extractor.extract_keywords(
text=text, document_id=document_id
)
method = "elasticsearch"
return keywords, method
8단계: 태그 생성 및 연결
- AI가 추출한 키워드들을 tags 테이블에 저장하고, document_tags 연결 테이블을 통해 현재 문서와 다대다(Many-to-Many) 관계를 설정
# src/domains/documents/service.py
# TagService는 DB 부하를 최소화하기 위해 최적화된 로직을 포함한다.
# - Get-or-Create: 이미 존재하는 태그는 새로 만들지 않고 재사용한다.
# - Bulk Operations: 여러 태그를 한 번의 쿼리로 조회하고 생성하여 N+1 문제를 방지한다.
tags = await self.tag_service.attach_tags_to_document(
document_id=document.document_id,
tag_names=keywords # 7단계에서 추출된 키워드 리스트
)
9단계: 최종 결과 응답
- 모든 파이프라인 단계를 성공적으로 통과하면, 컨트롤러는 최종적으로 생성된 문서 정보와 AI가 추출한 태그 목록을 JSON 형태로 클라이언트에게 반환하며 전체 프로세스를 완료
{
"document_id": 101,
"original_filename": "my_report.pdf",
"tags": [
{"tag_id": 1, "name": "machine learning"},
{"tag_id": 2, "name": "deep learning"}
],
"extraction_method": "keybert"
}
'PYTHON' 카테고리의 다른 글
| [Python] Elasticsearch 클라이언트 분석 및 검색 개념 정리 - Searchive 프로젝트 (0) | 2026.03.28 |
|---|---|
| [Python] Searchive 프로젝트( 2 ) - KeyBert, ElasticSearch 고도화 (0) | 2025.10.11 |