| """ |
| This module provides comprehensive VCF file parsing |
| """ |
|
|
| import vcf |
| import json |
| import logging |
| from pathlib import Path |
| from collections import defaultdict |
| from typing import Dict, List, Tuple, Optional, Union, Any |
| from dataclasses import dataclass, asdict |
| import pandas as pd |
|
|
| from config import DataConfig, ConfigManager |
|
|
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class MutationRecord: |
| """Represents a single mutation record with all relevant information.""" |
| |
| chromosome: str |
| position: int |
| reference: str |
| alternate: str |
| impact: str |
| gene_id: str |
| pathway: str |
| sample_id: str |
| quality: Optional[float] = None |
| depth: Optional[int] = None |
| allele_frequency: Optional[float] = None |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary format.""" |
| return asdict(self) |
|
|
|
|
| class PathwayMapper: |
| """Manages pathway mapping data and provides gene-to-pathway lookups.""" |
| |
| def __init__(self, pathway_file: Optional[Union[str, Path]] = None): |
| self.gene_to_pathway: Dict[str, str] = {} |
| self.pathway_to_genes: Dict[str, List[str]] = defaultdict(list) |
| |
| if pathway_file: |
| self.load_pathway_mappings(pathway_file) |
| |
| def load_pathway_mappings(self, pathway_file: Union[str, Path]) -> None: |
| """ |
| Load pathway mappings from file. |
| |
| Expected formats: |
| - JSON: {"GENE1": "pathway1", "GENE2": "pathway2", ...} |
| - TSV: gene\tpathway |
| """ |
| pathway_file = Path(pathway_file) |
| |
| if not pathway_file.exists(): |
| logger.warning(f"Pathway file not found: {pathway_file}") |
| return |
| |
| try: |
| if pathway_file.suffix.lower() == '.json': |
| self._load_json_pathways(pathway_file) |
| elif pathway_file.suffix.lower() in ['.tsv', '.txt']: |
| self._load_tsv_pathways(pathway_file) |
| else: |
| logger.error(f"Unsupported pathway file format: {pathway_file.suffix}") |
| |
| except Exception as e: |
| logger.error(f"Error loading pathway mappings: {e}") |
| raise |
| |
| def _load_json_pathways(self, file_path: Path) -> None: |
| """Load pathway mappings from JSON file.""" |
| with open(file_path, 'r') as f: |
| self.gene_to_pathway = json.load(f) |
| |
| |
| for gene, pathway in self.gene_to_pathway.items(): |
| self.pathway_to_genes[pathway].append(gene) |
| |
| def _load_tsv_pathways(self, file_path: Path) -> None: |
| """Load pathway mappings from TSV file.""" |
| df = pd.read_csv(file_path, sep='\t') |
| |
| required_columns = ['gene', 'pathway'] |
| if not all(col in df.columns for col in required_columns): |
| raise ValueError(f"TSV file must contain columns: {required_columns}") |
| |
| for _, row in df.iterrows(): |
| gene = str(row['gene']) |
| pathway = str(row['pathway']) |
| |
| self.gene_to_pathway[gene] = pathway |
| self.pathway_to_genes[pathway].append(gene) |
| |
| def get_pathway(self, gene_id: str) -> str: |
| """Get pathway for a gene, returns 'Unknown_Pathway' if not found.""" |
| return self.gene_to_pathway.get(gene_id, "Unknown_Pathway") |
| |
| def get_genes_in_pathway(self, pathway: str) -> List[str]: |
| """Get all genes in a specific pathway.""" |
| return self.pathway_to_genes.get(pathway, []) |
|
|
|
|
| class VCFParser: |
| """ |
| Comprehensive VCF parser with hierarchical data organization. |
| |
| Parses VCF files and organizes mutations in a hierarchical structure: |
| Sample -> Pathway -> Chromosome -> Gene -> Mutations |
| """ |
| |
| def __init__(self, |
| config: Optional[DataConfig] = None, |
| pathway_mapper: Optional[PathwayMapper] = None): |
| |
| self.config = config or DataConfig() |
| self.pathway_mapper = pathway_mapper or PathwayMapper() |
| |
| |
| self.parsing_stats = { |
| 'total_records': 0, |
| 'processed_records': 0, |
| 'skipped_records': 0, |
| 'samples_processed': 0, |
| 'unique_genes': set(), |
| 'unique_pathways': set() |
| } |
| |
| def parse_vcf_file(self, vcf_file: Union[str, Path]) -> Dict[str, Any]: |
| """ |
| Parse VCF file and return hierarchical mutation data. |
| |
| Returns: |
| Dict with structure: { |
| 'sample_id': { |
| 'pathway_id': { |
| 'chromosome': { |
| 'gene_id': [MutationRecord, ...] |
| } |
| } |
| } |
| } |
| """ |
| vcf_file = Path(vcf_file) |
| |
| if not vcf_file.exists(): |
| raise FileNotFoundError(f"VCF file not found: {vcf_file}") |
| |
| logger.info(f"Parsing VCF file: {vcf_file}") |
| |
| |
| hierarchical_data = defaultdict( |
| lambda: defaultdict( |
| lambda: defaultdict( |
| lambda: defaultdict( |
| lambda: [] |
| ) |
| ) |
| ) |
| ) |
| |
| try: |
| vcf_reader = vcf.Reader(filename=str(vcf_file)) |
| |
| for record in vcf_reader: |
| self.parsing_stats['total_records'] += 1 |
| |
| |
| for sample in record.samples: |
| mutation_record = self._process_vcf_record(record, sample) |
| |
| if mutation_record: |
| |
| sample_id = mutation_record.sample_id |
| pathway = mutation_record.pathway |
| chromosome = mutation_record.chromosome |
| gene_id = mutation_record.gene_id |
| |
| hierarchical_data[sample_id][pathway][chromosome][gene_id].append( |
| mutation_record |
| ) |
| |
| |
| self.parsing_stats['processed_records'] += 1 |
| self.parsing_stats['unique_genes'].add(gene_id) |
| self.parsing_stats['unique_pathways'].add(pathway) |
| else: |
| self.parsing_stats['skipped_records'] += 1 |
| |
| self.parsing_stats['samples_processed'] = len(hierarchical_data) |
| |
| except Exception as e: |
| logger.error(f"Error parsing VCF file: {e}") |
| raise |
| |
| logger.info(f"Parsing completed. Processed {self.parsing_stats['processed_records']} " |
| f"mutations from {self.parsing_stats['samples_processed']} samples") |
| |
| return dict(hierarchical_data) |
| |
| def _process_vcf_record(self, record, sample) -> Optional[MutationRecord]: |
| """Process a single VCF record and return MutationRecord.""" |
| try: |
| |
| chrom = str(record.CHROM) |
| pos = record.POS |
| ref = record.REF |
| alt = str(record.ALT[0]) if record.ALT else '.' |
| |
| |
| if chrom not in self.config.supported_chromosomes: |
| return None |
| |
| |
| impact = self._extract_impact(record) |
| if impact not in self.config.supported_impacts: |
| impact = "MODERATE" |
| |
| |
| gene_id = self._extract_gene_id(record) |
| if not gene_id: |
| gene_id = "Unknown_Gene" |
| |
| |
| pathway = self.pathway_mapper.get_pathway(gene_id) |
| |
| |
| quality = getattr(record, 'QUAL', None) |
| depth = self._extract_depth(sample) |
| allele_freq = self._extract_allele_frequency(sample) |
| |
| return MutationRecord( |
| chromosome=chrom, |
| position=pos, |
| reference=ref, |
| alternate=alt, |
| impact=impact, |
| gene_id=gene_id, |
| pathway=pathway, |
| sample_id=sample.sample, |
| quality=quality, |
| depth=depth, |
| allele_frequency=allele_freq |
| ) |
| |
| except Exception as e: |
| logger.warning(f"Error processing record at {record.CHROM}:{record.POS}: {e}") |
| return None |
| |
| def _extract_gene_id(self, record) -> Optional[str]: |
| """Extract gene ID directly from VCF record INFO fields.""" |
| |
| gene_fields = ['GENE', 'SYMBOL', 'ANN', 'EFF', 'CSQ', 'GENEINFO'] |
| |
| for field in gene_fields: |
| if field in record.INFO: |
| gene_value = record.INFO[field] |
| if isinstance(gene_value, list): |
| gene_value = gene_value[0] |
| |
| gene_str = str(gene_value) |
| |
| |
| if '|' in gene_str: |
| |
| parts = gene_str.split('|') |
| for part in parts: |
| if part and part not in ['', '.', 'ALLELE', 'Annotation']: |
| return part |
| else: |
| |
| if gene_str and gene_str not in ['', '.']: |
| return gene_str |
| |
| return None |
| |
| def _extract_impact(self, record) -> str: |
| """Extract variant impact from VCF record.""" |
| |
| impact_fields = ['IMPACT', 'ANN', 'EFF', 'CSQ'] |
| |
| for field in impact_fields: |
| if field in record.INFO: |
| impact_value = record.INFO[field] |
| if isinstance(impact_value, list): |
| impact_value = impact_value[0] |
| |
| |
| for supported_impact in self.config.supported_impacts: |
| if supported_impact in str(impact_value).upper(): |
| return supported_impact |
| |
| return "MODERATE" |
| |
| def _extract_depth(self, sample) -> Optional[int]: |
| """Extract read depth from sample.""" |
| if hasattr(sample.data, 'DP') and sample.data.DP is not None: |
| return int(sample.data.DP) |
| return None |
| |
| def _extract_allele_frequency(self, sample) -> Optional[float]: |
| """Extract allele frequency from sample.""" |
| if hasattr(sample.data, 'AF') and sample.data.AF is not None: |
| af = sample.data.AF |
| if isinstance(af, list): |
| af = af[0] |
| return float(af) |
| return None |
| |
| def get_parsing_statistics(self) -> Dict[str, Any]: |
| """Get detailed parsing statistics.""" |
| stats = self.parsing_stats.copy() |
| stats['unique_genes'] = len(stats['unique_genes']) |
| stats['unique_pathways'] = len(stats['unique_pathways']) |
| stats['success_rate'] = ( |
| stats['processed_records'] / max(stats['total_records'], 1) * 100 |
| ) |
| return stats |
| |
| def export_parsed_data(self, |
| hierarchical_data: Dict, |
| output_file: Union[str, Path], |
| format: str = 'json') -> None: |
| """ |
| Export parsed hierarchical data to file. |
| |
| Args: |
| hierarchical_data: Parsed VCF data |
| output_file: Output file path |
| format: Export format ('json', 'pickle') |
| """ |
| output_file = Path(output_file) |
| output_file.parent.mkdir(parents=True, exist_ok=True) |
| |
| if format.lower() == 'json': |
| |
| json_data = self._convert_to_json_serializable(hierarchical_data) |
| with open(output_file, 'w') as f: |
| json.dump(json_data, f, indent=2) |
| |
| elif format.lower() == 'pickle': |
| import pickle |
| with open(output_file, 'wb') as f: |
| pickle.dump(hierarchical_data, f) |
| |
| else: |
| raise ValueError(f"Unsupported export format: {format}") |
| |
| logger.info(f"Exported parsed data to: {output_file}") |
| |
| def _convert_to_json_serializable(self, data: Dict) -> Dict: |
| """Convert hierarchical data with MutationRecord objects to JSON-serializable format.""" |
| result = {} |
| |
| for sample_id, pathways in data.items(): |
| result[sample_id] = {} |
| |
| for pathway_id, chromosomes in pathways.items(): |
| result[sample_id][pathway_id] = {} |
| |
| for chrom_id, genes in chromosomes.items(): |
| result[sample_id][pathway_id][chrom_id] = {} |
| |
| for gene_id, mutations in genes.items(): |
| result[sample_id][pathway_id][chrom_id][gene_id] = [ |
| mutation.to_dict() for mutation in mutations |
| ] |
| |
| return result |
|
|
|
|
| def create_parser_from_config(config_manager: ConfigManager) -> VCFParser: |
| """Create VCF parser from configuration manager.""" |
| config = config_manager.data_config |
| |
| |
| pathway_mapper = None |
| if config.pathway_mapping_path: |
| pathway_mapper = PathwayMapper(config.pathway_mapping_path) |
| |
| return VCFParser( |
| config=config, |
| pathway_mapper=pathway_mapper |
| ) |
|
|
|
|
| |
| if __name__ == "__main__": |
| |
| config_manager = ConfigManager() |
| |
| |
| config_manager.data_config.vcf_file_path = "example.vcf" |
| config_manager.data_config.pathway_mapping_path = "pathway_mappings.json" |
| |
| |
| parser = create_parser_from_config(config_manager) |
| |
| |
| try: |
| hierarchical_data = parser.parse_vcf_file(config_manager.data_config.vcf_file_path) |
| |
| |
| stats = parser.get_parsing_statistics() |
| print(f"Parsing Statistics: {stats}") |
| |
| |
| parser.export_parsed_data( |
| hierarchical_data, |
| "parsed_vcf_data.json", |
| format='json' |
| ) |
| |
| except Exception as e: |
| logger.error(f"Error in VCF parsing: {e}") |