神经网络量化初探


什么是量化?

image-20231021104226103

image-20231021104342734

为什么要量化?

  1. 量化最直接的好处就是减少存储空间和内存需求,减少数据的位宽,从而减少模型的大小,即压缩模型。

  2. 量化的第二个好处就是加快推理速度减少了计算时间

    image-20231021104618371

    例如在上图中,A可以用AQ × a + b来表示,此处的AQ为 int8 类型,而A为 float32 类型,所以就实现了数据的压缩。同样也可以反推回去,但是这种反量化是有精读损失的。最后可以看到Z的表示方法,相比于两个浮点数A和W相乘,转化为了 int8 类型的计算,速度变快了很多

量化推导

回想一下,我们通常会将一张 uint8 类型、数值范围在 0-255 的图片归一成 float32 类型、数值范围在 0.0-1.0 的张量,这个过程就是反量化。类似地,我们经常将网络输出的范围在 0.0-1.0 之间的张量调整成数值为 0-255、uint8 类型的图片数据,这个过程就是量化

所以量化本质上只是对数值范围的重新调整,可以「粗略」理解为是一种线性映射。之所以加「粗略」二字,是因为有些论文会用非线性量化,但目前在工业界落地的还都是线性量化,所以本文只讨论线性量化的方案。

不过,可以明显看出,反量化一般没有信息损失,而量化一般都会有精度损失。这也非常好理解,float32 能保存的数值范围本身就比 uint8 多,因此必定有大量数值无法用 uint8 表示,只能四舍五入成 uint8 型的数值。量化模型和全精度模型的误差也来自四舍五入的 clip 操作。

量化公式推导

  • $r$:浮点实数
  • $q$:量化后的定点整数

浮点和整型的换算公式:
$$
\begin{align} \overline r = S(q-Z) \tag{1} \newline \ q = round(\frac{r}{S}+Z) \tag{2} \end{align}
$$

  • $S$:scale,表示实数和整数之间的比例关系

  • $Z$:zero point,表示实数中的 0 经过量化后对应的整数

    得到它们的计算方法:
    $$
    \begin{align}
    S = \frac{r_{\text{max}} - r_{\text{min}}}{q_{\text{max}} - q_{\text{min}}} \tag{3} \newline
    Z = \text{round}(q_{\text{max}} - \frac{r_{\text{max}}}{S}) \tag{4}
    \end{align}
    $$
    $r_{max}$ 、 $r_{min}$ 分别是r的最大值和最小值, $q_{min}$ 、 $q_{max}$ 同理。这个公式的推导比较简单,很多资料也有详细的介绍,这里不过多介绍。需要强调的一点是,定点整数的 zero point 就代表浮点实数的 0,二者之间的换算不存在精度损失,这一点可以从公式 (2) 中看出来,把 $r=0$ 代入后就可以得到 $q=Z$。这么做的目的是为了在 padding 时保证浮点数值的 0 和定点整数的 zero point 完全等价,保证定点和浮点之间的表征能够一致。

矩阵运算的量化

由于卷积网络中的卷积层和全连接层本质上都是一堆矩阵乘法,因此我们先看如何将浮点运算上的矩阵转换为定点运算。

假设 $r_1$、$r_2$ 是浮点实数上的两个$N \times N$的矩阵,$r_3$ 是 $r_1$、$r_2$ 相乘后的矩阵:
$$
r_3^{i,k}=\sum_{j=1}^N r_1^{i,j}r_2^{j,k} \tag{5}
$$
假设 $S_1$、$Z_1$ 是$r_1$矩阵对应的scale和zero point,$S_2$、$Z_2$、$S_3$、$Z_3$ 同理。那么将 $(1)$ 式代入 $(5)$ 式可以推出:
$$
S_3(q_3^{i,k}-Z_3)=\sum_{j=1}^{N}S_1(q_{1}^{i,j}-Z_1)S_2(q_2^{j,k}-Z_2) \tag{6}
$$
移项可以得到:
$$
q_3^{i,k}=\frac{S_1 S_2}{S_3}\sum_{j=1}^N(q_1^{i,j}-Z_1)(q_2^{j,k}-Z_2)+Z_3 \tag{7}
$$
仔细观察 $(7)$ 式可以发现,除了 $\frac{S_1 S_2}{S_3}$ ,其他都是定点整数运算。那如何把 $\frac{S_1 S_2}{S_3}$ 也变成定点运算呢?这里要用到一个 trick。假设 $M=\frac{S_1 S_2}{S_3}$,由于 $M$ 通常都是 $(0, 1)$ 之间的实数(这是通过大量实验统计出来的),因此可以表示成 $M=2^{-n}M_0$,其中 $M_0$ 是一个定点实数。

注意:定点数并不一定是整数,所谓定点,指的是小数点的位置是固定的,即小数位数是固定的。

因此,如果存在 $M=2^{-n}M_0$,那我们就可以通过 $M_0$ 的 bit 位移操作实现 $2^{-n}M_0$,这样整个过程就都在定点上计算了。

很多刚接触量化的同学对这一点比较疑惑,下面我就用一个简单的示例说明这一点。我们把 $M=\frac{S_1 S_2}{S_3}$ 代入 $(7)$ 式可以得到:
$$
q_3^{i,k}=M\sum_{j=1}^N(q_1^{i,j}-Z_1)(q_2^{j,k}-Z_2)+Z_3=MP+Z_3 \tag{8}
$$
这里面 $P$ 是一个在定点域上计算好的整数。
假设 $P=7091$,$M=0.0072474273418460$ ($M$ 可以通过事先计算得到),那下面我们就是要找到一个 $M_0$ 和 $n$,使得 $MP=2^{-n}M_0 P$ 成立。我们可以用一段代码来找到这两个数:

M = 0.0072474273418460
P = 7091

def multiply(n, M, P):
    result = M * P
    Mo = int(round(2 ** n * M)) # 这里不一定要四舍五入截断,因为python定点数不好表示才这样处理

    approx_result = (Mo * P) >> n
    print("n=%d, Mo=%d, approx=%f, error=%f"%\
          (n, Mo, approx_result, result-approx_result))

for n in range(1, 16):
    multiply(n, M, P)

输出:

n=1, Mo=0, approx=0.000000, error=51.391507
n=2, Mo=0, approx=0.000000, error=51.391507
n=3, Mo=0, approx=0.000000, error=51.391507
n=4, Mo=0, approx=0.000000, error=51.391507
n=5, Mo=0, approx=0.000000, error=51.391507
n=6, Mo=0, approx=0.000000, error=51.391507
n=7, Mo=1, approx=55.000000, error=-3.608493
n=8, Mo=2, approx=55.000000, error=-3.608493
n=9, Mo=4, approx=55.000000, error=-3.608493
n=10, Mo=7, approx=48.000000, error=3.391507
n=11, Mo=15, approx=51.000000, error=0.391507
n=12, Mo=30, approx=51.000000, error=0.391507
n=13, Mo=59, approx=51.000000, error=0.391507
n=14, Mo=119, approx=51.000000, error=0.391507
n=15, Mo=237, approx=51.000000, error=0.391507

可以看到,在 n=11、 $M_0$ =15 的时候,误差就已经在 1 以内了。因此,可以通过对 $M_0P$ 右移 n 个 bit 来近似$MP$,而这个误差本身在可以接受的范围内。这样一来,$(8)$ 式就可以完全通过定点运算来计算,即我们实现了浮点矩阵乘法的量化。

卷积网络的量化

有了上面矩阵乘法的量化,我们就可以进一步尝试对卷积网络的量化。

假设一个这样的网络:

img

这个网络只有三个模块,现在需要把 conv、fc、relu 量化。

  • conv量化

    假设输入为 $x$,我们可以事先统计样本的最大值和最小值,然后根据式 $(3)$ 和式 $(4)$ 计算出 $S_x$ (scale) 和 $Z_x$ (zero point)。

    同样地,假设 conv、fc 的参数为 $w_1$、$w_2$,以及 scale 和 zero point 为 $S_{w1}$、$Z_{w1}$、$S_{w2}$、$Z_{w2}$。中间层的 feature map 为 $a_1$、$a_2$,并且事先统计出它们的 scale 和 zero point 为 $S_{a1}$、$Z_{a1}$、$S_{a2}$、$Z_{a2}$。

    卷积运算和全连接层的本质都是矩阵运算,因此我们可以把卷积运算表示成(这里先忽略加 bias 的操作,这一步同样可以量化,不过中间有一些 trick,详见:):
    $$
    a_1^{i,k}=\sum_{j=1}^N x^{i,j}w_1^{j,k} \tag{9}
    $$
    根据 $(7)$ 式的转换,我们可以得到:
    $$
    q_{a1}^{i,k}=M\sum_{j=1}^N(q_x^{i,j}-Z_x)(q_{w1}^{j,k}-Z_{w1})+Z_{a1} \tag{10}
    $$
    其中 $M=\frac{S_{w1}S_{x}}{S_{a1}}$,我们得到量化conv后的输出。

  • relu量化

    得到 conv 的输出后,我们不用反量化回 $a_1$,直接用 $q_{a1}$ 继续后面的计算即可。

    对于量化的 relu 来说,计算公式不再是$q_{a2}=max(q_{a1}, 0)$,而是 $q_{a2}=max(q_{a1},Z_{a1})$ ,并且$S_{a1}=S_{a2}$, $Z_{a1}=Z_{a2}$ (为什么是这样,这一点留作思考题),我们得到量化relu后的输出。

    另外,在实际部署的时候,relu 或者 bn 通常是会整合到 conv 中一起计算的,这一点在之后的文章再讲解。

  • fc量化

    得到 $q_{a2}$ 后,我们可以继续用 $(8)$ 式来计算 fc 层。假设网络输出为 y,对应的 scale 和 zero point 为 $S_y$、 $Z_y$ ,则量化后的 fc 层可以用如下公式计算:
    $$
    q_{y}^{i,k}=M\sum_{j=1}^N(q_{a2}^{i,j}-Z_{a2})(q_{w2}^{j,k}-Z_{w2})+Z_{y}\tag{11}
    $$
    然后通过公式 $y=S_y(q_y-Z_y)$ 把结果反量化回去,就可以得到近似原来全精度模型的输出了。

总结

可以看到,上面整个流程都是用定点运算实现的。我们在得到全精度的模型后,可以事先统计出 $weight$ 以及中间各个 feature map 的 $min$、$max$,并以此计算出 scale 和 zero point,然后把 $weight$ 量化成 int8/int16 型的整数后,整个网络便完成了量化,然后就可以依据上面的流程做量化推理了。

K-Means到底做了什么?

image-20231021105755989

例如上图,K-Means对这些数据做了这些事:

  1. 分类:将数据集划分为K个组(簇),使得同一簇内的数据点彼此相似度较高,而不同簇之间的相似度较低。

  2. 确定聚类中心。

  3. 代码示例:

    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torchvision import datasets, transforms
    import torch.utils.data
    import numpy as np
    from sklearn.cluster import KMeans
    import matplotlib.pyplot as plt
    import math
    from copy import deepcopy
    def k_means_cpu(weight, n_clusters, init='k-means++', max_iter=50): # 权重矩阵 聚类的簇数 初始化方法 最大迭代次数
        
        org_shape = weight.shape # 保存原始权重矩阵的形状
        weight = weight.reshape(-1, 1)  # 将数组展平为二维数组,其中n是指定的列数,自动计算行数
        
        # 如果聚类的簇数n_clusters大于权重数组的大小,则将n_clusters设置为权重数组的大小。
        if n_clusters > weight.size: # 如果聚类的簇数n_clusters大于权重矩阵的大小,那么聚类的结果就没有意义,因为每个簇至少应该包含一个样本
            n_clusters = weight.size
    
        k_means = KMeans(n_clusters=n_clusters, init=init, n_init=1, max_iter=max_iter)
        
        k_means.fit(weight) # 传入权重矩阵作为聚类算法的输入,可以根据权重矩阵中的数值来计算数据样本之间的相似度或距离,并根据相似度或距离将样本分配到不同的簇中。
    
        centroids = k_means.cluster_centers_ # 存储聚类中心
        labels = k_means.labels_ # 存储聚类标签
        labels = labels.reshape(org_shape) # 将labels的形状调整为原始形状
        """
            1.centroids 是一个Numpy数组,先通过torch.from_numpy()转换为PyTorch张量
            2.cuda()函数将张量移动到GPU上,使其能够在GPU上进行计算
            3.view(1, -1)将张量的形状调整为1行,列数自动计算
            4.int()转换为整数类型
        """
        return torch.from_numpy(centroids).cuda().view(1, -1), torch.from_numpy(labels).int().cuda() 
    
    
    
    def reconstruct_weight_from_k_means_result(centroids, labels): # 聚类中心  标签
        weight = torch.zeros_like(labels).float().cuda() # 创建一个与labels相同大小的全零张量 weight,并将其转换为 float 类型,并将其存储在 GPU 上。
        for i, c in enumerate(centroids.cpu().numpy().squeeze()): # 使用for循环遍历centroids,将其转换为NumPy数组,并使用squeeze函数去除维度为1的维度
            weight[labels == i] = c.item() # 使用 item 函数将c转换为标量,并将其赋值给 weight 中所有标签为 i 的元素,即找到符合当前中心的标签 
        return weight # 返回重构之后的权重
    
    '''
    实现使用K-means算法对权重矩阵进行聚类,并通过聚类结果重构权重矩阵的功能。
    '''
    # w  = torch.rand(4,5).numpy()
    # print("初始权重函数:\n", w)
    # centroids,labels = k_means_cpu(w,n_clusters=2)
    # print("初始簇类中心:\n",centroids)
    # print("初始标签:\n",labels)
    
    # w_q = reconstruct_weight_from_k_means_result(centroids,labels)
    # print("重构后的权重矩阵:\n",w_q)
    class QuantLinear(nn.Linear): # 使用库中的nn模块
        def __init__(self, in_features, out_features, bias=True):
            super(QuantLinear, self).__init__(in_features, out_features, bias) # 继承父类的属性和方法,初始化当前的对象
            self.weight_labels = None # 权重标签
            self.bias_labels = None # 偏置标签
            self.num_cent = None # 分成多少个类
            self.quant_flag = False # 该层是否被量化过的标志
            '''
            偏置的作用是引入模型的偏移,使其能够更好地拟合训练数据。
            模型的偏移是指模型在拟合训练数据时引入的偏差或错误,它表示模型在处理输入数据时的系统性偏离真实值的能力。
            我们通常模型的偏移尽可能的小。
            '''
            self.quant_bias = False # 是否量化偏置,一般不量化。
    
    
        # K-Means 量化函数    
        def kmeans_quant(self, bias=False, quantize_bit=4): # 自身 是否需要量化bias 量化为多少个bit
            self.num_cent = 2 ** quantize_bit # 确定聚类中心的个数,这导致最终数据会被划分为多少个类
            
            w = self.weight.data # 获取权重
            centroids, self.weight_labels = k_means_cpu(w.cpu().numpy(), self.num_cent) # 将权重和中心的个数传给之前的聚类函数
            w_q = reconstruct_weight_from_k_means_result(centroids, self.weight_labels) # 通过权重和中心点,重构权重矩阵
            self.weight.data = w_q.float()
            
            # 如果需要量化 bias 
            if bias:
                b = self.bias.data
                centroids, self.bias_labels = k_means_cpu(b.cpu().numpy(), self.num_cent)
                b_q = reconstruct_weight_from_k_means_result(centroids, self.bias_labels)
                self.bias.data = b_q.float()
            
            self.quant_flag = True # 修改 bias 标志位
            self.quant_bias = bias # 存储 bias
        
        # 根据权重和偏置的标签,计算新的权重和偏置数据
        def kmeans_update(self):
            if not self.quant_flag: # 检查是否需要进行量化的标志位,如果不需要,直接返回
                return
            
            new_weight_data = torch.zeros_like(self.weight_labels).float().cuda() # 创建一个和权重标签相同形状的全零张量,转换为浮点型,移动到GPU上
            for i in range(self.num_cent): # 使用循环遍历每个聚类中心
                mask_cl = (self.weight_labels == i).float() # 用于标识权重标签等于当前聚类中心的位置,并将其转换为浮点型
                # 使用加权平均值可以考虑每个样本的权重或重要性,同时可以更好地处理零权重,它们对最终结果的贡献应该被忽略
                new_weight_data += (self.weight.data * mask_cl).sum() / mask_cl.sum() * mask_cl # 对于每个聚类中心,计算权重数据的加权平均值 * 掩码
            self.weight.data = new_weight_data # 将新的权重数据赋值给weight.data
            
            if self.quant_bias:
                new_bias_data = torch.zeros_like(self.bias_labels).float().cuda()
                for i in range(self.num_cent):
                    mask_cl = (self.bias_labels == i).float()
                    new_bias_data += (self.bias.data * mask_cl).sum() / mask_cl.sum() * mask_cl
                self.bias.data = new_bias_data
    
    # 卷积层 主要用于提取图像或特征图中的局部特征            
    class QuantConv2d(nn.Conv2d):
        def __init__(self, in_channels, out_channels, kernel_size, stride=1,
                     padding=0, dilation=1, groups=1, bias=True):
            super(QuantConv2d, self).__init__(in_channels, out_channels, 
                kernel_size, stride, padding, dilation, groups, bias)
            self.weight_labels = None
            self.bias_labels = None
            self.num_cent = None
            self.quant_flag = False
            self.quant_bias = False
            
        def kmeans_quant(self, bias=False, quantize_bit=4):
            self.num_cent = 2 ** quantize_bit
            
            w = self.weight.data
            centroids, self.weight_labels = k_means_cpu(w.cpu().numpy(), self.num_cent)
            w_q = reconstruct_weight_from_k_means_result(centroids, self.weight_labels)
            self.weight.data = w_q.float()
            
            if bias:
                b = self.bias.data
                centroids, self.bias_labels = k_means_cpu(b.cpu().numpy(), self.num_cent)
                b_q = reconstruct_weight_from_k_means_result(centroids, self.bias_labels)
                self.bias.data = b_q.float()
            
            self.quant_flag = True
            self.quant_bias = bias
        
        def kmeans_update(self):
            if not self.quant_flag:
                return
            
            new_weight_data = torch.zeros_like(self.weight_labels).float().cuda()
            for i in range(self.num_cent):
                mask_cl = (self.weight_labels == i).float()
                new_weight_data += (self.weight.data * mask_cl).sum() / mask_cl.sum() * mask_cl
            self.weight.data = new_weight_data
            
            if self.quant_bias:
                new_bias_data = torch.zeros_like(self.bias_labels).float().cuda()
                for i in range(self.num_cent):
                    mask_cl = (self.bias_labels == i).float()
                    new_bias_data += (self.bias.data * mask_cl).sum() / mask_cl.sum() * mask_cl
                self.bias.data = new_bias_data
    
    
    class ConvNet(nn.Module):
        # 该类为一个卷积神经网络模型
        def __init__(self):
            super(ConvNet, self).__init__()
    
            self.conv1 = QuantConv2d(1, 32, kernel_size=3, padding=1, stride=1) # 创建一个名为conv1的卷积层,输入通道数为1,输出通道数为32,卷积核大小为3,填充为1,步长为1。
            self.relu1 = nn.ReLU(inplace=True) # 创建一个名为relu1的ReLU激活函数,inplace参数为True表示原地操作。
            self.maxpool1 = nn.MaxPool2d(2) # 创建一个名为maxpool1的最大池化层,池化核大小为2。
    
            self.conv2 = QuantConv2d(32, 64, kernel_size=3, padding=1, stride=1)
            self.relu2 = nn.ReLU(inplace=True)
            self.maxpool2 = nn.MaxPool2d(2)
    
            self.conv3 = QuantConv2d(64, 64, kernel_size=3, padding=1, stride=1)
            self.relu3 = nn.ReLU(inplace=True)
    
            self.linear1 = QuantLinear(7*7*64, 10)
        # 向前传播函数    
        def forward(self, x):
            out = self.maxpool1(self.relu1(self.conv1(x))) # 卷积操作(提取图像特征)-> 激活函数(对特征进行非线性映射)->池化操作(减小特征图的尺寸,保留主要特征)
            
            out = self.maxpool2(self.relu2(self.conv2(out)))
            out = self.relu3(self.conv3(out))
            out = out.view(out.size(0), -1) # 视图变换:将多维张量重塑为一维向量,以便进行线性变换
            out = self.linear1(out) # 通过矩阵乘法和偏置项加法,将输入特征进行线性组合,得到输出特征
            # 按照这个顺序执行这些操作,神经网络能够逐步提取输入的特征,并通过线性变换将其转换为最终的输出
            return out
    
        def kmeans_quant(self, bias=False, quantize_bit=4):
            # Should be a less manual way to quantize
            # Leave it for the future
            
            self.conv1.kmeans_quant(bias, quantize_bit)
            self.conv2.kmeans_quant(bias, quantize_bit)
            self.conv3.kmeans_quant(bias, quantize_bit)
            self.linear1.kmeans_quant(bias, quantize_bit)
        
        def kmeans_update(self):
            self.conv1.kmeans_update()
            self.conv2.kmeans_update()
            self.conv3.kmeans_update()
            self.linear1.kmeans_update()
    def train(model, device, train_loader, optimizer, epoch):
        model.train()
        total = 0
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device) # 移动到指定的设备上
            optimizer.zero_grad() # 将模型的梯度置零,以准备计算新的梯度
            output = model(data) # 将数据传给模型,获得模型的输出
            loss = F.cross_entropy(output, target) # 计算输出和目标之间的交叉熵损失
            loss.backward() # 计算损失相对于模型参数的梯度
            optimizer.step() # 更新模型参数,以最小化损失
            total += len(data) # 更新 total 变量
            progress = math.ceil(batch_idx / len(train_loader) * 50) # 根据当前批次的索引和训练数据加载器的长度计算进度条的长度
            print("\rTrain epoch %d: %d/%d, [%-51s] %d%%" %
                  (epoch, total, len(train_loader.dataset),
                   '-' * progress + '>', progress * 2), end='')
    def test(model, device, test_loader):
        model.eval()
        test_loss = 0
        correct = 0
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                test_loss += F.cross_entropy(output, target, reduction='sum').item()  # sum up batch loss
                pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
                correct += pred.eq(target.view_as(pred)).sum().item()
    
        test_loss /= len(test_loader.dataset)
    
        print('\nTest: average loss: {:.4f}, accuracy: {}/{} ({:.0f}%)'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))
        return test_loss, correct / len(test_loader.dataset)
    def main():
        epochs = 2 
        batch_size = 64
        torch.manual_seed(0) # 设置随机种子并选择设备(GPU或CPU)
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
        train_loader = torch.utils.data.DataLoader( # 数据加载器,按照批次大小和随机顺序加载到模型中进行训练
            datasets.MNIST('../data/MNIST', train=True, download=False,
                           transform=transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize((0.1307,), (0.3081,)) # 创建数据集
                           ])),
            batch_size=batch_size, shuffle=True)
        test_loader = torch.utils.data.DataLoader(
            datasets.MNIST('../data/MNIST', train=False, download=False, transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))
            ])),
            batch_size=1000, shuffle=True)
    
        model = ConvNet().to(device) # 创建了一个卷积神经网络模型并部署到设备上
        
        optimizer = torch.optim.Adadelta(model.parameters()) # 使用优化器来优化模型的参数
        
        for epoch in range(1, epochs + 1):
             train(model, device, train_loader, optimizer, epoch)
             _, acc = test(model, device, test_loader)
        
        quant_model = deepcopy(model)
        print('=='*10)
        print('2 bits quantization')
        quant_model.kmeans_quant(bias=False, quantize_bit=2)
        _, acc = test(quant_model, device, test_loader)
            
        return model, quant_model
    model, quant_model = main()
    ## 可视化
    from matplotlib import pyplot as plt
    def plot_weights(model):
        modules = [module for module in model.modules()]
        num_sub_plot = 0
        for i, layer in enumerate(modules):
            if hasattr(layer, 'weight'):
                plt.subplot(221+num_sub_plot)
                w = layer.weight.data
                w_one_dim = w.cpu().numpy().flatten()
                plt.hist(w_one_dim, bins=50)
                num_sub_plot += 1
        plt.show()
    
    plot_weights(model)
    plot_weights(quant_model)
    

线性量化

代码示例:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
import torch.utils.data
import numpy as np
from sklearn.cluster import KMeans
import math
from copy import deepcopy
def quantize_tensor(x, num_bits=8): # 张量 量化的位数 
    qmin = 0. # 量化的最小值
    qmax = 2.**num_bits - 1. # 量化的最大值
    
    '''
    张量类似于多维数组或矩阵,它是由一个或多个标量组成。
    本函数输入:
    tensor([[0.7687, 0.7557, 0.1392, 0.5356],
        [0.7059, 0.2987, 0.7618, 0.4284],
        [0.4509, 0.7673, 0.0345, 0.8120]])
    '''
    min_val, max_val = x.min(), x.max() # 获得张量的最小值和最大值

    scale = (max_val - min_val) / (qmax - qmin) # 计算缩放因子

    initial_zero_point = qmin - min_val / scale # 将张量的最小值映射到量化范围的零点

    zero_point = 0
    # 初始化零点
    if initial_zero_point < qmin:
        zero_point = qmin
    elif initial_zero_point > qmax:
        zero_point = qmax
    else:
        zero_point = initial_zero_point

    zero_point = int(zero_point)
    q_x = zero_point + x / scale # 进行量化,零点 + 输入张量 / 缩放因子
    q_x.clamp_(qmin, qmax).round_() # 进行截断操作,将超过量化范围的值截断到最大值或最小值,并进行四舍五入
    q_x = q_x.round().byte() # 转换为字节类型
    return q_x, scale, zero_point


def dequantize_tensor(q_x, scale, zero_point):
    return scale * (q_x.float() - zero_point) # 重构向量x

w = torch.rand(3,4)
print(w)

print(dequantize_tensor(*quantize_tensor(w)))
class QuantLinear(nn.Linear):
    def __init__(self, in_features, out_features, bias=True):
        super(QuantLinear, self).__init__(in_features, out_features, bias)
        self.quant_flag = False
        self.scale = None
        self.zero_point = None
    
    def linear_quant(self, quantize_bit=8):
        
        quantized_weight, scale, zero_point = quantize_tensor(self.weight.data.detach(), num_bits=quantize_bit)
        self.weight.data = quantized_weight.float()
        self.scale = scale
        self.zero_point = zero_point

    def forward(self, x):
        if self.quant_flag == True:
            weight = dequantize_tensor(self.weight, self.scale, self.zero_point)
            return F.linear(x, weight, self.bias)
        else:
            return F.linear(x, self.weight, self.bias)
        
            
class QuantConv2d(nn.Conv2d):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1,
                 padding=0, dilation=1, groups=1, bias=True):
        super(QuantConv2d, self).__init__(in_channels, out_channels, 
            kernel_size, stride, padding, dilation, groups, bias)
        self.quant_flag = False
        self.scale = None
        self.zero_point = None
    
    def linear_quant(self, quantize_bit=8):
        
        
        quantized_weight, scale, zero_point = quantize_tensor(self.weight.data.detach(), num_bits=quantize_bit)
        self.weight.data = quantized_weight.float()
        self.scale = scale
        self.zero_point = zero_point
        
    def forward(self, x):
        if self.quant_flag == True:
            weight = dequantize_tensor(self.weight, self.scale, self.zero_point)
            return F.conv2d(x, weight, self.bias, self.stride,
                        self.padding, self.dilation, self.groups)
        else:
            return F.conv2d(x, self.weight, self.bias, self.stride,
                        self.padding, self.dilation, self.groups)
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()

        self.conv1 = QuantConv2d(1, 32, kernel_size=3, padding=1, stride=1)
        self.relu1 = nn.ReLU(inplace=True)
        self.maxpool1 = nn.MaxPool2d(2)

        self.conv2 = QuantConv2d(32, 64, kernel_size=3, padding=1, stride=1)
        self.relu2 = nn.ReLU(inplace=True)
        self.maxpool2 = nn.MaxPool2d(2)

        self.conv3 = QuantConv2d(64, 64, kernel_size=3, padding=1, stride=1)
        self.relu3 = nn.ReLU(inplace=True)

        self.linear1 = QuantLinear(7*7*64, 10)
        
    def forward(self, x):
        out = self.maxpool1(self.relu1(self.conv1(x)))
        out = self.maxpool2(self.relu2(self.conv2(out)))
        out = self.relu3(self.conv3(out))
        out = out.view(out.size(0), -1)
        out = self.linear1(out)
        return out

    def linear_quant(self, quantize_bit=8):
        # Should be a less manual way to quantize
        # Leave it for the future
        self.conv1.linear_quant(quantize_bit)
        self.conv2.linear_quant(quantize_bit)
        self.conv3.linear_quant(quantize_bit)
        self.linear1.linear_quant(quantize_bit)

def train(model, device, train_loader, optimizer, epoch):
    model.train()
    total = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()
        
        total += len(data)
        progress = math.ceil(batch_idx / len(train_loader) * 50)
        print("\rTrain epoch %d: %d/%d, [%-51s] %d%%" %
              (epoch, total, len(train_loader.dataset),
               '-' * progress + '>', progress * 2), end='')
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest: average loss: {:.4f}, accuracy: {}/{} ({:.0f}%)'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    return test_loss, correct / len(test_loader.dataset)
def main():
    epochs = 2
    batch_size = 64
    torch.manual_seed(0)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('../data/MNIST', train=True, download=False,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST('../data/MNIST', train=False, download=False, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])),
        batch_size=1000, shuffle=True)

    model = ConvNet().to(device)
    optimizer = torch.optim.Adadelta(model.parameters())
    
    for epoch in range(1, epochs + 1):
        train(model, device, train_loader, optimizer, epoch)
        _, acc = test(model, device, test_loader)
    
    quant_model = deepcopy(model)
    print('\n')
    print('=='*10)
    print('4 linear bits quantization')
    quant_model.linear_quant(quantize_bit=4)
    _, acc = test(quant_model, device, test_loader)
    
    return model, quant_model
model, quant_model = main()
## 可视化
from matplotlib import pyplot as plt
def plot_weights(model):
    modules = [module for module in model.modules()]
    num_sub_plot = 0
    for i, layer in enumerate(modules):
        if hasattr(layer, 'weight'):
            plt.subplot(221+num_sub_plot)
            w = layer.weight.data
            w_one_dim = w.cpu().numpy().flatten()
            plt.hist(w_one_dim, bins=50)
            num_sub_plot += 1
    plt.show()

plot_weights(model)
plot_weights(quant_model)

量化的方向分类

  • 量化的对象

    思考:我们可以在模型训练后再对模型进行量化,那我们可不可以在模型训练的同时进行量化呢?

  • 量化的颗粒度

    思考:我们可以对整个模型进行量化,那我们可不可以对模型的每一层运用不同的量化呢?我们可不可以对每个卷积核进行量化呢?我们可不可以甚至对每个元素进行量化呢?

  • 按需量化

    思考:我们可不可以对一个模型进行按需量化?针对不同模型的精读要求,那我们就可以采用对应的量化措施。

Transform使用方法

图像处理:以后再写

参考


文章作者: QT-7274
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 QT-7274 !
评论
  目录