| | import cv2
|
| | import os
|
| | import torch
|
| | import numpy as np
|
| | from torch.utils.data import Dataset
|
| | from torchvision import transforms
|
| |
|
| | from facenet_pytorch import MTCNN
|
| |
|
| |
|
| |
|
| | SEQUENCE_LENGTH_DEFAULT = 10
|
| | IMG_SIZE = 224
|
| | DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
| |
|
| |
|
| | print(f"Initializing MTCNN on {DEVICE}...")
|
| |
|
| |
|
| |
|
| | mtcnn_detector = MTCNN(keep_all=True, device=DEVICE)
|
| |
|
| |
|
| | data_transforms = transforms.Compose([
|
| | transforms.ToPILImage(),
|
| | transforms.Resize((IMG_SIZE, IMG_SIZE)),
|
| | transforms.ToTensor(),
|
| | transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
|
| | ])
|
| |
|
| |
|
| | def extract_frames_from_video(video_path, sequence_length=SEQUENCE_LENGTH_DEFAULT):
|
| | cap = cv2.VideoCapture(video_path)
|
| | if not cap.isOpened():
|
| | return None
|
| |
|
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
| | if total_frames <= 0:
|
| | return None
|
| |
|
| | processed_frames = []
|
| | frame_indices = np.linspace(0, total_frames - 1, sequence_length, dtype=int)
|
| |
|
| | for i in frame_indices:
|
| | cap.set(cv2.CAP_PROP_POS_FRAMES, i)
|
| | ret, frame = cap.read()
|
| | if not ret: continue
|
| |
|
| |
|
| | frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
| |
|
| | try:
|
| |
|
| |
|
| | boxes, probs = mtcnn_detector.detect(frame_rgb)
|
| |
|
| | if boxes is not None and len(boxes) > 0:
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | face_list = []
|
| | for box, prob in zip(boxes, probs):
|
| | if prob is None: continue
|
| | face_list.append({'box': box, 'conf': prob})
|
| |
|
| | if not face_list: continue
|
| |
|
| | best_face = sorted(face_list, key=lambda x: x['conf'], reverse=True)[0]
|
| | x1, y1, x2, y2 = best_face['box']
|
| |
|
| | w = x2 - x1
|
| | h = y2 - y1
|
| | x = x1
|
| | y = y1
|
| |
|
| |
|
| | x, y = max(0, int(x)), max(0, int(y))
|
| | w, h = int(w), int(h)
|
| |
|
| |
|
| | pad_w = int(w * 0.1)
|
| | pad_h = int(h * 0.1)
|
| |
|
| | img_h, img_w, _ = frame.shape
|
| | y_min = max(0, y - pad_h)
|
| | y_max = min(img_h, y + h + pad_h)
|
| | x_min = max(0, x - pad_w)
|
| | x_max = min(img_w, x + w + pad_w)
|
| |
|
| | face_crop = frame[y_min:y_max, x_min:x_max]
|
| |
|
| | if face_crop.size != 0:
|
| | processed_frame = data_transforms(face_crop)
|
| | processed_frames.append(processed_frame)
|
| | except Exception as e:
|
| |
|
| | continue
|
| |
|
| | cap.release()
|
| |
|
| | if not processed_frames:
|
| | return None
|
| |
|
| |
|
| | while len(processed_frames) < sequence_length:
|
| | processed_frames.append(processed_frames[-1])
|
| |
|
| | return torch.stack(processed_frames[:sequence_length])
|
| |
|
| |
|
| |
|
| | def process_image(image_path, sequence_length=SEQUENCE_LENGTH_DEFAULT):
|
| | try:
|
| | frame = cv2.imread(image_path)
|
| | if frame is None:
|
| | return None
|
| |
|
| |
|
| | frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
| |
|
| |
|
| | boxes, probs = mtcnn_detector.detect(frame_rgb)
|
| |
|
| | if boxes is None or len(boxes) == 0:
|
| | return None
|
| |
|
| | face_list = []
|
| | for box, prob in zip(boxes, probs):
|
| | if prob is None: continue
|
| | face_list.append({'box': box, 'conf': prob})
|
| |
|
| | if not face_list: return None
|
| |
|
| | best_face = sorted(face_list, key=lambda x: x['conf'], reverse=True)[0]
|
| | x1, y1, x2, y2 = best_face['box']
|
| |
|
| | w = x2 - x1
|
| | h = y2 - y1
|
| | x = x1
|
| | y = y1
|
| |
|
| |
|
| | x, y = max(0, int(x)), max(0, int(y))
|
| | w, h = int(w), int(h)
|
| |
|
| | pad_w = int(w * 0.1)
|
| | pad_h = int(h * 0.1)
|
| |
|
| | img_h, img_w, _ = frame.shape
|
| | y_min = max(0, y - pad_h)
|
| | y_max = min(img_h, y + h + pad_h)
|
| | x_min = max(0, x - pad_w)
|
| | x_max = min(img_w, x + w + pad_w)
|
| |
|
| | face_crop = frame[y_min:y_max, x_min:x_max]
|
| |
|
| | if face_crop.size == 0:
|
| | return None
|
| |
|
| | processed_frame = data_transforms(face_crop)
|
| |
|
| |
|
| | return processed_frame.unsqueeze(0).repeat(sequence_length, 1, 1, 1)
|
| |
|
| | except Exception as e:
|
| | print(f"Error processing image: {e}")
|
| | return None
|
| |
|
| |
|
| |
|
| | class DeepfakeDataset(Dataset):
|
| | def __init__(self, data_dir, sequence_length=SEQUENCE_LENGTH_DEFAULT):
|
| | self.data_dir = data_dir
|
| | self.sequence_length = sequence_length
|
| | self.video_files = []
|
| | self.labels = []
|
| |
|
| | print(f" Scanning for videos in {data_dir}...")
|
| |
|
| | def find_videos_in_folder(folder_path):
|
| | video_paths = []
|
| | for root, dirs, files in os.walk(folder_path):
|
| | for file in files:
|
| | if file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
|
| | video_paths.append(os.path.join(root, file))
|
| | return video_paths
|
| |
|
| |
|
| | real_path = os.path.join(data_dir, 'real')
|
| | real_videos = find_videos_in_folder(real_path)
|
| |
|
| | if len(real_videos) > 400:
|
| | real_videos = real_videos[:400]
|
| |
|
| | for vid in real_videos:
|
| | self.video_files.append(vid)
|
| | self.labels.append(0)
|
| |
|
| |
|
| | fake_path = os.path.join(data_dir, 'fake')
|
| | fake_videos = find_videos_in_folder(fake_path)
|
| |
|
| | if len(fake_videos) > 400:
|
| | fake_videos = fake_videos[:400]
|
| |
|
| | for vid in fake_videos:
|
| | self.video_files.append(vid)
|
| | self.labels.append(1)
|
| |
|
| | self.total_videos = len(self.video_files)
|
| | print(f" Total dataset size: {self.total_videos} videos")
|
| |
|
| | def __len__(self):
|
| | return len(self.video_files)
|
| |
|
| | def __getitem__(self, idx):
|
| | video_path = self.video_files[idx]
|
| | label = self.labels[idx]
|
| |
|
| | frames = extract_frames_from_video(video_path, self.sequence_length)
|
| |
|
| | if frames is None:
|
| | return torch.zeros((self.sequence_length, 3, IMG_SIZE, IMG_SIZE)), -1
|
| |
|
| | return frames, torch.tensor(label, dtype=torch.long)
|
| |
|
| | if __name__ == "__main__":
|
| | ds = DeepfakeDataset('data/') |