PyTorch实战:残差网络(ResNet)


残差网络(ResNet)

让我们先思考一个问题:对神经网络模型添加新的层,充分训练后的模型是否只可能更有效地降低训练误差?

理论上,原模型解的空间只是新模型解的空间的子空间。

也就是说,如果我们能将新添加的层训练成恒等映射 $f(x)=x$ ,新模型和原模型将同样有效。由于新模型可能得出更优的解来拟合训练数据集,因此添加层似乎更容易降低训练误差。

然而在实践中,添加过多的层后训练误差往往不降反升。即使利用批量归一化带来的数值稳定性使训练深层模型更加容易,该问题仍然存在。针对这一问题,何恺明等人提出了残差网络 (ResNet) [1]。它在2015年的ImageNet图像识别挑战赛夺魁,并深刻影响了后来的深度神经网络的设计。

残差快

让我们聚焦于神经网络局部。如图所示,设输入为 $\boldsymbol{x}$ 。假设我们希望学出的理想映射为 $f(\boldsymbol{x})$ ,从而作为图上方激活函数的输 $\lambda$ 。

image-20231215222818328

左图虚线框中的部分需要直接拟合出该映射 $f(\boldsymbol{x})$ ,而右图虚线框中的部分则需要拟合出有关恒等映射的残差映射 $f(\boldsymbol{x})-\boldsymbol{x}$ 。

残差映射在实际中往往更容易优化。以本节开头提到的恒等映射作为我们希望学出的理想映射 $f(x)$ 。我们只需将图中右图虚线框内上方的加权运算 (如仿射) 的权重和偏差参数学成 0 ,那么 $f(\boldsymbol{x})$ 即为恒等映射。

实际中,当理想映射 $f(\boldsymbol{x})$ 极接近于恒等映射时,残差映射也易于捕捉恒等映射的细微波动。右图也是ResNet的基础块,即残差块(residual block)。在残差块中,输入可通过跨层的数据线路更快地向前传播。

ResNet沿用了VGG全3×3卷积层的设计:

  • 残差块里首先有2个有相同输出通道数的3×3卷积层。

  • 每个卷积层后接一个批量归一化层和ReLU激活函数。

  • 然后我们将输入跳过这两个卷积运算后直接加在最后的ReLU激活函数前。这样的设计要求两个卷积层的输出与输入形状一样,从而可以相加。

    注意:如果想改变通道数,就需要引入一个额外的1×1卷积层来将输入变换成需要的形状后再做相加运算。

残差块的实现如下:

import time
import torch
from torch import nn, optim
import torch.nn.functional as F

import sys
sys.path.append("..") 
import d2lzh_pytorch as d2l
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class Residual(nn.Module):  
    def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1): # 输入通道数 输出通道数 是否使用1×1卷积 步长
        super(Residual, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride) # 第一个卷积核
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        if use_1x1conv: # 如果想改变通道数,就需要引入一个额外的1×1卷积层来将输入变换成需要的形状后再做相加运算。
            self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
        else:
            self.conv3 = None
        self.bn1 = nn.BatchNorm2d(out_channels) # 批量归一化层,加速模型的收敛速度,具有轻微正则化的作用
        self.bn2 = nn.BatchNorm2d(out_channels) # 定义两个一样的批量归一化层是因为它们的参数(均值和方差)是独立学习的,它们在网络中的作用是不同的

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X))) # 输入->第一个卷积层->批量归一化层->激活函数
        Y = self.bn2(self.conv2(Y)) # 激活函数后->第二个卷积层
        if self.conv3: # 如果想改变通道数,直接将原始输入通过一个1×1卷积层
            X = self.conv3(X)
        return F.relu(Y + X) # 直接将最后输出+原始输入

下面我们来查看输入和输出形状一致的情况。

# 输入和输出形状一致的情况
blk = Residual(3, 3)
X = torch.rand((4, 3, 6, 6)) # 批量大小(X中包含的样本数量) 每个样本的通道数 每个样本的高度 每个样本的宽度
blk(X).shape # torch.Size([4, 3, 6, 6])

我们也可以在增加输出通道数的同时减半输出的高和宽。

# 输入通道数≠输出通道数,那么就需要借助1×1卷积层改变通道数,具体通道数取决于输出通道数
blk = Residual(3, 6, use_1x1conv=True, stride=2) # 注意步长也改变为2,这导致输出的宽和高减半
blk(X).shape # torch.Size([4, 6, 3, 3])

ResNet模型

ResNet的前两层跟之前介绍的GoogLeNet中的一样:在输出通道数为64、步幅为2的7×7卷积层后接步幅为2的3×3的最大池化层。不同之处在于ResNet每个卷积层后增加的批量归一化层。

net = nn.Sequential( # 与GoogLeNet相似,在输出通道为64的卷积层后+批量归一层+激活函数层+批量归一化层+最大池化层
        nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
        nn.BatchNorm2d(64), 
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
  • GoogLeNet在后面接了4个由Inception块组成的模块。

  • ResNet则使用4个由残差块组成的模块,每个模块使用若干个同样输出通道数的残差块:

    • 第一个模块的通道数同输入通道数一致。

      由于之前已经使用了步幅为2的最大池化层,所以无须减小高和宽。

    • 每个模块在第一个残差块里将上一个模块的通道数翻倍,并将高和宽减半。

def resnet_block(in_channels, out_channels, num_residuals, first_block=False): # 第一个残差块由于在之前已经使用了步幅为2的最大池化层,所以无须减小高和宽
    if first_block:
        assert in_channels == out_channels # 第一个模块的通道数同输入通道数一致
    blk = []
    for i in range(num_residuals):
        if i == 0 and not first_block: # 如果这不是第一个残差块,并且是每个残差块的第一个 Residual 对象,那么会创建一个使用 1x1 卷积并且步长为 2 的 Residual 对象。
            blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2)) # 目的是在每个残差块的开始处改变输入数据的通道数和尺寸,以适应网络的需要
        else:
            blk.append(Residual(out_channels, out_channels))
    return nn.Sequential(*blk)

接着我们为ResNet加入所有残差块。这里每个模块使用两个残差块。

net.add_module("resnet_block1", resnet_block(64, 64, 2, first_block=True)) # 每个模块使用两个2个残差块
net.add_module("resnet_block2", resnet_block(64, 128, 2))
net.add_module("resnet_block3", resnet_block(128, 256, 2))
net.add_module("resnet_block4", resnet_block(256, 512, 2))

最后,与GoogLeNet一样,加入全局平均池化层后接上全连接层输出。

class GlobalAvgPool2d(nn.Module): # 平均成一个数值的目的是为了减少特征的维度,从而降低计算量和内存消耗,同时保留足够的特征信息用于分类或其他任务。
    # 全局平均池化层可通过将池化窗口形状设置成输入的高和宽实现
    def __init__(self):
        super(GlobalAvgPool2d, self).__init__()
    def forward(self, x): # x 的形状为(1,3,224,224)
        return F.avg_pool2d(x, kernel_size=x.size()[2:]) # 对输入数据x进行平均池化操作,并取出元组中的第三个和第四个元素,也就是高和宽
    
 # 转换形状,用于在softmax回归,因为每个batch样本x的形状为(batch_size,1,28,28)
class FlattenLayer(nn.Module):
    def __init__(self):
        super(FlattenLayer, self).__init__()
    def forward(self, x): # x shape: (batch, *, *, ...)
        return x.view(x.shape[0], -1) # 保持批量大小相同,-1 表示自动计算剩余维度的大小,使得重塑后的数据元素个数与原数据相同。
      

最后,与GoogLeNet一样,加入全局平均池化层后接上全连接层输出。

net.add_module("global_avg_pool", d2l.GlobalAvgPool2d()) # GlobalAvgPool2d的输出: (Batch, 512, 1, 1)
net.add_module("fc", nn.Sequential(d2l.FlattenLayer(), nn.Linear(512, 10))) 

这里每个模块里有4个卷积层(不计算1×11×11×1卷积层),加上最开始的卷积层和最后的全连接层,共计18层。这个模型通常也被称为ResNet-18。通过配置不同的通道数和模块里的残差块数可以得到不同的ResNet模型,例如更深的含152层的ResNet-152。虽然ResNet的主体架构跟GoogLeNet的类似,但ResNet结构更简单,修改也更方便。这些因素都导致了ResNet迅速被广泛使用。

在训练ResNet之前,我们来观察一下输入形状在ResNet不同模块之间的变化。

X = torch.rand((1, 1, 224, 224))
for name, layer in net.named_children():
    X = layer(X)
    print(name, ' output shape:\t', X.shape)

'''
第一层卷积: 使用nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3)后,X的形状变为(1, 64, 112, 112)。步长为2使得宽和高减半。
批量归一化和ReLU: 形状不变,依然是(1, 64, 112, 112)。
最大池化: 使用nn.MaxPool2d(kernel_size=3, stride=2, padding=1)后,X的形状变为(1, 64, 56, 56)。步长为2再次减半宽和高。
resnet_block1: 第一个残差块不改变通道数,也不改变宽和高,因此X的形状仍然是(1, 64, 56, 56)。
resnet_block2: 第二个残差块使用1x1卷积和步长为2,将通道数从64增加到128,并将宽和高减半,所以X的形状变为(1, 128, 28, 28)。
resnet_block3: 同样的操作将通道数从128增加到256,并再次减半宽和高,X的形状变为(1, 256, 14, 14)。
resnet_block4: 将通道数从256增加到512,并减半宽和高,X的形状变为(1, 512, 7, 7)。
全局平均池化: d2l.GlobalAvgPool2d()将每个通道的特征图平均成一个单一的数值,因此X的形状变为(1, 512, 1, 1)。
全连接层: nn.Linear(512, 10)将512个特征转换为10个输出类别,因此最终输出的形状是(1, 10)。
'''

获取数据和训练模型

def load_data_fashion_mnist(batch_size, resize=None, root='~/Datasets/FashionMNIST'):
    """Download the fashion mnist dataset and then load into memory."""
    trans = [] # 用于存储图像变换的操作
    if resize: # 检查检查 resize 参数是否有值。如果有,那么将一个 torchvision.transforms.Resize 对象添加到 trans 列表中,用于将图像缩放到指定的大小。
        trans.append(torchvision.transforms.Resize(size=resize))
    trans.append(torchvision.transforms.ToTensor()) # 用于将图像转换为张量,并且将像素值归一化到 [0, 1] 范围内。
    
    # 它会按照trans列表中定义的顺序依次执行这些转换操作,将多个图像转换操作组合成一个转换函数
    transform = torchvision.transforms.Compose(trans) # 将 trans 列表中的所有图像变换操作组合成一个变换函数 transform。
    mnist_train = torchvision.datasets.FashionMNIST(root=root, train=True, download=True, transform=transform)
    mnist_test = torchvision.datasets.FashionMNIST(root=root, train=False, download=True, transform=transform)

    train_iter = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size, shuffle=True, num_workers=4)
    test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size, shuffle=False, num_workers=4)

    return train_iter, test_iter
def evaluate_accuracy(data_iter, net, device=None):
    if device is None and isinstance(net, torch.nn.Module):
        # 如果没指定device就使用net的device
        device = list(net.parameters())[0].device
    acc_sum, n = 0.0, 0
    with torch.no_grad():
        for X, y in data_iter:
            if isinstance(net, torch.nn.Module):
                net.eval() # 评估模式, 这会关闭dropout
                acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
                net.train() # 改回训练模式
            else: # 自定义的模型, 不考虑GPU
                if('is_training' in net.__code__.co_varnames): # 如果有is_training这个参数
                    # 将is_training设置成False
                    acc_sum += (net(X, is_training=False).argmax(dim=1) == y).float().sum().item() 
                else:
                    acc_sum += (net(X).argmax(dim=1) == y).float().sum().item() 
            n += y.shape[0]
    return acc_sum / n
def train_ch5(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs):
    net = net.to(device)
    print("training on ", device)
    loss = torch.nn.CrossEntropyLoss()
    for epoch in range(num_epochs):
        train_l_sum, train_acc_sum, n, batch_count, start = 0.0, 0.0, 0, 0, time.time()
        for X, y in train_iter:
            X = X.to(device)
            y = y.to(device)
            y_hat = net(X)

            l = loss(y_hat, y)

            optimizer.zero_grad()

            l.backward()
            optimizer.step()

            train_l_sum += l.cpu().item() # 计算累加损失值
            train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
            n += y.shape[0]
            batch_count += 1
        test_acc = evaluate_accuracy(test_iter, net)
        print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
              % (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))

开始训练:

batch_size = 256
# 如出现“out of memory”的报错信息,可减小batch_size或resize
train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size, resize=96)

lr, num_epochs = 0.001, 5
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
d2l.train_ch5(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs)
training on  cuda
epoch 1, loss 0.0015, train acc 0.853, test acc 0.885, time 31.0 sec
epoch 2, loss 0.0010, train acc 0.910, test acc 0.899, time 31.8 sec
epoch 3, loss 0.0008, train acc 0.926, test acc 0.911, time 31.6 sec
epoch 4, loss 0.0007, train acc 0.936, test acc 0.916, time 31.8 sec
epoch 5, loss 0.0006, train acc 0.944, test acc 0.926, time 31.5 sec

文章作者: QT-7274
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 QT-7274 !
评论
  目录