u++の備忘録

言語処理100本ノック 2020「85. 双方向RNN・多層化」

問題文

nlp100.github.io

問題の概要

RNN を双方向しました。具体的には bidirectional=True にし、続く層の hidden_size を 2 倍にしています。なお実装時には『現場で使える!PyTorch開発入門 深層学習モデルの作成とアプリケーションへの実装』(翔泳社)のサンプルコードを一部流用しました。

# ref: https://www.shoeisha.co.jp/book/detail/9784798157184
import re
from collections import defaultdict

import joblib
import pandas as pd
import torch
from gensim.models import KeyedVectors
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def cleanText(text):
    remove_marks_regex = re.compile("[,\.\(\)\[\]\*:;]|<.*?>")
    shift_marks_regex = re.compile("([?!])")
    # !?以外の記号の削除
    text = remove_marks_regex.sub("", text)
    # !?と単語の間にスペースを挿入
    text = shift_marks_regex.sub(r" \1 ", text)
    return text


def list2tensor(token_idxes, max_len=20, padding=True):
    if len(token_idxes) > max_len:
        token_idxes = token_idxes[:max_len]
    n_tokens = len(token_idxes)
    if padding:
        token_idxes = token_idxes + [0] * (max_len - len(token_idxes))
    return torch.tensor(token_idxes, dtype=torch.int64), n_tokens


class RNN(nn.Module):
    def __init__(self, num_embeddings,
                 embedding_dim=300,
                 hidden_size=300,
                 output_size=1,
                 num_layers=1,
                 dropout=0.2):
        super().__init__()
        # self.emb = nn.Embedding(num_embeddings, embedding_dim,
        #                         padding_idx=0)
        model = KeyedVectors.load_word2vec_format('ch07/GoogleNews-vectors-negative300.bin', binary=True)
        weights = torch.FloatTensor(model.vectors)
        self.emb = nn.Embedding.from_pretrained(weights)
        self.lstm = nn.LSTM(embedding_dim,
                            hidden_size, num_layers,
                            batch_first=True, dropout=dropout, bidirectional=True)
        self.linear = nn.Linear(hidden_size * 2, output_size)

    def forward(self, x, h0=None, n_tokens=None):
        # IDをEmbeddingで多次元のベクトルに変換する
        # xは(batch_size, step_size)
        # -> (batch_size, step_size, embedding_dim)
        x = self.emb(x)
        # 初期状態h0と共にRNNにxを渡す
        # xは(batch_size, step_size, embedding_dim)
        # -> (batch_size, step_size, hidden_dim)
        x, h = self.lstm(x, h0)
        # 最後のステップのみ取り出す
        # xは(batch_size, step_size, hidden_dim)
        # -> (batch_size, 1)
        if n_tokens is not None:
            # 入力のもともとの長さがある場合はそれを使用する
            x = x[list(range(len(x))), n_tokens - 1, :]
        else:
            # なければ単純に最後を使用する
            x = x[:, -1, :]
        # 取り出した最後のステップを線形層に入れる
        x = self.linear(x)
        # 余分な次元を削除する
        # (batch_size, 1) -> (batch_size, )
        # x = x.squeeze()
        return x


class TITLEDataset(Dataset):
    def __init__(self, section='train'):
        X_train = pd.read_table(f'ch06/{section}.txt', header=None)
        use_cols = ['TITLE', 'CATEGORY']
        X_train.columns = use_cols

        d = defaultdict(int)
        for text in X_train['TITLE']:
            text = cleanText(text)
            for word in text.split():
                d[word] += 1
        dc = sorted(d.items(), key=lambda x: x[1], reverse=True)

        words = []
        idx = []
        for i, a in enumerate(dc, 1):
            words.append(a[0])
            if a[1] < 2:
                idx.append(0)
            else:
                idx.append(i)

        self.word2token = dict(zip(words, idx))
        self.data = (X_train['TITLE'].apply(lambda x: list2tensor(
            [self.word2token[word] if word in self.word2token.keys() else 0 for word in cleanText(x).split()])))

        y_train = pd.read_table(f'ch06/{section}.txt', header=None)[1].values
        self.labels = y_train

    @property
    def vocab_size(self):
        return len(self.word2token)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        data, n_tokens = self.data[idx]
        label = self.labels[idx]
        return data, label, n_tokens


def eval_net(net, data_loader, device='cpu'):
    net.eval()
    ys = []
    ypreds = []
    for x, y, nt in data_loader:
        x = x.to(device)
        y = y.to(device)
        nt = nt.to(device)
        with torch.no_grad():
            y_pred = net(x, n_tokens=nt)
            # print(f'test loss: {loss_fn(y_pred, y.long()).item()}')
            _, y_pred = torch.max(y_pred, 1)
            ys.append(y)
            ypreds.append(y_pred)
    ys = torch.cat(ys)
    ypreds = torch.cat(ypreds)
    print(f'test acc: {(ys == ypreds).sum().item() / len(ys)}')
    return


if __name__ == "__main__":
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    batch_size = 640
    train_data = TITLEDataset(section='train')
    train_loader = DataLoader(train_data, batch_size=batch_size,
                            shuffle=True, num_workers=4)
    test_data = TITLEDataset(section='test')
    test_loader = DataLoader(test_data, batch_size=batch_size,
                            shuffle=False, num_workers=4)

    net = RNN(train_data.vocab_size + 1, num_layers=2, output_size=4)
    net = net.to(device)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.01)

    for epoch in tqdm(range(10)):
        losses = []
        net.train()
        for x, y, nt in train_loader:
            x = x.to(device)
            y = y.to(device)
            nt = nt.to(device)
            y_pred = net(x, n_tokens=nt)
            loss = loss_fn(y_pred, y.long())
            net.zero_grad()
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
            _, y_pred_train = torch.max(y_pred, 1)
            # print(f'train loss: {loss.item()}')
            # print(f'train acc: {(y_pred_train == y).sum().item() / len(y)}')
        eval_net(net, test_loader, device)

言語処理100本ノック 2020「84. 単語ベクトルの導入」

問題文

nlp100.github.io

問題の概要

Google Newsデータセットの学習済み単語ベクトルで単語埋め込みを初期化して学習します。なお実装時には『現場で使える!PyTorch開発入門 深層学習モデルの作成とアプリケーションへの実装』(翔泳社)のサンプルコードを一部流用しました。

# ref: https://www.shoeisha.co.jp/book/detail/9784798157184
import re
from collections import defaultdict

import joblib
import pandas as pd
import torch
from gensim.models import KeyedVectors
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def cleanText(text):
    remove_marks_regex = re.compile("[,\.\(\)\[\]\*:;]|<.*?>")
    shift_marks_regex = re.compile("([?!])")
    # !?以外の記号の削除
    text = remove_marks_regex.sub("", text)
    # !?と単語の間にスペースを挿入
    text = shift_marks_regex.sub(r" \1 ", text)
    return text


def list2tensor(token_idxes, max_len=20, padding=True):
    if len(token_idxes) > max_len:
        token_idxes = token_idxes[:max_len]
    n_tokens = len(token_idxes)
    if padding:
        token_idxes = token_idxes + [0] * (max_len - len(token_idxes))
    return torch.tensor(token_idxes, dtype=torch.int64), n_tokens


class RNN(nn.Module):
    def __init__(self, num_embeddings,
                 embedding_dim=300,
                 hidden_size=300,
                 output_size=1,
                 num_layers=1,
                 dropout=0.2):
        super().__init__()
        # self.emb = nn.Embedding(num_embeddings, embedding_dim,
        #                         padding_idx=0)
        model = KeyedVectors.load_word2vec_format('ch07/GoogleNews-vectors-negative300.bin', binary=True)
        weights = torch.FloatTensor(model.vectors)
        self.emb = nn.Embedding.from_pretrained(weights)
        self.lstm = nn.LSTM(embedding_dim,
                            hidden_size, num_layers,
                            batch_first=True, dropout=dropout)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, x, h0=None, n_tokens=None):
        # IDをEmbeddingで多次元のベクトルに変換する
        # xは(batch_size, step_size)
        # -> (batch_size, step_size, embedding_dim)
        x = self.emb(x)
        # 初期状態h0と共にRNNにxを渡す
        # xは(batch_size, step_size, embedding_dim)
        # -> (batch_size, step_size, hidden_dim)
        x, h = self.lstm(x, h0)
        # 最後のステップのみ取り出す
        # xは(batch_size, step_size, hidden_dim)
        # -> (batch_size, 1)
        if n_tokens is not None:
            # 入力のもともとの長さがある場合はそれを使用する
            x = x[list(range(len(x))), n_tokens - 1, :]
        else:
            # なければ単純に最後を使用する
            x = x[:, -1, :]
        # 取り出した最後のステップを線形層に入れる
        x = self.linear(x)
        # 余分な次元を削除する
        # (batch_size, 1) -> (batch_size, )
        # x = x.squeeze()
        return x


class TITLEDataset(Dataset):
    def __init__(self, section='train'):
        X_train = pd.read_table(f'ch06/{section}.txt', header=None)
        use_cols = ['TITLE', 'CATEGORY']
        X_train.columns = use_cols

        d = defaultdict(int)
        for text in X_train['TITLE']:
            text = cleanText(text)
            for word in text.split():
                d[word] += 1
        dc = sorted(d.items(), key=lambda x: x[1], reverse=True)

        words = []
        idx = []
        for i, a in enumerate(dc, 1):
            words.append(a[0])
            if a[1] < 2:
                idx.append(0)
            else:
                idx.append(i)

        self.word2token = dict(zip(words, idx))
        self.data = (X_train['TITLE'].apply(lambda x: list2tensor(
            [self.word2token[word] if word in self.word2token.keys() else 0 for word in cleanText(x).split()])))

        y_train = pd.read_table(f'ch06/{section}.txt', header=None)[1].values
        self.labels = y_train

    @property
    def vocab_size(self):
        return len(self.word2token)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        data, n_tokens = self.data[idx]
        label = self.labels[idx]
        return data, label, n_tokens


def eval_net(net, data_loader, device='cpu'):
    net.eval()
    ys = []
    ypreds = []
    for x, y, nt in data_loader:
        x = x.to(device)
        y = y.to(device)
        nt = nt.to(device)
        with torch.no_grad():
            y_pred = net(x, n_tokens=nt)
            # print(f'test loss: {loss_fn(y_pred, y.long()).item()}')
            _, y_pred = torch.max(y_pred, 1)
            ys.append(y)
            ypreds.append(y_pred)
    ys = torch.cat(ys)
    ypreds = torch.cat(ypreds)
    print(f'test acc: {(ys == ypreds).sum().item() / len(ys)}')
    return


if __name__ == "__main__":
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    batch_size = 640
    train_data = TITLEDataset(section='train')
    train_loader = DataLoader(train_data, batch_size=batch_size,
                            shuffle=True, num_workers=4)
    test_data = TITLEDataset(section='test')
    test_loader = DataLoader(test_data, batch_size=batch_size,
                            shuffle=False, num_workers=4)

    net = RNN(train_data.vocab_size + 1, num_layers=2, output_size=4)
    net = net.to(device)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.01)

    for epoch in tqdm(range(10)):
        losses = []
        net.train()
        for x, y, nt in train_loader:
            x = x.to(device)
            y = y.to(device)
            nt = nt.to(device)
            y_pred = net(x, n_tokens=nt)
            loss = loss_fn(y_pred, y.long())
            net.zero_grad()
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
            _, y_pred_train = torch.max(y_pred, 1)
            # print(f'train loss: {loss.item()}')
            # print(f'train acc: {(y_pred_train == y).sum().item() / len(y)}')
        eval_net(net, test_loader, device)

言語処理100本ノック 2020「83. ミニバッチ化・GPU上での学習」

問題文

nlp100.github.io

問題の概要

ミニバッチでの処理を追加します。なお実装時には『現場で使える!PyTorch開発入門 深層学習モデルの作成とアプリケーションへの実装』(翔泳社)のサンプルコードを一部流用しました。

# ref: https://www.shoeisha.co.jp/book/detail/9784798157184
import re
from collections import defaultdict

import joblib
import pandas as pd
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def cleanText(text):
    remove_marks_regex = re.compile("[,\.\(\)\[\]\*:;]|<.*?>")
    shift_marks_regex = re.compile("([?!])")
    # !?以外の記号の削除
    text = remove_marks_regex.sub("", text)
    # !?と単語の間にスペースを挿入
    text = shift_marks_regex.sub(r" \1 ", text)
    return text


def list2tensor(token_idxes, max_len=20, padding=True):
    if len(token_idxes) > max_len:
        token_idxes = token_idxes[:max_len]
    n_tokens = len(token_idxes)
    if padding:
        token_idxes = token_idxes + [0] * (max_len - len(token_idxes))
    return torch.tensor(token_idxes, dtype=torch.int64), n_tokens


class RNN(nn.Module):
    def __init__(self, num_embeddings,
                 embedding_dim=50,
                 hidden_size=50,
                 output_size=1,
                 num_layers=1,
                 dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim,
                                padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim,
                            hidden_size, num_layers,
                            batch_first=True, dropout=dropout)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, x, h0=None, n_tokens=None):
        # IDをEmbeddingで多次元のベクトルに変換する
        # xは(batch_size, step_size)
        # -> (batch_size, step_size, embedding_dim)
        x = self.emb(x)
        # 初期状態h0と共にRNNにxを渡す
        # xは(batch_size, step_size, embedding_dim)
        # -> (batch_size, step_size, hidden_dim)
        x, h = self.lstm(x, h0)
        # 最後のステップのみ取り出す
        # xは(batch_size, step_size, hidden_dim)
        # -> (batch_size, 1)
        if n_tokens is not None:
            # 入力のもともとの長さがある場合はそれを使用する
            x = x[list(range(len(x))), n_tokens - 1, :]
        else:
            # なければ単純に最後を使用する
            x = x[:, -1, :]
        # 取り出した最後のステップを線形層に入れる
        x = self.linear(x)
        # 余分な次元を削除する
        # (batch_size, 1) -> (batch_size, )
        # x = x.squeeze()
        return x


class TITLEDataset(Dataset):
    def __init__(self, section='train'):
        X_train = pd.read_table(f'ch06/{section}.txt', header=None)
        use_cols = ['TITLE', 'CATEGORY']
        X_train.columns = use_cols

        d = defaultdict(int)
        for text in X_train['TITLE']:
            text = cleanText(text)
            for word in text.split():
                d[word] += 1
        dc = sorted(d.items(), key=lambda x: x[1], reverse=True)

        words = []
        idx = []
        for i, a in enumerate(dc, 1):
            words.append(a[0])
            if a[1] < 2:
                idx.append(0)
            else:
                idx.append(i)

        self.word2token = dict(zip(words, idx))
        self.data = (X_train['TITLE'].apply(lambda x: list2tensor(
            [self.word2token[word] if word in self.word2token.keys() else 0 for word in cleanText(x).split()])))

        y_train = pd.read_table(f'ch06/{section}.txt', header=None)[1].values
        self.labels = y_train

    @property
    def vocab_size(self):
        return len(self.word2token)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        data, n_tokens = self.data[idx]
        label = self.labels[idx]
        return data, label, n_tokens


def eval_net(net, data_loader, device='cpu'):
    net.eval()
    ys = []
    ypreds = []
    for x, y, nt in data_loader:
        x = x.to(device)
        y = y.to(device)
        nt = nt.to(device)
        with torch.no_grad():
            y_pred = net(x, n_tokens=nt)
            # print(f'test loss: {loss_fn(y_pred, y.long()).item()}')
            _, y_pred = torch.max(y_pred, 1)
            ys.append(y)
            ypreds.append(y_pred)
    ys = torch.cat(ys)
    ypreds = torch.cat(ypreds)
    print(f'test acc: {(ys == ypreds).sum().item() / len(ys)}')
    return


if __name__ == "__main__":
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    batch_size = 640
    train_data = TITLEDataset(section='train')
    train_loader = DataLoader(train_data, batch_size=batch_size,
                            shuffle=True, num_workers=4)
    test_data = TITLEDataset(section='test')
    test_loader = DataLoader(test_data, batch_size=batch_size,
                            shuffle=False, num_workers=4)

    net = RNN(train_data.vocab_size + 1, num_layers=2, output_size=4)
    net = net.to(device)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.01)

    for epoch in tqdm(range(10)):
        losses = []
        net.train()
        for x, y, nt in train_loader:
            x = x.to(device)
            y = y.to(device)
            nt = nt.to(device)
            y_pred = net(x, n_tokens=nt)
            loss = loss_fn(y_pred, y.long())
            net.zero_grad()
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
            _, y_pred_train = torch.max(y_pred, 1)
            # print(f'train loss: {loss.item()}')
            # print(f'train acc: {(y_pred_train == y).sum().item() / len(y)}')
        eval_net(net, test_loader, device)

言語処理100本ノック 2020「82. 確率的勾配降下法による学習」

問題文

nlp100.github.io

問題の概要

確率的勾配降下法による学習の処理を追加します。なお実装時には『現場で使える!PyTorch開発入門 深層学習モデルの作成とアプリケーションへの実装』(翔泳社)のサンプルコードを一部流用しました。

# ref: https://www.shoeisha.co.jp/book/detail/9784798157184
import re
from collections import defaultdict

import joblib
import pandas as pd
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def cleanText(text):
    remove_marks_regex = re.compile("[,\.\(\)\[\]\*:;]|<.*?>")
    shift_marks_regex = re.compile("([?!])")
    # !?以外の記号の削除
    text = remove_marks_regex.sub("", text)
    # !?と単語の間にスペースを挿入
    text = shift_marks_regex.sub(r" \1 ", text)
    return text


def list2tensor(token_idxes, max_len=20, padding=True):
    if len(token_idxes) > max_len:
        token_idxes = token_idxes[:max_len]
    n_tokens = len(token_idxes)
    if padding:
        token_idxes = token_idxes + [0] * (max_len - len(token_idxes))
    return torch.tensor(token_idxes, dtype=torch.int64), n_tokens


class RNN(nn.Module):
    def __init__(self, num_embeddings,
                 embedding_dim=50,
                 hidden_size=50,
                 output_size=1,
                 num_layers=1,
                 dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim,
                                padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim,
                            hidden_size, num_layers,
                            batch_first=True, dropout=dropout)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, x, h0=None, n_tokens=None):
        # IDをEmbeddingで多次元のベクトルに変換する
        # xは(batch_size, step_size)
        # -> (batch_size, step_size, embedding_dim)
        x = self.emb(x)
        # 初期状態h0と共にRNNにxを渡す
        # xは(batch_size, step_size, embedding_dim)
        # -> (batch_size, step_size, hidden_dim)
        x, h = self.lstm(x, h0)
        # 最後のステップのみ取り出す
        # xは(batch_size, step_size, hidden_dim)
        # -> (batch_size, 1)
        if n_tokens is not None:
            # 入力のもともとの長さがある場合はそれを使用する
            x = x[list(range(len(x))), n_tokens - 1, :]
        else:
            # なければ単純に最後を使用する
            x = x[:, -1, :]
        # 取り出した最後のステップを線形層に入れる
        x = self.linear(x)
        # 余分な次元を削除する
        # (batch_size, 1) -> (batch_size, )
        # x = x.squeeze()
        return x


class TITLEDataset(Dataset):
    def __init__(self, section='train'):
        X_train = pd.read_table(f'ch06/{section}.txt', header=None)
        use_cols = ['TITLE', 'CATEGORY']
        X_train.columns = use_cols

        d = defaultdict(int)
        for text in X_train['TITLE']:
            text = cleanText(text)
            for word in text.split():
                d[word] += 1
        dc = sorted(d.items(), key=lambda x: x[1], reverse=True)

        words = []
        idx = []
        for i, a in enumerate(dc, 1):
            words.append(a[0])
            if a[1] < 2:
                idx.append(0)
            else:
                idx.append(i)

        self.word2token = dict(zip(words, idx))
        self.data = (X_train['TITLE'].apply(lambda x: list2tensor(
            [self.word2token[word] if word in self.word2token.keys() else 0 for word in cleanText(x).split()])))

        y_train = pd.read_table(f'ch06/{section}.txt', header=None)[1].values
        self.labels = y_train

    @property
    def vocab_size(self):
        return len(self.word2token)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        data, n_tokens = self.data[idx]
        label = self.labels[idx]
        return data, label, n_tokens


def eval_net(net, data_loader, device='cpu'):
    net.eval()
    ys = []
    ypreds = []
    for x, y, nt in data_loader:
        with torch.no_grad():
            y_pred = net(x, n_tokens=nt)
            print(f'test loss: {loss_fn(y_pred, y.long()).item()}')
            _, y_pred = torch.max(y_pred, 1)
            ys.append(y)
            ypreds.append(y_pred)
    ys = torch.cat(ys)
    ypreds = torch.cat(ypreds)
    print(f'test acc: {(ys == ypreds).sum().item() / len(ys)}')
    return


if __name__ == "__main__":
    train_data = TITLEDataset(section='train')
    train_loader = DataLoader(train_data, batch_size=len(train_data),
                            shuffle=True, num_workers=4)
    test_data = TITLEDataset(section='test')
    test_loader = DataLoader(test_data, batch_size=len(test_data),
                            shuffle=False, num_workers=4)

    net = RNN(train_data.vocab_size + 1, num_layers=2, output_size=4)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.01)

    for epoch in tqdm(range(10)):
        losses = []
        net.train()
        for x, y, nt in train_loader:
            y_pred = net(x, n_tokens=nt)
            loss = loss_fn(y_pred, y.long())
            net.zero_grad()
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
            _, y_pred_train = torch.max(y_pred, 1)
            print(f'train loss: {loss.item()}')
            print(f'train acc: {(y_pred_train == y).sum().item() / len(y)}')
        eval_net(net, test_loader)

言語処理100本ノック 2020「81. RNNによる予測」

問題文

nlp100.github.io

問題の概要

RNN を実装します。なお実装時には『現場で使える!PyTorch開発入門 深層学習モデルの作成とアプリケーションへの実装』(翔泳社)のサンプルコードを一部流用しました。

import re
from collections import defaultdict

import joblib
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset


def cleanText(text):
    remove_marks_regex = re.compile("[,\.\(\)\[\]\*:;]|<.*?>")
    shift_marks_regex = re.compile("([?!])")
    # !?以外の記号の削除
    text = remove_marks_regex.sub("", text)
    # !?と単語の間にスペースを挿入
    text = shift_marks_regex.sub(r" \1 ", text)
    return text


def list2tensor(token_idxes, max_len=20, padding=True):
    if len(token_idxes) > max_len:
        token_idxes = token_idxes[:max_len]
    n_tokens = len(token_idxes)
    if padding:
        token_idxes = token_idxes + [0] * (max_len - len(token_idxes))
    return torch.tensor(token_idxes, dtype=torch.int64), n_tokens


class RNN(nn.Module):
    def __init__(self, num_embeddings,
                 embedding_dim=50,
                 hidden_size=50,
                 output_size=1,
                 num_layers=1,
                 dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim,
                                padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim,
                            hidden_size, num_layers,
                            batch_first=True, dropout=dropout)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, x, h0=None, n_tokens=None):
        # IDをEmbeddingで多次元のベクトルに変換する
        # xは(batch_size, step_size)
        # -> (batch_size, step_size, embedding_dim)
        x = self.emb(x)
        # 初期状態h0と共にRNNにxを渡す
        # xは(batch_size, step_size, embedding_dim)
        # -> (batch_size, step_size, hidden_dim)
        x, h = self.lstm(x, h0)
        # 最後のステップのみ取り出す
        # xは(batch_size, step_size, hidden_dim)
        # -> (batch_size, 1)
        if n_tokens is not None:
            # 入力のもともとの長さがある場合はそれを使用する
            x = x[list(range(len(x))), n_tokens - 1, :]
        else:
            # なければ単純に最後を使用する
            x = x[:, -1, :]
        # 取り出した最後のステップを線形層に入れる
        x = self.linear(x)
        # 余分な次元を削除する
        # (batch_size, 1) -> (batch_size, )
        # x = x.squeeze()
        return x


class TITLEDataset(Dataset):
    def __init__(self):
        X_train = pd.read_table('ch06/train.txt', header=None)
        use_cols = ['TITLE', 'CATEGORY']
        X_train.columns = use_cols

        d = defaultdict(int)
        for text in X_train['TITLE']:
            text = cleanText(text)
            for word in text.split():
                d[word] += 1
        dc = sorted(d.items(), key=lambda x: x[1], reverse=True)

        words = []
        idx = []
        for i, a in enumerate(dc, 1):
            words.append(a[0])
            if a[1] < 2:
                idx.append(0)
            else:
                idx.append(i)

        self.word2token = dict(zip(words, idx))
        self.data = (X_train['TITLE'].apply(lambda x: list2tensor([self.word2token[word] for word in cleanText(x).split()])))

        y_train = pd.read_table('ch06/train.txt', header=None)[1].values
        self.labels = y_train

    @property
    def vocab_size(self):
        return len(self.word2token)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        data, n_tokens = self.data[idx]
        label = self.labels[idx]
        return data, label, n_tokens


if __name__ == "__main__":
    train_data = TITLEDataset()
    train_loader = DataLoader(train_data, batch_size=len(train_data),
                            shuffle=True, num_workers=4)

    net = RNN(train_data.vocab_size + 1, num_layers=2, output_size=4)

    for epoch in range(1):
        net.train()
        for x, y, nt in train_loader:
            y_pred = net(x, n_tokens=nt)
            print(y_pred)

言語処理100本ノック 2020「80. ID番号への変換」

問題文

nlp100.github.io

問題の概要

指示通りに愚直に実装します。

from collections import defaultdict

import joblib
import pandas as pd


def text2id(text):
    return [word2token[word] for word in text.split()]


X_train = pd.read_table('ch06/train.txt', header=None)
use_cols = ['TITLE', 'CATEGORY']
X_train.columns = use_cols

d = defaultdict(int)
for sentence in X_train['TITLE']:
    for word in sentence.split():
        d[word] += 1
dc = sorted(d.items(), key=lambda x: x[1], reverse=True)

words = []
idx = []
for i, a in enumerate(dc, 1):
    words.append(a[0])
    if a[1] < 2:
        idx.append(0)
    else:
        idx.append(i)

word2token = dict(zip(words, idx))
print(X_train['TITLE'].apply(text2id))

言語処理100本ノック 2020「79. 多層ニューラルネットワーク」

問題文

nlp100.github.io

問題の概要

ネットワークを 3 層に変更しています。

import joblib
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm

X_train = joblib.load('ch08/X_train.joblib')
y_train = joblib.load('ch08/y_train.joblib')
X_train = torch.from_numpy(X_train.astype(np.float32)).clone()
y_train = torch.from_numpy(y_train.astype(np.int64)).clone()

X_valid = joblib.load('ch08/X_valid.joblib')
y_valid = joblib.load('ch08/y_valid.joblib')
X_valid = torch.from_numpy(X_valid.astype(np.float32)).clone()
y_valid = torch.from_numpy(y_valid.astype(np.int64)).clone()

X_test = joblib.load('ch08/X_test.joblib')
y_test = joblib.load('ch08/y_test.joblib')
X_test = torch.from_numpy(X_test.astype(np.float32)).clone()
y_test = torch.from_numpy(y_test.astype(np.int64)).clone()

X = X_train
y = y_train
X = X.to('cuda:0')
y = y.to('cuda:0')
ds = TensorDataset(X, y)

net = nn.Sequential(
    nn.Linear(X.size()[1], 100),
    nn.PReLU(),
    nn.BatchNorm1d(100),
    nn.Linear(100, 25),
    nn.PReLU(),
    nn.BatchNorm1d(25),
    nn.Linear(25, 4)
)
net = net.to('cuda:0')
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.01)

batchSize = [64]

for bs in batchSize:
    loader = DataLoader(ds, batch_size=bs, shuffle=True)

    train_losses = []
    valid_losses = []
    train_accs = []
    valid_accs = []

    for epoc in tqdm(range(100)):
        train_running_loss = 0.0
        valid_running_loss = 0.0

        for xx, yy in loader:
            y_pred = net(xx)
            loss = loss_fn(y_pred, yy)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_running_loss += loss.item()
            valid_running_loss += loss_fn(net(X_valid), y_valid).item()

        joblib.dump(net.state_dict(), f'ch08/state_dict_{epoc}.joblib')

        train_losses.append(train_running_loss)
        valid_losses.append(valid_running_loss)

        _, y_pred_train = torch.max(net(X), 1)
        train_accs.append((y_pred_train == y).sum().item() / len(y))
        _, y_pred_valid = torch.max(net(X_valid), 1)
        valid_accs.append((y_pred_valid == y_valid).sum().item() / len(y_valid))

plt.plot(train_losses, label='train loss')
plt.plot(valid_losses, label='valid loss')
plt.legend()
plt.show()

plt.plot(train_accs, label='train acc')
plt.plot(valid_accs, label='valid acc')
plt.legend()
plt.show()