LSQ的Pytorch代码实现


加载配置文件

script_dir = Path.cwd() # 获取当前工作目录的路径:d:/..../lsq-net-master
args = util.get_config(default_file = script_dir / 'config.yaml') # 参数为当前目录下的配置文件路径,返回为 Munch 类型

其中处理配置项的功能函数定义了命令行参数解析器,并且能解析多个命令行配置文件:

def get_config(default_file):
    p = argparse.ArgumentParser(description='Learned Step Size Quantization')
    p.add_argument('config_file', metavar='PATH', nargs='+',
                   help='path to a configuration file') # 添加命令行参数
    arg = p.parse_args() # 解析命令行参数,默认为 config_file = ['config.yaml']

    with open(default_file) as yaml_file: # 打开默认指定文件(只有一个文件),将内容读入yaml_file
        '''
        例如cfg的默认值为:
        {'name': 'MyProject', 
        'output_dir': 'out', 
        'device': {'type': 'cuda', 'gpu': [...]}, 
        ...
        '''
        cfg = yaml.safe_load(yaml_file) # 该函数将YAML内容转换为Python对象,并将结果存储在cfg中

    for f in arg.config_file: # 遍历命令行每个参数文件,因为设置了nargs为+,可接收多个配置文件
        if not os.path.isfile(f): # 检查f是否为一个存在的文件
            raise FileNotFoundError('Cannot find a configuration file at', f)
        with open(f) as yaml_file:
            c = yaml.safe_load(yaml_file)
            cfg = merge_nested_dict(cfg, c) # 将 cfg(第一个cfg是初始默认配置文件) 和 c 合并

    return munch.munchify(cfg) # 转换为 Munch 对象并返回,Munch对象允许你可以像访问对象的属性一样来访问值

【技巧代码】如何合并多个配置文件

def merge_nested_dict(d, other): # 初始配置文件 其他命令行配置文件
    new = dict(d) # 创建 d 的字典的副本
    for k, v in other.items():
        if d.get(k, None) is not None and type(v) is dict: # 检查 k 是否在字典 d 中存在(两个字典有同一个键),并且v的类型是否为字典
            new[k] = merge_nested_dict(d[k], v) # 递归合并,例如初始参数中 gpu 就符合条件
        else:
            new[k] = v
    return new

创建输出目录

output_dir = script_dir / args.output_dir # 默认 args.output_dir 为 out
output_dir.mkdir(exist_ok=True) #这个参数决定了当目录已经存在时,是否抛出一个异常。如果exist_ok被设置为True,那么如果目录已经存在,将不会抛出异常,而是会忽略这个操作。如果exist_ok被设置为False(这也是默认值),那么如果目录已经存在,将会抛出一个FileExistsError异常。

初始化日志记录器并保存当前配置

log_dir = util.init_logger(args.name, output_dir, script_dir / 'logging.conf') # 返回初始实验日志路径
logger = logging.getLogger() # 创建一个日志对象

with open(log_dir / "args.yaml", "w") as yaml_file:  # 将 args 转换为 YAML 格式,并写入 yaml_file
        yaml.safe_dump(args, yaml_file)

【技巧代码】创建每一次的实验文件及日志

def init_logger(experiment_name, output_dir, cfg_file=None):
    time_str = time.strftime("%Y%m%d-%H%M%S") # 获取当前时间 例如20231228-181202
    exp_full_name = time_str if experiment_name is None else experiment_name + '_' + time_str # 拼接实验名称
    log_dir = output_dir / exp_full_name # 设置新的日志路径
    log_dir.mkdir(exist_ok=True) # 创建路径
    log_file = log_dir / (exp_full_name + '.log') # 创建日志文件
    logging.config.fileConfig(cfg_file, defaults={'logfilename': log_file})
    logger = logging.getLogger() # 获取一个日志记录器对象,并将其存储在logger中
    logger.info('Log file for this run: ' + str(log_file)) # 例如 Log file for this run : d:\...\MyProject_20231228-181202\MyProject_20231228-181202.log
    return log_dir

初始化监视器

pymonitor = util.ProgressMonitor(logger) # 进度监视
tbmonitor = util.TensorBoardMonitor(logger, log_dir) # 可视化监视
monitors = [pymonitor, tbmonitor]

计算和存储平均值和当前值

class AverageMeter:
    """Computes and stores the average and current value"""

    def __init__(self, fmt='%.6f'): # 控制平均值的格式
        self.fmt = fmt
        self.val = self.avg = self.sum = self.count = 0

    def reset(self): # 重置所有属性为0
        self.val = self.avg = self.sum = self.count = 0

    def update(self, val, n=1):
        self.val = val # val 为新的值
        self.sum += val * n # n 为权重
        self.count += n # 计算权重个数
        self.avg = self.sum / self.count # 计算平均值

    def __str__(self): # 返回对象的字符串表示
        s = self.fmt % self.avg # 首先使用格式化平均值,再返回
        return s

进度监视器

class ProgressMonitor(Monitor): #  用于监视和记录训练过程中的进度
    def __init__(self, logger):
        super().__init__()
        self.logger = logger # 接受参数为日志记录器对象

    def update(self, epoch, step_idx, step_num, prefix, meter_dict): # 当前的训练轮数 当前的步骤索引 总的步骤数 日志消息的前缀 一个包含度量信息的字典
        msg = prefix # 创建一个新的字符串msg,初始化为日志信息的前缀
        if epoch > -1: # 除预训练模型以外
            msg += ' [%d][%5d/%5d]   ' % (epoch, step_idx, int(step_num)) # 在msg的末尾加上一些格式化的信息
        else:
            msg += ' [%5d/%5d]   ' % (step_idx, int(step_num)) # 否则只包括两个,没有epoch
        for k, v in meter_dict.items():
            msg += k + ' '
            if isinstance(v, AverageMeter): # 如果 v 是 AverageMeter 的实例
                msg += str(v) # msg后缀添加
            else:
                msg += '%.6f' % v # 添加v的浮点数表示
            msg += '   ' # msg末尾添加三个空格
        self.logger.info(msg) # 使用info方法来记录msg

TensorBoard进度监视器

class TensorBoardMonitor(Monitor):
    def __init__(self, logger, log_dir): # 日志记录器对象 日志目录
        super().__init__()
        self.writer = SummaryWriter(log_dir / 'tb_runs') # 用于在TensorBoard中写入数据,这个对象将数据写入到指定目录中
        logger.info('TensorBoard data directory: %s/tb_runs' % log_dir) # 表示数据目录的位置

    def update(self, epoch, step_idx, step_num, prefix, meter_dict):
        current_step = epoch * step_num + step_idx # 计算当前的步骤数
        for k, v in meter_dict.items():
            val = v.val if isinstance(v, AverageMeter) else v 
            self.writer.add_scalar(prefix + '/' + k, val, current_step) # TensorBoard添加标量数据

处理和设置GPU设备

if args.device.type == 'cpu' or not t.cuda.is_available() or args.device.gpu == []:
        args.device.gpu = []
    else:
        available_gpu = t.cuda.device_count()
        for dev_id in args.device.gpu:
            if dev_id >= available_gpu: # 如果参数中的gpu数量大于系统可用gpu,则会报错
                logger.error('GPU device ID {0} requested, but only {1} devices available'
                             .format(dev_id, available_gpu))
                exit(1)
        # Set default device in case the first one on the list
        t.cuda.set_device(args.device.gpu[0])
        # Enable the cudnn built-in auto-tuner to accelerating training, but it
        # will introduce some fluctuations in a narrow range.
        t.backends.cudnn.benchmark = True # 当你的输入数据的尺寸不变时,这个选项可以加速训练。cuDNN会自动寻找最适合当前配置的高效算法来执行卷积等操作。但是,这可能会在一定范围内引入一些波动。
        t.backends.cudnn.deterministic = False # 这行代码表示训练过程中的一些操作允许有一定的随机性。这可能会导致你每次运行模型时,即使输入数据和模型参数完全相同,也可能得到略有不同的结果。如果你需要完全可复现的结果,你应该将此选项设置为True。

初始化数据加载器

# Initialize data loader
    train_loader, val_loader, test_loader = util.load_data(args.dataloader)
    logger.info('Dataset `%s` size:' % args.dataloader.dataset +
                '\n          Training Set = %d (%d)' % (len(train_loader.sampler), len(train_loader)) +
                '\n        Validation Set = %d (%d)' % (len(val_loader.sampler), len(val_loader)) +
                '\n              Test Set = %d (%d)' % (len(test_loader.sampler), len(test_loader)))
  1. 检测用于验证的数据的比例是否合法:

    if cfg.val_split < 0 or cfg.val_split >= 1: # 检测用于验证的数据的比例是否合法
            raise ValueError('val_split should be in the range of [0, 1) but got %.3f' % cfg.val_split)
  2. 图像归一化

    tv_normalize = tv.transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                               std=[0.229, 0.224, 0.225]) # 用于对图像进行归一化,这里是预先计算好的RGB通道的均值和标准差
  3. 统一预处理数据集

    if cfg.dataset == 'imagenet':
           train_transform = tv.transforms.Compose([
               tv.transforms.RandomResizedCrop(224),
               tv.transforms.RandomHorizontalFlip(),
               tv.transforms.ToTensor(),
               tv_normalize
           ]) # 训练数据的预处理,包括随机裁剪、随机水平翻转、转换为张量、归一化等
           val_transform = tv.transforms.Compose([
               tv.transforms.Resize(256),
               tv.transforms.CenterCrop(224),
               tv.transforms.ToTensor(),
               tv_normalize
           ]) # 调整大小 中心裁剪 转换为张量 归一化
           
           train_set = tv.datasets.ImageFolder(
               root=os.path.join(cfg.path, 'train'), transform=train_transform)
           test_set = tv.datasets.ImageFolder(
               root=os.path.join(cfg.path, 'val'), transform=val_transform)
       
       elif cfg.dataset == 'cifar10': 
           train_transform = tv.transforms.Compose([
               tv.transforms.RandomHorizontalFlip(),
               tv.transforms.RandomCrop(32, 4), # size 和 padding:在裁剪前每一边都会被填充4个元素,然后从裁剪后的图像中随机裁剪出一个32×32像素的区域。
               tv.transforms.ToTensor(),
               tv_normalize
           ])
           val_transform = tv.transforms.Compose([
               tv.transforms.ToTensor(),
               tv_normalize
           ])
           # 参数文件中的path:'/localhome/fair/Dataset/cifar10'
           train_set = tv.datasets.CIFAR10(cfg.path, train=True, transform=train_transform, download=True)
           test_set = tv.datasets.CIFAR10(cfg.path, train=False, transform=val_transform, download=True)
       
       else:
           raise ValueError('load_data does not support dataset %s' % cfg.dataset)

    注意:该项目可使用imagenet或cifar10,imagenet需要提前下载,而cifar可借助torchvision自动下载。

    • 如果你的图像是彩色的,你可能需要使用transforms.ToTensor()将PIL图像或NumPy ndarray转换为torch.Tensor,并且在[0., 1.]范围内缩放图像的像素强度。
    • 如果你的图像的大小不一致,你可能需要使用transforms.Resize()将它们调整为相同的大小。
    • 许多预训练的模型要求输入的图像是224x224或其他特定大小的,因此你可能需要使用transforms.Resize()transforms.RandomResizedCrop()
    • 如果你的模型是在具有特定均值和标准差的图像上预训练的,你可能需要使用transforms.Normalize()来对你的图像进行归一化。
    • 为了提高模型的泛化能力,你可能需要在训练集上应用一些随机的变换,如transforms.RandomHorizontalFlip()transforms.RandomRotation()
  4. 根据验证集的比例划分训练集/验证集/测试集

    if cfg.val_split != 0: # 需要验证数据
            train_set, val_set = __balance_val_split(train_set, cfg.val_split) # 根据验证集的占比划分训练集和验证集
        else:
            # In this case, use the test set for validation
            val_set = test_set # 如果不需要验证集,则测试集全部为验证集

    【技巧代码】根据比例划分训练集和验证集

    def __balance_val_split(dataset, val_split=0.): # 训练数据集 验证数据集的占比
        targets = np.array(dataset.targets) # 获取数据集的标签并转换为Numpy数组
        train_indices, val_indices = train_test_split(
            np.arange(targets.shape[0]), # 训练集中的样本总数并转换为数组
            test_size=val_split, # 表示验证集的占比,如果不设置训练集的占比会默认验证集的占比的补数
            stratify=targets # 表示分割时要保持每个类别的比例,在这里实际分割的是上面的样本的索引数组,这样就可以根据索引来获取数据
        )
        train_dataset = t.utils.data.Subset(dataset, indices=train_indices) # 只包含训练集的数据
        val_dataset = t.utils.data.Subset(dataset, indices=val_indices) # 只包含测试集的数据
        return train_dataset, val_dataset
  5. 设置每个工作进程的状态

    worker_init_fn = None
       if cfg.deterministic:
           worker_init_fn = __deterministic_worker_init_fn

    【技巧代码】工作进程的可重复性

    def __deterministic_worker_init_fn(worker_id, seed=0): # 用于设置每个工作进程的随机种子
        import random
        random.seed(seed)
        np.random.seed(seed) # 影响Numpy生成的所有随机数
        t.manual_seed(seed)  # 影响PyTorch生成的所有随机数
  6. 使用数据加载器加载训练集/验证集/测试集

    train_loader = t.utils.data.DataLoader(
            train_set, cfg.batch_size, shuffle=True, num_workers=cfg.workers, pin_memory=True, worker_init_fn=worker_init_fn) # pin_memory会在返回批次之前,将数据放入CUDA固定的内存中,这可以加速将数据移动到GPU的速度
    val_loader = t.utils.data.DataLoader(
            val_set, cfg.batch_size, num_workers=cfg.workers, pin_memory=True, worker_init_fn=worker_init_fn)
    test_loader = t.utils.data.DataLoader(
            test_set, cfg.batch_size, num_workers=cfg.workers, pin_memory=True, worker_init_fn=worker_init_fn)
    
    return train_loader, val_loader, test_loader

创建模型

# Create the model
    model = create_model(args)
import logging

from .resnet import *
from .resnet_cifar import *


def create_model(args):
    logger = logging.getLogger()

    model = None
    if args.dataloader.dataset == 'imagenet':
        if args.arch == 'resnet18':
            model = resnet18(pretrained=args.pre_trained)
        elif args.arch == 'resnet34':
            model = resnet34(pretrained=args.pre_trained)
        elif args.arch == 'resnet50':
            model = resnet50(pretrained=args.pre_trained)
        elif args.arch == 'resnet101':
            model = resnet101(pretrained=args.pre_trained)
        elif args.arch == 'resnet152':
            model = resnet152(pretrained=args.pre_trained)
    elif args.dataloader.dataset == 'cifar10':
        if args.arch == 'resnet20':
            model = resnet20(pretrained=args.pre_trained)
        elif args.arch == 'resnet32':
            model = resnet32(pretrained=args.pre_trained)
        elif args.arch == 'resnet44':
            model = resnet44(pretrained=args.pre_trained)
        elif args.arch == 'resnet56':
            model = resnet56(pretrained=args.pre_trained)
        elif args.arch == 'resnet110':
            model = resnet152(pretrained=args.pre_trained)
        elif args.arch == 'resnet1202':
            model = resnet1202(pretrained=args.pre_trained)

    if model is None:
        logger.error('Model architecture `%s` for `%s` dataset is not supported' % (args.arch, args.dataloader.dataset))
        exit(-1)

    msg = 'Created `%s` model for `%s` dataset' % (args.arch, args.dataloader.dataset)
    msg += '\n          Use pre-trained model = %s' % args.pre_trained
    logger.info(msg)

    return model

以resnet20为例:

def _resnet(arch, block, layers, pretrained, progress, **kwargs): # 模型架构的名称 ResNet中的块类型 每个块中的层次数 是否加载预训练权重 是否显示下载进度条
    model = ResNet(block, layers, **kwargs) # ResNet模型

    if pretrained: # 是否需要加载预训练权重
        s = load_state_dict_from_url(model_urls[arch], progress=progress) # 从指定URL下载预训练权重
        state_dict = OrderedDict() # 有序字典,用于存储预训练权重
        for k, v in s['state_dict'].items():
            if k.startswith('module.'):
                state_dict[k[7:]] = v # 如果 k 以 'module.' 开头,则添加到字典中,但是去掉前缀
        model.load_state_dict(state_dict) # 将预训练权重加载到模型中
    return model # 返回模型


def resnet20(pretrained=False, progress=True):
    return _resnet('resnet20', BasicBlock, [3, 3, 3], pretrained, progress)

【技巧代码】预加载训练权重

if pretrained: # 是否需要加载预训练权重
        s = load_state_dict_from_url(model_urls[arch], progress=progress) # 从指定URL下载预训练权重
        state_dict = OrderedDict() # 有序字典,用于存储预训练权重
        for k, v in s['state_dict'].items():
            if k.startswith('module.'):
                state_dict[k[7:]] = v # 如果 k 以 'module.' 开头,则添加到字典中,但是去掉前缀
        model.load_state_dict(state_dict) # 将预训练权重加载到模型中

ResNet模型

class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 16

        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False) # 网络
        self.bn1 = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(block, 16, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 32, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 64, num_blocks[2], stride=2)
        self.linear = nn.Linear(64, num_classes)

        self.apply(_weights_init)

    def _make_layer(self, block, planes, num_blocks, stride): # 第一个卷积块的步长为stride,其余块的步长都为1
        strides = [stride] + [1] * (num_blocks - 1) # 初始步长 + 除第一个块之外的卷积块的数量
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion

        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        # out = F.avg_pool2d(out, out.size()[3])
        pool = nn.AdaptiveAvgPool2d((1,1))
        out = pool(out)
        

        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

基本块的构造

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1, option='A'):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential() # 快捷连接
        if stride != 1 or in_planes != planes: # 检查是否需要在快捷连接中添加卷积层。如果步长不为1或输入通道数与输出通道数不同,那么需要添加卷积层
            if option == 'A': # 如果选项为A
                """
                For CIFAR10 ResNet paper uses option A.

                LambdaLayer 是一个自定义的层,它允许你定义一个任意的函数作为网络的一部分
                x[:, :, ::2, ::2]: 对输入x进行下采样,即每隔一个像素取一个像素,这样可以将图像的高度和宽度减半。(batch_size,channels,height,width) ::2是从开始到结束,步长为2
                F.pad(..., (0, 0, 0, 0, planes // 4, planes // 4), 'constant', 0): 即在通道维度前后各添加 planes // 4 个零通道
                因为CIFAR10的图像尺寸较小,如果在shortcut中使用卷积操作可能会引入额外的复杂性和计算成本

                在PyTorch中,F.pad(input, pad, mode='constant', value=0) 是一个用于对输入张量进行填充的函数。这里的 F 是 torch.nn.functional 的别名。

                1.input 是需要填充的输入张量。
                2.pad 是一个元组,定义了各个维度两侧的填充量。长度应为2倍的张量维度,例如对于一个4维张量,pad 应为长度为8的元组。元组中的元素按照最后一个维度的后面,最后一个维度的前面,倒数第二个维度的后面,倒数第二个维度的前面,…,第一个维度的后面,第一个维度的前面的顺序排列。
                3.mode 定义了填充的方式,可以是 ‘constant’、‘reflect’ 或 ‘replicate’。默认为 ‘constant’。
                4.value 在 mode='constant' 时定义了填充的值。默认为0。
                """
                self.shortcut = LambdaLayer(lambda x:
                                            F.pad(x[:, :, ::2, ::2], (0, 0, 0, 0, planes // 4, planes // 4), "constant",
                                                  0)) # 添加一个LambdaLayer,它会对输入进行下采样并进行零填充
            elif option == 'B': 
                self.shortcut = nn.Sequential(
                    nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False), # 添加一个卷积层
                    nn.BatchNorm2d(self.expansion * planes) # 添加一个BN层
                )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

找到需要量化的模块

def find_modules_to_quantize(model, quan_scheduler): # 用于在给定的模型中找到需要量化的模块
    replaced_modules = dict() # 初始化空字典,用于存储需要被替换的模块
    for name, module in model.named_modules(): # 遍历模型中的所有模块 named_modules() 方法 会递归地返回模型中所有模块的名称和模块本身
        if type(module) in QuanModuleMapping.keys(): # 检查当前模块的类型是否在键中:t.nn.Conv2d 或 t.nn.Linear(通用量化处理)
            if name in quan_scheduler.excepts: # 说明这个模块有特殊的量化配置
                replaced_modules[name] = QuanModuleMapping[type(module)](
                    module,
                    quan_w_fn=quantizer(quan_scheduler.weight,
                                        quan_scheduler.excepts[name].weight),
                    quan_a_fn=quantizer(quan_scheduler.act,
                                        quan_scheduler.excepts[name].act)
                )
            else:
                replaced_modules[name] = QuanModuleMapping[type(module)](
                    module,
                    quan_w_fn=quantizer(quan_scheduler.weight),
                    quan_a_fn=quantizer(quan_scheduler.act)
                )
        elif name in quan_scheduler.excepts: # 说明用户想要对这个模块进行特殊的量化配置,但我们找不到对应的量化模块
            logging.warning('Cannot find module %s in the model, skip it' % name)

    return replaced_modules
QuanModuleMapping = {
    t.nn.Conv2d: QuanConv2d,
    t.nn.Linear: QuanLinear
}

根据位宽对激活函数/权重执行不同的量化配置函数

def quantizer(default_cfg, this_cfg=None): # 默认的量化配置 特定模块的量化配置
    target_cfg = dict(default_cfg)
    if this_cfg is not None: # 如果有特定模块的量化配置
        for k, v in this_cfg.items():
            target_cfg[k] = v # 覆盖默认的量化配置

    if target_cfg['bit'] is None:
        q = IdentityQuan # 不进行任何量化
    elif target_cfg['mode'] == 'lsq':
        q = LsqQuan 
    else:
        raise ValueError('Cannot find quantizer `%s`', target_cfg['mode'])

    target_cfg.pop('mode') # 移除mode字段
    return q(**target_cfg) # 执行相应的量化算法

【关键代码】LSQ量化激活函数/权重

import torch as t

from .quantizer import Quantizer


def grad_scale(x, scale):
    y = x
    y_grad = x * scale # 乘法操作可以直接改变梯度的大小,而不改变其方向
    return (y - y_grad).detach() + y_grad # 这个结果与x相同,但是梯度不同,因为(y - y_grad).detach()梯度为0,而y_grad的梯度是x的梯度乘以scale。这样,整个表达式的梯度就是x的梯度乘以scale。


def round_pass(x):
    y = x.round()
    y_grad = x
    return (y - y_grad).detach() + y_grad


class LsqQuan(Quantizer):
    def __init__(self, bit, all_positive=False, symmetric=False, per_channel=True): # 量化位宽 是否将所有数字量化为非负数 是否使用对称量化 是否每个输出通道使用自己的缩放因子
        super().__init__(bit)

        if all_positive:
            assert not symmetric, "Positive quantization cannot be symmetric" # 在量化为非负数的情况下,必须使用非对称量化
            # unsigned activation is quantized to [0, 2^b-1]
            self.thd_neg = 0
            self.thd_pos = 2 ** bit - 1
        else:
            if symmetric: # 对称量化
                # signed weight/activation is quantized to [-2^(b-1)+1, 2^(b-1)-1]
                self.thd_neg = - 2 ** (bit - 1) + 1
                self.thd_pos = 2 ** (bit - 1) - 1
            else: # 非对称量化
                # signed weight/activation is quantized to [-2^(b-1), 2^(b-1)-1]
                self.thd_neg = - 2 ** (bit - 1)
                self.thd_pos = 2 ** (bit - 1) - 1

        self.per_channel = per_channel
        self.s = t.nn.Parameter(t.ones(1)) # 创建一个全为1的新参数

    def init_from(self, x, *args, **kwargs): # 初始化缩放因子
        if self.per_channel:
            self.s = t.nn.Parameter(
                x.detach().abs().mean(dim=list(range(1, x.dim())), keepdim=True) * 2 / (self.thd_pos ** 0.5)) # 在除第一位(通道维)以外的所有维度上计算均值,然后将均值乘以2,然后除以上限的平方根、这是一个缩放操作,用于将s的初始值调整到合适的范围
        else:
            self.s = t.nn.Parameter(x.detach().abs().mean() * 2 / (self.thd_pos ** 0.5)) # 计算所有维度的均值

    def forward(self, x):
        if self.per_channel:
            s_grad_scale = 1.0 / ((self.thd_pos * x.numel()) ** 0.5) # 计算梯度缩放因子
        else:
            s_grad_scale = 1.0 / ((self.thd_pos * x.numel()) ** 0.5)
        s_scale = grad_scale(self.s, s_grad_scale) # 返回的是值与s相同但梯度不同的s_scale

        x = x / s_scale 
        x = t.clamp(x, self.thd_neg, self.thd_pos) # 限制在量化的范围内
        x = round_pass(x) # 进行四舍五入而不改变梯度
        x = x * s_scale # 四舍五入后再返回
        return x

注意:在PyTorch中,许多操作(如加、减、乘、除)都定义了如何计算梯度,这使得我们可以使用反向传播算法来训练神经网络。然而,四舍五入操作(round)并没有定义梯度,因为它在整数处是不连续的,所以在这些点上没有定义导数。

  • 因此,如果你直接对一个Tensor进行四舍五入,然后尝试计算梯度,你会发现梯度为None。因此作者使用了一个特殊的round_pass函数来进行四舍五入,而不是直接使用round函数。round_pass函数的作用是返回一个值等于四舍五入后的x,但梯度等于x的梯度的Tensor,这样就可以在进行四舍五入操作的同时保持梯度不变。
  • 同理grad_scale函数,如果你直接返回x * scale,那么得到的Tensor的值会是xscale倍,这并不是我们想要的。我们希望得到的Tensor的值仍然是x,但其梯度是x的梯度乘以scale

量化后的卷积层/线性层

import torch as t


class QuanConv2d(t.nn.Conv2d):
    def __init__(self, m: t.nn.Conv2d, quan_w_fn=None, quan_a_fn=None):
        assert type(m) == t.nn.Conv2d
        super().__init__(m.in_channels, m.out_channels, m.kernel_size,
                         stride=m.stride,
                         padding=m.padding,
                         dilation=m.dilation,
                         groups=m.groups,
                         bias=True if m.bias is not None else False,
                         padding_mode=m.padding_mode)
        self.quan_w_fn = quan_w_fn
        self.quan_a_fn = quan_a_fn

        self.weight = t.nn.Parameter(m.weight.detach())
        self.quan_w_fn.init_from(m.weight)
        if m.bias is not None:
            self.bias = t.nn.Parameter(m.bias.detach())

    def forward(self, x):
        quantized_weight = self.quan_w_fn(self.weight)
        quantized_act = self.quan_a_fn(x)
        return self._conv_forward(quantized_act, quantized_weight, bias = None)


class QuanLinear(t.nn.Linear):
    def __init__(self, m: t.nn.Linear, quan_w_fn=None, quan_a_fn=None):
        assert type(m) == t.nn.Linear
        super().__init__(m.in_features, m.out_features,
                         bias=True if m.bias is not None else False)
        self.quan_w_fn = quan_w_fn
        self.quan_a_fn = quan_a_fn

        self.weight = t.nn.Parameter(m.weight.detach())
        self.quan_w_fn.init_from(m.weight)
        if m.bias is not None:
            self.bias = t.nn.Parameter(m.bias.detach())

    def forward(self, x):
        quantized_weight = self.quan_w_fn(self.weight)
        quantized_act = self.quan_a_fn(x)
        return t.nn.functional.linear(quantized_act, quantized_weight, self.bias)


QuanModuleMapping = {
    t.nn.Conv2d: QuanConv2d,
    t.nn.Linear: QuanLinear
}

替换原有模型的网络层

modules_to_replace = quan.find_modules_to_quantize(model, args.quan)
model = quan.replace_module_by_names(model, modules_to_replace)
tbmonitor.writer.add_graph(model, input_to_model=train_loader.dataset[0][0].unsqueeze(0))
logger.info('Inserted quantizers into the original model')    

【技巧代码】替换原有模型的网络层为量化后的网络层

def replace_module_by_names(model, modules_to_replace): # 将初始化量化后的模块替换掉原模型中的模块
    def helper(child: t.nn.Module): 
        for n, c in child.named_children(): 
            if type(c) in QuanModuleMapping.keys(): 
                for full_name, m in model.named_modules(): 
                    if c is m:
                        child.add_module(n, modules_to_replace.pop(full_name))
                        break
            else:
                helper(c)

    helper(model)
    
    return model

加载预训练模型

if args.device.gpu and not args.dataloader.serialized:
        model = t.nn.DataParallel(model, device_ids=args.device.gpu) # 该类运行你在多个GPU上并行运行模型

    model.to(args.device.type)
    start_epoch = 0
    if args.resume.path: # 如果参数文件有预训练模型,则加载预训练模型
        model, start_epoch, _ = util.load_checkpoint(
            model, args.resume.path, args.device.type, lean=args.resume.lean)

【技巧代码】如何加载预训练模型

def load_checkpoint(model, chkp_file, model_device=None, strict=False, lean=False):
    """Load a pyTorch training checkpoint.
    Args:
        model: the pyTorch model to which we will load the parameters.  You can
        specify model=None if the checkpoint contains enough metadata to infer
        the model.  The order of the arguments is misleading and clunky, and is
        kept this way for backward compatibility. 我们将加载参数的Pytorch模型,如果检查点包含足够的元数据以推断模型,则可以指定model = None。参数的顺序令人误解和笨拙,并且保持这种方式是为了向后兼容。
        chkp_file: the checkpoint file 检查点文件
        lean: if set, read into model only 'state_dict' field 如果设置,只读取模型中的“state_dict”字段
        model_device [str]: if set, call model.to($model_device)
                This should be set to either 'cpu' or 'cuda'. 如果设置,调用model.to($ model_device)这应该设置为“cpu”或“cuda”之一。
    :returns: updated model, optimizer, start_epoch 返回更新的模型,优化器,start_epoch
    """
    if not os.path.isfile(chkp_file): # 如果 chkp_file 不是文件,则报错
        raise IOError('Cannot find a checkpoint at', chkp_file)
    # 加载通过torch.save()保存的序列化对象,这些对象通常包括模型的状态字典、优化器的状态字典等
    checkpoint = t.load(chkp_file, map_location=lambda storage, loc: storage) # lambada 函数无论原来的位置是什么,都会返回 storage,这意味着,无论张量原来在什么设备上,都会被放到内存cpu中

    if 'state_dict' not in checkpoint: # 检查点必须包含模型参数
        raise ValueError('Checkpoint must contain model parameters')

    extras = checkpoint.get('extras', None) # 获取额外的参数

    arch = checkpoint.get('arch', '_nameless_') # 获取模型的名称

    checkpoint_epoch = checkpoint.get('epoch', None) # 获取检查点的 epoch
    start_epoch = checkpoint_epoch + 1 if checkpoint_epoch is not None else 0 # 获取开始的 epoch

    

    anomalous_keys = model.load_state_dict(checkpoint['state_dict'], strict) # 加载模型参数
    if anomalous_keys: # 如果anomalous_keys不为空,说明加载的状态字典和模型的状态字典不完全匹配
        missing_keys, unexpected_keys = anomalous_keys # 包含了模型中没有的键和检查点中没有的键
        if unexpected_keys: # 如果有意外的键,发出警告
            logger.warning("The loaded checkpoint (%s) contains %d unexpected state keys" %
                           (chkp_file, len(unexpected_keys)))
        if missing_keys: # 如果有缺失的键,报错 
            raise ValueError("The loaded checkpoint (%s) is missing %d state keys" %
                             (chkp_file, len(missing_keys)))

    if model_device is not None:
        model.to(model_device)

    if lean: # 如果 lean 为 True,说明只需要模型,不需要其他状态信息
        logger.info("Loaded checkpoint %s model (next epoch %d) from %s", arch, 0, chkp_file)
        return model, 0, None
    else: # 否则返回模型,开始的 epoch,额外的状态信息
        logger.info("Loaded checkpoint %s model (next epoch %d) from %s", arch, start_epoch, chkp_file)
        return model, start_epoch, extras

参数器/学习率等调整函数

# Define loss function (criterion) and optimizer
   criterion = t.nn.CrossEntropyLoss().to(args.device.type)

   # optimizer = t.optim.Adam(model.parameters(), lr=args.optimizer.learning_rate)
   optimizer = t.optim.SGD(model.parameters(),
                           lr=args.optimizer.learning_rate,
                           momentum=args.optimizer.momentum,
                           weight_decay=args.optimizer.weight_decay)
   lr_scheduler = util.lr_scheduler(optimizer,
                                    batch_size=train_loader.batch_size,
                                    num_samples=len(train_loader.sampler),
                                    **args.lr_scheduler) # 学习率调度器,需要优化器,批量大小,样本数量,以及其他参数
   logger.info(('Optimizer: %s' % optimizer).replace('\n', '\n' + ' ' * 11))
   logger.info('LR scheduler: %s\n' % lr_scheduler)

得分输出函数

perf_scoreboard = process.PerformanceScoreboard(args.log.num_best_scores)

【技巧代码】得分输出函数top1/top5/top10

class PerformanceScoreboard: # 用于记录和更新训练过程中的最佳得分
    def __init__(self, num_best_scores):
        self.board = list()
        self.num_best_scores = num_best_scores

    def update(self, top1, top5, epoch):
        """ Update the list of top training scores achieved so far, and log the best scores so far"""
        self.board.append({'top1': top1, 'top5': top5, 'epoch': epoch})

        # Keep scoreboard sorted from best to worst, and sort by top1, top5 and epoch
        curr_len = min(self.num_best_scores, len(self.board)) # 最佳得分数量,为参数文件中的 num_best_scores 和当前得分数量的最小值
        self.board = sorted(self.board,
                            key=operator.itemgetter('top1', 'top5', 'epoch'),
                            reverse=True)[0:curr_len] # 按照 top1, top5, epoch 从大到小排序,取前 curr_len 个
        for idx in range(curr_len):
            score = self.board[idx]
            logger.info('Scoreboard best %d ==> Epoch [%d][Top1: %.3f   Top5: %.3f]',
                        idx + 1, score['epoch'], score['top1'], score['top5'])

    def is_best(self, epoch):
        return self.board[0]['epoch'] == epoch # 判断是否对应最佳得分

验证与训练数据集

if args.eval:
    process.validate(test_loader, model, criterion, -1, monitors, args)
else:  # training
    if args.resume.path or args.pre_trained:
        logger.info('>>>>>>>> Epoch -1 (pre-trained model evaluation)')
        top1, top5, _ = process.validate(val_loader, model, criterion,
                                         start_epoch - 1, monitors, args)
        perf_scoreboard.update(top1, top5, start_epoch - 1)
    for epoch in range(start_epoch, args.epochs):
        logger.info('>>>>>>>> Epoch %3d' % epoch)
        t_top1, t_top5, t_loss = process.train(train_loader, model, criterion, optimizer,
                                               lr_scheduler, epoch, monitors, args)
        v_top1, v_top5, v_loss = process.validate(val_loader, model, criterion, epoch, monitors, args)

        tbmonitor.writer.add_scalars('Train_vs_Validation/Loss', {'train': t_loss, 'val': v_loss}, epoch)
        tbmonitor.writer.add_scalars('Train_vs_Validation/Top1', {'train': t_top1, 'val': v_top1}, epoch)
        tbmonitor.writer.add_scalars('Train_vs_Validation/Top5', {'train': t_top5, 'val': v_top5}, epoch)

        perf_scoreboard.update(v_top1, v_top5, epoch)
        is_best = perf_scoreboard.is_best(epoch)
        util.save_checkpoint(epoch, args.arch, model, {'top1': v_top1, 'top5': v_top5}, is_best, args.name, log_dir)

    logger.info('>>>>>>>> Epoch -1 (final model evaluation)')
    process.validate(test_loader, model, criterion, -1, monitors, args)

tbmonitor.writer.close()  # close the TensorBoard
logger.info('Program completed successfully ... exiting ...')
logger.info('If you have any questions or suggestions, please visit: github.com/zhutmost/lsq-net')

有两部分可以影响到s的值:

  1. init_from方法:在这个方法中,s的初始值是根据输入数据x的统计信息计算得到的。如果per_channel为真,那么x的每个通道的均值会被用来计算s的初始值;否则,x的全局均值会被用来计算s的初始值。
  2. forward方法:在这个方法中,s被用来计算梯度缩放因子s_grad_scale,然后用grad_scale函数对其进行梯度缩放,得到用于量化xs_scale。这个s_scale的梯度会在反向传播过程中被计算,然后在优化步骤中被用来更新s的值。

文章作者: QT-7274
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 QT-7274 !
评论
  目录