|
|
|
|
|
import os |
|
|
import mlflow |
|
|
import chromadb |
|
|
from langchain_community.document_loaders import TextLoader, DirectoryLoader |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
from langchain_community.vectorstores import Chroma |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
DATA_PATH = "data/raw" |
|
|
DB_PATH = "data/chroma_db" |
|
|
COLLECTION_NAME = "rag_experiments" |
|
|
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" |
|
|
|
|
|
class IngestionPipeline: |
|
|
def __init__(self): |
|
|
self.embeddings = HuggingFaceEmbeddings( |
|
|
model_name=EMBEDDING_MODEL, |
|
|
model_kwargs={'device': 'mps'} |
|
|
) |
|
|
|
|
|
def load_documents(self): |
|
|
"""Loads text files from the data directory.""" |
|
|
loader = DirectoryLoader(DATA_PATH, glob="*.txt", loader_cls=TextLoader) |
|
|
documents = loader.load() |
|
|
print(f"π Loaded {len(documents)} documents.") |
|
|
return documents |
|
|
|
|
|
def chunk_documents(self, documents, chunk_size=1000, chunk_overlap=200): |
|
|
"""Splits documents into smaller chunks.""" |
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=chunk_size, |
|
|
chunk_overlap=chunk_overlap |
|
|
) |
|
|
chunks = text_splitter.split_documents(documents) |
|
|
print(f"π§© Split into {len(chunks)} chunks.") |
|
|
return chunks |
|
|
|
|
|
def store_embeddings(self, chunks): |
|
|
"""Embeds chunks and stores them in ChromaDB.""" |
|
|
if os.path.exists(DB_PATH): |
|
|
print("β οΈ Existing DB found. Appending...") |
|
|
|
|
|
vectorstore = Chroma.from_documents( |
|
|
documents=chunks, |
|
|
embedding=self.embeddings, |
|
|
persist_directory=DB_PATH, |
|
|
collection_name=COLLECTION_NAME |
|
|
) |
|
|
print(f"πΎ Saved to {DB_PATH}") |
|
|
return vectorstore |
|
|
|
|
|
def run(self): |
|
|
"""Runs the full pipeline with MLflow tracking.""" |
|
|
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI")) |
|
|
|
|
|
with mlflow.start_run(run_name="Ingestion_Phase_2"): |
|
|
|
|
|
mlflow.log_param("embedding_model", EMBEDDING_MODEL) |
|
|
mlflow.log_param("chunk_size", 1000) |
|
|
mlflow.log_param("chunk_overlap", 200) |
|
|
|
|
|
|
|
|
docs = self.load_documents() |
|
|
chunks = self.chunk_documents(docs) |
|
|
self.store_embeddings(chunks) |
|
|
|
|
|
|
|
|
mlflow.log_metric("num_documents", len(docs)) |
|
|
mlflow.log_metric("num_chunks", len(chunks)) |
|
|
|
|
|
print("β
Ingestion complete and logged to Dagshub!") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
pipeline = IngestionPipeline() |
|
|
pipeline.run() |
|
|
|