🔥알림🔥
① 테디노트 유튜브 -
구경하러 가기!
② LangChain 한국어 튜토리얼
바로가기 👀
③ 랭체인 노트 무료 전자책(wikidocs)
바로가기 🙌
④ RAG 비법노트 LangChain 강의오픈
바로가기 🙌
⑤ 서울대 PyTorch 딥러닝 강의
바로가기 🙌
Attention을 활용한 Seq2Seq 모델 생성과 데이터셋 구성
이번 포스팅에서는 Attention을 활용한 Seq2Seq 모델을 생성하는 방법 그리고 Seq2Seq 모델의 학습을 위해 필요한 데이터셋을 구성하는 방법에 대하여 알아보도록 하겠습니다.
from IPython.display import Image
Seq2Seq 모델을 활용한 챗봇 생성
Seq2Seq 모델의 개요
Image('https://wikidocs.net/images/page/24996/%EC%9D%B8%EC%BD%94%EB%8D%94%EB%94%94%EC%BD%94%EB%8D%94%EB%AA%A8%EB%8D%B8.PNG')
데이터셋에 필요한 라이브러리를 다운로드 받습니다.
설치 명령어
# !pip install Korpora
- 이 중 챗봇용 데이터셋인
KoreanChatbotKorpus
를 다운로드 받습니다. KoreanChatbotKorpus
데이터셋을 활용하여 챗봇 모델을 학습합니다.- text, pair로 구성되어 있습니다.
- 질의는 text, 답변은 pair입니다.
from Korpora import KoreanChatbotKorpus
corpus = KoreanChatbotKorpus()
예시 텍스트를 보면 구어체로 구성되어 있습니다.
corpus.get_all_texts()[:10]
get_all_pairs()
는 text
와 pair
가 쌍으로 이루어져 있습니다.
corpus.get_all_pairs()[0].text
corpus.get_all_pairs()[0].pair
데이터 전처리
question과 answer를 분리합니다.
question은 질의로 활용될 데이터셋, answer는 답변으로 활용될 데이터 셋입니다.
texts = []
pairs = []
for sentence in corpus.get_all_pairs():
texts.append(sentence.text)
pairs.append(sentence.pair)
list(zip(texts, pairs))[:5]
특수문자는 제거합니다.
한글과 숫자를 제외한 특수문자를 제거하도록 합니다.
[참고] 튜토리얼에서는 특수문자와 영문자를 제거하나, 실제 프로젝트에 적용해보기 위해서는 신중히 결정해야합니다.
챗봇 대화에서 영어도 많이 사용되고, 특수문자도 굉장히 많이 사용됩니다. 따라서, 선택적으로 제거할 특수기호나 영문자를 정의한 후에 전처리를 진행하야합니다.
# re 모듈은 regex expression을 적용하기 위하여 활용합니다.
import re
def clean_sentence(sentence):
# 한글, 숫자를 제외한 모든 문자는 제거합니다.
sentence = re.sub(r'[^0-9ㄱ-ㅎㅏ-ㅣ가-힣 ]',r'', sentence)
return sentence
적용한 예시
한글, 숫자 이외의 모든 문자를 전부 제거됨을 확인할 수 있습니다.
clean_sentence('12시 땡^^!??')
clean_sentence('abcef가나다^^$%@12시 땡^^!??')
한글 형태소 분석기 (Konlpy)
형태소 분석기를 활용하여 문장을 분리합니다.
가방에 들어가신다 -> 가방/NNG + 에/JKM + 들어가/VV + 시/EPH + ㄴ다/EFN
- 형태소 분석 이란 형태소를 비롯하여, 어근, 접두사/접미사, 품사(POS, part-of-speech) 등 다양한 언어적 속성의 구조를 파악하는 것입니다.
- konlpy 형태소 분석기를 활용하여 한글 문장에 대한 토큰화처리를 보다 효율적으로 처리합니다.
설치
# !pip install konlpy
konlpy 내부에는 Kkma, Okt, Twitter 등등의 형태소 분석기가 존재하지만, 이번 튜토리얼에서는 Okt를 활용하도록 하겠습니다.
from konlpy.tag import Okt
okt = Okt()
# 형태소 변환에 활용하는 함수
# morphs 함수 안에 변환한 한글 문장을 입력 합니다.
def process_morph(sentence):
return ' '.join(okt.morphs(sentence))
Seq2Seq 모델이 학습하기 위한 데이터셋을 구성할 때, 다음과 같이 3가지 데이터셋을 구성합니다.
question
: encoder input 데이터셋 (질의 전체)answer_input
: decoder input 데이터셋 (답변의 시작). START 토큰을 문장 처음에 추가 합니다.answer_output
: decoder output 데이터셋 (답변의 끝). END 토큰을 문장 마지막에 추가 합니다.
def clean_and_morph(sentence, is_question=True):
# 한글 문장 전처리
sentence = clean_sentence(sentence)
# 형태소 변환
sentence = process_morph(sentence)
# Question 인 경우, Answer인 경우를 분기하여 처리합니다.
if is_question:
return sentence
else:
# START 토큰은 decoder input에 END 토큰은 decoder output에 추가합니다.
return ('<START> ' + sentence, sentence + ' <END>')
def preprocess(texts, pairs):
questions = []
answer_in = []
answer_out = []
# 질의에 대한 전처리
for text in texts:
# 전처리와 morph 수행
question = clean_and_morph(text, is_question=True)
questions.append(question)
# 답변에 대한 전처리
for pair in pairs:
# 전처리와 morph 수행
in_, out_ = clean_and_morph(pair, is_question=False)
answer_in.append(in_)
answer_out.append(out_)
return questions, answer_in, answer_out
questions, answer_in, answer_out = preprocess(texts, pairs)
questions[:5]
answer_in[:5]
answer_out[:5]
all_sentences = questions + answer_in + answer_out
a = (' '.join(questions) + ' '.join(answer_in) + ' '.join(answer_out)).split()
len(set(a))
토큰화
import numpy as np
import warnings
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
# WARNING 무시
warnings.filterwarnings('ignore')
토큰의 정의
tokenizer = Tokenizer(filters='', lower=False, oov_token='<OOV>')
Tokenizer로 문장에 대한 Word-Index Vocabulary(단어 사전)을 만듭니다.
tokenizer.fit_on_texts(all_sentences)
단어 사전 10개 출력
for word, idx in tokenizer.word_index.items():
print(f'{word}\t\t => \t{idx}')
if idx > 10:
break
토큰의 갯수 확인
len(tokenizer.word_index)
치환: 텍스트를 시퀀스로 인코딩 (texts_to_sequences
)
question_sequence = tokenizer.texts_to_sequences(questions)
answer_in_sequence = tokenizer.texts_to_sequences(answer_in)
answer_out_sequence = tokenizer.texts_to_sequences(answer_out)
문장의 길이 맞추기 (pad_sequences
)
MAX_LENGTH = 30
question_padded = pad_sequences(question_sequence, maxlen=MAX_LENGTH, truncating='post', padding='post')
answer_in_padded = pad_sequences(answer_in_sequence, maxlen=MAX_LENGTH, truncating='post', padding='post')
answer_out_padded = pad_sequences(answer_out_sequence, maxlen=MAX_LENGTH, truncating='post', padding='post')
question_padded.shape
answer_in_padded.shape, answer_out_padded.shape
모델
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Dropout, Attention
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint
from tensorflow.keras.utils import plot_model
학습용 인코더 (Encoder)
class Encoder(tf.keras.Model):
def __init__(self, units, vocab_size, embedding_dim, time_steps):
super(Encoder, self).__init__()
self.embedding = Embedding(vocab_size, embedding_dim, input_length=time_steps, name='Embedding')
self.dropout = Dropout(0.2, name='Dropout')
# (attention) return_sequences=True 추가
self.lstm = LSTM(units, return_state=True, return_sequences=True, name='LSTM')
def call(self, inputs):
x = self.embedding(inputs)
x = self.dropout(x)
x, hidden_state, cell_state = self.lstm(x)
# (attention) x return 추가
return x, [hidden_state, cell_state]
학습용 디코더 (Decoder)
Attention Layer
Inputs are query tensor of shape [batch_size, Tq, dim], value tensor of shape [batch_size, Tv, dim] and key tensor of shape [batch_size, Tv, dim].
The calculation follows the steps:
- Calculate scores with shape [batch_size, Tq, Tv] as a query-key dot product: scores = tf.matmul(query, key, transpose_b=True).
- Use scores to calculate a distribution with shape [batch_size, Tq, Tv]: distribution = tf.nn.softmax(scores).
- Use distribution to create a linear combination of value with shape [batch_size, Tq, dim]: return tf.matmul(distribution, value).
class Decoder(tf.keras.Model):
def __init__(self, units, vocab_size, embedding_dim, time_steps):
super(Decoder, self).__init__()
self.embedding = Embedding(vocab_size, embedding_dim, input_length=time_steps, name='Embedding')
self.dropout = Dropout(0.2, name='Dropout')
self.lstm = LSTM(units,
return_state=True,
return_sequences=True,
name='LSTM'
)
self.attention = Attention(name='Attention')
self.dense = Dense(VOCAB_SIZE, activation='softmax', name='Dense')
def call(self, inputs, initial_state):
# (attention) encoder_inputs 추가
encoder_inputs, decoder_inputs = inputs
x = self.embedding(decoder_inputs)
x = self.dropout(x)
x, hidden_state, cell_state = self.lstm(x, initial_state=initial_state)
# (attention) key_value, attention_matrix 추가
# 이전 hidden_state의 값을 concat으로 만들어 vector를 생성합니다.
key_value = tf.concat([initial_state[0][:, tf.newaxis, :], x[:, :-1, :]], axis=1)
# 이전 hidden_state의 값을 concat으로 만든 vector와 encoder에서 나온 출력 값들로 attention을 구합니다.
attention_matrix = self.attention([key_value, encoder_inputs])
# 위에서 구한 attention_matrix와 decoder의 출력 값을 concat 합니다.
x = tf.concat([x, attention_matrix], axis=-1)
x = self.dense(x)
return x, hidden_state, cell_state
코드 세부설명
STEP 1) 인코더의 최종 hidden_state 값을 디코더 attention key의 첫 번째 state로 활용
initial_state[0]
: (batch_size, 128)[initial_state[0][:, tf.newaxis, :]
: (batch_size, 1, 128)
STEP 2) 디코더의 이전 time_step의 hidden_state 값과 concat
x[:, :-1, :]
: (batch_size, 29, 128)
병합 후
tf.concat([initial_state[0][:, tf.newaxis, :], x[:, :-1, :]], axis=1)
: (batch_size, 30, 128)
STEP 3) 1번 단계와 2번 단계에서 병합된 Key 값과 Encoder의 LSTM 전체 output value로 attention 계산
attention_matrix = self.attention([key_value, encoder_inputs])
STEP 4)최종 결과 도출 (concat)
tf.concat([x, attention_matrix], axis=-1)
디코더의 LSTM 전체 output과 attention 값을 concat 하여 Dense로 넘깁니다.
모델 결합
class Seq2Seq(tf.keras.Model):
def __init__(self, units, vocab_size, embedding_dim, time_steps, start_token, end_token):
super(Seq2Seq, self).__init__()
self.start_token = start_token
self.end_token = end_token
self.time_steps = time_steps
self.encoder = Encoder(units, vocab_size, embedding_dim, time_steps)
self.decoder = Decoder(units, vocab_size, embedding_dim, time_steps)
def call(self, inputs, training=True):
if training:
encoder_inputs, decoder_inputs = inputs
# (attention) encoder 출력 값 수정
encoder_outputs, context_vector = self.encoder(encoder_inputs)
# (attention) decoder 입력 값 수정
decoder_outputs, _, _ = self.decoder((encoder_outputs, decoder_inputs), initial_state=context_vector)
return decoder_outputs
else:
x = inputs
# (attention) encoder 출력 값 수정
encoder_outputs, context_vector = self.encoder(x)
target_seq = tf.constant([[self.start_token]], dtype=tf.float32)
results = tf.TensorArray(tf.int32, self.time_steps)
for i in tf.range(self.time_steps):
decoder_output, decoder_hidden, decoder_cell = self.decoder((encoder_outputs, target_seq), initial_state=context_vector)
decoder_output = tf.cast(tf.argmax(decoder_output, axis=-1), dtype=tf.int32)
decoder_output = tf.reshape(decoder_output, shape=(1, 1))
results = results.write(i, decoder_output)
if decoder_output == self.end_token:
break
target_seq = decoder_output
context_vector = [decoder_hidden, decoder_cell]
return tf.reshape(results.stack(), shape=(1, self.time_steps))
단어별 원핫인코딩 적용
단어별 원핫인코딩을 적용하는 이유는 decoder의 output(출력)을 원핫인코딩 vector로 변환하기 위함
VOCAB_SIZE = len(tokenizer.word_index)+1
def convert_to_one_hot(padded):
# 원핫인코딩 초기화
one_hot_vector = np.zeros((len(answer_out_padded), MAX_LENGTH, VOCAB_SIZE))
# 디코더 목표를 원핫인코딩으로 변환
# 학습시 입력은 인덱스이지만, 출력은 원핫인코딩 형식임
for i, sequence in enumerate(answer_out_padded):
for j, index in enumerate(sequence):
one_hot_vector[i, j, index] = 1
return one_hot_vector
answer_in_one_hot = convert_to_one_hot(answer_in_padded)
answer_out_one_hot = convert_to_one_hot(answer_out_padded)
answer_in_one_hot[0].shape, answer_in_one_hot[0].shape
변환된 index를 다시 단어로 변환
def convert_index_to_text(indexs, end_token):
sentence = ''
# 모든 문장에 대해서 반복
for index in indexs:
if index == end_token:
# 끝 단어이므로 예측 중비
break;
# 사전에 존재하는 단어의 경우 단어 추가
if index > 0 and tokenizer.index_word[index] is not None:
sentence += tokenizer.index_word[index]
else:
# 사전에 없는 인덱스면 빈 문자열 추가
sentence += ''
# 빈칸 추가
sentence += ' '
return sentence
학습 (Training)
하이퍼 파라미터 정의
BUFFER_SIZE = 1000
BATCH_SIZE = 64
EMBEDDING_DIM = 100
TIME_STEPS = MAX_LENGTH
START_TOKEN = tokenizer.word_index['<START>']
END_TOKEN = tokenizer.word_index['<END>']
UNITS = 128
VOCAB_SIZE = len(tokenizer.word_index)+1
DATA_LENGTH = len(questions)
SAMPLE_SIZE = 3
체크포인트 생성
checkpoint_path = 'model/training_checkpoint-6.ckpt'
checkpoint = ModelCheckpoint(filepath=checkpoint_path,
save_weights_only=True,
save_best_only=True,
monitor='loss',
verbose=1
)
분산환경 설정
strategy = tf.distribute.MirroredStrategy()
strategy.num_replicas_in_sync
모델 생성 & compile
# 분산 환경 적용시
with strategy.scope():
seq2seq = Seq2Seq(UNITS, VOCAB_SIZE, EMBEDDING_DIM, TIME_STEPS, START_TOKEN, END_TOKEN)
seq2seq.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'])
seq2seq = Seq2Seq(UNITS, VOCAB_SIZE, EMBEDDING_DIM, TIME_STEPS, START_TOKEN, END_TOKEN)
seq2seq.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'])
# 연속하여 학습시 체크포인트를 로드하여 이어서 학습합니다.
seq2seq.load_weights(checkpoint_path)
def make_prediction(model, question_inputs):
results = model(inputs=question_inputs, training=False)
# 변환된 인덱스를 문장으로 변환
results = np.asarray(results).reshape(-1)
return results
for epoch in range(35):
seq2seq.fit([question_padded, answer_in_padded],
answer_out_one_hot,
epochs=10,
batch_size=16,
callbacks=[checkpoint]
)
# 랜덤한 샘플 번호 추출
samples = np.random.randint(DATA_LENGTH, size=SAMPLE_SIZE)
# 예측 성능 테스트
for idx in samples:
question_inputs = question_padded[idx]
# 문장 예측
results = make_prediction(seq2seq, np.expand_dims(question_inputs, 0))
# 변환된 인덱스를 문장으로 변환
results = convert_index_to_text(results, END_TOKEN)
print(f'Q: {questions[idx]}')
print(f'A: {results}\n')
print()
예측
# 자연어 (질문 입력) 대한 전처리 함수
def make_question(sentence):
sentence = clean_and_morph(sentence)
question_sequence = tokenizer.texts_to_sequences([sentence])
question_padded = pad_sequences(question_sequence, maxlen=MAX_LENGTH, truncating='post', padding='post')
return question_padded
make_question('오늘 날씨가 정말 화창합니다')
make_question('찐찐찐찐찐이야~ 완전 찐이야~')
def run_chatbot(question):
question_inputs = make_question(question)
results = make_prediction(seq2seq, question_inputs)
results = convert_index_to_text(results, END_TOKEN)
return results
유저로부터 Text 입력 값을 받아 답변 출력
while True:
user_input = input('<< 말을 걸어 보세요!\n')
if user_input == 'q':
break
print('>> 챗봇 응답: {}'.format(run_chatbot(user_input)))
댓글남기기