LSTM -Pytorch实现&解构

部分源码和思路来源于 《Dive-into-DL-PyTorch》,很好的一本进阶书籍,推荐学习!

本文适合对pytorch.nn,python-generator有一定基础的人观看,若对DL感兴趣想入门的朋友,推荐去看李宏毅2020的网课。

LSTM介绍

LSTM 中引入了3个门,即输入门(input gate)、遗忘门(forget gate)和输出门(output gate),以及与隐藏状态形状相同的记忆细胞(某些文献把记忆细胞当成一种特殊的隐藏状态),从而记录额外的信息。
下图是一层lstm的图示,很好的展示了lstm的“记忆”过程:

本次模型目标为:

  • 记忆周杰伦的歌词习惯
  • 给出一个歌词集里出现过的字,输出下一个字,迭代50次生成一句歌词,同时预设歌词开头为”分开”

数据预处理

数据介绍:运用的周杰伦歌词集的text。因lyric数据具有很强的时序性,因此用lstm。同时为了突出时序,在小批量训练时我们用 *相邻采样 *而非一般的随机采样,后面会有介绍。

标注

  • 读取某文本的前10000个字符,需要将字符转码为独热编码
  • 独热编码前,需要对每种字符进行”编号”,这里用了python的set和enumerate来生成不重复序列对象作为字典标识char
  • 用字典(char_to_idx)来对每个char标注,再以字典进行独热编码,作为训练数据
    # 设定学习所需的基本参数,后面会详细解释
    vocab_size: 1027 —— 就是len(char_to_idx)
    num_input & num_output:1027, num_hidden: 256 # 规定LSTM的基本参数
    num_epochs, num_steps, batch_size, lr, clipping_theta = 160, 35, 32, 1e2, 1e-2
    # 对read的字符进行处理
    corpus_chars = corpus_chars.replace('\n', ' ').replace('\r', ' ')
    corpus_chars = corpus_chars[0:10000]
    idx_to_char = list(set(corpus_chars))
    char_to_idx = dict([(char, i) for i, char in enumerate(idx_to_char)]) # 列表解析,后面将多次用到
    vocab_size = len(char_to_idx)
    corpus_indices = [char_to_idx[char] for char in corpus_chars]
    
    # one-hot步骤忽略,比较简单

    生成器构造

    如果想要自定义每次迭代时的过程,我们需要使用生成器

生成迭代对象

相邻取样

引自D2L原文

令相邻的两个随机小批量在原始序列上的位置相毗邻。这时候,我们就可以用一个小批量最终时间步的隐藏状态来初始化下一个小批量的隐藏状态,从而使下一个小批量的输出也取决于当前小批量的输入,并如此循环下去。这对实现循环神经网络造成了两方面影响:一方面, 在训练模型时,我们只需在每一个迭代周期开始时初始化隐藏状态;另一方面,当多个相邻小批量通过传递隐藏状态串联起来时,模型参数的梯度计算将依赖所有串联起来的小批量序列。同一迭代周期中,随着迭代次数的增加,梯度的计算开销会越来越大。 为了使模型参数的梯度计算只依赖一次迭代读取的小批量序列,我们可以在每次读取小批量前将隐藏状态从计算图中分离出来。

# 生成迭代对象
def data_iter_consecutive(corpus_indices, batch_size, num_steps, device=None):
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 深度学习计算量较大,推荐用cuda
    corpus_indices = torch.tensor(corpus_indices, dtype=torch.float32, device=device)
    data_len = len(corpus_indices)
    batch_len = data_len // batch_size
    indices = corpus_indices[0: batch_size*batch_len].view(batch_size, batch_len)
    epoch_size = (batch_len - 1) // num_steps
    for i in range(epoch_size):
        i = i * num_steps
        X = indices[:, i: i + num_steps]
        Y = indices[:, i + 1: i + num_steps + 1]
        yield X, Y

这里可视化一下相邻取样的逻辑

搭建模型

这里我们搭建一个 input_size = 1027, num_hidden = 256, output_size =1027, 一层lstm+一层全连接层的模型

lstm_layer = nn.LSTM(input_size=vocab_size, hidden_size=num_hiddens)

class RNNModel(nn.Module):
    def __init__(self, rnn_layer, vocab_size):
        super(RNNModel, self).__init__()
        self.rnn = rnn_layer
        self.hidden_size = rnn_layer.hidden_size * (2 if rnn_layer.bidirectional else 1) 
        self.vocab_size = vocab_size
        self.dense = nn.Linear(self.hidden_size, vocab_size)
        self.state = None

    def forward(self, inputs, state): # inputs: (batch, seq_len)
        # 获取one-hot向量表示
        X = to_onehot(inputs, self.vocab_size) # X是个list
        Y, self.state = self.rnn(torch.stack(X), state)
        # 全连接层会首先将Y的形状变成(num_steps * batch_size, num_hiddens),它的输出
        # 形状为(num_steps * batch_size, vocab_size)
        output = self.dense(Y.view(-1, Y.shape[-1]))
        return output, self.state
    
model = RNNModel(lstm_layer)

损失函数为交叉熵,小批量梯度下降的优化器为Adam,学习率见上,忽略。

def train_and_predict_rnn_pytorch(model, num_hiddens, vocab_size, device,
                                corpus_indices, idx_to_char, char_to_idx,
                                num_epochs, num_steps, lr, clipping_theta,
                                batch_size, pred_period, pred_len, prefixes):
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    model.to(device)
    state = None
    for epoch in range(num_epochs):
        l_sum, n, start = 0.0, 0, time.time()
        data_iter = data_iter_consecutive(corpus_indices, batch_size, num_steps, device) # 相邻采样
        for X, Y in data_iter:
            if state is not None:
                # 使用detach函数从计算图分离隐藏状态, 这是为了
                # 使模型参数的梯度计算只依赖一次迭代读取的小批量序列(防止梯度计算开销太大)
                if isinstance (state, tuple): # LSTM, state:(h, c)  
                    state = (state[0].detach(), state[1].detach())
                else:   
                    state = state.detach()
    
            (output, state) = model(X, state) # output: 形状为(num_steps * batch_size, vocab_size)
            
            # Y的形状是(batch_size, num_steps),转置后再变成长度为
            # batch * num_steps 的向量,这样跟输出的行一一对应
            y = torch.transpose(Y, 0, 1).contiguous().view(-1)
            l = loss(output, y.long())
            
            optimizer.zero_grad()
            l.backward()
            # 梯度裁剪
            grad_clipping(model.parameters(), clipping_theta, device)
            optimizer.step()
            l_sum += l.item() * y.shape[0]
            n += y.shape[0]
        
        try:
            perplexity = math.exp(l_sum / n)
        except OverflowError:
            perplexity = float('inf')
        if (epoch + 1) % pred_period == 0:
            print('epoch %d, perplexity %f, time %.2f sec' % (
                epoch + 1, perplexity, time.time() - start))
            for prefix in prefixes:
                print(' -', predict_rnn_pytorch(
                    prefix, pred_len, model, vocab_size, device, idx_to_char,
                    char_to_idx))

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!