| """Fault classification training utilities for PMU and PV datasets. |
| |
| This module trains deep learning models on high-frequency PMU measurements and |
| supports classical machine learning baselines so the resulting artefacts can be |
| served via the Gradio app in this repository or on Hugging Face Spaces. It |
| implements a full training pipeline including preprocessing, sequence |
| generation, model definition (CNN-LSTM, Temporal Convolutional Network, or |
| Support Vector Machine), evaluation, and export of deployment metadata. |
| |
| Example |
| ------- |
| python fault_classification_pmu.py \ |
| --data-path data/Fault_Classification_PMU_Data.csv \ |
| --label-column FaultType \ |
| --model-type tcn \ |
| --model-out pmu_tcn_model.keras \ |
| --scaler-out pmu_feature_scaler.pkl \ |
| --metadata-out pmu_metadata.json |
| |
| The script accepts CSV input where each row contains a timestamped PMU |
| measurement and a categorical fault label. Features default to the 14 PMU |
| channels used in the project documentation, but any subset can be provided |
| via the ``--feature-columns`` argument. Data is automatically standardised |
| and windowed to create temporal sequences that feed into the neural network. |
| |
| The exported metadata JSON file contains the feature ordering, label names, |
| sequence length, stride, and chosen architecture. The Gradio front-end |
| consumes this file to replicate the same preprocessing steps during inference. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import shutil |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Dict, List, Optional, Sequence, Tuple |
|
|
| import math |
|
|
| os.environ.setdefault("CUDA_VISIBLE_DEVICES", "-1") |
| os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2") |
| os.environ.setdefault("TF_ENABLE_ONEDNN_OPTS", "0") |
|
|
| import joblib |
| import numpy as np |
| import pandas as pd |
| from pandas.api.types import is_numeric_dtype |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix |
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import LabelEncoder, StandardScaler |
| from sklearn.svm import SVC |
| from tensorflow.keras import callbacks, layers, models, optimizers |
|
|
|
|
|
|
| class ProgressCallback(callbacks.Callback): |
| """Custom callback to provide training progress updates.""" |
|
|
| def __init__( |
| self, |
| total_epochs, |
| status_file_path=None, |
| *, |
| status_update_interval: float = 10.0, |
| batch_log_frequency: int = 10, |
| ): |
| super().__init__() |
| self.total_epochs = total_epochs |
| self.status_file_path = status_file_path |
| self.status_update_interval = max(1.0, float(status_update_interval)) |
| self.batch_log_frequency = max(1, int(batch_log_frequency)) |
| self.current_epoch = 0 |
| self.train_start_time: Optional[float] = None |
| self.last_status_report: Optional[float] = None |
| self.total_batches_per_epoch = 0 |
| self.batches_seen = 0 |
|
|
| |
| |
| |
| def _now(self) -> float: |
| import time |
|
|
| return time.perf_counter() |
|
|
| def _training_elapsed(self, now: Optional[float] = None) -> float: |
| if self.train_start_time is None: |
| return 0.0 |
| if now is None: |
| now = self._now() |
| return max(0.0, now - self.train_start_time) |
|
|
| def _report_status(self, message: str, *, force: bool = False) -> None: |
| now = self._now() |
| if not force and self.last_status_report is not None: |
| if now - self.last_status_report < self.status_update_interval: |
| return |
|
|
| print(message, flush=True) |
|
|
| if self.status_file_path: |
| try: |
| with open(self.status_file_path, "w") as f: |
| f.write(message) |
| except Exception: |
| |
| pass |
|
|
| self.last_status_report = now |
|
|
| |
| |
| |
| def on_train_begin(self, logs=None): |
| params = self.params or {} |
| steps = params.get("steps") or params.get("steps_per_epoch") |
| if steps: |
| self.total_batches_per_epoch = int(steps) |
| else: |
| samples = params.get("samples") |
| batch_size = params.get("batch_size") or 0 |
| if samples and batch_size: |
| self.total_batches_per_epoch = math.ceil(samples / batch_size) |
| else: |
| self.total_batches_per_epoch = 0 |
|
|
| self.batches_seen = 0 |
| self.last_status_report = None |
| self.train_start_time = self._now() |
|
|
| def on_epoch_begin(self, epoch, logs=None): |
| import time |
|
|
| now = self._now() |
| if self.train_start_time is None: |
| self.train_start_time = now |
|
|
| self.current_epoch = epoch + 1 |
| self.batches_seen = 0 |
|
|
| progress_pct = (self.current_epoch / self.total_epochs) * 100 |
| elapsed_time = self._training_elapsed(now) |
| status_msg = ( |
| f"Training epoch {self.current_epoch}/{self.total_epochs} " |
| f"({progress_pct:.1f}%) - {elapsed_time:.1f}s elapsed" |
| ) |
| self._report_status(status_msg, force=True) |
|
|
| if self.current_epoch == 1: |
| wall_clock = time.strftime("%H:%M:%S") |
| print(f"Starting first epoch at {wall_clock}", flush=True) |
|
|
| def on_batch_begin(self, batch, logs=None): |
| if self.current_epoch == 1 and batch % self.batch_log_frequency == 0: |
| elapsed = self._training_elapsed() |
| print(f"Epoch {self.current_epoch}, Batch {batch} started - {elapsed:.1f}s elapsed", flush=True) |
|
|
| def on_batch_end(self, batch, logs=None): |
| self.batches_seen = batch + 1 |
|
|
| if self.current_epoch == 1 and batch % self.batch_log_frequency == 0: |
| logs = logs or {} |
| loss = logs.get("loss", 0) |
| elapsed = self._training_elapsed() |
| print( |
| f"Epoch {self.current_epoch}, Batch {batch} completed - Loss: {loss:.4f}, {elapsed:.1f}s elapsed", |
| flush=True, |
| ) |
|
|
| total_batches = self.total_batches_per_epoch or 0 |
| if not total_batches: |
| params = self.params or {} |
| total_batches = ( |
| params.get("steps") |
| or params.get("steps_per_epoch") |
| or 0 |
| ) |
|
|
| if total_batches: |
| epoch_fraction = min(1.0, (batch + 1) / total_batches) |
| else: |
| epoch_fraction = 0.0 |
|
|
| overall_progress = ( |
| (self.current_epoch - 1 + epoch_fraction) / self.total_epochs * 100 |
| ) |
| elapsed_time = self._training_elapsed() |
| status_msg = ( |
| f"Epoch {self.current_epoch}/{self.total_epochs} - Batch {batch + 1}/{total_batches or '?'} " |
| f"({overall_progress:.1f}%) - {elapsed_time:.1f}s elapsed" |
| ) |
| self._report_status(status_msg) |
|
|
| def on_epoch_end(self, epoch, logs=None): |
| logs = logs or {} |
| loss = logs.get("loss", 0) |
| val_loss = logs.get("val_loss", 0) |
| accuracy = logs.get("accuracy", logs.get("acc", 0)) |
| val_accuracy = logs.get("val_accuracy", logs.get("val_acc", 0)) |
| _ = epoch |
|
|
| elapsed_time = self._training_elapsed() |
| status_msg = ( |
| f"Epoch {self.current_epoch}/{self.total_epochs} completed - " |
| f"Loss: {loss:.4f}, Val Loss: {val_loss:.4f}, " |
| f"Acc: {accuracy:.4f}, Val Acc: {val_accuracy:.4f} - {elapsed_time:.1f}s total" |
| ) |
| self._report_status(status_msg, force=True) |
|
|
| def on_train_end(self, logs=None): |
| total_elapsed = self._training_elapsed() |
| final_message = ( |
| f"Training finished after {self.total_epochs} epoch(s) - " |
| f"{total_elapsed:.1f}s total elapsed" |
| ) |
| self._report_status(final_message, force=True) |
|
|
|
|
| |
| |
| DEFAULT_FEATURE_COLUMNS: List[str] = [ |
| "[325] UPMU_SUB22:FREQ", |
| "[326] UPMU_SUB22:DFDT", |
| "[327] UPMU_SUB22:FLAG", |
| "[328] UPMU_SUB22-L1:MAG", |
| "[329] UPMU_SUB22-L1:ANG", |
| "[330] UPMU_SUB22-L2:MAG", |
| "[331] UPMU_SUB22-L2:ANG", |
| "[332] UPMU_SUB22-L3:MAG", |
| "[333] UPMU_SUB22-L3:ANG", |
| "[334] UPMU_SUB22-C1:MAG", |
| "[335] UPMU_SUB22-C1:ANG", |
| "[336] UPMU_SUB22-C2:MAG", |
| "[337] UPMU_SUB22-C2:ANG", |
| "[338] UPMU_SUB22-C3:MAG", |
| "[339] UPMU_SUB22-C3:ANG", |
| ] |
|
|
| LABEL_GUESS_CANDIDATES: Tuple[str, ...] = ("Fault", "FaultType", "Label", "Target", "Class") |
|
|
|
|
| def _normalise_column_name(name: str) -> str: |
| return str(name).strip().lower() |
|
|
|
|
| def _resolve_label_column(df: pd.DataFrame, requested: str) -> str: |
| columns = [str(col) for col in df.columns] |
| if not columns: |
| raise ValueError("Provided dataframe does not contain any columns.") |
|
|
| requested = str(requested or "").strip() |
| if requested and requested in df.columns: |
| return requested |
|
|
| if requested: |
| for col in df.columns: |
| if str(col).strip() == requested: |
| return str(col) |
| lowered = requested.lower() |
| lowered_map = {_normalise_column_name(col): str(col) for col in df.columns} |
| if lowered in lowered_map: |
| return lowered_map[lowered] |
|
|
| lowered_map = {_normalise_column_name(col): str(col) for col in df.columns} |
| for guess in LABEL_GUESS_CANDIDATES: |
| key = guess.lower() |
| if key in lowered_map: |
| return lowered_map[key] |
|
|
| for col in reversed(df.columns): |
| if not is_numeric_dtype(df[col]): |
| return str(col) |
|
|
| available = ", ".join(columns) |
| raise ValueError( |
| f"Label column '{requested or ' '}' not found in provided dataframe. " |
| f"Available columns: {available}" |
| ) |
|
|
|
|
| def _resolve_features(df: pd.DataFrame, feature_columns: Sequence[str] | None, label_column: str) -> List[str]: |
| if feature_columns: |
| missing = [c for c in feature_columns if c not in df.columns] |
| if missing: |
| raise ValueError(f"Feature columns not present in CSV: {missing}") |
| return list(feature_columns) |
|
|
| |
| |
| preferred = [c for c in DEFAULT_FEATURE_COLUMNS if c in df.columns] |
|
|
| excluded = {label_column, label_column.lower(), "timestamp", "Timestamp"} |
| remainder = [c for c in df.columns if c not in preferred and c not in excluded] |
| ordered = preferred + remainder |
| if not ordered: |
| raise ValueError("No feature columns detected. Specify --feature-columns explicitly.") |
| return ordered |
|
|
|
|
| def load_dataset( |
| csv_path: Path, |
| *, |
| feature_columns: Sequence[str] | None, |
| label_column: str, |
| ) -> Tuple[np.ndarray, np.ndarray, List[str], str]: |
| """Load the dataset from CSV. |
| |
| Parameters |
| ---------- |
| csv_path: |
| Path to the CSV file containing PMU measurements. |
| feature_columns: |
| Optional explicit ordering of feature columns. |
| label_column: |
| Name of the column containing the categorical fault label. |
| |
| Returns |
| ------- |
| features: np.ndarray |
| 2-D array of shape (n_samples, n_features). |
| labels: np.ndarray |
| 1-D array of label strings. |
| columns: list[str] |
| Actual feature ordering used. |
| resolved_label: str |
| The column name that supplied the labels. |
| """ |
| df = pd.read_csv(csv_path, sep=None, engine="python") |
| resolved_label = _resolve_label_column(df, label_column) |
|
|
| columns = _resolve_features(df, feature_columns, resolved_label) |
| features = df[columns].astype(np.float32).values |
| labels = df[resolved_label].astype(str).values |
| return features, labels, columns, resolved_label |
|
|
|
|
| def load_dataset_from_dataframe( |
| df: pd.DataFrame, |
| *, |
| feature_columns: Sequence[str] | None, |
| label_column: str, |
| ) -> Tuple[np.ndarray, np.ndarray, List[str], str]: |
| """Load dataset arrays directly from a DataFrame.""" |
|
|
| resolved_label = _resolve_label_column(df, label_column) |
|
|
| columns = _resolve_features(df, feature_columns, resolved_label) |
| features = df[columns].astype(np.float32).values |
| labels = df[resolved_label].astype(str).values |
| return features, labels, columns, resolved_label |
|
|
|
|
| def create_sequences( |
| features: np.ndarray, |
| labels: np.ndarray, |
| *, |
| sequence_length: int, |
| stride: int, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """Create overlapping sequences suitable for sequence models. |
| |
| The label assigned to a sequence corresponds to the label of the final |
| timestep in the window. This choice aligns with fault detection use cases |
| where the most recent measurement dictates the state of the system. |
| """ |
| if sequence_length <= 0: |
| raise ValueError("sequence_length must be > 0") |
| if stride <= 0: |
| raise ValueError("stride must be > 0") |
| if features.shape[0] != labels.shape[0]: |
| raise ValueError("Features and labels must contain the same number of rows") |
| if features.shape[0] < sequence_length: |
| raise ValueError("Not enough samples to create a single sequence") |
|
|
| sequences: List[np.ndarray] = [] |
| seq_labels: List[str] = [] |
| for start in range(0, features.shape[0] - sequence_length + 1, stride): |
| end = start + sequence_length |
| sequences.append(features[start:end]) |
| seq_labels.append(labels[end - 1]) |
| return np.stack(sequences), np.array(seq_labels) |
|
|
|
|
| def build_cnn_lstm( |
| input_shape: Tuple[int, int], |
| num_classes: int, |
| *, |
| conv_filters: int = 128, |
| kernel_size: int = 3, |
| lstm_units: int = 128, |
| dropout: float = 0.3, |
| ) -> models.Model: |
| """Construct a compact yet expressive CNN-LSTM architecture.""" |
| inputs = layers.Input(shape=input_shape) |
| x = layers.Conv1D(conv_filters, kernel_size, padding="same", activation="relu")(inputs) |
| x = layers.BatchNormalization()(x) |
| x = layers.Conv1D(conv_filters, kernel_size, dilation_rate=2, padding="same", activation="relu")(x) |
| x = layers.BatchNormalization()(x) |
| x = layers.Dropout(dropout)(x) |
| x = layers.LSTM(lstm_units, return_sequences=False)(x) |
| x = layers.Dropout(dropout)(x) |
| outputs = layers.Dense(num_classes, activation="softmax")(x) |
| model = models.Model(inputs, outputs) |
| model.compile( |
| optimizer=optimizers.Adam(learning_rate=1e-3), |
| loss="sparse_categorical_crossentropy", |
| metrics=["accuracy"], |
| ) |
| return model |
|
|
|
|
| def build_tcn( |
| input_shape: Tuple[int, int], |
| num_classes: int, |
| *, |
| filters: int = 64, |
| kernel_size: int = 3, |
| dilations: Sequence[int] = (1, 2, 4, 8), |
| dropout: float = 0.2, |
| ) -> models.Model: |
| """Construct a lightweight Temporal Convolutional Network.""" |
|
|
| inputs = layers.Input(shape=input_shape) |
| x = inputs |
| for dilation in dilations: |
| residual = x |
| x = layers.Conv1D( |
| filters, |
| kernel_size, |
| padding="causal", |
| activation="relu", |
| dilation_rate=dilation, |
| )(x) |
| x = layers.BatchNormalization()(x) |
| x = layers.Dropout(dropout)(x) |
| x = layers.Conv1D( |
| filters, |
| kernel_size, |
| padding="causal", |
| activation="relu", |
| dilation_rate=dilation, |
| )(x) |
| x = layers.BatchNormalization()(x) |
| if residual.shape[-1] != filters: |
| residual = layers.Conv1D(filters, 1, padding="same")(residual) |
| x = layers.Add()([x, residual]) |
| x = layers.Activation("relu")(x) |
|
|
| x = layers.GlobalAveragePooling1D()(x) |
| x = layers.Dropout(dropout)(x) |
| outputs = layers.Dense(num_classes, activation="softmax")(x) |
|
|
| model = models.Model(inputs, outputs) |
| model.compile( |
| optimizer=optimizers.Adam(learning_rate=1e-3), |
| loss="sparse_categorical_crossentropy", |
| metrics=["accuracy"], |
| ) |
| return model |
|
|
|
|
| def train_model( |
| sequences: np.ndarray, |
| labels: np.ndarray, |
| *, |
| validation_split: float, |
| batch_size: int, |
| epochs: int, |
| model_type: str = "cnn_lstm", |
| tensorboard_log_dir: Optional[Path] = None, |
| status_file_path: Optional[Path] = None, |
| ) -> Tuple[object, LabelEncoder, Dict[str, object]]: |
| """Train a sequence model and return training history and validation outputs.""" |
|
|
| model_type = model_type.lower().strip() |
| if model_type not in {"cnn_lstm", "tcn", "svm"}: |
| raise ValueError("model_type must be either 'cnn_lstm', 'tcn', or 'svm'") |
|
|
| |
| status_file = status_file_path if status_file_path else None |
|
|
| label_encoder = LabelEncoder() |
| y = label_encoder.fit_transform(labels) |
|
|
| if model_type == "svm": |
| features = sequences.reshape(sequences.shape[0], -1) |
| else: |
| features = sequences |
|
|
| tb_dir: Optional[str] = None |
| if model_type != "svm" and tensorboard_log_dir is not None: |
| tensorboard_log_dir.mkdir(parents=True, exist_ok=True) |
| tb_dir = str(tensorboard_log_dir.resolve()) |
| else: |
| tensorboard_log_dir = None |
|
|
| |
| unique_labels, label_counts = np.unique(y, return_counts=True) |
| min_samples_per_class = np.min(label_counts) |
|
|
| print(f"Label distribution: {dict(zip(unique_labels, label_counts))}") |
| print(f"Minimum samples per class: {min_samples_per_class}") |
| print(f"Total sequences: {len(sequences)}, Features per sequence: {sequences.shape[1:]}") |
|
|
| |
| import sys |
| data_size_mb = sequences.nbytes / (1024 * 1024) |
| print(f"Data size: {data_size_mb:.2f} MB") |
| if data_size_mb > 1000: |
| print("Warning: Large dataset detected. Consider reducing batch size or sequence length.") |
|
|
| |
| if np.any(np.isnan(sequences)) or np.any(np.isinf(sequences)): |
| print("Warning: NaN or Inf values detected in sequences") |
| sequences = np.nan_to_num(sequences, nan=0.0, posinf=1e6, neginf=-1e6) |
|
|
| |
| if min_samples_per_class >= 2: |
| X_train, X_val, y_train, y_val = train_test_split( |
| features, y, test_size=validation_split, stratify=y, random_state=42 |
| ) |
| else: |
| print(f"Warning: Some classes have only {min_samples_per_class} sample(s). Using simple random split instead of stratified split.") |
|
|
| |
| |
| total_samples = len(y) |
| if validation_split * total_samples < len(unique_labels): |
| |
| adjusted_split = max(0.1, len(unique_labels) / total_samples) |
| adjusted_split = min(adjusted_split, 0.3) |
| print(f"Adjusting validation split from {validation_split} to {adjusted_split}") |
| validation_split = adjusted_split |
|
|
| X_train, X_val, y_train, y_val = train_test_split( |
| features, y, test_size=validation_split, random_state=42 |
| ) |
|
|
| if model_type == "cnn_lstm": |
| print("Building CNN-LSTM model...") |
|
|
| |
| if len(sequences) > 100000: |
| print("Using lightweight CNN-LSTM for large dataset") |
| model = build_cnn_lstm( |
| input_shape=sequences.shape[1:], |
| num_classes=len(label_encoder.classes_), |
| conv_filters=64, |
| lstm_units=64, |
| dropout=0.2 |
| ) |
| else: |
| model = build_cnn_lstm( |
| input_shape=sequences.shape[1:], num_classes=len(label_encoder.classes_) |
| ) |
| print(f"CNN-LSTM model built. Input shape: {sequences.shape[1:]}, Classes: {len(label_encoder.classes_)}") |
| print(f"Model parameters: {model.count_params():,}") |
|
|
| |
| if len(sequences) > 100000: |
| callbacks_list = [ |
| ProgressCallback(total_epochs=epochs, status_file_path=str(status_file) if status_file else None), |
| callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2, min_lr=1e-5), |
| callbacks.EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True), |
| ] |
| print("Using aggressive callbacks for large dataset") |
| else: |
| callbacks_list = [ |
| ProgressCallback(total_epochs=epochs, status_file_path=str(status_file) if status_file else None), |
| callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5), |
| callbacks.EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True), |
| ] |
| if tensorboard_log_dir is not None: |
| callbacks_list.insert(-2, callbacks.TensorBoard(log_dir=tb_dir, histogram_freq=0, write_graph=False)) |
|
|
| print(f"Starting CNN-LSTM training with {len(X_train)} training samples, {len(X_val)} validation samples") |
| print(f"Batch size: {batch_size}, Epochs: {epochs}") |
|
|
| if status_file: |
| with open(status_file, 'w') as f: |
| f.write(f"CNN-LSTM training started - {len(X_train)} train, {len(X_val)} val samples, batch_size={batch_size}") |
|
|
| history = model.fit( |
| X_train, |
| y_train, |
| validation_data=(X_val, y_val), |
| epochs=epochs, |
| batch_size=batch_size, |
| callbacks=callbacks_list, |
| verbose=2, |
| ) |
|
|
| print("CNN-LSTM training completed, starting prediction...") |
| if status_file: |
| with open(status_file, 'w') as f: |
| f.write("CNN-LSTM training completed, evaluating model...") |
|
|
| print(f"Making predictions on {len(X_val)} validation samples...") |
| if status_file: |
| with open(status_file, 'w') as f: |
| f.write(f"Making predictions on {len(X_val)} validation samples...") |
| y_pred = model.predict(X_val, verbose=0).argmax(axis=1) |
| print("Predictions completed") |
| training_history: Dict[str, object] = history.history |
| elif model_type == "tcn": |
| print("Building TCN model...") |
| model = build_tcn(input_shape=sequences.shape[1:], num_classes=len(label_encoder.classes_)) |
| print(f"TCN model built. Input shape: {sequences.shape[1:]}, Classes: {len(label_encoder.classes_)}") |
|
|
| callbacks_list = [ |
| ProgressCallback(total_epochs=epochs, status_file_path=str(status_file) if status_file else None), |
| callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5), |
| callbacks.EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True), |
| ] |
| if tensorboard_log_dir is not None: |
| callbacks_list.insert(-2, callbacks.TensorBoard(log_dir=tb_dir, histogram_freq=0, write_graph=False)) |
|
|
| print(f"Starting TCN training with {len(X_train)} training samples, {len(X_val)} validation samples") |
| print(f"Batch size: {batch_size}, Epochs: {epochs}") |
|
|
| if status_file: |
| with open(status_file, 'w') as f: |
| f.write(f"TCN training started - {len(X_train)} train, {len(X_val)} val samples, batch_size={batch_size}") |
|
|
| history = model.fit( |
| X_train, |
| y_train, |
| validation_data=(X_val, y_val), |
| epochs=epochs, |
| batch_size=batch_size, |
| callbacks=callbacks_list, |
| verbose=2, |
| ) |
|
|
| print("TCN training completed, starting prediction...") |
| if status_file: |
| with open(status_file, 'w') as f: |
| f.write("TCN training completed, evaluating model...") |
|
|
| print(f"Making TCN predictions on {len(X_val)} validation samples...") |
| if status_file: |
| with open(status_file, 'w') as f: |
| f.write(f"Making TCN predictions on {len(X_val)} validation samples...") |
| y_pred = model.predict(X_val, verbose=0).argmax(axis=1) |
| print("TCN predictions completed") |
| training_history = history.history |
| else: |
| print("Training SVM model...", flush=True) |
| if status_file: |
| with open(status_file, 'w') as f: |
| f.write("Training SVM model...") |
|
|
| model = SVC(kernel="rbf", probability=True, class_weight="balanced") |
| model.fit(X_train, y_train) |
|
|
| print("SVM training completed. Evaluating...", flush=True) |
| if status_file: |
| with open(status_file, 'w') as f: |
| f.write("SVM training completed. Evaluating...") |
|
|
| y_pred = model.predict(X_val) |
| training_history = { |
| "train_accuracy": float(model.score(X_train, y_train)), |
| "val_accuracy": float(accuracy_score(y_val, y_pred)), |
| } |
|
|
| cm = confusion_matrix(y_val, y_pred) |
| metrics: Dict[str, object] = { |
| "history": training_history, |
| "validation": { |
| "y_true": y_val, |
| "y_pred": y_pred, |
| "class_names": label_encoder.classes_.tolist(), |
| "confusion_matrix": cm, |
| }, |
| "model_type": model_type, |
| "input_shape": list(sequences.shape[1:]), |
| "tensorboard_log_dir": tb_dir, |
| } |
| return model, label_encoder, metrics |
|
|
|
|
| def standardise_sequences(sequences: np.ndarray) -> Tuple[np.ndarray, StandardScaler]: |
| """Apply standard scaling per feature across all timesteps.""" |
| scaler = StandardScaler() |
| flattened = sequences.reshape(-1, sequences.shape[-1]) |
| scaled = scaler.fit_transform(flattened) |
| return scaled.reshape(sequences.shape), scaler |
|
|
|
|
| def export_artifacts( |
| *, |
| model: object, |
| scaler: StandardScaler, |
| label_encoder: LabelEncoder, |
| feature_columns: Sequence[str], |
| label_column: str, |
| sequence_length: int, |
| stride: int, |
| model_path: Path, |
| scaler_path: Path, |
| metadata_path: Path, |
| metrics: dict, |
| ) -> None: |
| """Persist trained assets to disk for deployment.""" |
| model_path.parent.mkdir(parents=True, exist_ok=True) |
| scaler_path.parent.mkdir(parents=True, exist_ok=True) |
| metadata_path.parent.mkdir(parents=True, exist_ok=True) |
| model_type = str(metrics.get("model_type", "cnn_lstm")) |
| if model_type == "svm": |
| joblib.dump(model, model_path) |
| else: |
| model.save(model_path) |
| joblib.dump(scaler, scaler_path) |
|
|
| validation = metrics["validation"] |
| report_dict = classification_report( |
| validation["y_true"], |
| validation["y_pred"], |
| target_names=label_encoder.classes_, |
| output_dict=True, |
| ) |
|
|
| metadata = { |
| "feature_columns": list(feature_columns), |
| "label_classes": label_encoder.classes_.tolist(), |
| "label_column": label_column, |
| "sequence_length": sequence_length, |
| "stride": stride, |
| "model_path": str(model_path), |
| "scaler_path": str(scaler_path), |
| "training_history": metrics["history"], |
| "classification_report": report_dict, |
| "model_type": model_type, |
| "model_format": "joblib" if model_type == "svm" else "keras", |
| "input_shape": metrics.get("input_shape"), |
| "tensorboard_log_dir": metrics.get("tensorboard_log_dir"), |
| } |
| confusion = validation.get("confusion_matrix") |
| if confusion is None: |
| confusion = confusion_matrix(validation["y_true"], validation["y_pred"]) |
| metadata["confusion_matrix"] = np.asarray(confusion).tolist() |
|
|
| metadata_path.write_text(json.dumps(metadata, indent=2)) |
|
|
|
|
| def train_from_dataframe( |
| df: pd.DataFrame, |
| *, |
| label_column: str, |
| feature_columns: Sequence[str] | None = None, |
| sequence_length: int = 32, |
| stride: int = 4, |
| validation_split: float = 0.2, |
| batch_size: int = 128, |
| epochs: int = 50, |
| model_type: str = "cnn_lstm", |
| model_path: Path | str = "pmu_cnn_lstm_model.keras", |
| scaler_path: Path | str = "pmu_feature_scaler.pkl", |
| metadata_path: Path | str = "pmu_metadata.json", |
| enable_tensorboard: bool = True, |
| tensorboard_root: Path | str | None = None, |
| ) -> dict: |
| """Train a PMU fault classification model using an in-memory dataframe.""" |
|
|
| model_path = Path(model_path) |
| scaler_path = Path(scaler_path) |
| metadata_path = Path(metadata_path) |
|
|
| |
| status_file = model_path.parent / "training_status.txt" |
| print(f"Training progress will be written to: {status_file}") |
|
|
| tensorboard_log_dir: Optional[Path] = None |
| if enable_tensorboard and model_type.lower() != "svm": |
| base_dir = Path(tensorboard_root) if tensorboard_root is not None else Path("tensorboard_runs") |
| timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") |
| tensorboard_log_dir = base_dir / f"run-{timestamp}" |
|
|
| features, labels, used_columns, resolved_label = load_dataset_from_dataframe( |
| df, feature_columns=feature_columns, label_column=label_column |
| ) |
|
|
| print(f"Input data: {len(features)} samples") |
| print(f"Creating sequences with length={sequence_length}, stride={stride}") |
|
|
| sequences, seq_labels = create_sequences( |
| features, |
| labels, |
| sequence_length=sequence_length, |
| stride=stride, |
| ) |
|
|
| print(f"Generated {len(sequences)} sequences") |
|
|
| |
| if len(sequences) < 10: |
| raise ValueError( |
| f"Only {len(sequences)} sequences generated. Need at least 10 for training. " |
| f"Try reducing sequence_length (currently {sequence_length}) or stride (currently {stride}), " |
| "or provide more data." |
| ) |
|
|
| |
| if len(sequences) < 100 and model_type in ['cnn_lstm', 'tcn']: |
| print(f"Warning: Only {len(sequences)} sequences available. Consider using SVM for small datasets.") |
|
|
| sequences, scaler = standardise_sequences(sequences) |
|
|
| |
| original_batch_size = batch_size |
| original_epochs = epochs |
| original_validation_split = validation_split |
|
|
| |
| if len(sequences) > 100000: |
| print(f"Large dataset detected ({len(sequences)} sequences). Optimizing parameters...") |
| batch_size = min(batch_size * 2, 512) |
| epochs = min(epochs, 30) |
| print(f"Adjusted parameters for large dataset:") |
| print(f" Batch size: {original_batch_size} -> {batch_size}") |
| print(f" Epochs: {original_epochs} -> {epochs}") |
|
|
| |
| import gc |
| gc.collect() |
|
|
| elif len(sequences) < 100: |
| |
| batch_size = max(min(batch_size, len(sequences) // 4), 4) |
| epochs = min(epochs, 20) |
| validation_split = min(validation_split, 0.3) |
| print(f"Adjusted parameters for small dataset:") |
| print(f" Batch size: {original_batch_size} -> {batch_size}") |
| print(f" Epochs: {original_epochs} -> {epochs}") |
| print(f" Validation split: {original_validation_split} -> {validation_split}") |
|
|
| model, label_encoder, metrics = train_model( |
| sequences, |
| seq_labels, |
| validation_split=validation_split, |
| batch_size=batch_size, |
| epochs=epochs, |
| model_type=model_type, |
| tensorboard_log_dir=tensorboard_log_dir, |
| status_file_path=status_file, |
| ) |
|
|
| export_artifacts( |
| model=model, |
| scaler=scaler, |
| label_encoder=label_encoder, |
| feature_columns=used_columns, |
| label_column=resolved_label, |
| sequence_length=sequence_length, |
| stride=stride, |
| model_path=model_path, |
| scaler_path=scaler_path, |
| metadata_path=metadata_path, |
| metrics=metrics, |
| ) |
|
|
| tensorboard_zip_path: Optional[str] = None |
| if tensorboard_log_dir and tensorboard_log_dir.exists(): |
| try: |
| tensorboard_zip_path = shutil.make_archive( |
| base_name=str(tensorboard_log_dir.parent / tensorboard_log_dir.name), |
| format="zip", |
| root_dir=str(tensorboard_log_dir.parent), |
| base_dir=tensorboard_log_dir.name, |
| ) |
| tensorboard_zip_path = str(Path(tensorboard_zip_path).resolve()) |
| except Exception: |
| tensorboard_zip_path = None |
|
|
| report_dict = classification_report( |
| metrics["validation"]["y_true"], |
| metrics["validation"]["y_pred"], |
| target_names=metrics["validation"]["class_names"], |
| output_dict=True, |
| ) |
| confusion = metrics["validation"].get("confusion_matrix") |
| if confusion is None: |
| confusion = confusion_matrix(metrics["validation"]["y_true"], metrics["validation"]["y_pred"]) |
|
|
| return { |
| "num_samples": int(df.shape[0]), |
| "num_sequences": int(sequences.shape[0]), |
| "feature_columns": used_columns, |
| "class_names": label_encoder.classes_.tolist(), |
| "model_path": str(model_path.resolve()), |
| "scaler_path": str(scaler_path.resolve()), |
| "metadata_path": str(metadata_path.resolve()), |
| "history": metrics["history"], |
| "model_type": metrics.get("model_type", model_type), |
| "classification_report": report_dict, |
| "confusion_matrix": np.asarray(confusion).tolist(), |
| "tensorboard_log_dir": metrics.get("tensorboard_log_dir"), |
| "tensorboard_zip_path": tensorboard_zip_path, |
| "label_column": resolved_label, |
| } |
|
|
|
|
| def run_training(args: argparse.Namespace) -> None: |
| csv_path = Path(args.data_path) |
| model_out = Path(args.model_out) |
| scaler_out = Path(args.scaler_out) |
| metadata_out = Path(args.metadata_out) |
|
|
| features, labels, feature_columns, resolved_label = load_dataset( |
| csv_path, feature_columns=args.feature_columns, label_column=args.label_column |
| ) |
|
|
| sequences, seq_labels = create_sequences( |
| features, |
| labels, |
| sequence_length=args.sequence_length, |
| stride=args.stride, |
| ) |
|
|
| sequences, scaler = standardise_sequences(sequences) |
| tensorboard_log_dir: Optional[Path] = None |
| if args.tensorboard and args.model_type != "svm": |
| if args.tensorboard_log_dir: |
| tensorboard_log_dir = Path(args.tensorboard_log_dir) |
| else: |
| tensorboard_log_dir = Path("tensorboard_runs") / datetime.utcnow().strftime("%Y%m%d-%H%M%S") |
| model, label_encoder, metrics = train_model( |
| sequences, |
| seq_labels, |
| validation_split=args.validation_split, |
| batch_size=args.batch_size, |
| epochs=args.epochs, |
| model_type=args.model_type, |
| tensorboard_log_dir=tensorboard_log_dir, |
| status_file_path=None, |
| ) |
|
|
| export_artifacts( |
| model=model, |
| scaler=scaler, |
| label_encoder=label_encoder, |
| feature_columns=feature_columns, |
| label_column=resolved_label, |
| sequence_length=args.sequence_length, |
| stride=args.stride, |
| model_path=model_out, |
| scaler_path=scaler_out, |
| metadata_path=metadata_out, |
| metrics=metrics, |
| ) |
|
|
| print("Training complete") |
| print(f"Model architecture : {args.model_type}") |
| print(f"Model saved to : {model_out}") |
| print(f"Scaler saved to : {scaler_out}") |
| print(f"Metadata saved to : {metadata_out}") |
| print("Validation metrics:") |
| report = classification_report( |
| metrics["validation"]["y_true"], metrics["validation"]["y_pred"], target_names=metrics["validation"]["class_names"] |
| ) |
| print(report) |
| if metrics.get("tensorboard_log_dir"): |
| tb_dir = metrics["tensorboard_log_dir"] |
| print(f"TensorBoard logs written to: {tb_dir}") |
| print(f"Launch TensorBoard with: tensorboard --logdir \"{tb_dir}\"") |
|
|
|
|
| def parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Train a sequence model for PMU fault classification") |
| parser.add_argument("--data-path", required=True, help="Path to Fault_Classification_PMU_Data CSV") |
| parser.add_argument( |
| "--label-column", |
| default="Fault", |
| help="Name of the target label column (default: Fault)", |
| ) |
| parser.add_argument( |
| "--feature-columns", |
| nargs="*", |
| default=None, |
| help="Optional explicit list of feature columns. Defaults to all non-label columns", |
| ) |
| parser.add_argument("--sequence-length", type=int, default=32, help="Number of timesteps per training window") |
| parser.add_argument("--stride", type=int, default=4, help="Step size between consecutive windows") |
| parser.add_argument("--validation-split", type=float, default=0.2, help="Validation set fraction") |
| parser.add_argument("--batch-size", type=int, default=128, help="Training batch size") |
| parser.add_argument("--epochs", type=int, default=50, help="Maximum number of training epochs") |
| parser.add_argument( |
| "--model-type", |
| choices=["cnn_lstm", "tcn", "svm"], |
| default="cnn_lstm", |
| help="Model architecture to train (choices: cnn_lstm, tcn, svm)", |
| ) |
| parser.add_argument("--model-out", default="pmu_cnn_lstm_model.keras", help="Path to save trained Keras model") |
| parser.add_argument("--scaler-out", default="pmu_feature_scaler.pkl", help="Path to save fitted StandardScaler") |
| parser.add_argument("--metadata-out", default="pmu_metadata.json", help="Path to save metadata JSON") |
| parser.add_argument( |
| "--tensorboard-log-dir", |
| default=None, |
| help="Optional directory to write TensorBoard logs (defaults to tensorboard_runs/<timestamp>)", |
| ) |
| parser.add_argument( |
| "--no-tensorboard", |
| dest="tensorboard", |
| action="store_false", |
| help="Disable TensorBoard logging for neural network models", |
| ) |
| parser.set_defaults(tensorboard=True) |
| return parser.parse_args(argv) |
|
|
|
|
| def main(argv: Sequence[str] | None = None) -> None: |
| args = parse_args(argv) |
| run_training(args) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|