• Home
  • About
    • taichi photo

      taichi

      ars longa vita brevis.

    • Learn More
    • Twitter
    • LinkedIn
  • Posts
    • All Posts
    • All Tags
  • Projects

LSTMで次にくる単語を予測

30 Nov 2018

Reading time ~4 minutes

LSTMで次にくる単語を予測

GitHub

LSTMで次にくる単語を予測

学習モデルの作成

import os
import re
import string
import requests
import numpy as np
import collections
import random
import pickle
import matplotlib.pyplot as plt
import tensorflow as tf

sess = tf.Session()

# RNNのパラメータ設定
min_word_freq = 5 # 出現頻度がこの値以下の単語を除外
rnn_size = 128 # RNNモデルのサイズ
embedding_size = rnn_size # 埋め込みサイズ
epochs = 1800 #  データを処理する回数
batch_size = 100 # 一度にトレーニングするサンプル数
learning_rate = 0.001 # 学習率
training_seq_len = 50 # 前後の単語数(左右に25ずつ)
save_every = 500 
eval_every = 50 

# テスト文のリスト
prime_texts = ['thou art more', 'to be or not to', 'wherefore art thou']

# データフォルダ、データファイル、モデルファイルを設定
data_dir = 'temp'
data_file = 'shakespeare.txt'
model_path = 'shakespeare_model'
full_model_dir = os.path.join(data_dir, model_path)

# データから余計なものを取り除く((-)と(')を残すのはシェイクスピアがよく使用していたから)
punctuation = string.punctuation
punctuation = ''.join([x for x in punctuation if x not in ['-', "'"]])

# データの取得
# モデルフォルダを作成
if not os.path.exists(full_model_dir):
    os.makedirs(full_model_dir)

# データフォルダを作成
if not os.path.exists(data_dir):
    os.makedirs(data_dir)
    
print('Loading Shakespeare Data')
# ファイルがダウンロードされているかどうか確認
if not os.path.isfile(os.path.join(data_dir, data_file)):
    print('Not found, downloading Shakespeare texts from www.gutenberg.org')
    shakespeare_url = 'http://www.gutenberg.org/cache/epub/100/pg100.txt'
    # テキスト取得
    response = requests.get(shakespeare_url)
    shakespeare_file = response.content
    # デコード
    s_text = shakespeare_file.decode('utf-8')
    # 最初の生命用の段落を削除
    s_text = s_text[7675:]
    # 改行を削除
    s_text = s_text.replace('\r\n', '')
    s_text = s_text.replace('\n', '')
    
    # ファイルに保存
    with open(os.path.join(data_dir, data_file), 'w') as out_conn:
        out_conn.write(s_text)
else:
    # すでにファイルがある場合は、ロード
    with open(os.path.join(data_dir, data_file), 'r') as file_conn:
        s_text = file_conn.read().replace('\n', '')

print("downloading is done")

# データから句読点と余分な空白を削除
print('Cleaning Text')
s_text = re.sub(r'[{}]'.format(punctuation), ' ', s_text)
s_text = re.sub('\s+', ' ', s_text ).strip().lower()
print('Done loading/cleaning.')

# 辞書を作成する関数(1つ目は単語からインデックスのマッピング、2つ目はインデックスから単語へのマッピング)
def build_vocab(text, min_word_freq):
    word_counts = collections.Counter(text.split(' '))
    # 出現頻度がしきい値を超える単語を対象
    word_counts = {key:val for key, val in word_counts.items() if val>min_word_freq}
    # 単語 --> インデックス
    words = word_counts.keys()
    vocab_to_ix_dict = {key:(ix+1) for ix, key in enumerate(words)}
    # 不明なキー--> インデックス0とする
    vocab_to_ix_dict['unknown']=0
    # インデックス --> 単語
    ix_to_vocab_dict = {val:key for key,val in vocab_to_ix_dict.items()}
    
    return(ix_to_vocab_dict, vocab_to_ix_dict)

# シェイクスピアの辞書を作成
print('Building Shakespeare Vocab')
ix2vocab, vocab2ix = build_vocab(s_text, min_word_freq)
vocab_size = len(ix2vocab) + 1
print('Vocabulary Length = {}'.format(vocab_size))
# 正しく実行されているか確認
assert(len(ix2vocab) == len(vocab2ix))

# テキストデータからインデックスの配列を作成
# テキストを単語ベクトルに変換
s_text_words = s_text.split(' ')
s_text_ix = []
for ix, x in enumerate(s_text_words):
    try:
        s_text_ix.append(vocab2ix[x])
    except:
        s_text_ix.append(0)
s_text_ix = np.array(s_text_ix)


# LSTMモデルの作成(理想的には、別のPythonファイルに保存してimportできるようにする)
class LSTM_Model():
    # モデルのすべての変数と演算を定義
    def __init__(self, embedding_size, rnn_size, batch_size, learning_rate,
                 training_seq_len, vocab_size, infer_sample=False):
        self.embedding_size = embedding_size
        self.rnn_size = rnn_size
        self.vocab_size = vocab_size
        self.infer_sample = infer_sample
        self.learning_rate = learning_rate
        
        if infer_sample:
            self.batch_size = 1
            self.training_seq_len = 1
        else:
            self.batch_size = batch_size
            self.training_seq_len = training_seq_len
        
        self.lstm_cell = tf.contrib.rnn.BasicLSTMCell(self.rnn_size)
        self.initial_state = self.lstm_cell.zero_state(self.batch_size, tf.float32)
        
        self.x_data = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])
        self.y_output = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])
        
        with tf.variable_scope('lstm_vars'):
            # ソフトマックスの出力の重み
            W = tf.get_variable('W', [self.rnn_size, self.vocab_size], tf.float32, tf.random_normal_initializer())
            b = tf.get_variable('b', [self.vocab_size], tf.float32, tf.constant_initializer(0.0))
        
            # 埋め込みを定義
            embedding_mat = tf.get_variable('embedding_mat', [self.vocab_size, self.embedding_size],
                                            tf.float32, tf.random_normal_initializer())
                                            
            embedding_output = tf.nn.embedding_lookup(embedding_mat, self.x_data)
            rnn_inputs = tf.split(axis=1, num_or_size_splits=self.training_seq_len, value=embedding_output)
            rnn_inputs_trimmed = [tf.squeeze(x, [1]) for x in rnn_inputs]
            
        # 推測中(テキストの生成中)は、ループ関数を追加
        # i番目の出力から i + 1 番目のにゅう直を取得する方法を定義
        def inferred_loop(prev, count):
            # 隠れ層を適用
            prev_transformed = tf.matmul(prev, W) + b
            # 出力のインデックスを取得
            prev_symbol = tf.stop_gradient(tf.argmax(prev_transformed, 1))
            # 埋め込みベクトルを取得
            output = tf.nn.embedding_lookup(embedding_mat, prev_symbol)
            return(output)

        decoder = tf.contrib.legacy_seq2seq.rnn_decoder
        outputs, last_state = decoder(rnn_inputs_trimmed,
                                      self.initial_state,
                                      self.lstm_cell,
                                      loop_function=inferred_loop if infer_sample else None)
        # 推測されていない出力
        output = tf.reshape(tf.concat(axis=1, values=outputs), [-1, self.rnn_size])
        # ロジットと出力
        self.logit_output = tf.matmul(output, W) + b
        self.model_output = tf.nn.softmax(self.logit_output)

        loss_fun = tf.contrib.legacy_seq2seq.sequence_loss_by_example
        loss = loss_fun([self.logit_output],[tf.reshape(self.y_output, [-1])],
                [tf.ones([self.batch_size * self.training_seq_len])],
                self.vocab_size)
        self.cost = tf.reduce_sum(loss) / (self.batch_size * self.training_seq_len)
        self.final_state = last_state
        gradients, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tf.trainable_variables()), 4.5)
        optimizer = tf.train.AdamOptimizer(self.learning_rate)
        self.train_op = optimizer.apply_gradients(zip(gradients, tf.trainable_variables()))
    
    # サンプル(単語)をループ処理してテキストを生成
    def sample(self, sess, words=ix2vocab, vocab=vocab2ix, num=10, prime_text='thou art'):
        state = sess.run(self.lstm_cell.zero_state(1, tf.float32))
        word_list = prime_text.split()
        for word in word_list[:-1]:
            x = np.zeros((1, 1))
            x[0, 0] = vocab[word]
            feed_dict = {self.x_data: x, self.initial_state:state}
            [state] = sess.run([self.final_state], feed_dict=feed_dict)

        out_sentence = prime_text
        word = word_list[-1]
        for n in range(num):
            x = np.zeros((1, 1))
            x[0, 0] = vocab[word]
            feed_dict = {self.x_data: x, self.initial_state:state}
            [model_output, state] = sess.run([self.model_output, self.final_state], feed_dict=feed_dict)
            sample = np.argmax(model_output[0])
            if sample == 0:
                break
            word = words[sample]
            out_sentence = out_sentence + ' ' + word
        return(out_sentence)
        
# LSTMモデルとテストモデルを設定
#LSTMモデルを定義
lstm_model = LSTM_Model(embedding_size, rnn_size, batch_size, learning_rate,
                        training_seq_len, vocab_size)

# テストモデルを定義(このスコープをテストに再利用する)
with tf.variable_scope(tf.get_variable_scope(), reuse=True):
    test_lstm_model = LSTM_Model(embedding_size, rnn_size, batch_size, learning_rate,
                                 training_seq_len, vocab_size, infer_sample=True)
                                 
# 保存関数を定義し、バッチサイズで入力テキストを分割する
# モデルの保存関数を定義
saver = tf.train.Saver(tf.global_variables())


# エポックごとにバッチを作成
num_batches = int(len(s_text_ix)/(batch_size * training_seq_len)) + 1
# テキストインデックスを同じサイズの部分配列に分割
batches = np.array_split(s_text_ix, num_batches)
# それらの部分配列の形状を [batch_size, training_seq_len] に変更
batches = [np.resize(x, [batch_size, training_seq_len]) for x in batches]

# すべての変数を初期化
init = tf.global_variables_initializer()
sess.run(init)

トレーニングの開始

train_loss = []
iteration_count = 1
for epoch in range(epochs):
    # 単語インデックスをシャッフル
    random.shuffle(batches)
    # シャッフルしたバッチから目的値を作成
    targets = [np.roll(x, -1, axis=1) for x in batches]
    # エポックの実行を開始
    print('Starting Epoch #{} of {}.'.format(epoch+1, epochs))
    # LSTMの初期状態をエポックごとにリセット
    state = sess.run(lstm_model.initial_state)
    for ix, batch in enumerate(batches):
        training_dict = {lstm_model.x_data: batch, lstm_model.y_output: targets[ix]}
        c, h = lstm_model.initial_state
        training_dict[c] = state.c
        training_dict[h] = state.h
        
        temp_loss, state, _ = sess.run([lstm_model.cost, lstm_model.final_state, lstm_model.train_op],
                                       feed_dict=training_dict)
        train_loss.append(temp_loss)
        
        # Print status every 10 gens
        if iteration_count % 10 == 0:
            summary_nums = (iteration_count, epoch+1, ix+1, num_batches+1, temp_loss)
            print('Iteration: {}, Epoch: {}, Batch: {} out of {}, Loss: {:.2f}'.format(*summary_nums))
        
        # Save the model and the vocab
        if iteration_count % save_every == 0:
            # Save model
            model_file_name = os.path.join(full_model_dir, 'model')
            saver.save(sess, model_file_name, global_step = iteration_count)
            print('Model Saved To: {}'.format(model_file_name))
            # Save vocabulary
            dictionary_file = os.path.join(full_model_dir, 'vocab.pkl')
            with open(dictionary_file, 'wb') as dict_file_conn:
                pickle.dump([vocab2ix, ix2vocab], dict_file_conn)
        
        if iteration_count % eval_every == 0:
            for sample in prime_texts:
                print(test_lstm_model.sample(sess, ix2vocab, vocab2ix, num=10, prime_text=sample))
                
        iteration_count += 1

損失値をプロット

plt.plot(train_loss, 'k-')
plt.title('Sequence to Sequence Loss')
plt.xlabel('Iterations')
plt.ylabel('Loss')
plt.show()

plot



TechnologyMachine LearningTensorFlow Share Tweet +1