data_img / lib /models /YOLOP_YOLOv11.py
sft2005's picture
Upload folder using huggingface_hub
9545fea verified
import torch
import torch.nn as nn
import math
from ultralytics import YOLO
from ultralytics.nn.modules import Conv, Concat
from lib.models.common import Focus, BottleneckCSP, Detect
from lib.utils import check_anchor_order
import logging
class YOLOv11Backbone(nn.Module):
def __init__(self, width_multiple=0.25, depth_multiple=0.50, yolo_model_path=None):
"""
YOLOv11 Backbone - 直接从 ultralytics YOLO 模型提取
Args:
width_multiple: 通道数缩放因子 (n=0.25, s=0.50, m=1.00, l=1.00, x=1.50)
depth_multiple: 深度缩放因子 (n=0.50, s=0.50, m=0.50, l=1.00, x=1.00)
yolo_model_path: YOLOv11 预训练模型路径(可选)
Warning:
不同的yolo model(n, s, m, l, x)模型结构都会不同,目前这个是以 small 为例,
恰好可以输出(128, 256, 512)通道数 (虽然有adapter也无所谓)
"""
super().__init__()
self.out_indices = [4, 6, 10] # P3, P4, P5
# 如果提供了预训练模型路径,直接加载
if yolo_model_path:
yolo = YOLO(yolo_model_path)
yolo_model = yolo.model
# 提取 backbone 层 (0-10)
self.layers = nn.ModuleList([yolo_model.model[i] for i in range(11)])
# 获取输出通道数 (C3k2 和 C2PSA 都有 cv2 属性)
self.out_channels = [
yolo_model.model[self.out_indices[0]].conv.out_channels, # P3 (C3k2)
yolo_model.model[self.out_indices[1]].conv.out_channels, # P4 (C3k2)
yolo_model.model[self.out_indices[2]].conv.out_channels, # P5 (C2PSA)
]
else:
# 如果没有预训练模型,使用 ultralytics 的模块构建
from ultralytics.nn.modules import Conv, C3k2, SPPF, C2PSA
# 根据 width_multiple 计算通道数
def make_divisible(x, divisor=8):
"""确保通道数是 divisor 的倍数"""
return int(math.ceil(x / divisor) * divisor)
c1 = make_divisible(64 * width_multiple)
c2 = make_divisible(128 * width_multiple)
c3 = make_divisible(256 * width_multiple)
c4 = make_divisible(512 * width_multiple)
c5 = make_divisible(1024 * width_multiple)
# 根据 depth_multiple 计算重复次数
n1 = max(round(2 * depth_multiple), 1) # C3k2 repeats
self.layers = nn.ModuleList([
Conv(3, c1, k=3, s=2), # 0
Conv(c1, c2, k=3, s=2), # 1
C3k2(c2, c3, n=n1, shortcut=False, e=0.25), # 2
Conv(c3, c3, k=3, s=2), # 3
C3k2(c3, c4, n=n1, shortcut=False, e=0.25), # 4
Conv(c4, c4, k=3, s=2), # 5
C3k2(c4, c4, n=n1, shortcut=True), # 6
Conv(c4, c5, k=3, s=2), # 7
C3k2(c5, c5, n=n1, shortcut=True), # 8
SPPF(c5, c5, k=5), # 9
C2PSA(c5, c5, n=n1), # 10
])
self.out_channels = []
for i in self.out_indices:
layer = self.layers[i]
#(Conv)
if hasattr(layer, 'conv'):
self.out_channels.append(layer.conv.out_channels)
elif hasattr(layer, 'cv2'): # (C3k2)
self.out_channels.append(layer.cv2.conv.out_channels)
else:
raise AttributeError(f"Layer {i} 没有 conv 或 cv2 属性,请检查模块结构")
def forward(self, x):
outputs = []
for i, layer in enumerate(self.layers):
x = layer(x)
if i in self.out_indices:
outputs.append(x)
return outputs
class ChannelAdapter(nn.Module):
def __init__(self, in_channels, out_channels):
super().__init__()
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)
def forward(self, x):
return self.conv(x)
class YOLOPWithYOLOv11(nn.Module):
def __init__(self, num_seg_class=2, yolo_scale='n', yolo_weights_path=None):
"""
YOLOP with YOLOv11 Backbone
Args:
num_seg_class: 分割类别数
yolo_scale: YOLOv11 规模 ('n', 's', 'm', 'l', 'x')
yolo_weights_path: YOLOv11 预训练权重路径(可选)
"""
super().__init__()
# YOLOv11 缩放参数
scale_configs = {
'n': {'width': 0.25, 'depth': 0.50}, # nano
's': {'width': 0.50, 'depth': 0.50}, # small
'm': {'width': 1.00, 'depth': 0.50}, # medium
'l': {'width': 1.00, 'depth': 1.00}, # large
'x': {'width': 1.50, 'depth': 1.00}, # xlarge
}
if yolo_scale not in scale_configs:
raise ValueError(f"Invalid yolo_scale: {yolo_scale}. Must be one of {list(scale_configs.keys())}")
scale = scale_configs[yolo_scale]
# 如果提供了权重路径,直接从预训练模型提取 backbone
if yolo_weights_path:
self.backbone = YOLOv11Backbone(yolo_model_path=yolo_weights_path)
else:
self.backbone = YOLOv11Backbone(width_multiple=scale['width'], depth_multiple=scale['depth'])
# 适配 YOLOv11 输出到 YOLOP neck 输入 [128, 256, 512]
backbone_channels = self.backbone.out_channels
neck_channels = [128, 256, 512]
self.adapters = nn.ModuleList([
ChannelAdapter(backbone_channels[0], neck_channels[0]), # P3
ChannelAdapter(backbone_channels[1], neck_channels[1]), # P4
ChannelAdapter(backbone_channels[2], neck_channels[2]), # P5
])
# YOLOP neck (层 11-24)
self.neck = nn.ModuleList([
Conv(512, 256, k=1, s=1), # 11
nn.Upsample(scale_factor=2, mode='nearest'), # 12
Concat(dimension=1), # 13: Concat [-1, 6]
BottleneckCSP(512, 256, n=1, shortcut=False), # 14
Conv(256, 128, k=1, s=1), # 15
nn.Upsample(scale_factor=2, mode='nearest'), # 16
Concat(dimension=1), # 17: Concat [-1, 4]
BottleneckCSP(256, 128, n=1, shortcut=False), # 18
Conv(128, 128, k=3, s=2), # 19
Concat(dimension=1), # 20: Concat [-1, 14]
BottleneckCSP(256, 256, n=1, shortcut=False), # 21
Conv(256, 256, k=3, s=2), # 22
Concat(dimension=1), # 23: Concat [-1, 10]
BottleneckCSP(512, 512, n=1, shortcut=False), # 24
])
# YOLOP heads
self.detect_head = Detect(1, [[3,9,5,11,4,20], [7,18,6,39,12,31], [19,50,38,81,68,157]], [128, 256, 512])
self.drivable_seg_head = nn.ModuleList([
Conv(256, 128, k=3, s=1), # 25
nn.Upsample(scale_factor=2, mode='nearest'), # 26
BottleneckCSP(128, 64, n=1, shortcut=False), # 27
Conv(64, 32, k=3, s=1), # 28
nn.Upsample(scale_factor=2, mode='nearest'), # 29
Conv(32, 16, k=3, s=1), # 30
BottleneckCSP(16, 8, n=1, shortcut=False), # 31
nn.Upsample(scale_factor=2, mode='nearest'), # 32
Conv(8, num_seg_class, k=3, s=1), # 33
])
self.lane_seg_head = nn.ModuleList([
Conv(256, 128, k=3, s=1), # 34
nn.Upsample(scale_factor=2, mode='nearest'), # 35
BottleneckCSP(128, 64, n=1, shortcut=False), # 36
Conv(64, 32, k=3, s=1), # 37
nn.Upsample(scale_factor=2, mode='nearest'), # 38
Conv(32, 16, k=3, s=1), # 39
BottleneckCSP(16, 8, n=1, shortcut=False), # 40
nn.Upsample(scale_factor=2, mode='nearest'), # 41
Conv(8, 2, k=3, s=1), # 42
])
# 初始化 Detection Head 的 stride
# self.detect_head.stride = torch.tensor([8., 16., 32.]) # P3, P4, P5 的 stride
# 初始化时动态计算 stride
s = 128
with torch.no_grad():
dummy = torch.zeros(1, 3, s, s)
detect_out, _, _ = self.forward(dummy)
self.detect_head.stride = torch.tensor([s / x.shape[-2] for x in detect_out])
self.detect_head.anchors /= self.detect_head.stride.view(-1, 1, 1) # Set the anchors for the corresponding scale
check_anchor_order(self.detect_head)
self.stride = self.detect_head.stride
print(f"Initialized Detect head with strides: {self.detect_head.stride.tolist()}")
# 添加必要的属性以兼容训练代码
self.nc = 1 # number of classes
self.detector_index = -1 # detector在模型中的索引
self.names = ['vehicle'] # class names
self.model = nn.ModuleList([
self.backbone,
self.adapters,
self.neck,
self.detect_head,
self.drivable_seg_head,
self.lane_seg_head
])
self.detector_index = 3 # detect_head 在第4个位置
self.det_out_idx = 25
self.gr = 1.0 # giou loss ratio (obj loss ratio is 1-giou)
# 初始化 Detection Head 的偏置
self._initialize_biases()
def freeze_backbone(self):
"""冻结backbone和adapters的参数"""
logging.info("Freezing backbone parameters...")
for param in self.backbone.parameters():
param.requires_grad = False
for param in self.adapters.parameters():
param.requires_grad = False
# 验证冻结状态
frozen_count = sum(1 for p in self.backbone.parameters() if not p.requires_grad)
frozen_count += sum(1 for p in self.adapters.parameters() if not p.requires_grad)
total_count = sum(1 for _ in self.backbone.parameters())
total_count += sum(1 for _ in self.adapters.parameters())
logging.info(f"Frozen {frozen_count}/{total_count} backbone+adapter parameters")
def unfreeze_backbone(self):
"""解冻backbone和adapters的参数"""
logging.info("Unfreezing backbone parameters...")
for param in self.backbone.parameters():
param.requires_grad = True
for param in self.adapters.parameters():
param.requires_grad = True
def _initialize_biases(self, cf=None):
"""初始化检测头的偏置 (参考原始YOLOP实现)"""
# https://arxiv.org/abs/1708.02002 section 3.3
m = self.detect_head # Detect() module
for mi, s in zip(m.m, m.stride): # from
b = mi.bias.view(m.na, -1) # conv.bias(255) to (3,85)
b.data[:, 4] += math.log(8 / (640 / s) ** 2) # obj (8 objects per 640 image)
b.data[:, 5:] += math.log(0.6 / (m.nc - 0.99)) if cf is None else torch.log(cf / cf.sum()) # cls
mi.bias = torch.nn.Parameter(b.view(-1), requires_grad=True)
def load_yolov11_backbone_weights(self, weights_path, freeze_backbone=False):
"""
从YOLOv11预训练模型加载backbone权重
Args:
weights_path: YOLOv11权重路径(.pt文件)
freeze_backbone: 是否冻结backbone参数
"""
try:
from ultralytics import YOLO
logging.info(f"Loading YOLOv11 weights from {weights_path}")
# 加载YOLOv11模型
yolo_model = YOLO(weights_path)
yolo_state_dict = yolo_model.model.state_dict()
# 映射YOLOv11的backbone权重到我们的模型
# YOLOv11的backbone层索引: 0-10
backbone_mapping = {
# YOLOv11 layer -> our layer
'model.0': 'backbone.layers.0', # Conv 3->64
'model.1': 'backbone.layers.1', # Conv 64->128
'model.2': 'backbone.layers.2', # C3k2 128->256
'model.3': 'backbone.layers.3', # Conv 256->256
'model.4': 'backbone.layers.4', # C3k2 256->512
'model.5': 'backbone.layers.5', # Conv 512->512
'model.6': 'backbone.layers.6', # C3k2 512->512
'model.7': 'backbone.layers.7', # Conv 512->1024
'model.8': 'backbone.layers.8', # C3k2 1024->1024
'model.9': 'backbone.layers.9', # SPPF
'model.10': 'backbone.layers.10', # C2PSA
}
# 构建新的state dict
new_state_dict = {}
loaded_keys = []
for yolo_key, our_key in backbone_mapping.items():
for k, v in yolo_state_dict.items():
if k.startswith(yolo_key + '.'):
new_key = k.replace(yolo_key, our_key)
new_state_dict[new_key] = v
loaded_keys.append(new_key)
# 加载权重
model_dict = self.state_dict()
# 只更新存在的键
new_state_dict = {k: v for k, v in new_state_dict.items() if k in model_dict}
model_dict.update(new_state_dict)
self.load_state_dict(model_dict)
logging.info(f"Successfully loaded {len(loaded_keys)} backbone parameters from YOLOv11")
# 冻结backbone
if freeze_backbone:
self.freeze_backbone()
logging.info("Backbone frozen successfully")
except Exception as e:
logging.warning(f"Failed to load YOLOv11 weights: {e}")
logging.warning("Training will start from scratch")
def forward(self, x):
features = self.backbone(x) # YOLOv11 输出 [P3, P4, P5]
features = [adapter(f) for adapter, f in zip(self.adapters, features)] # 适配到 [128, 256, 512]
# Neck 前向传播
x = features[-1] # P5 10
x = self.neck[0](x) # 11
x = self.neck[1](x) # 12
x = self.neck[2]([x, features[1]]) # 13
x = self.neck[3](x) # 14
x = self.neck[4](x) # 15
x = self.neck[5](x) # 16
p3_fpn = self.neck[6]([x, features[0]]) # 17 (P3, 256 通道)
p3 = self.neck[7](p3_fpn) # 18 (P3, 128 通道)
x = self.neck[8](p3) # 19
x = self.neck[9]([x, self.neck[4](features[1])]) # 20
p4 = self.neck[10](x) # 21
x = self.neck[11](p4) # 22
x = self.neck[12]([x, self.neck[0](features[2])]) # 23
p5 = self.neck[13](x) # 24
# Heads
detect_out = self.detect_head([p3, p4, p5]) # 使用层 17, 20, 23
drivable_out = p3_fpn # 使用层 16
for layer in self.drivable_seg_head:
drivable_out = layer(drivable_out)
lane_out = p3_fpn # 使用层 16
for layer in self.lane_seg_head:
lane_out = layer(lane_out)
drivable_out = torch.sigmoid(drivable_out)
lane_out = torch.sigmoid(lane_out)
return [detect_out, drivable_out, lane_out]
def get_net_yolov11(cfg, **kwargs):
"""
获取带有YOLOv11 backbone的YOLOP模型
Args:
cfg: 配置对象
**kwargs: 其他参数,包括:
- yolov11_weights: YOLOv11预训练权重路径
- freeze_backbone: 是否冻结backbone
- yolo_scale: YOLOv11规模 ('n', 's', 'm', 'l', 'x')
"""
num_seg_class = cfg.num_seg_class if hasattr(cfg, 'num_seg_class') else 2
yolo_scale = kwargs.get('yolo_scale', 'n') # 默认使用 nano
# 如果提供了权重路径,直接用权重初始化
yolov11_weights = kwargs.get('yolov11_weights', f'weights/yolo11{yolo_scale}.pt')
freeze_backbone = kwargs.get('freeze_backbone', False)
# 在初始化时就加载预训练权重
import os
if os.path.exists(yolov11_weights):
logging.info(f"Creating model with YOLOv11{yolo_scale} pretrained weights from {yolov11_weights}")
model = YOLOPWithYOLOv11(num_seg_class=num_seg_class, yolo_scale=yolo_scale, yolo_weights_path=yolov11_weights)
if freeze_backbone:
model.freeze_backbone()
else:
logging.warning(f"YOLOv11 weights not found at {yolov11_weights}, creating model from scratch")
model = YOLOPWithYOLOv11(num_seg_class=num_seg_class, yolo_scale=yolo_scale, yolo_weights_path=None)
return model