본문 바로가기
자연어 처리(NLP)

7. cbow text classification

by 곽정우 2024. 6. 21.

1. 문장 임베딩

import urllib.request
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from copy import deepcopy  
from tqdm.auto import tqdm
urllib.request.urlretrieve("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt", filename="ratings_train.txt")
urllib.request.urlretrieve("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_test.txt", filename="ratings_test.txt")
train_dataset = pd.read_table('ratings_train.txt')
train_dataset

# pos, neg 활용
train_dataset['label'].value_counts()

sum(train_dataset['document'].isnull())

~train_dataset['document'].isnull()

train_dataset = train_dataset[~train_dataset['document'].isnull()]
train_dataset.shape

train_dataset

 

Tokenization

  • 자연어를 모델이 이해하기 위해서는 자연어를 숫자의 형식으로 변형 시켜야 함
train_dataset['document'].iloc[0].split()

vocab = set()
for doc in train_dataset['document']:
  for token in doc.split():
    vocab.add(token)
len(vocab)

# 단어의 빈도수 구하기
'''
[ ('아', 1204), 
  ('더빙..', 112), 
  ('진짜', 5929), 
  ('짜증나네요', 10), 
  ('목소리', 99),
  ...
]
'''

vocab_cnt_dict = {}
for doc in train_dataset['document']:
  for token in doc.split():
    if token not in vocab_cnt_dict:
      vocab_cnt_dict[token] = 0
    vocab_cnt_dict[token] += 1
vocab_cnt_list = [(token, cnt) for token, cnt in vocab_cnt_dict.items()]
vocab_cnt_list[:10]

top_vocabs = sorted(vocab_cnt_list, key=lambda x: x[1], reverse=True)
top_vocabs[:10]

cnts = [cnt for _, cnt in top_vocabs]
np.mean(cnts)

cnts[:10]

sum(np.array(cnts) > 2)

n_vocab = sum(np.array(cnts) > 2)
top_vocabs_truncated = top_vocabs[:n_vocab]
top_vocabs_truncated[:5]

vocabs = [token for token, _ in top_vocabs_truncated]
vocabs[:5]

 

special token

  • [UNK]: Unknown token
  • [PAD]: Padding token
unk_token = '[UNK]'
unk_token in vocabs

pad_token = '[PAD]'
pad_token in vocabs

vocabs.insert(0, unk_token)
vocabs.insert(0, pad_token)
vocabs[:5]

idx_to_token = vocabs
token_to_idx = {token: idx for idx, token in enumerate(idx_to_token)}
class Tokenizer:
    def __init__(self, vocabs, use_padding=True, max_padding=64, pad_token='[PAD]', unk_token='[UNK]'):
        # 토큰 인덱스를 위한 리스트
        self.idx_to_token = vocabs
        # 토큰에서 인덱스로 매핑하는 딕셔너리 생성
        self.token_to_idx = {token: i for i, token in enumerate(self.idx_to_token)}
        # 패딩 사용 여부 설정
        self.use_padding = use_padding
        # 최대 패딩 길이 설정
        self.max_padding = max_padding
        # 패딩 토큰 설정
        self.pad_token = pad_token
        # 알 수 없는 토큰 설정
        self.unk_token = unk_token
        # 알 수 없는 토큰의 인덱스 설정
        self.unk_token_idx = self.token_to_idx[self.unk_token]
        # 패딩 토큰의 인덱스 설정
        self.pad_token_idx = self.token_to_idx[self.pad_token]
        
        
    def __call__(self, x):
        # 토큰의 인덱스를 저장할 리스트 초기화
        token_ids = []
        # 입력 문자열을 공백을 기준으로 분할하여 토큰 리스트 생성
        token_list = x.split()
        # 토큰 리스트를 순회하며 인덱스로 변환
        for token in token_list:
            if token in self.token_to_idx:
                token_idx = self.token_to_idx[token]
            else:
                token_idx = self.unk_token_idx
            token_ids.append(token_idx)
        # 패딩을 사용하는 경우
        if self.use_padding:
            # 최대 패딩 길이까지만 잘라내기
            token_ids = token_ids[:self.max_padding]
            # 필요한 패딩 개수 계산
            n_pads = self.max_padding - len(token_ids)
            # 패딩 추가
            token_ids = token_ids + [self.pad_token_idx] * n_pads
        
        # 토큰 인덱스 리스트 반환
        return token_ids

 

# 패딩을 사용하지 않는 토크나이저 초기화

tokenizer = Tokenizer(vocabs, use_padding=False)
sample = train_dataset['document'].iloc[0]
print(sample)

tokenizer(sample) # [51, 1, ]

token_length_list = []
for sample in train_dataset['document']:
      token_length_list.append(len(tokenizer(sample)))

max(token_length_list)

tokenizer = Tokenizer(vocabs, use_padding=True, max_padding=50, pad_token='[PAD]', unk_token='[UNK]')
train_valid_dataset = pd.read_table('ratings_train.txt')
test_dataset = pd.read_table('ratings_test.txt')

print(f'train, valid samples:{len(train_valid_dataset)}') 
print(f'test samples:{len(test_dataset)}')

train_valid_dataset.head()

train_valid_dataset = train_valid_dataset.sample(frac=1.)
train_valid_dataset.head()

train_ratio = 0.8
n_train = int(len(train_valid_dataset) * train_ratio)

train_df = train_valid_dataset[:n_train]
valid_df = train_valid_dataset[n_train:]
test_df = test_dataset

print(f"train samples: {len(train_df)}")
print(f"valid samples: {len(valid_df)}")
print(f"test samples: {len(test_df)}")

# 1/10으로 샘플링
train_df = train_df.sample(frac=0.8)
valid_df = valid_df.sample(frac=0.8)
test_df = test_df.sample(frac=0.8)

print(f"train samples: {len(train_df)}")
print(f"valid samples: {len(valid_df)}")
print(f"test samples: {len(test_df)}")

class NSMCDataset(Dataset):
    
    def __init__(self, data_df, tokenizer=None):
        self.data_df = data_df
        self.tokenizer = tokenizer
        
    
    def __len__(self):
        return len(self.data_df)
    
    def __getitem__(self, idx):
        sample_raw = self.data_df.iloc[idx]
        sample = {}      
        sample['doc'] = str(sample_raw['document'])
        sample['label'] = int(sample_raw['label'])
        
        if self.tokenizer is not None:
            sample['doc_ids'] = self.tokenizer(sample['doc']) 
        return sample
train_dataset = NSMCDataset(data_df=train_df, tokenizer=tokenizer)
valid_dataset = NSMCDataset(data_df=valid_df, tokenizer=tokenizer)
test_dataset = NSMCDataset(data_df=test_df, tokenizer=tokenizer)
print(train_dataset[0])

def collate_fn(batch):
    keys = [key for key in batch[0].keys()]
    data = {key: [] for key in keys}

    for item in batch:
        for key in keys:
            data[key].append(item[key])
    return data
train_dataloader= DataLoader(train_dataset,
                             batch_size=128,
                             collate_fn=collate_fn,
                             shuffle=True)

valid_dataloader= DataLoader(valid_dataset,
                             batch_size=128,
                             collate_fn=collate_fn,
                             shuffle=False)

test_dataloader= DataLoader(test_dataset,
                            batch_size=128,
                            collate_fn=collate_fn,
                            shuffle=False)
sample = next(iter(test_dataloader))
sample.keys() # dict_keys(['doc', 'label', 'doc_ids'])

sample['doc'][5]

print(sample['doc_ids'][5] )

 

CBOW(Continuous Bag of Words)

  • 자연어 처리에서 단어의 의미를 벡터로 표현하는 Word2Vec 모델 중 하나
  • 문맥(context) 단어들을 사용해서 타겟(target) 단어를 예측하는 것
  • 모델의 원리
    • 문장은 단어의 연속으로 구성
      • 예) The cat sast on the mat
      • 타겟단어: 'cat', 주변 단어: ('The', 'sat')
    • 문맥 단어의 수를 결정하는 윈도우 크기를 설정
      • 예) 윈도우 크기: 2, 타겟 단어의 왼쪽과 오른쪽에서 각각 2개의 단어를 문맥 단어로 사용
    • 모든 단어를 고유한 인덱스로 매핑하고 원 핫 이코딩으로 변환
    • 입력으로 주어진 문맥 단어들을 이용해 타켓 단어를 예측하는 신경망을 학습
      • 예) 일반적으로 입력층, 은닉층, 출력층 3개의 층으로 구성
      • 입력층: 문맥 단어들의 원 핫 인코딩 벡터를 받음
      • 은닉층: 입력 벡터들의 평균을 계산하여 은닉층 벡터를 만듦
      • 출력층: 은닉층 벡터를 사용해 타켓 단어를 예측
class CBOW(nn.Module):
  def __init__(self, vocab_size, embed_dim):
    super(CBOW, self).__init__()
    self.output_dim = embed_dim
    self.embeddings = nn.Embedding(vocab_size, embed_dim, padding_idx=0)

  def forward(self, X):
    # (batch_size, sequence) ->(batch_size, sequence, embed_dim)
    x_embede = self.embeddings(X)
    stnc_repr = torch.mean(x_embede, dim=1) # batch_size * embed_dim
    return stnc_repr
class Classifier(nn.Module):

    def __init__(self, sr_model, output_dim, vocab_size, embed_dim, **kwargs):
        super().__init__()
        self.sr_model = sr_model(vocab_size=vocab_size,
                                 embed_dim=embed_dim,
                                 **kwargs)
        self.input_dim = self.sr_model.output_dim
        self.output_dim = output_dim
        self.fc = nn.Linear(self.input_dim, self.output_dim)

    def forward(self, x):
        return self.fc(self.sr_model(x))
model = Classifier(sr_model=CBOW,
                   output_dim=2,
                   vocab_size=len(vocabs),
                   embed_dim=16)
model.sr_model.embeddings.weight[0]

use_cuda = True and torch.cuda.is_available()

if use_cuda:
    model.cuda()
optimizer = optim.Adam(params=model.parameters(), lr=0.01)
calc_loss = nn.CrossEntropyLoss()
n_epoch = 10
global_i = 0
valid_loss_history = []
train_loss_history = []
best_model = None
best_epoch_i = None
min_valid_loss = 9e+9
for epoch_i in range(n_epoch):
    model.train()
    for batch in train_dataloader:
        optimizer.zero_grad()
        X = torch.tensor(batch['doc_ids'])
        y = torch.tensor(batch['label'])
        if use_cuda:
            X = X.cuda()
            y = y.cuda()
        y_pred = model(X)
        loss = calc_loss(y_pred, y)
        if global_i % 1000 == 0:
            print(f'i: {global_i}, epoch: {epoch_i}, loss: {loss.item()}')
        train_loss_history.append((global_i, loss.item()))
        loss.backward()
        optimizer.step()
        global_i += 1
    model.eval()
    valid_loss_list = []
    for batch in valid_dataloader:
        X = torch.tensor(batch['doc_ids'])
        y = torch.tensor(batch['label'])
        if use_cuda:
            X = X.cuda()
            y = y.cuda()
        y_pred = model(X)
        loss = calc_loss(y_pred, y)
        valid_loss_list.append(loss.item())
    valid_loss_mean = np.mean(valid_loss_list)
    valid_loss_history.append((global_i, valid_loss_mean.item()))
    if valid_loss_mean < min_valid_loss:
        min_valid_loss = valid_loss_mean
        best_epoch_i = epoch_i
        best_model = deepcopy(model)
    if epoch_i % 2 == 0:
        print("*"*30)
        print(f'valid_loss_mean: {valid_loss_mean}')
        print("*"*30)
print(f'best_epoch: {best_epoch_i}')

def calc_moving_average(arr, win_size=100):
    new_arr = []
    win = []

    for i, val in enumerate(arr):
        win.append(val)
        if len(win) > win_size:
            win.pop(0)
            
        new_arr.append(np.mean(win))
    return np.array(new_arr)
valid_loss_history = np.array(valid_loss_history)
train_loss_history =  np.array(train_loss_history)
plt.figure(figsize=(12,8))
plt.plot(train_loss_history[:,0],
         calc_moving_average(train_loss_history[:,1]), color='blue')
plt.plot(valid_loss_history[:,0],
         valid_loss_history[:,1], color='red')
plt.xlabel("step")
plt.ylabel("loss")

Evaluation

from tqdm.auto import tqdm

model = best_model

model.eval()

total = 0
correct = 0
for batch in tqdm(test_dataloader,
                  total=len(test_dataloader.dataset)//test_dataloader.batch_size):
    x = torch.tensor(batch['doc_ids'])
    y = torch.tensor(batch['label'])
    
    if use_cuda:
        x = x.cuda()
        y = y.cuda()

    y_pred = model(x)
    
    curr_correct = y_pred.argmax(dim=1) == y
    
    total += len(curr_correct)
    correct += sum(curr_correct)
print(f"test accuracy: {correct/total}")

'자연어 처리(NLP)' 카테고리의 다른 글

9. LSTM과 GRU  (0) 2024.06.21
8. CNN text classification  (0) 2024.06.21
6. Rnn 기초  (0) 2024.06.21
5. 워드임베딩 시각화  (0) 2024.06.21
4. 워드 임베딩  (0) 2024.06.17