用 TensorFlow 在 Transformers 上生成字幕的注意机制的实现

共 42023字,需浏览 85分钟

 ·

2021-03-04 00:06

总览
  • 了解最先进的变压器模型。
  • 了解我们如何使用Tensorflow在已经看到的图像字幕问题上实现变形金刚
  • 比较《变形金刚》与注意力模型的结果。
介绍
我们已经看到,注意力机制已成为各种任务(例如图像字幕)中引人注目的序列建模和转导模型的组成部分,从而允许对依赖项进行建模,而无需考虑它们在输入或输出序列中的距离。

Transformer是一种避免重复发生的模型体系结构,而是完全依赖于注意力机制来绘制输入和输出之间的全局依存关系。Transformer体系结构允许更多并行化,并可以达到翻译质量方面的最新水平。
在本文中,让我们看看如何使用TensorFlow来实现用变形金刚生成字幕的注意力机制。
开始之前的先决条件:
  • Python编程
  • Tensorflow和Keras
  • RNN和LSTM
  • 转移学习
  • 编码器和解码器架构
  • 深度学习的要点–注意序列到序列建模
我建议您在阅读本文前可以参考下面资料:
一个动手教程来学习Python中图像标题生成的注意机制
https://www.analyticsvidhya.com/blog/2020/11/attention-mechanism-for-caption-generation/
目录 
一、Transformer 架构 
二、使用Tensorflow的变压器字幕生成注意机制的实现 
2.1、导入所需的库 
2.2、数据加载和预处理 
2.3、模型定义 
2.4、位置编码 
2.5、多头注意力 
2.6、编码器-解码器层 
2.7、Transformer 
2.8、模型超参数 
2.9、模型训练 
2.10、BLEU评估 
2.11、比较方式 
三、下一步是什么?
四、尾注
Transformer 架构

Transformer 网络采用类似于RNN的编解码器架构。主要区别在于,转换器可以并行接收输入的句子/顺序,即没有与输入相关的时间步长,并且句子中的所有单词都可以同时传递。
让我们从了解变压器的输入开始。
考虑一下英语到德语的翻译。我们将整个英语句子输入到输入嵌入中。可以将输入嵌入层视为空间中的一个点,其中含义相似的单词在物理上彼此更接近,即,每个单词映射到具有连续值的矢量来表示该单词。
现在的问题是,不同句子中的相同单词可能具有不同的含义,这就是位置编码输入的地方。由于转换器不包含递归和卷积,因此为了使模型能够利用序列的顺序,它必须利用一些有关序列中单词相对或绝对位置的信息。这个想法是使用固定或学习的权重,该权重对与句子中标记的特定位置有关的信息进行编码。
类似地,将目标德语单词输入到输出嵌入中,并将其位置编码矢量传递到解码器块中。
编码器块具有两个子层。第一个是多头自我关注机制,第二个是简单的位置完全连接的前馈网络。对于每个单词,我们可以生成一个注意力向量,该向量捕获句子中单词之间的上下文关系。编码器中的多头注意力会应用一种称为自我注意力的特定注意力机制。自注意力允许模型将输入中的每个单词与其他单词相关联。
除了每个编码器层中的两个子层之外,解码器还插入第三子层,该第三子层对编码器堆栈的输出执行多头关注。与编码器类似,我们在每个子层周围采用残余连接,然后进行层归一化。来自编码器的德语单词的注意力向量和英语句子的注意力向量被传递到第二多头注意力。
该注意块将确定每个单词向量彼此之间的关联程度。这是英语到德语单词映射的地方。解码器以充当分类器的线性层和softmax来封闭,以获取单词概率。
现在,您已基本了解了转换器的工作方式,让我们看看如何使用Tensorflow将其实现用于图像字幕任务,并将我们的结果与其他方法进行比较。
使用TensorFlow在Transformers 上生成字幕的注意机制的实现
步骤1:导入所需的库
在这里,我们将利用Tensorflow创建模型并对其进行训练。大部分代码归功于TensorFlow教程。如果您想要GPU进行训练,则可以使用Google Colab或Kaggle笔记本。
import string
import numpy as np
import pandas as pd
from numpy import array
from PIL import Image
import pickle
 
import matplotlib.pyplot as plt
import sys, time, os, warnings
warnings.filterwarnings("ignore")
import re
 
import keras
import tensorflow as tf
from tqdm import tqdm
from nltk.translate.bleu_score import sentence_bleu
 
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense, BatchNormalization
from keras.layers import LSTM
from keras.layers import Embedding
from keras.layers import Dropout
from keras.layers.merge import add
from keras.callbacks import ModelCheckpoint
from keras.preprocessing.image import load_img, img_to_array
from keras.preprocessing.text import Tokenizer
 
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
步骤2:数据加载和预处理
定义图像和字幕路径,并检查数据集中总共有多少图像。
image_path = "/content/gdrive/My Drive/FLICKR8K/Flicker8k_Dataset"
dir_Flickr_text = "/content/gdrive/My Drive/FLICKR8K/Flickr8k_text/Flickr8k.token.txt"
jpgs = os.listdir(image_path)
 
print("Total Images in Dataset = {}".format(len(jpgs)))
输出如下:

我们创建一个数据框来存储图像ID和标题,以便于使用。
file = open(dir_Flickr_text,'r')
text = file.read()
file.close()
 
datatxt = []
for line in text.split('\n'):
   col = line.split('\t')
   if len(col) == 1:
       continue
   w = col[0].split("#")
   datatxt.append(w + [col[1].lower()])
 
data = pd.DataFrame(datatxt,columns=["filename","index","caption"])
data = data.reindex(columns =['index','filename','caption'])
data = data[data.filename != '2258277193_586949ec62.jpg.1']
uni_filenames = np.unique(data.filename.values)
 
data.head()
输出如下:

接下来,让我们可视化一些图片及其5个标题:
npic = 5
npix = 224
target_size = (npix,npix,3)
count = 1
 
fig = plt.figure(figsize=(10,20))
for jpgfnm in uni_filenames[10:14]:
   filename = image_path + '/' + jpgfnm
   captions = list(data["caption"].loc[data["filename"]==jpgfnm].values)
   image_load = load_img(filename, target_size=target_size)
   ax = fig.add_subplot(npic,2,count,xticks=[],yticks=[])
   ax.imshow(image_load)
   count += 1
 
   ax = fig.add_subplot(npic,2,count)
   plt.axis('off')
   ax.plot()
   ax.set_xlim(0,1)
   ax.set_ylim(0,len(captions))
   for i, caption in enumerate(captions):
       ax.text(0,i,caption,fontsize=20)
   count += 1
plt.show()
输出如下:

接下来,让我们看看我们当前的词汇量是多少:
vocabulary = []
for txt in data.caption.values:
   vocabulary.extend(txt.split())
print('Vocabulary Size: %d' % len(set(vocabulary)))
输出如下:

接下来执行一些文本清理,例如删除标点符号,单个字符和数字值:
def remove_punctuation(text_original):
   text_no_punctuation = text_original.translate(string.punctuation)
   return(text_no_punctuation)
 
def remove_single_character(text):
   text_len_more_than1 = ""
   for word in text.split():
       if len(word) > 1:
           text_len_more_than1 += " " + word
   return(text_len_more_than1)
 
def remove_numeric(text):
   text_no_numeric = ""
   for word in text.split():
       isalpha = word.isalpha()
       if isalpha:
           text_no_numeric += " " + word
   return(text_no_numeric)
 
def text_clean(text_original):
   text = remove_punctuation(text_original)
   text = remove_single_character(text)
   text = remove_numeric(text)
   return(text)
 
for i, caption in enumerate(data.caption.values):
   newcaption = text_clean(caption)
   data["caption"].iloc[i] = newcaption
现在让我们看一下清理后词汇量的大小
clean_vocabulary = []
for txt in data.caption.values:
   clean_vocabulary.extend(txt.split())
print('Clean Vocabulary Size: %d' % len(set(clean_vocabulary)))
输出如下:

接下来,我们将所有标题和图像路径保存在两个列表中,以便我们可以使用路径集立即加载图像。我们还向每个字幕添加了“ <开始>”和“ <结束>”标签,以便模型可以理解每个字幕的开始和结束。
PATH = "/content/gdrive/My Drive/FLICKR8K/Flicker8k_Dataset/"
all_captions = []
for caption  in data["caption"].astype(str):
   caption = '<start> ' + caption+ ' <end>'
   all_captions.append(caption)
 
all_captions[:10]
输出如下:

all_img_name_vector = []
for annot in data["filename"]:
   full_image_path = PATH + annot
   all_img_name_vector.append(full_image_path)
 
all_img_name_vector[:10]

现在您可以看到我们有40455个图像路径和标题。
print(f"len(all_img_name_vector) : {len(all_img_name_vector)}")
print(f"len(all_captions) : {len(all_captions)}")
输出如下:

我们将仅取每个批次的40000个,以便可以正确选择批次大小,即如果批次大小= 64,则可以选择625个批次。为此,我们定义了一个函数来将数据集限制为40000个图像和标题。
def data_limiter(num,total_captions,all_img_name_vector):
   train_captions, img_name_vector = shuffle(total_captions,all_img_name_vector,random_state=1)
   train_captions = train_captions[:num]
   img_name_vector = img_name_vector[:num]
   return train_captions,img_name_vector
 
train_captions,img_name_vector = data_limiter(40000,total_captions,all_img_name_vector)
步骤3:模型定义
让我们使用InceptionV3定义图像特征提取模型。我们必须记住,这里不需要分类图像,只需要为图像提取图像矢量即可。因此,我们从模型中删除了softmax层。我们必须先将所有图像预处理为相同的尺寸,即299×299,然后再将其输入模型,并且该层的输出形状为8x8x2048。
def load_image(image_path):
   img = tf.io.read_file(image_path)
   img = tf.image.decode_jpeg(img, channels=3)
   img = tf.image.resize(img, (299299))
   img = tf.keras.applications.inception_v3.preprocess_input(img)
   return img, image_path
 
image_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
接下来,让我们将每个图片名称映射到要加载图片的函数。我们将使用InceptionV3预处理每个图像,并将输出缓存到磁盘,然后将图像特征重塑为64×2048。
encode_train = sorted(set(img_name_vector))
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(64)
我们提取特征并将其存储在各自的.npy文件中,然后将这些特征通过编码器传递.NPY文件存储在任何计算机上重建数组所需的所有信息,包括dtype和shape信息。
for img, path in tqdm(image_dataset):
   batch_features = image_features_extract_model(img)
   batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))
 
 for bf, p in zip(batch_features, path):
   path_of_feature = p.numpy().decode("utf-8")
   np.save(path_of_feature, bf.numpy())
接下来,我们标记标题,并为数据中所有唯一的单词建立词汇表。我们还将词汇量限制在前5000个单词以节省内存。我们将更换的话不词汇与令牌。
top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                 oov_token="<unk>",
                                                 filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
 
tokenizer.fit_on_texts(train_captions)
train_seqs = tokenizer.texts_to_sequences(train_captions)
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'
 
train_seqs = tokenizer.texts_to_sequences(train_captions)
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')
接下来,使用80-20拆分创建训练和验证集:
img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector,cap_vector, test_size=0.2, random_state=0)
接下来,让我们创建一个tf.data数据集以用于训练我们的模型。
BATCH_SIZE = 64
BUFFER_SIZE = 1000
num_steps = len(img_name_train) // BATCH_SIZE
 
def map_func(img_name, cap):
   img_tensor = np.load(img_name.decode('utf-8')+'.npy')
   return img_tensor, cap
 
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))
dataset = dataset.map(lambda item1, item2: tf.numpy_function(map_func, [item1, item2], [tf.float32, tf.int32]),num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
步骤4:位置编码
位置编码使用不同频率的正弦和余弦函数。对于输入向量上的每个奇数索引,请使用cos函数创建一个向量,对于每个偶数索引,请使用sin函数创建一个向量。然后将这些向量添加到其相应的输入嵌入中,从而成功地提供有关每个向量位置的网络信息。
def get_angles(pos, i, d_model):
   angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
   return pos * angle_rates
 
def positional_encoding_1d(position, d_model):
   angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                           np.arange(d_model)[np.newaxis, :],
                           d_model)
 
   angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
   angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
   pos_encoding = angle_rads[np.newaxis, ...]
   return tf.cast(pos_encoding, dtype=tf.float32)
 
def positional_encoding_2d(row,col,d_model):
   assert d_model % 2 == 0
   row_pos = np.repeat(np.arange(row),col)[:,np.newaxis]
   col_pos = np.repeat(np.expand_dims(np.arange(col),0),row,axis=0).reshape(-1,1)
 
   angle_rads_row = get_angles(row_pos,np.arange(d_model//2)[np.newaxis,:],d_model//2)
   angle_rads_col = get_angles(col_pos,np.arange(d_model//2)[np.newaxis,:],d_model//2)
 
   angle_rads_row[:, 0::2] = np.sin(angle_rads_row[:, 0::2])
   angle_rads_row[:, 1::2] = np.cos(angle_rads_row[:, 1::2])
   angle_rads_col[:, 0::2] = np.sin(angle_rads_col[:, 0::2])
   angle_rads_col[:, 1::2] = np.cos(angle_rads_col[:, 1::2])
   pos_encoding = np.concatenate([angle_rads_row,angle_rads_col],axis=1)[np.newaxis, ...]
   return tf.cast(pos_encoding, dtype=tf.float32)
步骤5:多头注意力
计算注意力权重。q,k,v必须具有匹配的前导尺寸。k,v必须具有匹配的倒数第二个维度,即:seq_len_k = seq_len_v。遮罩根据其类型(填充或向前看)而具有不同的形状,但必须广播以添加。
def create_padding_mask(seq):
   seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
   return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)
 
def create_look_ahead_mask(size):
   mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -10)
   return mask  # (seq_len, seq_len)
 
def scaled_dot_product_attention(q, k, v, mask):
   matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)
   dk = tf.cast(tf.shape(k)[-1], tf.float32)
   scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
.
   if mask is not None:
      scaled_attention_logits += (mask * -1e9
 
   attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1
   output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)
 
   return output, attention_weights
 
class MultiHeadAttention(tf.keras.layers.Layer):
   def __init__(self, d_model, num_heads):
      super(MultiHeadAttention, self).__init__()
      self.num_heads = num_heads
      self.d_model = d_model
      assert d_model % self.num_heads == 0
      self.depth = d_model // self.num_heads
      self.wq = tf.keras.layers.Dense(d_model)
      self.wk = tf.keras.layers.Dense(d_model)
      self.wv = tf.keras.layers.Dense(d_model)
      self.dense = tf.keras.layers.Dense(d_model)
 
   def split_heads(self, x, batch_size):
      x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
      return tf.transpose(x, perm=[0213])
 
   def call(self, v, k, q, mask=None):
      batch_size = tf.shape(q)[0]
      q = self.wq(q)  # (batch_size, seq_len, d_model)
      k = self.wk(k)  # (batch_size, seq_len, d_model)
      v = self.wv(v)  # (batch_size, seq_len, d_model)
 
      q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
      k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
      v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)
 
      scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
      scaled_attention = tf.transpose(scaled_attention, perm=[0213])  # (batch_size, seq_len_q,      num_heads, depth)
 
      concat_attention = tf.reshape(scaled_attention,
                                 (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)
 
      output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)
      return output, attention_weights
 
def point_wise_feed_forward_network(d_model, dff):
   return tf.keras.Sequential([
                tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
                tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)])
步骤6:编码器-解码器层
class EncoderLayer(tf.keras.layers.Layer):
   def __init__(self, d_model, num_heads, dff, rate=0.1):
      super(EncoderLayer, self).__init__()
      self.mha = MultiHeadAttention(d_model, num_heads)
      self.ffn = point_wise_feed_forward_network(d_model, dff)
 
      self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
      self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
 
      self.dropout1 = tf.keras.layers.Dropout(rate)
      self.dropout2 = tf.keras.layers.Dropout(rate)
 
 
   def call(self, x, training, mask=None):
      attn_output, _ = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, d_model)
      attn_output = self.dropout1(attn_output, training=training)
      out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model)
 
      ffn_output = self.ffn(out1)  # (batch_size, input_seq_len, d_model)
      ffn_output = self.dropout2(ffn_output, training=training)
      out2 = self.layernorm2(out1 + ffn_output)  # (batch_size, input_seq_len, d_model)
      return out2
 
class DecoderLayer(tf.keras.layers.Layer):
   def __init__(self, d_model, num_heads, dff, rate=0.1):
      super(DecoderLayer, self).__init__()
      self.mha1 = MultiHeadAttention(d_model, num_heads)
      self.mha2 = MultiHeadAttention(d_model, num_heads)
 
      self.ffn = point_wise_feed_forward_network(d_model, dff)
 
      self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
      self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
      self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
 
      self.dropout1 = tf.keras.layers.Dropout(rate)
      self.dropout2 = tf.keras.layers.Dropout(rate)
      self.dropout3 = tf.keras.layers.Dropout(rate)
 
   def call(self, x, enc_output, training,look_ahead_mask=None, padding_mask=None):
      attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)  # (batch_size, target_seq_len, d_model)
      attn1 = self.dropout1(attn1, training=training)
      out1 = self.layernorm1(attn1 + x)
 
      attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask) 
      attn2 = self.dropout2(attn2, training=training)
      out2 = self.layernorm2(attn2 + out1)  # (batch_size, target_seq_len, d_model)
 
      ffn_output = self.ffn(out2)  # (batch_size, target_seq_len, d_model)
      ffn_output = self.dropout3(ffn_output, training=training)
      out3 = self.layernorm3(ffn_output + out2)  # (batch_size, target_seq_len, d_model)
 
      return out3, attn_weights_block1, attn_weights_block2
 
 
class Encoder(tf.keras.layers.Layer):
   def __init__(self, num_layers, d_model, num_heads, dff, row_size,col_size,rate=0.1):
      super(Encoder, self).__init__()
      self.d_model = d_model
      self.num_layers = num_layers
 
      self.embedding = tf.keras.layers.Dense(self.d_model,activation='relu')
      self.pos_encoding = positional_encoding_2d(row_size,col_size,self.d_model)
 
      self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
      self.dropout = tf.keras.layers.Dropout(rate)
 
   def call(self, x, training, mask=None):
      seq_len = tf.shape(x)[1]
      x = self.embedding(x)  # (batch_size, input_seq_len(H*W), d_model)
      x += self.pos_encoding[:, :seq_len, :]
      x = self.dropout(x, training=training)
 
      for i in range(self.num_layers):
         x = self.enc_layers[i](x, training, mask)
 
      return x  # (batch_size, input_seq_len, d_model)
 
 
class Decoder(tf.keras.layers.Layer):
   def __init__(self, num_layers,d_model,num_heads,dff, target_vocab_size, maximum_position_encoding,   rate=0.1):
      super(Decoder, self).__init__()
      self.d_model = d_model
      self.num_layers = num_layers
 
      self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
      self.pos_encoding = positional_encoding_1d(maximum_position_encoding, d_model)
 
      self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate)
                         for _ in range(num_layers)]
      self.dropout = tf.keras.layers.Dropout(rate)
 
   def call(self, x, enc_output, training,look_ahead_mask=None, padding_mask=None):
      seq_len = tf.shape(x)[1]
      attention_weights = {}
 
      x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
      x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
      x += self.pos_encoding[:, :seq_len, :]
      x = self.dropout(x, training=training)
 
      for i in range(self.num_layers):
         x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                            look_ahead_mask, padding_mask)
         
         attention_weights['decoder_layer{}_block1'.format(i+1)] = block1
         attention_weights['decoder_layer{}_block2'.format(i+1)] = block2
 
      return x, attention_weights
步骤7:Transformer
class Transformer(tf.keras.Model):
   def __init__(self, num_layers, d_model, num_heads, dff,row_size,col_size,
              target_vocab_size,max_pos_encoding, rate=0.1)
:

      super(Transformer, self).__init__()
      self.encoder = Encoder(num_layers, d_model, num_heads, dff,row_size,col_size, rate)
      self.decoder = Decoder(num_layers, d_model, num_heads, dff,
                          target_vocab_size,max_pos_encoding, rate)
      self.final_layer = tf.keras.layers.Dense(target_vocab_size)
 
   def call(self, inp, tar, training,look_ahead_mask=None,dec_padding_mask=None,enc_padding_mask=None   ):
      enc_output = self.encoder(inp, training, enc_padding_mask)  # (batch_size, inp_seq_len, d_model      )
      dec_output, attention_weights = self.decoder(
      tar, enc_output, training, look_ahead_mask, dec_padding_mask)
      final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, target_vocab_size)
      return final_output, attention_weights
步骤8:模型超参数
定义训练参数:
num_layer = 4
d_model = 512
dff = 2048
num_heads = 8
row_size = 8
col_size = 8
target_vocab_size = top_k + 1
dropout_rate = 0.1 
 
 
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
   def __init__(self, d_model, warmup_steps=4000):
      super(CustomSchedule, self).__init__()
      self.d_model = d_model
      self.d_model = tf.cast(self.d_model, tf.float32)
      self.warmup_steps = warmup_steps
 
   def __call__(self, step):
      arg1 = tf.math.rsqrt(step)
      arg2 = step * (self.warmup_steps ** -1.5)
      return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
 
 
learning_rate = CustomSchedule(d_model)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                    epsilon=1e-9)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
 
def loss_function(real, pred):
   mask = tf.math.logical_not(tf.math.equal(real, 0))
   loss_ = loss_object(real, pred)
   mask = tf.cast(mask, dtype=loss_.dtype)
   loss_ *= mask
  return tf.reduce_sum(loss_)/tf.reduce_sum(mask)
 
 
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
transformer = Transformer(num_layer,d_model,num_heads,dff,row_size,col_size,target_vocab_size,                                 max_pos_encoding=target_vocab_size,rate=dropout_rate)
步骤9:模型训练
def create_masks_decoder(tar):
   look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
   dec_target_padding_mask = create_padding_mask(tar)
   combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
   return combined_mask
 

@tf.function
def train_step(img_tensor, tar):
   tar_inp = tar[:, :-1]
   tar_real = tar[:, 1:]
   dec_mask = create_masks_decoder(tar_inp)
   with tf.GradientTape() as tape:
      predictions, _ = transformer(img_tensor, tar_inp,True, dec_mask)
      loss = loss_function(tar_real, predictions)
 
   gradients = tape.gradient(loss, transformer.trainable_variables)   
   optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
   train_loss(loss)
   train_accuracy(tar_real, predictions)
 
for epoch in range(30):
   start = time.time()
   train_loss.reset_states()
   train_accuracy.reset_states()
   for (batch, (img_tensor, tar)) in enumerate(dataset):
      train_step(img_tensor, tar)
      if batch % 50 == 0:
         print ('Epoch {} Batch {} Loss {:.4f} Accuracy {:.4f}'.format(
         epoch + 1, batch, train_loss.result(), train_accuracy.result()))
 
   print ('Epoch {} Loss {:.4f} Accuracy {:.4f}'.format(epoch + 1,
                                               train_loss.result(),
                                
                                   train_accuracy.result()))
   print ('Time taken for 1 epoch: {} secs\n'.format(time.time() - start))
步骤10:BLEU评估
def evaluate(image):
   temp_input = tf.expand_dims(load_image(image)[0], 0)
   img_tensor_val = image_features_extract_model(temp_input)
   img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))
   start_token = tokenizer.word_index['<start>']
   end_token = tokenizer.word_index['<end>']
   decoder_input = [start_token]
   output = tf.expand_dims(decoder_input, 0#tokens
   result = [] #word list
 
   for i in range(100):
      dec_mask = create_masks_decoder(output)
      predictions, attention_weights = transformer(img_tensor_val,output,False,dec_mask)
      predictions = predictions[: ,-1:, :]  # (batch_size, 1, vocab_size)
      predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
      if predicted_id == end_token:
         return result,tf.squeeze(output, axis=0), attention_weights
      result.append(tokenizer.index_word[int(predicted_id)])
      output = tf.concat([output, predicted_id], axis=-1)
 
   return result,tf.squeeze(output, axis=0), attention_weights
 
 
 
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
caption,result,attention_weights = evaluate(image)
 
first = real_caption.split(' '1)[1]
real_caption = first.rsplit(' '1)[0]
 
for i in caption:
   if i=="<unk>":
      caption.remove(i)
 
for i in real_caption:
   if i=="<unk>":
      real_caption.remove(i)
 
result_join = ' '.join(caption)
result_final = result_join.rsplit(' '1)[0]
real_appn = []
real_appn.append(real_caption.split())
reference = real_appn
candidate = caption
 
score = sentence_bleu(reference, candidate, weights=(1.0,0,0,0))
print(f"BLEU-1 score: {score*100}")
score = sentence_bleu(reference, candidate, weights=(0.5,0.5,0,0))
print(f"BLEU-2 score: {score*100}")
score = sentence_bleu(reference, candidate, weights=(0.3,0.3,0.3,0))
print(f"BLEU-3 score: {score*100}")
score = sentence_bleu(reference, candidate, weights=(0.25,0.25,0.25,0.25))
print(f"BLEU-4 score: {score*100}")
print ('Real Caption:', real_caption)
print ('Predicted Caption:'' '.join(caption))
temp_image = np.array(Image.open(image))
plt.imshow(temp_image)
输出如下:

rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
caption,result,attention_weights = evaluate(image)
 
first = real_caption.split(' '1)[1]
real_caption = first.rsplit(' '1)[0]
 
for i in caption:
   if i=="<unk>":
      caption.remove(i)
 
for i in real_caption:
   if i=="<unk>":
      real_caption.remove(i)
 
result_join = ' '.join(caption)
result_final = result_join.rsplit(' '1)[0]
real_appn = []
real_appn.append(real_caption.split())
reference = real_appn
candidate = caption
 
score = sentence_bleu(reference, candidate, weights=(1.0,0,0,0))
print(f"BLEU-1 score: {score*100}")
score = sentence_bleu(reference, candidate, weights=(0.5,0.5,0,0))
print(f"BLEU-2 score: {score*100}")
score = sentence_bleu(reference, candidate, weights=(0.3,0.3,0.3,0))
print(f"BLEU-3 score: {score*100}")
score = sentence_bleu(reference, candidate, weights=(0.25,0.25,0.25,0.25))
print(f"BLEU-4 score: {score*100}")
print ('Real Caption:', real_caption)
print ('Predicted Caption:'' '.join(caption))
temp_image = np.array(Image.open(image))
plt.imshow(temp_image)
输出如下:

步骤11:比较
让我们比较上一篇文章中使用Bahdanau的Attention vs我们的Transformers获得的BLEU得分。

左侧的BLEU得分使用Bahdanau Attention,右侧的BLEU得分使用Transformers。正如我们所看到的,Transformer的性能远胜于注意力模型。

在那里!我们已经使用Tensorflow成功实现了Transformers,并看到了它如何产生最先进的结果。
尾注
总而言之,Transformers比我们之前看到的所有其他体系结构都要好,因为它们完全避免了递归,因为它通过多头注意机制和位置嵌入来处理句子,并通过学习单词之间的关系来完全避免递归。还必须指出,使用Tensorflow的转换器只能捕获用于训练它们的固定输入大小内的依赖项。
有许多新的功能强大的Transformers,例如Transformer-XL,缠结Transformers,网状存储器Transformers,它们也可以为诸如图像字幕之类的应用实现,以达到更好的效果。


作者:沂水寒城,CSDN博客专家,个人研究方向:机器学习、深度学习、NLP、CV

Blog: http://yishuihancheng.blog.csdn.net


赞 赏 作 者



更多阅读



2020 年最佳流行 Python 库 Top 10


2020 Python中文社区热门文章 Top 10


5分钟快速掌握 Python 定时任务框架

特别推荐




点击下方阅读原文加入社区会员

浏览 21
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报