一、实验内容
1.1 实验内容
1.2 实验要点
1.3 实验环境
二、实验原理
2.1 多层感知机mlp
2.2 卷积神经网络cnn
三、实验步骤
3.1 例子: 使用多层感知机进行姓氏分类
3.1.1 姓氏数据集
3.1.2 特征工程(词汇表、向量化器和dataloader)
3.1.3 姓氏分类器模型
一、实验内容
1、实验内容
感知器是现存最简单的神经网络。感知器的一个历史性的缺点是它不能学习数据中存在的一些非常重要的模式。例如,查看图4-1中绘制的数据点。这相当于非此即彼(xor)的情况,在这种情况下,决策边界不能是一条直线(也称为线性可分)。在这个例子中,感知器失败了。
在本次实验中,我们将探索传统上称为前馈网络的神经网络模型,以及两种前馈神经网络:多层感知器和卷积神经网络。多层感知器在结构上扩展了我们在实验3中研究的简单感知器,将多个感知器分组在一个单层,并将多个层叠加在一起。我们稍后将介绍多层感知器,并在“基于多层感知器的姓氏分类”中展示它们在多层分类中的应用。
本实验研究的第二种前馈神经网络,卷积神经网络,在处理数字信号时深受窗口滤波器的启发。通过这种窗口特性,卷积神经网络能够在输入中学习局部化模式,这不仅使其成为计算机视觉的主轴,而且是检测单词和句子等序列数据中的子结构的理想候选。我们在“卷积神经网络”中概述了卷积神经网络,并在“示例:使用cnn对姓氏进行分类”中演示了它们的使用。
在本实验中,多层感知器和卷积神经网络被分组在一起,因为它们都是前馈神经网络,并且与另一类神经网络——递归神经网络(rnns)形成对比,递归神经网络(rnns)允许反馈(或循环),这样每次计算都可以从之前的计算中获得信息。在实验6和实验7中,我们将介绍rnns以及为什么允许网络结构中的循环是有益的。
在我们介绍这些不同的模型时,需要理解事物如何工作的一个有用方法是在计算数据张量时注意它们的大小和形状。每种类型的神经网络层对它所计算的数据张量的大小和形状都有特定的影响,理解这种影响可以极大地有助于对这些模型的深入理解。
2、实验要点
通过“基于多层感知器的姓氏分类”,掌握多层感知器在多层分类中的应用
掌握每种类型的神经网络层对它所计算的数据张量的大小和形状的影响
3、实验环境
python 3.6.7
二、实验原理
1、多层感知机mlp
多层感知机(multilayer perceptron,mlp)是一种基本的前馈人工神经网络,被认为是最基本的神经网络构建模块之一。由多个神经元组成的多层结构。它包括输入层、至少一个或多个隐藏层以及一个输出层。每个隐藏层和输出层都由多个神经元(也称为节点)组成,每个节点都与前一层的每个节点相连,并具有一组权重和偏置。mlp的基本工作原理如下:
输入层:接受原始输入数据的层。每个输入特征被表示为一个节点,并传递给下一层。
隐藏层:通过对输入层的加权组合和应用激活函数来生成新的特征表示。每个隐藏层都有一组权重,用于加权输入,然后将加权输入传递给激活函数。常用的激活函数包括relu(rectified linear unit)、sigmoid和tanh等。
输出层:生成最终的输出。输出层的节点数取决于任务的性质,例如分类问题中可能有多个类别,回归问题中可能只有一个输出节点。输出层也会应用一个激活函数,例如对于二分类问题,常用的激活函数是sigmoid函数,对于多分类问题,常用的是softmax函数。
反向传播算法:用于训练mlp的主要方法之一。通过反向传播算法,mlp可以根据预测结果与真实标签之间的误差来调整权重和偏置,从而逐步提高模型的性能。
最简单的mlp,如图4-2所示,由三个表示阶段和两个线性层组成。第一阶段是输入向量。这是给定给模型的向量。在“示例:对餐馆评论的情绪进行分类”中,输入向量是yelp评论的一个收缩的one-hot表示。给定输入向量,第一个线性层计算一个隐藏向量——表示的第二阶段。隐藏向量之所以这样被调用,是因为它是位于输入和输出之间的层的输出。我们所说的“层的输出”是什么意思?理解这个的一种方法是隐藏向量中的值是组成该层的不同感知器的输出。使用这个隐藏的向量,第二个线性层计算一个输出向量。在像yelp评论分类这样的二进制任务中,输出向量仍然可以是1。在多类设置中,将在本实验后面的“示例:带有多层感知器的姓氏分类”一节中看到,输出向量是类数量的大小。虽然在这个例子中,我们只展示了一个隐藏的向量,但是有可能有多个中间阶段,每个阶段产生自己的隐藏向量。最终的隐藏向量总是通过线性层和非线性的组合映射到输出向量。
mlp可以用于解决各种机器学习任务,包括分类、回归和聚类等。通过增加隐藏层的数量和神经元的数量,mlp可以表示更复杂的函数关系,从而提高模型的灵活性和性能。
2、卷积神经网络cnn
在神经网络中,不同类型的层对输入数据张量的大小和形状都有影响。例如:
全连接层(dense):全连接层将输入数据张量展平,并将其与权重矩阵相乘,输出一个新的张量。这会改变张量的形状和大小。
卷积层(convolutional):卷积层通过滑动卷积核在输入数据上提取特征。卷积操作会改变数据张量的大小,通常会减小数据的空间维度。
池化层(pooling):池化层用于减小特征图的空间维度,通常通过取最大值或平均值来实现。池化操作会改变数据张量的大小,但不会改变其深度。
假设我们有一个输入图像,尺寸为32x32像素,有3个颜色通道(例如rgb图像),那么我们的输入张量形状为(32, 32, 3)。
(1)卷积层 (convolutional layer)
假设我们使用一个卷积层,参数如下:
过滤器数量(输出通道数):8
过滤器大小(核大小):3x3
填充(padding):1(same padding)
步幅(stride):1
计算过程:
输入张量形状:(32, 32, 3)
卷积核大小:3x3
填充:1
步幅:1
计算公式:
[ \text{输出高度} = \left\lfloor \frac{\text{输入高度} - \text{卷积核高度} + 2 \times \text{填充}}{\text{步幅}} \right\rfloor + 1 ]
[ \text{输出宽度} = \left\lfloor \frac{\text{输入宽度} - \text{卷积核宽度} + 2 \times \text{填充}}{\text{步幅}} \right\rfloor + 1 ]
输出张量形状:(32, 32, 8)
(2)池化层 (pooling layer)
假设我们使用一个最大池化层(max pooling),参数如下:
池化窗口大小:2x2
步幅:2
计算过程:
输入张量形状:(32, 32, 8)
计算公式:
[ \text{输出高度} = \left\lfloor \frac{\text{输入高度}}{\text{步幅}} \right\rfloor ]
[ \text{输出宽度} = \left\lfloor \frac{\text{输入宽度}}{\text{步幅}} \right\rfloor ]
输出张量形状:(16, 16, 8)
(3)全连接层 (fully connected layer)
假设我们使用一个全连接层,将前一层的输出展平并连接到一个具有10个神经元的输出层。
计算过程:
输入张量形状:(16, 16, 8)
展平操作:16 x 16 x 8 = 2048
输出张量形状:(2048,)
经过全连接层:
输出神经元数量:10
输出张量形状:(10,)
(4)总结
通过以上步骤,我们可以看到每种神经网络层如何影响输入张量的形状:
神经网络层 输入 输出
卷积层 (32,32,3) (32,32,8)
池化层 (32,32,8) (16,16,8)
全连接层 (16,16,8) (2048,)到(10,)
三、实验步骤
3.1 例子: 使用多层感知机进行姓氏分类
本节展示如何使用mlp将姓氏分类到原籍国。此任务通过推断人口统计信息,具有重要应用。处理时需谨慎对待“受保护属性”。
我们将姓氏拆分为字符,处理方式类似于“例子: 将餐馆评论的情绪分类”。字符级模型在结构和实现上与单词级模型类似。
关键教训是mlp的实现和训练直接来源于第3章中的感知器。本节不包含“例子: 餐馆评论的情绪分类”中的代码。
本节描述了姓氏数据集的预处理步骤,并使用词汇表、向量化器和dataloader类完成从姓氏字符串到向量化小批处理的管道。
接下来,我们描述姓氏分类器模型及其设计思路。mlp与实验3中的感知器类似,但增加了多类输出及其对应的损失函数。
3.1.1 姓氏数据集
姓氏数据集,它收集了来自18个不同国家的10,000个姓氏,这些姓氏是作者从互联网上不同的姓名来源收集的。该数据集将在本课程实验的几个示例中重用,并具有一些使其有趣的属性。第一个性质是它是相当不平衡的。排名前三的课程占数据的60%以上:27%是英语,21%是俄语,14%是阿拉伯语。剩下的15个民族的频率也在下降——这也是语言特有的特性。第二个特点是,在国籍和姓氏正字法(拼写)之间有一种有效和直观的关系。有些拼写变体与原籍国联系非常紧密(比如“o ‘neill”、“antonopoulos”、“nagasawa”或“zhu”)。
流程如下
1.读取原始数据:从 csv 文件中读取原始训练数据。(输入)
↓
2.存储数据:创建一个按国籍存储数据的字典。
↓
3.划分数据集:将数据集划分为训练集、验证集和测试集。
↓
4.保存处理后的数据:将最终处理后的数据保存到 csv 文件中。(输出)
下面开始逐步编码
第1步:先来定义一些参数:
我们要先定义一个自定义的 surnamedataset 类,继承自 dataset 类。它的 __getitem__ 方法用于获取指定索引处的数据样本。它从目标数据框中提取出对应索引的姓氏和国籍信息,并使用 vectorizer 对象将姓氏转换为向量,同时使用 vocabulary 对象查找国籍对应的索引。最后,它返回一个包含姓氏向量和国籍索引的字典作为数据样本。
from torch.utils.data import dataset
class surnamedataset(dataset):
def __getitem__(self, index):
row = self._target_df.iloc[index] # 从目标数据框中获取指定索引的行
surname_vector = \
self._vectorizer.vectorize(row.surname) # 将姓氏转换为向量
nationality_index = \
self._vectorizer.nationality_vocab.lookup_token(row.nationality) # 查找国籍的索引
return {'x_surname': surname_vector, # 返回字典,包含姓氏向量和国籍索引
'y_nationality': nationality_index}
原始数据集
https://course.educg.net/a5eedf85fdf8ef11eeb6b027c1b76ae8/lab/tree/surnames.csv
import collections
import numpy as np
import pandas as pd
import re
from argparse import namespace
# 划分数据集(也可以说是存储划分数据集参数)
args = namespace(
raw_dataset_csv="surnames.csv",
train_proportion=0.7,
val_proportion=0.15,
test_proportion=0.15,
output_munged_csv="surnames_with_splits.csv",
seed=1337
)
1.读取原始数据:从 csv 文件中读取原始训练数据。
# 读取
surnames = pd.read_csv(args.raw_dataset_csv, header=0)
# 看看前几行知道长啥样
surnames.head()
# 类别
set(surnames.nationality)
2.存储数据:创建一个按国籍存储数据的字典。
# 按国籍划分训练集
# 创建字典
by_nationality = collections.defaultdict(list)
for _, row in surnames.iterrows():
by_nationality[row.nationality].append(row.to_dict())
3.划分数据集:将数据集划分为训练集、验证集和测试集。
分组:将字典按 nationality 列进行分组。
↓
按比例分割数据:将每个评分的数据按比例分为训练集、验证集和测试集。
↓
逐个处理并标记:为每个数据点标记 split 属性。
↓
合并数据:将处理后的数据添加到 final_list 中。
from tqdm.notebook import tqdm # 画进度条的
# 创建分割数据
final_list = []
np.random.seed(args.seed)
# for _, item_list in sorted(by_nationality.items()):
for _, item_list in tqdm(sorted(by_nationality.items()), desc="处理进度"):
np.random.shuffle(item_list)
n = len(item_list)
# 计算每个集数量
n_train = int(args.train_proportion*n)
n_val = int(args.val_proportion*n)
n_test = int(args.test_proportion*n)
# 贴标签
for item in item_list[:n_train]: # 不想用原来的
# for item in tqdm(item_list[:n_train], desc="处理训练集"): # 我改成有进度条的(下同)
item['split'] = 'train'
for item in item_list[n_train:n_train+n_val]:
# for item in tqdm(item_list[n_train:n_train+n_val], desc="处理验证集"):
item['split'] = 'val'
for item in item_list[n_train+n_val:]:
# for item in tqdm(item_list[n_train+n_val:n_train+n_val+n_test], desc="处理测试集"):
item['split'] = 'test'
# 写入
final_list.extend(item_list)
# 将分割数据写入文件
final_surnames = pd.dataframe(final_list)
# 康康每个数据集的样本数
final_surnames.split.value_counts()
final_surnames.head() # 看看前几行知道长啥样
4.保存处理后的数据:将最终处理后的数据保存到 csv 文件中。(输出)
# 改成csv
final_surnames.to_csv(args.output_munged_csv, index=false)
搞好之后应该是这个
https://course.educg.net/a5eedf85fdf8ef11eeb6b027c1b76ae8/lab/tree/surnames_with_splits.csv
3.1.2 特征工程(词汇表、向量化器和dataloader)
在姓氏分类任务中,我们利用词汇表、向量化器和dataloader将姓氏字符串转换为向量化的小批量数据。这些数据结构类似于用于情感分类的示例中的结构。
姓氏向量化器将姓氏字符串转换为向量,利用词汇表将字符映射到整数,然后通过创建one-hot向量表示。
(后续实验中将介绍其他向量化方法,如热门矩阵和嵌入层,它们可能在某些情况下更有效)
3.1.2.1 词汇表(the vocabulary)
图3.3.1 特征工程1:词汇表(the vocabulary)
功能表:
一、功能:
1.映射现有的令牌:传入一个包含令牌到索引(数字)映射的字典,词汇表会用这个字典来初始化自己。
2.处理未知令牌(unk):词汇表可以自动添加一个特殊的“未知”令牌(通常表示为 )。这个令牌用于处理那些在构建词汇表时没有见过的单词。当遇到这些未知单词时,词汇表会返回这个特殊令牌对应的索引,以确保程序不会因为找不到单词而报错。
二、令牌管理函数:
to_serializable(self): 返回可序列化的字典。
from_serializable(cls, contents): 从序列化字典实例化词汇表。
add_token(token): 向词汇表中添加新令牌,并返回对应的索引。
add_many(self, tokens): 添加多个令牌,返回索引列表。
lookup_token(token): 查找与令牌关联的索引,不存在且 add_unk 为 true,则返回 unk 索引。
lookup_index(index): 根据索引查找对应的令牌,如果索引不存在,则报错(keyerror)。
三、辅助函数:
__str__(): 返回词汇表的字符串表示,显示词汇表的大小。
__len__(): 返回词汇表中的令牌数量。
class vocabulary(object):
"""处理文本并提取词汇表以进行映射的类"""
def __init__(self, token_to_idx=none, add_unk=true, unk_token=""):
"""
参数:
token_to_idx (dict): 预先存在的令牌到索引的映射
add_unk (bool): 是否添加unk令牌的标志
unk_token (str): 要添加到词汇表中的unk令牌
"""
if token_to_idx is none:
token_to_idx = {} # 如果没有提供token_to_idx,初始化为空字典
self._token_to_idx = token_to_idx # 保存令牌到索引的映射
self._idx_to_token = {idx: token
for token, idx in self._token_to_idx.items()} # 创建索引到令牌的反向映射
self._add_unk = add_unk # 是否添加unk令牌的标志
self._unk_token = unk_token # unk令牌的值
self.unk_index = -1 # 初始化unk令牌的索引
if add_unk:
self.unk_index = self.add_token(unk_token) # 如果需要添加unk令牌,则添加并保存其索引
def to_serializable(self):
"""返回可以序列化的字典"""
return {'token_to_idx': self._token_to_idx,
'add_unk': self._add_unk,
'unk_token': self._unk_token} # 返回包含词汇表信息的字典
@classmethod
def from_serializable(cls, contents):
"""从序列化字典实例化词汇表"""
return cls(**contents) # 使用字典内容实例化词汇表
def add_token(self, token):
"""根据令牌更新映射字典。
参数:
token (str): 要添加到词汇表中的项目
返回:
index (int): 对应于令牌的整数索引
"""
try:
index = self._token_to_idx[token] # 获取令牌对应的索引
except keyerror:
index = len(self._token_to_idx) # 如果令牌不在映射中,分配新的索引
self._token_to_idx[token] = index # 添加令牌到索引的映射
self._idx_to_token[index] = token # 添加索引到令牌的映射
return index # 返回令牌的索引
def add_many(self, tokens):
"""将多个令牌添加到词汇表中
参数:
tokens (list): 字符串令牌列表
返回:
indices (list): 对应于令牌的索引列表
"""
return [self.add_token(token) for token in tokens] # 为每个令牌调用add_token并返回索引列表
def lookup_token(self, token):
"""检索与令牌关联的索引,如果令牌不存在,则返回unk索引。
参数:
token (str): 要查找的令牌
返回:
index (int): 对应于令牌的索引
注意:
unk功能需要unk_index >= 0 (已添加到词汇表中)
"""
if self.unk_index >= 0:
return self._token_to_idx.get(token, self.unk_index) # 返回令牌的索引或unk索引
else:
return self._token_to_idx[token] # 如果没有unk令牌,返回令牌的索引
def lookup_index(self, index):
"""返回与索引关联的令牌
参数:
index (int): 要查找的索引
返回:
token (str): 对应于索引的令牌
抛出:
keyerror: 如果索引不在词汇表中
"""
if index not in self._idx_to_token:
raise keyerror("索引 (%d) 不在词汇表中" % index) # 如果索引不在映射中,抛出keyerror
return self._idx_to_token[index] # 返回与索引关联的令牌
def __str__(self):
return "<vocabulary(size=%d)>" % len(self) # 返回词汇表的字符串表示
def __len__(self):
return len(self._token_to_idx) # 返回词汇表的大小
3.1.2.2 向量化器(the vectorizer)
图3.3.2 特征工程2:向量化器(the vectorizer)
功能表:
一、功能:
1.矢量化评论文本:将评论文本转换为one-hot向量
2.从数据框创建矢量化器:从数据框中提取频繁出现的单词并创建矢量化器
3.从可序列化的字典创建和保存矢量化器:从可序列化的字典中实例化矢量化器,并将矢量化器转换为可序列化的字典以便保存
二、函数:
vectorize(self, review):为评论创建one-hot向量
from_dataframe(cls, review_df, cutoff=25):从数据框实例化矢量化器,基于频率的过滤参数选择单词
from_serializable(cls, contents):从可序列化的字典实例化矢量化器
to_serializable(self):创建可缓存的可序列化字典
class surnamevectorizer(object):
""" 协调词汇表并将其应用的向量化器类 """
def __init__(self, surname_vocab, nationality_vocab):
"""
参数:
surname_vocab (vocabulary): 将字符映射到整数的词汇表
nationality_vocab (vocabulary): 将国籍映射到整数的词汇表
"""
self.surname_vocab = surname_vocab # 姓氏词汇表
self.nationality_vocab = nationality_vocab # 国籍词汇表
def vectorize(self, surname):
"""
参数:
surname (str): 姓氏
返回:
one_hot (np.ndarray): 压缩的一热编码
"""
vocab = self.surname_vocab # 获取姓氏词汇表
one_hot = np.zeros(len(vocab), dtype=np.float32) # 初始化一热编码向量
for token in surname:
one_hot[vocab.lookup_token(token)] = 1 # 设置对应位置为1
return one_hot # 返回一热编码向量
@classmethod
def from_dataframe(cls, surname_df):
"""从数据集数据帧实例化向量化器
参数:
surname_df (pandas.dataframe): 姓氏数据集
返回:
surnamevectorizer的实例
"""
surname_vocab = vocabulary(unk_token="@") # 初始化姓氏词汇表,unk令牌为"@"
nationality_vocab = vocabulary(add_unk=false) # 初始化国籍词汇表,不添加unk令牌
for index, row in surname_df.iterrows():
for letter in row.surname:
surname_vocab.add_token(letter) # 添加姓氏中的每个字符到词汇表
nationality_vocab.add_token(row.nationality) # 添加国籍到词汇表
return cls(surname_vocab, nationality_vocab) # 返回向量化器实例
@classmethod
def from_serializable(cls, contents):
"""从可序列化字典实例化向量化器"""
surname_vocab = vocabulary.from_serializable(contents['surname_vocab']) # 从序列化数据中恢复姓氏词汇表
nationality_vocab = vocabulary.from_serializable(contents['nationality_vocab']) # 从序列化数据中恢复国籍词汇表
return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab) # 返回向量化器实例
def to_serializable(self):
"""返回可序列化的字典"""
return {'surname_vocab': self.surname_vocab.to_serializable(), # 姓氏词汇表的序列化
'nationality_vocab': self.nationality_vocab.to_serializable()} # 国籍词汇表的序列化
3.1.2.3 dataloader
图3.3.3 特征工程3:dataloader
surnamedataset 类:处理姓氏数据集并实现数据的加载、处理和批量生成。(与pytorch的dataset和dataloader一起工作)
一、功能:
数据加载和向量化器管理:创建相应的向量化器。提供从csv文件加载数据集并创建新向量化器的方法或从文件加载已保存向量化器的方法。
数据集分割:按照训练集、验证集和测试集的分割存储数据。
计算类权重:统计每个国籍的样本数量,并计算类别权重,用于处理类别不平衡问题。
数据访问:根据索引返回数据点,包括特征(姓氏的向量表示)和标签(国籍索引)。
批量生成:提供生成批量数据的生成器函数generate_batches,确保张量在指定设备上。
二、方法:
load_dataset_and_make_vectorizer(cls, surname_csv):从csv文件加载数据集,并创建一个新的向量化器。返回surnamedataset实例。
load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):和上面一个差不多,但是从文件加载已保存的向量化器。
load_vectorizer_only(vectorizer_filepath):只从文件加载已保存的向量化器。
save_vectorizer(self, vectorizer_filepath):将向量化器保存到指定文件。
get_vectorizer(self):返回当前的向量化器。
set_split(self, split="train"):设置当前使用的数据集分割(训练集、验证集、测试集)。
三、辅助方法:
__len__(self):看数据集分割的大小。
__getitem__(self, index):根据索引返回数据点,包括特征(姓氏的向量表示)和标签(国籍索引)。
get_num_batches(self, batch_size):返回批次数量。
generate_batches(dataset, batch_size, shuffle=true, drop_last=true, device="cpu"):生成批量数据,确保张量在指定设备上。
class surnamedataset(dataset):
def __init__(self, surname_df, vectorizer):
"""
参数:
surname_df (pandas.dataframe): 数据集
vectorizer (surnamevectorizer): 从数据集中实例化的向量化器
"""
self.surname_df = surname_df # 保存数据集
self._vectorizer = vectorizer # 保存向量化器
self.train_df = self.surname_df[self.surname_df.split=='train'] # 训练集数据
self.train_size = len(self.train_df) # 训练集大小
self.val_df = self.surname_df[self.surname_df.split=='val'] # 验证集数据
self.validation_size = len(self.val_df) # 验证集大小
self.test_df = self.surname_df[self.surname_df.split=='test'] # 测试集数据
self.test_size = len(self.test_df) # 测试集大小
self._lookup_dict = {'train': (self.train_df, self.train_size),
'val': (self.val_df, self.validation_size),
'test': (self.test_df, self.test_size)} # 数据集查找字典
self.set_split('train') # 设置默认数据集为训练集
# 类别权重
class_counts = surname_df.nationality.value_counts().to_dict() # 统计每个国籍的数量
def sort_key(item):
return self._vectorizer.nationality_vocab.lookup_token(item[0])
sorted_counts = sorted(class_counts.items(), key=sort_key) # 按照国籍索引排序
frequencies = [count for _, count in sorted_counts] # 获取每个国籍的频率
self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32) # 计算类别权重
@classmethod
def load_dataset_and_make_vectorizer(cls, surname_csv):
"""加载数据集并从头创建一个新的向量化器
参数:
surname_csv (str): 数据集的位置
返回:
surnamedataset实例
"""
surname_df = pd.read_csv(surname_csv) # 加载数据集
train_surname_df = surname_df[surname_df.split=='train'] # 获取训练集数据
return cls(surname_df, surnamevectorizer.from_dataframe(train_surname_df)) # 返回数据集实例
@classmethod
def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
"""加载数据集和相应的向量化器
在向量化器已经缓存以便重用的情况下使用
参数:
surname_csv (str): 数据集的位置
vectorizer_filepath (str): 已保存的向量化器的位置
返回:
surnamedataset实例
"""
surname_df = pd.read_csv(surname_csv) # 加载数据集
vectorizer = cls.load_vectorizer_only(vectorizer_filepath) # 加载向量化器
return cls(surname_df, vectorizer) # 返回数据集实例
@staticmethod
def load_vectorizer_only(vectorizer_filepath):
"""从文件加载向量化器的静态方法
参数:
vectorizer_filepath (str): 序列化的向量化器的位置
返回:
surnamevectorizer实例
"""
with open(vectorizer_filepath) as fp:
return surnamevectorizer.from_serializable(json.load(fp)) # 从文件加载向量化器
def save_vectorizer(self, vectorizer_filepath):
"""使用json保存向量化器
参数:
vectorizer_filepath (str): 保存向量化器的位置
"""
with open(vectorizer_filepath, "w") as fp:
json.dump(self._vectorizer.to_serializable(), fp) # 保存向量化器到文件
def get_vectorizer(self):
""" 返回向量化器 """
return self._vectorizer # 返回向量化器实例
def set_split(self, split="train"):
""" 使用数据框中的列选择数据集的分割 """
self._target_split = split # 设置当前数据集分割
self._target_df, self._target_size = self._lookup_dict[split] # 获取对应数据和大小
def __len__(self):
return self._target_size # 返回当前数据集的大小
def __getitem__(self, index):
"""pytorch数据集的主要入口方法
参数:
index (int): 数据点的索引
返回:
包含数据点的字典:
features (x_surname): 姓氏的向量表示
label (y_nationality): 国籍标签
"""
row = self._target_df.iloc[index] # 获取数据行
surname_vector = self._vectorizer.vectorize(row.surname) # 向量化姓氏
nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality) # 查找国籍索引
return {'x_surname': surname_vector, 'y_nationality': nationality_index} # 返回数据点字典
def get_num_batches(self, batch_size):
"""给定批量大小,返回数据集中的批次数量
参数:
batch_size (int)
返回:
数据集中的批次数量
"""
return len(self) // batch_size # 计算批次数量
def generate_batches(dataset, batch_size, shuffle=true, drop_last=true, device="cpu"):
"""
一个包装pytorch dataloader的生成器函数
它将确保每个张量都在正确的设备位置上
"""
dataloader = dataloader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
for data_dict in dataloader:
out_data_dict = {}
for name, tensor in data_dict.items():
out_data_dict[name] = data_dict[name].to(device) # 将张量移到指定设备
yield out_data_dict # 生成数据字典
3.1.3 姓氏分类器模型
这里利用第二节将的多层感知机
这里用两层多层感知器。
class surnameclassifier(nn.module):
""" 用于姓氏分类的两层多层感知器 """
def __init__(self, input_dim, hidden_dim, output_dim):
"""
参数:
input_dim (int): 输入向量的大小
hidden_dim (int): 第一个线性层的输出大小
output_dim (int): 第二个线性层的输出大小
"""
super(surnameclassifier, self).__init__() # 调用父类的初始化方法
self.fc1 = nn.linear(input_dim, hidden_dim) # 定义第一个线性层
self.fc2 = nn.linear(hidden_dim, output_dim) # 定义第二个线性层
def forward(self, x_in, apply_softmax=false):
"""分类器的前向传递
参数:
x_in (torch.tensor): 输入数据张量。
x_in的形状应该是 (batch, input_dim)
apply_softmax (bool): 是否应用softmax激活的标志
如果与交叉熵损失一起使用,应为false
返回:
结果张量。张量形状应该是 (batch, output_dim)
"""
intermediate_vector = f.relu(self.fc1(x_in)) # 应用第一个线性层和relu激活函数
prediction_vector = self.fc2(intermediate_vector) # 应用第二个线性层
if apply_softmax:
prediction_vector = f.softmax(prediction_vector, dim=1) # 如果需要,应用softmax激活函数
return prediction_vector # 返回预测结果张量
3.1.4 训练和验证
我们只展示了args以及本例中的训练例程与“示例:餐厅评论情绪分类”中的示例之间的主要区别
(1)训练前的组件:
下面要定义三个组件函数:
初始化训练状态:创建并初始化一个包含训练状态信息的字典。
更新训练状态:在训练过程中根据验证损失更新训练状态,包括提前停止和模型检查点保存。
计算准确率:根据模型预测和实际目标计算准确率。
(具体描述放代码注释了)
'''
1. make_train_state(args)(初始化训练状态)
输入:args: 一个包含训练参数的对象,通常包括学习率、模型文件名等。
输出:一个字典,包含一堆键值对(不列了)
功能:初始化并返回一个训练状态字典,用于记录训练过程中的各种状态和统计信息。
'''
def make_train_state(args):
return {'stop_early': false, # 提前停止的标志
'early_stopping_step': 0, # 提前停止的步数
'early_stopping_best_val': 1e8, # 最好的验证损失
'learning_rate': args.learning_rate, # 学习率
'epoch_index': 0, # 训练轮次的索引
'train_loss': [], # 训练损失列表
'train_acc': [], # 训练准确度列表
'val_loss': [], # 验证损失列表
'val_acc': [], # 验证准确度列表
'test_loss': -1, # 测试损失
'test_acc': -1, # 测试准确度
'model_filename': args.model_state_file} # 模型文件名
'''
2. update_train_state(args, model, train_state)(更新训练状态)
输入:
args: 一个包含训练参数的对象,通常包括提前停止标准等。
model: 当前训练的模型实例,用于保存模型参数。
train_state: 记录当前训练状态的字典。
输出:更新后的训练状态字典。
功能:根据验证损失更新训练状态字典,。
说明:包含提前停止逻辑和模型检查点保存逻辑;要检查最近两次的验证损失,如果损失增加,则增加提前停止步骤计数;如果模型更好,则更新模型。
'''
def update_train_state(args, model, train_state):
# 至少保存一个模型
if train_state['epoch_index'] == 0:
torch.save(model.state_dict(), train_state['model_filename']) # 保存模型状态字典
train_state['stop_early'] = false # 不提前停止
# 如果性能改善则保存模型
elif train_state['epoch_index'] >= 1:
loss_tm1, loss_t = train_state['val_loss'][-2:] # 获取最后两次验证损失
# 如果损失变得更糟
if loss_t >= train_state['early_stopping_best_val']:
# 更新步数
train_state['early_stopping_step'] += 1
# 损失减少
else:
# 保存最好的模型
if loss_t < train_state['early_stopping_best_val']:
torch.save(model.state_dict(), train_state['model_filename']) # 保存模型状态字典
# 重置提前停止步数
train_state['early_stopping_step'] = 0
# 是否提前停止?
train_state['stop_early'] = \
train_state['early_stopping_step'] >= args.early_stopping_criteria # 判断是否达到提前停止标准
return train_state # 返回更新后的训练状态
'''
3. compute_accuracy(y_pred, y_target)(计算准确率)
输入:
y_pred: 预测值(tensor型,不然报错)
y_target: 目标值(tensor型,不然报错)
输出:accuracy(准确率)
功能:算accuracy(准确率)呗
'''
def compute_accuracy(y_pred, y_target):
_, y_pred_indices = y_pred.max(dim=1) # 获取预测值的索引
n_correct = torch.eq(y_pred_indices, y_target).sum().item() # 计算正确预测的数量
return n_correct / len(y_pred_indices) * 100 # 返回准确度百分比
两个辅助函数:
# 两个辅助的函数
def set_seed_everywhere(seed, cuda): # 芝士随机种子
np.random.seed(seed)
torch.manual_seed(seed)
if cuda:
torch.cuda.manual_seed_all(seed)
def handle_dirs(dirpath): # 芝士检查是否有文件夹没有就创建的
if not os.path.exists(dirpath):
os.makedirs(dirpath)
(2)训练前夕,检查东西
# 检查点东西
import datetime # 打印时间的,别介意
def why_time():
current_time = datetime.datetime.now()
formatted_time = current_time.strftime("%y-%m-%d %h:%m:%s")
return formatted_time
args = namespace(
# 数据和路径信息
surname_csv="surnames_with_splits.csv", # 数据集csv文件路径
vectorizer_file="vectorizer.json", # 向量化器文件路径
model_state_file="model.pth", # 模型状态文件路径
save_dir="model_storage/ch4/surname_mlp", # 模型保存目录
# 模型超参数
hidden_dim=300, # 隐藏层维度
# 训练超参数
seed=1337, # 随机种子
num_epochs=20, # 训练轮数(不要100)
early_stopping_criteria=5, # 提前停止标准
learning_rate=0.001, # 学习率
batch_size=64, # 批量大小
# 运行时选项
cuda=false, # 是否使用cuda
reload_from_files=false, # 是否从文件重新加载
expand_filepaths_to_save_dir=true, # 是否扩展文件保存路径
)
print(f'\033[0;31m叮咚~ \033[0;32m数据信息检查完毕 \033[0;33m \033[0;35m{why_time()}\033[0m')
# 扩展文件路径
if args.expand_filepaths_to_save_dir:
args.vectorizer_file = os.path.join(args.save_dir, args.vectorizer_file) # 扩展向量化器文件路径
args.model_state_file = os.path.join(args.save_dir, args.model_state_file) # 扩展模型状态文件路径
print(f'\033[0;31m叮咚~ \033[0;32m文件路径设置完毕 \033[0;33m \033[0;35m{why_time()}\033[0m')
print("\t向量化器文件路径:{}".format(args.vectorizer_file)) # 打印扩展后的向量化器文件路径
print("\t模型状态文件路径{}".format(args.model_state_file)) # 打印扩展后的模型状态文件路径
# 检查cuda
if not torch.cuda.is_available():
args.cuda = false # 如果cuda不可用,设置为false
args.device = torch.device("cuda" if args.cuda else "cpu") # 设置设备为cuda或cpu
print("是不是cuda?: {}".format(args.cuda))
# 设置随机种子以确保可重复性
set_seed_everywhere(args.seed, args.cuda)
print(f'\033[0;31m叮咚~ \033[0;32m随机种子设置完毕 \033[0;33m \033[0;35m{why_time()}\033[0m')
# 处理目录
handle_dirs(args.save_dir) # 创建保存目录
print(f'\033[0;31m叮咚~ \033[0;32m目录设置完毕 \033[0;33m \033[0;35m{why_time()}\033[0m')
if args.reload_from_files: # 如果参数 reload_from_files 为真
print("重新加载!")
dataset = surnamedataset.load_dataset_and_load_vectorizer(args.surname_csv, args.vectorizer_file)
else:
print("创建新的!")
dataset = surnamedataset.load_dataset_and_make_vectorizer(args.surname_csv) # 加载数据集并创建向量化器
dataset.save_vectorizer(args.vectorizer_file) # 保存向量化器
vectorizer = dataset.get_vectorizer() # 获取向量化器
classifier = surnameclassifier(input_dim=len(vectorizer.surname_vocab),
hidden_dim=args.hidden_dim,
output_dim=len(vectorizer.nationality_vocab))
# 创建姓氏分类器(输入维度为姓氏词汇表长度,隐藏层维度为 args.hidden_dim,输出维度为国籍词汇表长度)
发表评论