| | import torch |
| | import torch.nn as nn |
| | import math |
| | from ultralytics import YOLO |
| | from ultralytics.nn.modules import Conv, Concat |
| | from lib.models.common import Focus, BottleneckCSP, Detect |
| | from lib.utils import check_anchor_order |
| | import logging |
| |
|
| | class YOLOv11Backbone(nn.Module): |
| | def __init__(self, width_multiple=0.25, depth_multiple=0.50, yolo_model_path=None): |
| | """ |
| | YOLOv11 Backbone - 直接从 ultralytics YOLO 模型提取 |
| | |
| | Args: |
| | width_multiple: 通道数缩放因子 (n=0.25, s=0.50, m=1.00, l=1.00, x=1.50) |
| | depth_multiple: 深度缩放因子 (n=0.50, s=0.50, m=0.50, l=1.00, x=1.00) |
| | yolo_model_path: YOLOv11 预训练模型路径(可选) |
| | |
| | Warning: |
| | 不同的yolo model(n, s, m, l, x)模型结构都会不同,目前这个是以 small 为例, |
| | 恰好可以输出(128, 256, 512)通道数 (虽然有adapter也无所谓) |
| | """ |
| | super().__init__() |
| |
|
| | self.out_indices = [4, 6, 10] |
| | |
| | |
| | if yolo_model_path: |
| | yolo = YOLO(yolo_model_path) |
| | yolo_model = yolo.model |
| | |
| | |
| | self.layers = nn.ModuleList([yolo_model.model[i] for i in range(11)]) |
| | |
| | |
| | self.out_channels = [ |
| | yolo_model.model[self.out_indices[0]].conv.out_channels, |
| | yolo_model.model[self.out_indices[1]].conv.out_channels, |
| | yolo_model.model[self.out_indices[2]].conv.out_channels, |
| | ] |
| | else: |
| | |
| | from ultralytics.nn.modules import Conv, C3k2, SPPF, C2PSA |
| | |
| | |
| | def make_divisible(x, divisor=8): |
| | """确保通道数是 divisor 的倍数""" |
| | return int(math.ceil(x / divisor) * divisor) |
| | |
| | c1 = make_divisible(64 * width_multiple) |
| | c2 = make_divisible(128 * width_multiple) |
| | c3 = make_divisible(256 * width_multiple) |
| | c4 = make_divisible(512 * width_multiple) |
| | c5 = make_divisible(1024 * width_multiple) |
| | |
| | |
| | n1 = max(round(2 * depth_multiple), 1) |
| | |
| | self.layers = nn.ModuleList([ |
| | Conv(3, c1, k=3, s=2), |
| | Conv(c1, c2, k=3, s=2), |
| | C3k2(c2, c3, n=n1, shortcut=False, e=0.25), |
| | Conv(c3, c3, k=3, s=2), |
| | C3k2(c3, c4, n=n1, shortcut=False, e=0.25), |
| | Conv(c4, c4, k=3, s=2), |
| | C3k2(c4, c4, n=n1, shortcut=True), |
| | Conv(c4, c5, k=3, s=2), |
| | C3k2(c5, c5, n=n1, shortcut=True), |
| | SPPF(c5, c5, k=5), |
| | C2PSA(c5, c5, n=n1), |
| | ]) |
| | self.out_channels = [] |
| | for i in self.out_indices: |
| | layer = self.layers[i] |
| | |
| | if hasattr(layer, 'conv'): |
| | self.out_channels.append(layer.conv.out_channels) |
| | elif hasattr(layer, 'cv2'): |
| | self.out_channels.append(layer.cv2.conv.out_channels) |
| | else: |
| | raise AttributeError(f"Layer {i} 没有 conv 或 cv2 属性,请检查模块结构") |
| | |
| | def forward(self, x): |
| | outputs = [] |
| | for i, layer in enumerate(self.layers): |
| | x = layer(x) |
| | if i in self.out_indices: |
| | outputs.append(x) |
| | return outputs |
| |
|
| | class ChannelAdapter(nn.Module): |
| | def __init__(self, in_channels, out_channels): |
| | super().__init__() |
| | self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1) |
| | |
| | def forward(self, x): |
| | return self.conv(x) |
| |
|
| | class YOLOPWithYOLOv11(nn.Module): |
| |
|
| | def __init__(self, num_seg_class=2, yolo_scale='n', yolo_weights_path=None): |
| | """ |
| | YOLOP with YOLOv11 Backbone |
| | |
| | Args: |
| | num_seg_class: 分割类别数 |
| | yolo_scale: YOLOv11 规模 ('n', 's', 'm', 'l', 'x') |
| | yolo_weights_path: YOLOv11 预训练权重路径(可选) |
| | """ |
| | super().__init__() |
| | |
| | |
| | scale_configs = { |
| | 'n': {'width': 0.25, 'depth': 0.50}, |
| | 's': {'width': 0.50, 'depth': 0.50}, |
| | 'm': {'width': 1.00, 'depth': 0.50}, |
| | 'l': {'width': 1.00, 'depth': 1.00}, |
| | 'x': {'width': 1.50, 'depth': 1.00}, |
| | } |
| | |
| | if yolo_scale not in scale_configs: |
| | raise ValueError(f"Invalid yolo_scale: {yolo_scale}. Must be one of {list(scale_configs.keys())}") |
| | |
| | scale = scale_configs[yolo_scale] |
| | |
| | |
| | if yolo_weights_path: |
| | self.backbone = YOLOv11Backbone(yolo_model_path=yolo_weights_path) |
| | else: |
| | self.backbone = YOLOv11Backbone(width_multiple=scale['width'], depth_multiple=scale['depth']) |
| | |
| | |
| | backbone_channels = self.backbone.out_channels |
| | neck_channels = [128, 256, 512] |
| | |
| | self.adapters = nn.ModuleList([ |
| | ChannelAdapter(backbone_channels[0], neck_channels[0]), |
| | ChannelAdapter(backbone_channels[1], neck_channels[1]), |
| | ChannelAdapter(backbone_channels[2], neck_channels[2]), |
| | ]) |
| | |
| | self.neck = nn.ModuleList([ |
| | Conv(512, 256, k=1, s=1), |
| | nn.Upsample(scale_factor=2, mode='nearest'), |
| | Concat(dimension=1), |
| | BottleneckCSP(512, 256, n=1, shortcut=False), |
| | Conv(256, 128, k=1, s=1), |
| | nn.Upsample(scale_factor=2, mode='nearest'), |
| | Concat(dimension=1), |
| | BottleneckCSP(256, 128, n=1, shortcut=False), |
| | Conv(128, 128, k=3, s=2), |
| | Concat(dimension=1), |
| | BottleneckCSP(256, 256, n=1, shortcut=False), |
| | Conv(256, 256, k=3, s=2), |
| | Concat(dimension=1), |
| | BottleneckCSP(512, 512, n=1, shortcut=False), |
| | ]) |
| | |
| | self.detect_head = Detect(1, [[3,9,5,11,4,20], [7,18,6,39,12,31], [19,50,38,81,68,157]], [128, 256, 512]) |
| |
|
| | self.drivable_seg_head = nn.ModuleList([ |
| | Conv(256, 128, k=3, s=1), |
| | nn.Upsample(scale_factor=2, mode='nearest'), |
| | BottleneckCSP(128, 64, n=1, shortcut=False), |
| | Conv(64, 32, k=3, s=1), |
| | nn.Upsample(scale_factor=2, mode='nearest'), |
| | Conv(32, 16, k=3, s=1), |
| | BottleneckCSP(16, 8, n=1, shortcut=False), |
| | nn.Upsample(scale_factor=2, mode='nearest'), |
| | Conv(8, num_seg_class, k=3, s=1), |
| | ]) |
| | self.lane_seg_head = nn.ModuleList([ |
| | Conv(256, 128, k=3, s=1), |
| | nn.Upsample(scale_factor=2, mode='nearest'), |
| | BottleneckCSP(128, 64, n=1, shortcut=False), |
| | Conv(64, 32, k=3, s=1), |
| | nn.Upsample(scale_factor=2, mode='nearest'), |
| | Conv(32, 16, k=3, s=1), |
| | BottleneckCSP(16, 8, n=1, shortcut=False), |
| | nn.Upsample(scale_factor=2, mode='nearest'), |
| | Conv(8, 2, k=3, s=1), |
| | ]) |
| |
|
| | |
| | |
| | |
| | |
| | s = 128 |
| | with torch.no_grad(): |
| | dummy = torch.zeros(1, 3, s, s) |
| | detect_out, _, _ = self.forward(dummy) |
| | self.detect_head.stride = torch.tensor([s / x.shape[-2] for x in detect_out]) |
| | self.detect_head.anchors /= self.detect_head.stride.view(-1, 1, 1) |
| | check_anchor_order(self.detect_head) |
| | self.stride = self.detect_head.stride |
| |
|
| | print(f"Initialized Detect head with strides: {self.detect_head.stride.tolist()}") |
| | |
| | |
| | self.nc = 1 |
| | self.detector_index = -1 |
| | self.names = ['vehicle'] |
| | self.model = nn.ModuleList([ |
| | self.backbone, |
| | self.adapters, |
| | self.neck, |
| | self.detect_head, |
| | self.drivable_seg_head, |
| | self.lane_seg_head |
| | ]) |
| | self.detector_index = 3 |
| | self.det_out_idx = 25 |
| |
|
| | self.gr = 1.0 |
| | |
| | |
| | self._initialize_biases() |
| | |
| | def freeze_backbone(self): |
| | """冻结backbone和adapters的参数""" |
| | logging.info("Freezing backbone parameters...") |
| | for param in self.backbone.parameters(): |
| | param.requires_grad = False |
| | for param in self.adapters.parameters(): |
| | param.requires_grad = False |
| | |
| | |
| | frozen_count = sum(1 for p in self.backbone.parameters() if not p.requires_grad) |
| | frozen_count += sum(1 for p in self.adapters.parameters() if not p.requires_grad) |
| | total_count = sum(1 for _ in self.backbone.parameters()) |
| | total_count += sum(1 for _ in self.adapters.parameters()) |
| | logging.info(f"Frozen {frozen_count}/{total_count} backbone+adapter parameters") |
| | |
| | def unfreeze_backbone(self): |
| | """解冻backbone和adapters的参数""" |
| | logging.info("Unfreezing backbone parameters...") |
| | for param in self.backbone.parameters(): |
| | param.requires_grad = True |
| | for param in self.adapters.parameters(): |
| | param.requires_grad = True |
| | |
| | def _initialize_biases(self, cf=None): |
| | """初始化检测头的偏置 (参考原始YOLOP实现)""" |
| | |
| | m = self.detect_head |
| | for mi, s in zip(m.m, m.stride): |
| | b = mi.bias.view(m.na, -1) |
| | b.data[:, 4] += math.log(8 / (640 / s) ** 2) |
| | b.data[:, 5:] += math.log(0.6 / (m.nc - 0.99)) if cf is None else torch.log(cf / cf.sum()) |
| | mi.bias = torch.nn.Parameter(b.view(-1), requires_grad=True) |
| | |
| | def load_yolov11_backbone_weights(self, weights_path, freeze_backbone=False): |
| | """ |
| | 从YOLOv11预训练模型加载backbone权重 |
| | |
| | Args: |
| | weights_path: YOLOv11权重路径(.pt文件) |
| | freeze_backbone: 是否冻结backbone参数 |
| | """ |
| | try: |
| | from ultralytics import YOLO |
| | logging.info(f"Loading YOLOv11 weights from {weights_path}") |
| | |
| | |
| | yolo_model = YOLO(weights_path) |
| | yolo_state_dict = yolo_model.model.state_dict() |
| | |
| | |
| | |
| | backbone_mapping = { |
| | |
| | 'model.0': 'backbone.layers.0', |
| | 'model.1': 'backbone.layers.1', |
| | 'model.2': 'backbone.layers.2', |
| | 'model.3': 'backbone.layers.3', |
| | 'model.4': 'backbone.layers.4', |
| | 'model.5': 'backbone.layers.5', |
| | 'model.6': 'backbone.layers.6', |
| | 'model.7': 'backbone.layers.7', |
| | 'model.8': 'backbone.layers.8', |
| | 'model.9': 'backbone.layers.9', |
| | 'model.10': 'backbone.layers.10', |
| | } |
| | |
| | |
| | new_state_dict = {} |
| | loaded_keys = [] |
| | for yolo_key, our_key in backbone_mapping.items(): |
| | for k, v in yolo_state_dict.items(): |
| | if k.startswith(yolo_key + '.'): |
| | new_key = k.replace(yolo_key, our_key) |
| | new_state_dict[new_key] = v |
| | loaded_keys.append(new_key) |
| | |
| | |
| | model_dict = self.state_dict() |
| | |
| | new_state_dict = {k: v for k, v in new_state_dict.items() if k in model_dict} |
| | model_dict.update(new_state_dict) |
| | self.load_state_dict(model_dict) |
| | |
| | logging.info(f"Successfully loaded {len(loaded_keys)} backbone parameters from YOLOv11") |
| | |
| | |
| | if freeze_backbone: |
| | self.freeze_backbone() |
| | logging.info("Backbone frozen successfully") |
| | |
| | except Exception as e: |
| | logging.warning(f"Failed to load YOLOv11 weights: {e}") |
| | logging.warning("Training will start from scratch") |
| | |
| | def forward(self, x): |
| | features = self.backbone(x) |
| | features = [adapter(f) for adapter, f in zip(self.adapters, features)] |
| | |
| | x = features[-1] |
| | x = self.neck[0](x) |
| | x = self.neck[1](x) |
| | x = self.neck[2]([x, features[1]]) |
| | x = self.neck[3](x) |
| | x = self.neck[4](x) |
| | x = self.neck[5](x) |
| | p3_fpn = self.neck[6]([x, features[0]]) |
| | p3 = self.neck[7](p3_fpn) |
| | x = self.neck[8](p3) |
| | x = self.neck[9]([x, self.neck[4](features[1])]) |
| | p4 = self.neck[10](x) |
| | x = self.neck[11](p4) |
| | x = self.neck[12]([x, self.neck[0](features[2])]) |
| | p5 = self.neck[13](x) |
| | |
| | detect_out = self.detect_head([p3, p4, p5]) |
| | drivable_out = p3_fpn |
| | for layer in self.drivable_seg_head: |
| | drivable_out = layer(drivable_out) |
| |
|
| | lane_out = p3_fpn |
| | for layer in self.lane_seg_head: |
| | lane_out = layer(lane_out) |
| |
|
| | drivable_out = torch.sigmoid(drivable_out) |
| | lane_out = torch.sigmoid(lane_out) |
| |
|
| | return [detect_out, drivable_out, lane_out] |
| |
|
| |
|
| | def get_net_yolov11(cfg, **kwargs): |
| | """ |
| | 获取带有YOLOv11 backbone的YOLOP模型 |
| | |
| | Args: |
| | cfg: 配置对象 |
| | **kwargs: 其他参数,包括: |
| | - yolov11_weights: YOLOv11预训练权重路径 |
| | - freeze_backbone: 是否冻结backbone |
| | - yolo_scale: YOLOv11规模 ('n', 's', 'm', 'l', 'x') |
| | """ |
| | num_seg_class = cfg.num_seg_class if hasattr(cfg, 'num_seg_class') else 2 |
| | yolo_scale = kwargs.get('yolo_scale', 'n') |
| | |
| | |
| | yolov11_weights = kwargs.get('yolov11_weights', f'weights/yolo11{yolo_scale}.pt') |
| | freeze_backbone = kwargs.get('freeze_backbone', False) |
| | |
| | |
| | import os |
| | if os.path.exists(yolov11_weights): |
| | logging.info(f"Creating model with YOLOv11{yolo_scale} pretrained weights from {yolov11_weights}") |
| | model = YOLOPWithYOLOv11(num_seg_class=num_seg_class, yolo_scale=yolo_scale, yolo_weights_path=yolov11_weights) |
| | if freeze_backbone: |
| | model.freeze_backbone() |
| | else: |
| | logging.warning(f"YOLOv11 weights not found at {yolov11_weights}, creating model from scratch") |
| | model = YOLOPWithYOLOv11(num_seg_class=num_seg_class, yolo_scale=yolo_scale, yolo_weights_path=None) |
| | |
| | return model |