import urllib.request
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn.functional as F
from copy import deepcopy
from tqdm.auto import tqdm
vocab = set()
for doc in train_dataset['document']:
for token in doc.split():
vocab.add(token)
len(vocab)
# 단어의 빈도수 구하기
'''
[ ('아', 1204),
('더빙..', 112),
('진짜', 5929),
('짜증나네요', 10),
('목소리', 99),
...
]
'''
vocab_cnt_dict = {}
for doc in train_dataset['document']:
for token in doc.split():
if token not in vocab_cnt_dict:
vocab_cnt_dict[token] = 0
vocab_cnt_dict[token] += 1
vocab_cnt_list = [(token, cnt) for token, cnt in vocab_cnt_dict.items()]
vocab_cnt_list[:10]
idx_to_token = vocabs
token_to_idx = {token: idx for idx, token in enumerate(idx_to_token)}
class Tokenizer:
def __init__(self, vocabs, use_padding=True, max_padding=64, pad_token='[PAD]', unk_token='[UNK]'):
# 토큰 인덱스를 위한 리스트
self.idx_to_token = vocabs
# 토큰에서 인덱스로 매핑하는 딕셔너리 생성
self.token_to_idx = {token: i for i, token in enumerate(self.idx_to_token)}
# 패딩 사용 여부 설정
self.use_padding = use_padding
# 최대 패딩 길이 설정
self.max_padding = max_padding
# 패딩 토큰 설정
self.pad_token = pad_token
# 알 수 없는 토큰 설정
self.unk_token = unk_token
# 알 수 없는 토큰의 인덱스 설정
self.unk_token_idx = self.token_to_idx[self.unk_token]
# 패딩 토큰의 인덱스 설정
self.pad_token_idx = self.token_to_idx[self.pad_token]
def __call__(self, x):
# 토큰의 인덱스를 저장할 리스트 초기화
token_ids = []
# 입력 문자열을 공백을 기준으로 분할하여 토큰 리스트 생성
token_list = x.split()
# 토큰 리스트를 순회하며 인덱스로 변환
for token in token_list:
if token in self.token_to_idx:
token_idx = self.token_to_idx[token]
else:
token_idx = self.unk_token_idx
token_ids.append(token_idx)
# 패딩을 사용하는 경우
if self.use_padding:
# 최대 패딩 길이까지만 잘라내기
token_ids = token_ids[:self.max_padding]
# 필요한 패딩 개수 계산
n_pads = self.max_padding - len(token_ids)
# 패딩 추가
token_ids = token_ids + [self.pad_token_idx] * n_pads
# 토큰 인덱스 리스트 반환
return token_ids
# 패딩을 사용하지 않는 토크나이저 초기화
tokenizer = Tokenizer(vocabs, use_padding=False)
def collate_fn(batch):
keys = [key for key in batch[0].keys()]
data = {key: [] for key in keys}
for item in batch:
for key in keys:
data[key].append(item[key])
return data
n_epoch = 10
global_i = 0
valid_loss_history = []
train_loss_history = []
best_model = None
best_epoch_i = None
min_valid_loss = 9e+9
for epoch_i in range(n_epoch):
model.train()
for batch in train_dataloader:
optimizer.zero_grad()
X = torch.tensor(batch['doc_ids'])
y = torch.tensor(batch['label'])
if use_cuda:
X = X.cuda()
y = y.cuda()
y_pred = model(X)
loss = calc_loss(y_pred, y)
if global_i % 1000 == 0:
print(f'i: {global_i}, epoch: {epoch_i}, loss: {loss.item()}')
train_loss_history.append((global_i, loss.item()))
loss.backward()
optimizer.step()
global_i += 1
model.eval()
valid_loss_list = []
for batch in valid_dataloader:
X = torch.tensor(batch['doc_ids'])
y = torch.tensor(batch['label'])
if use_cuda:
X = X.cuda()
y = y.cuda()
y_pred = model(X)
loss = calc_loss(y_pred, y)
valid_loss_list.append(loss.item())
valid_loss_mean = np.mean(valid_loss_list)
valid_loss_history.append((global_i, valid_loss_mean.item()))
if valid_loss_mean < min_valid_loss:
min_valid_loss = valid_loss_mean
best_epoch_i = epoch_i
best_model = deepcopy(model)
if epoch_i % 2 == 0:
print("*"*30)
print(f'valid_loss_mean: {valid_loss_mean}')
print("*"*30)
print(f'best_epoch: {best_epoch_i}')
def calc_moving_average(arr, win_size=100):
new_arr = []
win = []
for i, val in enumerate(arr):
win.append(val)
if len(win) > win_size:
win.pop(0)
new_arr.append(np.mean(win))
return np.array(new_arr)
from tqdm.auto import tqdm
model = best_model
model.eval()
total = 0
correct = 0
for batch in tqdm(test_dataloader,
total=len(test_dataloader.dataset)//test_dataloader.batch_size):
X = torch.tensor(batch['doc_ids'])
y = torch.tensor(batch['label'])
if use_cuda:
X = X.cuda()
y = y.cuda()
y_pred = model(X)
curr_correct = y_pred.argmax(dim=1) == y
total += len(curr_correct)
correct += sum(curr_correct)
print(f"test accuracy: {correct/total}")