![]() |
ノート/chainerのLSTM(の書き方いろいろ)http://pepper.is.sci.toho-u.ac.jp/pepper/index.php?%A5%CE%A1%BC%A5%C8%2Fchainer%A4%CELSTM%A1%CA%A4%CE%BD%F1%A4%AD%CA%FD%A4%A4%A4%ED%A4%A4%A4%ED%A1%CB |
![]() |
ノート/ノート
訪問者 7959 最終更新 2017-06-13 (火) 11:09:13
chainerのサイトで提供されている公式tutorialの中のRecurrent Nets and their Computational Graphの節で、LSTM (Long Short Time Memory) を使った例を示している。(日本語の注は筆者による)
class RNN(Chain): def __init__(self): super(RNN, self).__init__() with self.init_scope(): self.embed = L.EmbedID(1000, 100) # word embedding self.mid = L.LSTM(100, 50) # the first LSTM layer <-- mid層がLSTM self.out = L.Linear(50, 1000) # the feed-forward output layer def reset_state(self): self.mid.reset_state() def __call__(self, cur_word): # Given the current word ID, predict the next word. x = self.embed(cur_word) # <-- 入力cur_wordをembed+feed forward NNしてxへ h = self.mid(x) # <-- xをLSTM層へ、更にその出力をhへ y = self.out(h) # <-- hをfeed forward NNしてyへ return y # <-- yをクラスRNNインスタンスの呼出しの出力へ rnn = RNN() # <-- クラスRNNのインスタンスを生成、rnnと呼ぶ model = L.Classifier(rnn) # <-- rnnを核にしてClassifierを構成し、modelと呼ぶ optimizer = optimizers.SGD() # <-- optimizerとしてSGDのインスタンスを生成 optimizer.setup(model) # <-- modelに対してoptimizerを設定 # ここから学習 loss = 0 count = 0 seqlen = len(x_list[1:]) rnn.reset_state() for cur_word, next_word in zip(x_list, x_list[1:]): <-- 学習ループ # x_listと、x_listを後ろへ1つずらしたものから1要素ずつ取り出して loss += model(cur_word, next_word) # <-- modelを呼び出して損失計算 <-- これは疑問アリ count += 1 if count % 30 == 0 or count == seqlen: # <-- 30回ごとにモデル学習更新 model.cleargrads() loss.backward() loss.unchain_backward() optimizer.update()
モデルクラスRNNの__call__は引数がシーケンス1つだけでかつ出力は予想ワードなので、 ちょっと食い違っている。2つのワードを与えてlossを返す関数を使うはずのところ。
この定義では、実際のlossの計算が何をするのか分からない。
というところで、「次のexample/ptbを見よ」に従って、ptbの例題を見てみる。
公式のexampleでは、RNN Language Modelを使ったptbの例が提供されている。
class RNNForLM(chainer.Chain): def __init__(self, n_vocab, n_units): super(RNNForLM, self).__init__() with self.init_scope(): self.embed = L.EmbedID(n_vocab, n_units) self.l1 = L.LSTM(n_units, n_units) self.l2 = L.LSTM(n_units, n_units) self.l3 = L.Linear(n_units, n_vocab) for param in self.params(): param.data[...] = np.random.uniform(-0.1, 0.1, param.data.shape) def reset_state(self): self.l1.reset_state() self.l2.reset_state() def __call__(self, x): h0 = self.embed(x) h1 = self.l1(F.dropout(h0)) h2 = self.l2(F.dropout(h1)) y = self.l3(F.dropout(h2)) return y
自然言語処理とニューラルネット(2015-12-10付) より。forwardの損失関数をマージ。コメントを一部変更。
ChainerのLSTMリンクを使わず、自前でhの中にt-1の状態を覚えている。だからモデルはLinearのみ。
forward内は、
h → u → (u,x)からsoftmax_cross_entropyでlossを計算・累積 → y → 単語の結合確率を計算・累積 →(新x, 旧h)から新hを計算して更新
の手順で、すべて自前で計算する。
model = FunctionSet( w_xh = EmbedID(VOCAB_SIZE, HIDDEN_SIZE), # 入力層(one-hot) -> 隠れ層 w_hh = Linear(HIDDEN_SIZE, HIDDEN_SIZE), # 隠れ層 -> 隠れ層 w_hy = Linear(HIDDEN_SIZE, VOCAB_SIZE), # 隠れ層 -> 出力層 )
def forward(sentence, model): # sentenceはstrの配列。MeCabなどの出力を想定。 sentence = [convert_to_your_word_id(word) for word in sentence] # 単語をIDに変換。自分で適当に実装する。 h = Variable(np.zeros((1, HIDDEN_SIZE), dtype=np.float32)) # 隠れ層の初期値 log_joint_prob = float(0) # 文の結合確率 accum_loss = Variable(np.zeros((), dtype=np.float32)) # 累積損失の初期値 for word in sentence: x = Variable(np.array([[word]], dtype=np.int32)) # 次回の入力層 u = model.w_hy(h) accum_loss += softmax_cross_entropy(u, x) # 損失の蓄積 y = softmax(u) # 次の単語の確率分布 log_joint_prob += math.log(y.data[0][word]) # 結合確率の更新 h = tanh(model.w_xh(x) + model.w_hh(h)) # 隠れ層の更新 return log_joint_prob, accum_loss # 結合確率の計算結果、累積損失も一緒に返す
def train(sentence_set, model): opt = SGD() # 確率的勾配法を使用 opt.setup(model) # 学習器の初期化 for sentence in sentence_set: opt.zero_grad(); # 勾配の初期化 log_joint_prob, accum_loss = forward(sentence, model) # 損失の計算 accum_loss.backward() # 誤差逆伝播 opt.clip_grads(10) # 大きすぎる勾配を抑制 opt.update() # パラメータの更新
class LSTM(chainer.Chain): def __init__(self, in_size, n_units, train=True): super(LSTM, self).__init__( embed=L.EmbedID(in_size, n_units), l1=L.LSTM(n_units, n_units), # <-- LSTMは1層でn_units→n_units l2=L.Linear(n_units, in_size), ) def __call__(self, x): # モデル呼び出し時にはyを返す。 h0 = self.embed(x) h1 = self.l1(h0) y = self.l2(h1) return y def reset_state(self): self.l1.reset_state()
lstm = LSTM(p , n_units) # モデルインスタンスの生成 model = L.Classifier(lstm) # Classifierでラップしておく model.compute_accuracy = False # accuracyは使わない for param in model.params(): data = param.data data[:] = np.random.uniform(-0.2, 0.2, data.shape) # modelの初期化 # optimizerの設定 optimizer = optimizers.Adam() optimizer.setup(model) # 訓練を行うループ display = 1000 # 何回ごとに表示するか total_loss = 0 # 誤差関数の値を入れる変数 for seq in range(100000): sequence = train_data[randint(2)] # ランダムにどちらかの文字列を選ぶ lstm.reset_state() # 前の系列の影響がなくなるようにリセット for i in six.moves.range(p): x = chainer.Variable(xp.asarray([sequence[i]])) # i文字目を入力に t = chainer.Variable(xp.asarray([sequence[i+1]])) # i+1文字目を正解に loss = model(x, t) # lossの計算 # 出力する時はlossを記憶 if seq%display==0: total_loss += loss.data # 最適化の実行 model.zerograds() loss.backward() optimizer.update()
# lossの表示 if seq%display==0: print("sequence:{}, loss:{}".format(seq, total_loss)) total_loss = 0 # 10回に1回系列ごとの予測結果と最後の文字の確率分布を表示 if seq%(display*10)==0: for select in six.moves.range(2): sequence = train_data[select] lstm.reset_state() print("prediction: {},".format(sequence[0]), end="") for i in six.moves.range(p): x = chainer.Variable(xp.asarray([sequence[i]])) data = lstm(x).data print("{},".format(np.argmax(data)), end="") print() print("probability: {}".format(data))
LSTMにsin波を覚えてもらう(chainer trainerの速習) (2016-09-08)
class MLP(Chain): n_input = 1 n_output = 1 n_units = 5 def __init__(self): super(MLP, self).__init__( l1 = L.Linear(self.n_input, self.n_units), l2 = L.LSTM(self.n_units, self.n_units), l3 = L.Linear(self.n_units, self.n_output), ) def reset_state(self): self.l2.reset_state() def __call__(self, x): h1 = self.l1(x) h2 = self.l2(h1) return self.l3(h2)
ロス関数の定義
class LossFuncL(Chain): def __init__(self, predictor): super(LossFuncL, self).__init__(predictor=predictor) def __call__(self, x, t): x.data = x.data.reshape((-1, 1)).astype(np.float32) t.data = t.data.reshape((-1, 1)).astype(np.float32) y = self.predictor(x) loss = F.mean_squared_error(y, t) report({'loss':loss}, self) return loss
モデルのインスタンス作成
model = LossFuncL(MLP()) optimizer = optimizers.Adam() optimizer.setup(model)
trainerクラス周り
class LSTM_test_Iterator(chainer.dataset.Iterator): def __init__(self, dataset, batch_size = 10, seq_len = 5, repeat = True): self.seq_length = seq_len self.dataset = dataset self.nsamples = len(dataset) self.batch_size = batch_size self.repeat = repeat self.epoch = 0 self.iteration = 0 self.offsets = np.random.randint(0, len(dataset),size=batch_size) self.is_new_epoch = False def __next__(self): if not self.repeat and self.iteration * self.batch_size >= self.nsamples: raise StopIteration x, t = self.get_data() self.iteration += 1 epoch = self.iteration // self.batch_size self.is_new_epoch = self.epoch < epoch if self.is_new_epoch: self.epoch = epoch self.offsets = np.random.randint(0, self.nsamples,size=self.batch_size) return list(zip(x, t)) @property def epoch_detail(self): return self.iteration * self.batch_size / len(self.dataset) def get_data(self): tmp0 = [self.dataset[(offset + self.iteration)%self.nsamples][0] for offset in self.offsets] tmp1 = [self.dataset[(offset + self.iteration + 1)%self.nsamples][0] for offset in self.offsets] return tmp0,tmp1 def serialzie(self, serialzier): self.iteration = serializer('iteration', self.iteration) self.epoch = serializer('epoch', self.epoch) class LSTM_updater(training.StandardUpdater): def __init__(self, train_iter, optimizer, device): super(LSTM_updater, self).__init__(train_iter, optimizer, device=device) self.seq_length = train_iter.seq_length def update_core(self): loss = 0 train_iter = self.get_iterator('main') optimizer = self.get_optimizer('main') for i in range(self.seq_length): batch = np.array(train_iter.__next__()).astype(np.float32) x, t = batch[:,0].reshape((-1,1)), batch[:,1].reshape((-1,1)) loss += optimizer.target(chainer.Variable(x), chainer.Variable(t)) optimizer.target.zerograds() loss.backward() loss.unchain_backward() optimizer.update()
LSTMによる正弦波の予測 〜 Chainerによる実装 〜 (2016-07-24)
モデルクラス
class LSTM(chainer.Chain): def __init__(self, in_units=1, hidden_units=2, out_units=1, train=True): super(LSTM, self).__init__( l1=L.Linear(in_units, hidden_units), l2=L.LSTM(hidden_units, hidden_units), l3=L.Linear(hidden_units, out_units), ) self.train = True def __call__(self, x, t): h = self.l1(x) h = self.l2(h) y = self.l3(h) self.loss = F.mean_squared_error(y, t) if self.train: return self.loss else: self.prediction = y return self.prediction def reset_state(self): self.l2.reset_state()
ロス計算関数
def compute_loss(model, sequences): loss = 0 rows, cols = sequences.shape length_of_sequence = cols for i in range(cols - 1): x = chainer.Variable( xp.asarray( [sequences[j, i + 0] for j in range(rows)], dtype=np.float32 )[:, np.newaxis] ) t = chainer.Variable( xp.asarray( [sequences[j, i + 1] for j in range(rows)], dtype=np.float32 )[:, np.newaxis] ) loss += model(x, t) return loss
学習データの生成
random.seed(0) class DataMaker(object): def __init__(self, steps_per_cycle, number_of_cycles): self.steps_per_cycle = steps_per_cycle self.number_of_cycles = number_of_cycles def make(self): return np.array([math.sin(i * 2 * math.pi/self.steps_per_cycle) for i in range(self.steps_per_cycle)] * self.number_of_cycles) def make_mini_batch(self, data, mini_batch_size, length_of_sequence): sequences = np.ndarray((mini_batch_size, length_of_sequence), dtype=np.float32) for i in range(mini_batch_size): index = random.randint(0, len(data) - length_of_sequence) sequences[i] = data[index:index+length_of_sequence] return sequences
学習プロセス
if __name__ == "__main__": # make training data data_maker = DataMaker(steps_per_cycle=STEPS_PER_CYCLE, number_of_cycles=NUMBER_OF_CYCLES) train_data = data_maker.make() # setup model model = LSTM(IN_UNITS, HIDDEN_UNITS, OUT_UNITS) for param in model.params(): data = param.data data[:] = np.random.uniform(-0.1, 0.1, data.shape) # setup optimizer optimizer = optimizers.Adam() optimizer.setup(model) start = time.time() cur_start = start for epoch in range(TRAINING_EPOCHS): sequences = data_maker.make_mini_batch(train_data, mini_batch_size=MINI_BATCH_SIZE, length_of_sequence=LENGTH_OF_SEQUENCE) model.reset_state() model.zerograds() loss = compute_loss(model, sequences) loss.backward() optimizer.update() # save model cPickle.dump(model, open("./model.pkl", "wb"))
ニコニコ動画のコメント次文字予測をChainer LSTMで実装した (2016-10-12)
ChainerのNStepLSTMでニコニコ動画のコメント予測 (2016-10-24)
ニコニコ動画のデータ入手、表記ゆれの正規化処理、などの前処理部分は省略。元記事を参照のこと。
モデル定義など
class RNNForLM(chainer.Chain): def __init__(self, n_vocab, n_units, train=True): super(RNNForLM, self).__init__( embed=L.EmbedID(n_vocab, n_units), l1=L.LSTM(n_units, n_units), l2=L.LSTM(n_units, n_units), l3=L.Linear(n_units, n_vocab), ) for param in self.params(): param.data[...] = np.random.uniform(-0.1, 0.1, param.data.shape) self.train = train def reset_state(self): self.l1.reset_state() self.l2.reset_state() def __call__(self, x): h0 = self.embed(x) h1 = self.l1(F.dropout(h0, train=self.train)) h2 = self.l2(F.dropout(h1, train=self.train)) y = self.l3(F.dropout(h2, train=self.train)) return y
trainerまわり
class ParallelSequentialIterator(chainer.dataset.Iterator): def __init__(self, dataset, batch_size, repeat=True): self.dataset = dataset self.batch_size = batch_size self.epoch = 0 self.is_new_epoch = False self.repeat = repeat length = len(dataset) self.offsets = [i * length // batch_size for i in range(batch_size)] self.iteration = 0 def __next__(self): length = len(self.dataset) if not self.repeat and self.iteration * self.batch_size >= length: raise StopIteration cur_words = self.get_words() self.iteration += 1 next_words = self.get_words() epoch = self.iteration * self.batch_size // length self.is_new_epoch = self.epoch < epoch if self.is_new_epoch: self.epoch = epoch return list(zip(cur_words, next_words)) @property def epoch_detail(self): return self.iteration * self.batch_size / len(self.dataset) def get_words(self): return [self.dataset[(offset + self.iteration) % len(self.dataset)] for offset in self.offsets] def serialize(self, serializer): self.iteration = serializer('iteration', self.iteration) self.epoch = serializer('epoch', self.epoch) class BPTTUpdater(training.StandardUpdater): def __init__(self, train_iter, optimizer, bprop_len, device): super(BPTTUpdater, self).__init__( train_iter, optimizer, device=device) self.bprop_len = bprop_len def update_core(self): loss = 0 train_iter = self.get_iterator('main') optimizer = self.get_optimizer('main') # Progress the dataset iterator for bprop_len words at each iteration. for i in range(self.bprop_len): batch = train_iter.__next__() x = np.asarray([example[0] for example in batch], dtype=np.int32) t = np.asarray([example[1] for example in batch], dtype=np.int32) if self.device >= 0: x = chainer.cuda.to_gpu(x, device=self.device) t = chainer.cuda.to_gpu(t, device=self.device) loss += optimizer.target(chainer.Variable(x), chainer.Variable(t)) optimizer.target.cleargrads() # Clear the parameter gradients loss.backward() # Backprop loss.unchain_backward() # Truncate the graph optimizer.update() # Update the parameters
# Routine to rewrite the result dictionary of LogReport to add perplexity # values def compute_perplexity(result): result['perplexity'] = np.exp(result['main/loss']) if 'validation/main/loss' in result: result['val_perplexity'] = np.exp(result['validation/main/loss'])
メイン部分
def main(): 学習データ生成部分は省略 train_iter = ParallelSequentialIterator(train, args.batchsize) # Prepare an RNNLM model rnn = RNNForLM(n_vocab, args.unit) model = L.Classifier(rnn) model.compute_accuracy = True # we only want the perplexity # Set up an optimizer optimizer = chainer.optimizers.SGD(lr=1.0) optimizer.setup(model) optimizer.add_hook(chainer.optimizer.GradientClipping(args.gradclip)) # Set up a trainer updater = BPTTUpdater(train_iter, optimizer, args.bproplen, args.gpu) trainer = training.Trainer(updater, (args.epoch, 'epoch'), out=args.out) interval = 10 if args.test else 500 trainer.extend(extensions.LogReport(postprocess=compute_perplexity, trigger=(interval, 'iteration'))) trainer.extend(extensions.PrintReport( ['epoch', 'iteration', 'perplexity', 'main/accuracy']), trigger=(interval, 'iteration')) trainer.extend(extensions.ProgressBar( update_interval=1 if args.test else 10)) trainer.extend(extensions.snapshot()) trainer.extend(extensions.snapshot_object( model, 'model_iter_{.updater.iteration}')) if args.resume: chainer.serializers.load_npz(args.resume, model) @training.make_extension(trigger=(500, 'iteration')) def save_model(trainer): chainer.serializers.save_npz('{}/{}'.format( args.out, 'lstm_model.npz'), model) trainer.extend(save_model) trainer.run() if __name__ == '__main__': main()
ChainerでLSTM言語モデルとミニバッチ学習の実装 (2016-04-10)
自前で(L.Linearを使って)LSTMクラスを作っている。