Arif
Initial commit (Clean history)
faa44eb
# src/ingestion/pipeline.py
import os
import mlflow
import chromadb
from langchain_community.document_loaders import TextLoader, DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
DATA_PATH = "data/raw"
DB_PATH = "data/chroma_db"
COLLECTION_NAME = "rag_experiments"
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
class IngestionPipeline:
def __init__(self):
self.embeddings = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL,
model_kwargs={'device': 'mps'} # Use M4 MPS for embeddings
)
def load_documents(self):
"""Loads text files from the data directory."""
loader = DirectoryLoader(DATA_PATH, glob="*.txt", loader_cls=TextLoader)
documents = loader.load()
print(f"πŸ“„ Loaded {len(documents)} documents.")
return documents
def chunk_documents(self, documents, chunk_size=1000, chunk_overlap=200):
"""Splits documents into smaller chunks."""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
chunks = text_splitter.split_documents(documents)
print(f"🧩 Split into {len(chunks)} chunks.")
return chunks
def store_embeddings(self, chunks):
"""Embeds chunks and stores them in ChromaDB."""
if os.path.exists(DB_PATH):
print("⚠️ Existing DB found. Appending...")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory=DB_PATH,
collection_name=COLLECTION_NAME
)
print(f"πŸ’Ύ Saved to {DB_PATH}")
return vectorstore
def run(self):
"""Runs the full pipeline with MLflow tracking."""
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI"))
with mlflow.start_run(run_name="Ingestion_Phase_2"):
# Log Parameters
mlflow.log_param("embedding_model", EMBEDDING_MODEL)
mlflow.log_param("chunk_size", 1000)
mlflow.log_param("chunk_overlap", 200)
# Execute Steps
docs = self.load_documents()
chunks = self.chunk_documents(docs)
self.store_embeddings(chunks)
# Log Metrics
mlflow.log_metric("num_documents", len(docs))
mlflow.log_metric("num_chunks", len(chunks))
print("βœ… Ingestion complete and logged to Dagshub!")
if __name__ == "__main__":
pipeline = IngestionPipeline()
pipeline.run()