본문 바로가기
자연어 처리(NLP)

8. CNN text classification

by 곽정우 2024. 6. 21.

1. 문장 임베딩

import urllib.request
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn.functional as F
from copy import deepcopy
from tqdm.auto import tqdm
urllib.request.urlretrieve("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt", filename="ratings_train.txt")
urllib.request.urlretrieve("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_test.txt", filename="ratings_test.txt")
train_dataset = pd.read_table('ratings_train.txt')
train_dataset

# pos, neg 활용
train_dataset['label'].value_counts()

sum(train_dataset['document'].isnull())

~train_dataset['document'].isnull()

train_dataset = train_dataset[~train_dataset['document'].isnull()]
train_dataset.shape

train_dataset

 

Tokenization

  • 자연어를 모델이 이해하기 위해서는 자연어를 숫자의 형식으로 변형 시켜야 함
train_dataset['document'].iloc[0].split()

vocab = set()
for doc in train_dataset['document']:
  for token in doc.split():
    vocab.add(token)
len(vocab)

# 단어의 빈도수 구하기
'''
[ ('아', 1204), 
  ('더빙..', 112), 
  ('진짜', 5929), 
  ('짜증나네요', 10), 
  ('목소리', 99),
  ...
]
'''

vocab_cnt_dict = {}
for doc in train_dataset['document']:
  for token in doc.split():
    if token not in vocab_cnt_dict:
      vocab_cnt_dict[token] = 0
    vocab_cnt_dict[token] += 1
vocab_cnt_list = [(token, cnt) for token, cnt in vocab_cnt_dict.items()]
vocab_cnt_list[:10]

top_vocabs = sorted(vocab_cnt_list, key=lambda x: x[1], reverse=True)
top_vocabs[:10]

cnts = [cnt for _, cnt in top_vocabs]
np.mean(cnts)

cnts[:10]

sum(np.array(cnts) > 2)

n_vocab = sum(np.array(cnts) > 2)
top_vocabs_truncated = top_vocabs[:n_vocab]
top_vocabs_truncated[:5]

vocabs = [token for token, _ in top_vocabs_truncated]
vocabs[:5]

 

special token

  • [UNK]: Unknown token
  • [PAD]: Padding token
unk_token = '[UNK]'
unk_token in vocabs

pad_token = '[PAD]'
pad_token in vocabs

vocabs.insert(0, unk_token)
vocabs.insert(0, pad_token)
vocabs[:5]

idx_to_token = vocabs
token_to_idx = {token: idx for idx, token in enumerate(idx_to_token)}
class Tokenizer:
    def __init__(self, vocabs, use_padding=True, max_padding=64, pad_token='[PAD]', unk_token='[UNK]'):
        # 토큰 인덱스를 위한 리스트
        self.idx_to_token = vocabs
        # 토큰에서 인덱스로 매핑하는 딕셔너리 생성
        self.token_to_idx = {token: i for i, token in enumerate(self.idx_to_token)}
        # 패딩 사용 여부 설정
        self.use_padding = use_padding
        # 최대 패딩 길이 설정
        self.max_padding = max_padding
        # 패딩 토큰 설정
        self.pad_token = pad_token
        # 알 수 없는 토큰 설정
        self.unk_token = unk_token
        # 알 수 없는 토큰의 인덱스 설정
        self.unk_token_idx = self.token_to_idx[self.unk_token]
        # 패딩 토큰의 인덱스 설정
        self.pad_token_idx = self.token_to_idx[self.pad_token]
        
        
    def __call__(self, x):
        # 토큰의 인덱스를 저장할 리스트 초기화
        token_ids = []
        # 입력 문자열을 공백을 기준으로 분할하여 토큰 리스트 생성
        token_list = x.split()
        # 토큰 리스트를 순회하며 인덱스로 변환
        for token in token_list:
            if token in self.token_to_idx:
                token_idx = self.token_to_idx[token]
            else:
                token_idx = self.unk_token_idx
            token_ids.append(token_idx)
        # 패딩을 사용하는 경우
        if self.use_padding:
            # 최대 패딩 길이까지만 잘라내기
            token_ids = token_ids[:self.max_padding]
            # 필요한 패딩 개수 계산
            n_pads = self.max_padding - len(token_ids)
            # 패딩 추가
            token_ids = token_ids + [self.pad_token_idx] * n_pads
        
        # 토큰 인덱스 리스트 반환
        return token_ids

 

# 패딩을 사용하지 않는 토크나이저 초기화

tokenizer = Tokenizer(vocabs, use_padding=False)
sample = train_dataset['document'].iloc[0]
print(sample)

tokenizer(sample) # [51, 1, ]

token_length_list = []
for sample in train_dataset['document']:
      token_length_list.append(len(tokenizer(sample)))

max(token_length_list)

tokenizer = Tokenizer(vocabs, use_padding=True, max_padding=50, pad_token='[PAD]', unk_token='[UNK]')
train_valid_dataset = pd.read_table('ratings_train.txt')
test_dataset = pd.read_table('ratings_test.txt')

print(f'train, valid samples:{len(train_valid_dataset)}') 
print(f'test samples:{len(test_dataset)}')

train_valid_dataset.head()

train_valid_dataset = train_valid_dataset.sample(frac=1.)
train_valid_dataset.head()

train_ratio = 0.8
n_train = int(len(train_valid_dataset) * train_ratio)

train_df = train_valid_dataset[:n_train]
valid_df = train_valid_dataset[n_train:]
test_df = test_dataset

print(f"train samples: {len(train_df)}")
print(f"valid samples: {len(valid_df)}")
print(f"test samples: {len(test_df)}")

# 1/10으로 샘플링
train_df = train_df.sample(frac=0.8)
valid_df = valid_df.sample(frac=0.8)
test_df = test_df.sample(frac=0.8)

print(f"train samples: {len(train_df)}")
print(f"valid samples: {len(valid_df)}")
print(f"test samples: {len(test_df)}")

class NSMCDataset(Dataset):
    
    def __init__(self, data_df, tokenizer=None):
        self.data_df = data_df
        self.tokenizer = tokenizer
        
    
    def __len__(self):
        return len(self.data_df)
    
    def __getitem__(self, idx):
        sample_raw = self.data_df.iloc[idx]
        sample = {}      
        sample['doc'] = str(sample_raw['document'])
        sample['label'] = int(sample_raw['label'])
        
        if self.tokenizer is not None:
            sample['doc_ids'] = self.tokenizer(sample['doc']) 
        return sample
train_dataset = NSMCDataset(data_df=train_df, tokenizer=tokenizer)
valid_dataset = NSMCDataset(data_df=valid_df, tokenizer=tokenizer)
test_dataset = NSMCDataset(data_df=test_df, tokenizer=tokenizer)
print(train_dataset[0])

def collate_fn(batch):
    keys = [key for key in batch[0].keys()]
    data = {key: [] for key in keys}

    for item in batch:
        for key in keys:
            data[key].append(item[key])
    return data
train_dataloader= DataLoader(train_dataset,
                             batch_size=128,
                             collate_fn=collate_fn,
                             shuffle=True)

valid_dataloader= DataLoader(valid_dataset,
                             batch_size=128,
                             collate_fn=collate_fn,
                             shuffle=False)

test_dataloader= DataLoader(test_dataset,
                            batch_size=128,
                            collate_fn=collate_fn,
                            shuffle=False)
sample = next(iter(test_dataloader))
sample.keys() # dict_keys(['doc', 'label', 'doc_ids'])

sample['doc'][5]

print(sample['doc_ids'][5] )

 

CNN model

 

class SentenceCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, word_win_size=[3, 5, 7]):
        super().__init__()
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.word_win_size = word_win_size

        self.conv_list = nn.ModuleList(
                          [nn.Conv2d(1, 1, kernel_size=(w, embed_dim))
                           for w in self.word_win_size])
        

        self.embeddings = nn.Embedding(vocab_size,
                                       embed_dim,
                                       padding_idx=0)
        self.output_dim = len(self.word_win_size)

    def forward(self, X):
        batch_size, seq_len = X.size()

        # batch_size x seq_len x embed_dim
        X = self.embeddings(X) 

        # batch_size x channel(1) x seq_len(H) x embed_dim(W)
        X = X.view(batch_size, 1, seq_len, self.embed_dim) 
        C = [F.relu(conv(X)) for conv in self.conv_list]
        C_hat = torch.stack([F.max_pool2d(
           c, c.size()[2:]).squeeze() for c in C], dim=1)
        return C_hat
class Classifier(nn.Module):
    def __init__(self, sr_model, output_dim, vocab_size, embed_dim, **kwargs):
        super().__init__()
        self.sr_model = sr_model(vocab_size=vocab_size,
                                 embed_dim=embed_dim,
                                 **kwargs)
        self.input_dim = self.sr_model.output_dim
        self.output_dim = output_dim
        self.fc = nn.Linear(self.input_dim, self.output_dim)

    def forward(self, x):
        return self.fc(self.sr_model(x))
model = Classifier(sr_model=SentenceCNN,
                   output_dim=2,
                   vocab_size=len(vocabs),
                   embed_dim=16)
model.sr_model.embeddings.weight[0]

use_cuda = True and torch.cuda.is_available()

if use_cuda:
    model.cuda()
optimizer = optim.Adam(params=model.parameters(), lr=0.01)
calc_loss = nn.CrossEntropyLoss()
n_epoch = 10
global_i = 0
valid_loss_history = []
train_loss_history = []
best_model = None
best_epoch_i = None
min_valid_loss = 9e+9
for epoch_i in range(n_epoch):
    model.train()
    for batch in train_dataloader:
        optimizer.zero_grad()
        X = torch.tensor(batch['doc_ids'])
        y = torch.tensor(batch['label'])
        if use_cuda:
            X = X.cuda()
            y = y.cuda()
        y_pred = model(X)
        loss = calc_loss(y_pred, y)
        if global_i % 1000 == 0:
            print(f'i: {global_i}, epoch: {epoch_i}, loss: {loss.item()}')
        train_loss_history.append((global_i, loss.item()))
        loss.backward()
        optimizer.step()
        global_i += 1
    model.eval()
    valid_loss_list = []
    for batch in valid_dataloader:
        X = torch.tensor(batch['doc_ids'])
        y = torch.tensor(batch['label'])
        if use_cuda:
            X = X.cuda()
            y = y.cuda()
        y_pred = model(X)
        loss = calc_loss(y_pred, y)
        valid_loss_list.append(loss.item())
    valid_loss_mean = np.mean(valid_loss_list)
    valid_loss_history.append((global_i, valid_loss_mean.item()))
    if valid_loss_mean < min_valid_loss:
        min_valid_loss = valid_loss_mean
        best_epoch_i = epoch_i
        best_model = deepcopy(model)
    if epoch_i % 2 == 0:
        print("*"*30)
        print(f'valid_loss_mean: {valid_loss_mean}')
        print("*"*30)
print(f'best_epoch: {best_epoch_i}')

def calc_moving_average(arr, win_size=100):
    new_arr = []
    win = []

    for i, val in enumerate(arr):
        win.append(val)
        if len(win) > win_size:
            win.pop(0)

        new_arr.append(np.mean(win))
    return np.array(new_arr)
valid_loss_history = np.array(valid_loss_history)
train_loss_history =  np.array(train_loss_history)
plt.figure(figsize=(12,8))
plt.plot(train_loss_history[:,0],
         calc_moving_average(train_loss_history[:,1]), color='blue')
plt.plot(valid_loss_history[:,0],
         valid_loss_history[:,1], color='red')
plt.xlabel("step")
plt.ylabel("loss")

Evaluation

from tqdm.auto import tqdm

model = best_model

model.eval()

total = 0
correct = 0
for batch in tqdm(test_dataloader,
                  total=len(test_dataloader.dataset)//test_dataloader.batch_size):
    X = torch.tensor(batch['doc_ids'])
    y = torch.tensor(batch['label'])

    if use_cuda:
        X = X.cuda()
        y = y.cuda()

    y_pred = model(X)

    curr_correct = y_pred.argmax(dim=1) == y

    total += len(curr_correct)
    correct += sum(curr_correct)
print(f"test accuracy: {correct/total}")

'자연어 처리(NLP)' 카테고리의 다른 글

10. 문장 임베딩  (0) 2024.06.21
9. LSTM과 GRU  (0) 2024.06.21
7. cbow text classification  (0) 2024.06.21
6. Rnn 기초  (0) 2024.06.21
5. 워드임베딩 시각화  (0) 2024.06.21