Attention을 활용한 Seq2Seq 모델 생성과 데이터셋 구성
Dec 24, 2020

이번 포스팅에서는 Attention을 활용한 Seq2Seq 모델을 생성하는 방법 그리고 Seq2Seq 모델의 학습을 위해 필요한 데이터셋을 구성하는 방법에 대하여 알아보도록 하겠습니다.

from IPython.display import Image

Seq2Seq 모델을 활용한 챗봇 생성

Seq2Seq 모델의 개요

Image('https://wikidocs.net/images/page/24996/%EC%9D%B8%EC%BD%94%EB%8D%94%EB%94%94%EC%BD%94%EB%8D%94%EB%AA%A8%EB%8D%B8.PNG')

데이터셋에 필요한 라이브러리를 다운로드 받습니다.

Korpora는 한글 자연어처리 데이터)셋입니다.

설치 명령어

# !pip install Korpora
  • 이 중 챗봇용 데이터셋인 KoreanChatbotKorpus를 다운로드 받습니다.
  • KoreanChatbotKorpus 데이터셋을 활용하여 챗봇 모델을 학습합니다.
  • text, pair로 구성되어 있습니다.
  • 질의는 text, 답변은 pair입니다.
from Korpora import KoreanChatbotKorpus
corpus = KoreanChatbotKorpus()
    Korpora 는 다른 분들이 연구 목적으로 공유해주신 말뭉치들을
    손쉽게 다운로드, 사용할 수 있는 기능만을 제공합니다.

    말뭉치들을 공유해 주신 분들에게 감사드리며, 각 말뭉치 별 설명과 라이센스를 공유 드립니다.
    해당 말뭉치에 대해 자세히 알고 싶으신 분은 아래의 description 을 참고,
    해당 말뭉치를 연구/상용의 목적으로 이용하실 때에는 아래의 라이센스를 참고해 주시기 바랍니다.

    # Description
    Author : songys@github
    Repository : https://github.com/songys/Chatbot_data
    References :

    Chatbot_data_for_Korean v1.0
      1. 챗봇 트레이닝용 문답 페어 11,876개
      2. 일상다반사 0, 이별(부정) 1, 사랑(긍정) 2로 레이블링
    자세한 내용은 위의 repository를 참고하세요.

    # License
    CC0 1.0 Universal (CC0 1.0) Public Domain Dedication
    Details in https://creativecommons.org/publicdomain/zero/1.0/

예시 텍스트를 보면 구어체로 구성되어 있습니다.

corpus.get_all_texts()[:10]
['12시 땡!',
 '1지망 학교 떨어졌어',
 '3박4일 놀러가고 싶다',
 '3박4일 정도 놀러가고 싶다',
 'PPL 심하네',
 'SD카드 망가졌어',
 'SD카드 안돼',
 'SNS 맞팔 왜 안하지ㅠㅠ',
 'SNS 시간낭비인 거 아는데 매일 하는 중',
 'SNS 시간낭비인데 자꾸 보게됨']

get_all_pairs()textpair가 쌍으로 이루어져 있습니다.

corpus.get_all_pairs()[0].text
'12시 땡!'
corpus.get_all_pairs()[0].pair
'하루가 또 가네요.'

데이터 전처리

questionanswer를 분리합니다.

question은 질의로 활용될 데이터셋, answer는 답변으로 활용될 데이터 셋입니다.

texts = []
pairs = []

for sentence in corpus.get_all_pairs():
    texts.append(sentence.text)
    pairs.append(sentence.pair)
list(zip(texts, pairs))[:5]
[('12시 땡!', '하루가 또 가네요.'),
 ('1지망 학교 떨어졌어', '위로해 드립니다.'),
 ('3박4일 놀러가고 싶다', '여행은 언제나 좋죠.'),
 ('3박4일 정도 놀러가고 싶다', '여행은 언제나 좋죠.'),
 ('PPL 심하네', '눈살이 찌푸려지죠.')]

특수문자는 제거합니다.

한글과 숫자를 제외한 특수문자를 제거하도록 합니다.

[참고] 튜토리얼에서는 특수문자와 영문자를 제거하나, 실제 프로젝트에 적용해보기 위해서는 신중히 결정해야합니다.

챗봇 대화에서 영어도 많이 사용되고, 특수문자도 굉장히 많이 사용됩니다. 따라서, 선택적으로 제거할 특수기호나 영문자를 정의한 후에 전처리를 진행하야합니다.

# re 모듈은 regex expression을 적용하기 위하여 활용합니다.
import re
def clean_sentence(sentence):
    # 한글, 숫자를 제외한 모든 문자는 제거합니다.
    sentence = re.sub(r'[^0-9ㄱ-ㅎㅏ-ㅣ가-힣 ]',r'', sentence)
    return sentence

적용한 예시

한글, 숫자 이외의 모든 문자를 전부 제거됨을 확인할 수 있습니다.

clean_sentence('12시 땡^^!??')
'12시 땡'
clean_sentence('abcef가나다^^$%@12시 땡^^!??')
'가나다12시 땡'

한글 형태소 분석기 (Konlpy)

형태소 분석기를 활용하여 문장을 분리합니다.

가방에 들어가신다 -> 가방/NNG + 에/JKM + 들어가/VV + 시/EPH + ㄴ다/EFN

  • 형태소 분석 이란 형태소를 비롯하여, 어근, 접두사/접미사, 품사(POS, part-of-speech) 등 다양한 언어적 속성의 구조를 파악하는 것입니다.
  • konlpy 형태소 분석기를 활용하여 한글 문장에 대한 토큰화처리를 보다 효율적으로 처리합니다.

공식 도큐먼트

설치

# !pip install konlpy

konlpy 내부에는 Kkma, Okt, Twitter 등등의 형태소 분석기가 존재하지만, 이번 튜토리얼에서는 Okt를 활용하도록 하겠습니다.

from konlpy.tag import Okt
okt = Okt()
# 형태소 변환에 활용하는 함수
# morphs 함수 안에 변환한 한글 문장을 입력 합니다.
def process_morph(sentence):
    return ' '.join(okt.morphs(sentence))

Seq2Seq 모델이 학습하기 위한 데이터셋을 구성할 때, 다음과 같이 3가지 데이터셋을 구성합니다.

  • question: encoder input 데이터셋 (질의 전체)
  • answer_input: decoder input 데이터셋 (답변의 시작). START 토큰을 문장 처음에 추가 합니다.
  • answer_output: decoder output 데이터셋 (답변의 끝). END 토큰을 문장 마지막에 추가 합니다.
def clean_and_morph(sentence, is_question=True):
    # 한글 문장 전처리
    sentence = clean_sentence(sentence)
    # 형태소 변환
    sentence = process_morph(sentence)
    # Question 인 경우, Answer인 경우를 분기하여 처리합니다.
    if is_question:
        return sentence
    else:
        # START 토큰은 decoder input에 END 토큰은 decoder output에 추가합니다.
        return ('<START> ' + sentence, sentence + ' <END>')
def preprocess(texts, pairs):
    questions = []
    answer_in = []
    answer_out = []

    # 질의에 대한 전처리
    for text in texts:
        # 전처리와 morph 수행
        question = clean_and_morph(text, is_question=True)
        questions.append(question)

    # 답변에 대한 전처리
    for pair in pairs:
        # 전처리와 morph 수행
        in_, out_ = clean_and_morph(pair, is_question=False)
        answer_in.append(in_)
        answer_out.append(out_)
    
    return questions, answer_in, answer_out
questions, answer_in, answer_out = preprocess(texts, pairs)
questions[:5]
['12시 땡', '1 지망 학교 떨어졌어', '3 박 4일 놀러 가고 싶다', '3 박 4일 정도 놀러 가고 싶다', '심하네']
answer_in[:5]
['<START> 하루 가 또 가네요',
 '<START> 위로 해 드립니다',
 '<START> 여행 은 언제나 좋죠',
 '<START> 여행 은 언제나 좋죠',
 '<START> 눈살 이 찌푸려지죠']
answer_out[:5]
['하루 가 또 가네요 <END>',
 '위로 해 드립니다 <END>',
 '여행 은 언제나 좋죠 <END>',
 '여행 은 언제나 좋죠 <END>',
 '눈살 이 찌푸려지죠 <END>']
all_sentences = questions + answer_in + answer_out
a = (' '.join(questions) + ' '.join(answer_in) + ' '.join(answer_out)).split()
len(set(a))
12638

토큰화

import numpy as np
import warnings
import tensorflow as tf

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# WARNING 무시
warnings.filterwarnings('ignore')

토큰의 정의

tokenizer = Tokenizer(filters='', lower=False, oov_token='<OOV>')

Tokenizer로 문장에 대한 Word-Index Vocabulary(단어 사전)을 만듭니다.

tokenizer.fit_on_texts(all_sentences)

단어 사전 10개 출력

for word, idx in tokenizer.word_index.items():
    print(f'{word}\t\t => \t{idx}')
    if idx > 10:
        break
<OOV>		 => 	1
<START>		 => 	2
<END>		 => 	3
이		 => 	4
을		 => 	5
거		 => 	6
가		 => 	7
예요		 => 	8
사람		 => 	9
요		 => 	10
에		 => 	11

토큰의 갯수 확인

len(tokenizer.word_index)
12637

치환: 텍스트를 시퀀스로 인코딩 (texts_to_sequences)

question_sequence = tokenizer.texts_to_sequences(questions)
answer_in_sequence = tokenizer.texts_to_sequences(answer_in)
answer_out_sequence = tokenizer.texts_to_sequences(answer_out)

문장의 길이 맞추기 (pad_sequences)

MAX_LENGTH = 30
question_padded = pad_sequences(question_sequence, maxlen=MAX_LENGTH, truncating='post', padding='post')
answer_in_padded = pad_sequences(answer_in_sequence, maxlen=MAX_LENGTH, truncating='post', padding='post')
answer_out_padded = pad_sequences(answer_out_sequence, maxlen=MAX_LENGTH, truncating='post', padding='post')
question_padded.shape
(11823, 30)
answer_in_padded.shape, answer_out_padded.shape
((11823, 30), (11823, 30))

모델

from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Dropout, Attention
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint
from tensorflow.keras.utils import plot_model

학습용 인코더 (Encoder)

class Encoder(tf.keras.Model):
    def __init__(self, units, vocab_size, embedding_dim, time_steps):
        super(Encoder, self).__init__()
        self.embedding = Embedding(vocab_size, embedding_dim, input_length=time_steps, name='Embedding')
        self.dropout = Dropout(0.2, name='Dropout')
        # (attention) return_sequences=True 추가
        self.lstm = LSTM(units, return_state=True, return_sequences=True, name='LSTM')
        
    def call(self, inputs):
        x = self.embedding(inputs)
        x = self.dropout(x)
        x, hidden_state, cell_state = self.lstm(x)
        # (attention) x return 추가
        return x, [hidden_state, cell_state]

학습용 디코더 (Decoder)

Attention Layer

Attention Layer 공식 도큐먼트

Inputs are query tensor of shape [batch_size, Tq, dim], value tensor of shape [batch_size, Tv, dim] and key tensor of shape [batch_size, Tv, dim].

The calculation follows the steps:

  1. Calculate scores with shape [batch_size, Tq, Tv] as a query-key dot product: scores = tf.matmul(query, key, transpose_b=True).
  2. Use scores to calculate a distribution with shape [batch_size, Tq, Tv]: distribution = tf.nn.softmax(scores).
  3. Use distribution to create a linear combination of value with shape [batch_size, Tq, dim]: return tf.matmul(distribution, value).
class Decoder(tf.keras.Model):
    def __init__(self, units, vocab_size, embedding_dim, time_steps):
        super(Decoder, self).__init__()
        self.embedding = Embedding(vocab_size, embedding_dim, input_length=time_steps, name='Embedding')
        self.dropout = Dropout(0.2, name='Dropout')
        self.lstm = LSTM(units, 
                         return_state=True, 
                         return_sequences=True, 
                         name='LSTM'
                        )
        self.attention = Attention(name='Attention')
        self.dense = Dense(VOCAB_SIZE, activation='softmax', name='Dense')
    
    def call(self, inputs, initial_state):
        # (attention) encoder_inputs 추가
        encoder_inputs, decoder_inputs = inputs
        x = self.embedding(decoder_inputs)
        x = self.dropout(x)
        x, hidden_state, cell_state = self.lstm(x, initial_state=initial_state)
        
        # (attention) key_value, attention_matrix 추가
        # 이전 hidden_state의 값을 concat으로 만들어 vector를 생성합니다.        
        key_value = tf.concat([initial_state[0][:, tf.newaxis, :], x[:, :-1, :]], axis=1)        
        # 이전 hidden_state의 값을 concat으로 만든 vector와 encoder에서 나온 출력 값들로 attention을 구합니다.
        attention_matrix = self.attention([key_value, encoder_inputs])
        # 위에서 구한 attention_matrix와 decoder의 출력 값을 concat 합니다.
        x = tf.concat([x, attention_matrix], axis=-1)
        
        x = self.dense(x)
        return x, hidden_state, cell_state

코드 세부설명

STEP 1) 인코더의 최종 hidden_state 값을 디코더 attention key의 첫 번째 state로 활용

  • initial_state[0]: (batch_size, 128)
  • [initial_state[0][:, tf.newaxis, :]: (batch_size, 1, 128)

STEP 2) 디코더의 이전 time_step의 hidden_state 값과 concat

  • x[:, :-1, :]: (batch_size, 29, 128)

병합 후

  • tf.concat([initial_state[0][:, tf.newaxis, :], x[:, :-1, :]], axis=1): (batch_size, 30, 128)

STEP 3) 1번 단계와 2번 단계에서 병합된 Key 값과 Encoder의 LSTM 전체 output value로 attention 계산

attention_matrix = self.attention([key_value, encoder_inputs])

STEP 4)최종 결과 도출 (concat)

tf.concat([x, attention_matrix], axis=-1)

디코더의 LSTM 전체 output과 attention 값을 concat 하여 Dense로 넘깁니다.

모델 결합

class Seq2Seq(tf.keras.Model):
    def __init__(self, units, vocab_size, embedding_dim, time_steps, start_token, end_token):
        super(Seq2Seq, self).__init__()
        self.start_token = start_token
        self.end_token = end_token
        self.time_steps = time_steps
        
        self.encoder = Encoder(units, vocab_size, embedding_dim, time_steps)
        self.decoder = Decoder(units, vocab_size, embedding_dim, time_steps)
        
        
    def call(self, inputs, training=True):
        if training:
            encoder_inputs, decoder_inputs = inputs
            # (attention) encoder 출력 값 수정
            encoder_outputs, context_vector = self.encoder(encoder_inputs)
            # (attention) decoder 입력 값 수정
            decoder_outputs, _, _ = self.decoder((encoder_outputs, decoder_inputs), initial_state=context_vector)
            return decoder_outputs
        else:
            x = inputs
            # (attention) encoder 출력 값 수정
            encoder_outputs, context_vector = self.encoder(x)
            target_seq = tf.constant([[self.start_token]], dtype=tf.float32)
            results = tf.TensorArray(tf.int32, self.time_steps)
            
            for i in tf.range(self.time_steps):
                decoder_output, decoder_hidden, decoder_cell = self.decoder((encoder_outputs, target_seq), initial_state=context_vector)
                decoder_output = tf.cast(tf.argmax(decoder_output, axis=-1), dtype=tf.int32)
                decoder_output = tf.reshape(decoder_output, shape=(1, 1))
                results = results.write(i, decoder_output)
                
                if decoder_output == self.end_token:
                    break
                    
                target_seq = decoder_output
                context_vector = [decoder_hidden, decoder_cell]
                
            return tf.reshape(results.stack(), shape=(1, self.time_steps))

단어별 원핫인코딩 적용

단어별 원핫인코딩을 적용하는 이유는 decoder의 output(출력)을 원핫인코딩 vector로 변환하기 위함

VOCAB_SIZE = len(tokenizer.word_index)+1
def convert_to_one_hot(padded):
    # 원핫인코딩 초기화
    one_hot_vector = np.zeros((len(answer_out_padded), MAX_LENGTH, VOCAB_SIZE))

    # 디코더 목표를 원핫인코딩으로 변환
    # 학습시 입력은 인덱스이지만, 출력은 원핫인코딩 형식임
    for i, sequence in enumerate(answer_out_padded):
        for j, index in enumerate(sequence):
            one_hot_vector[i, j, index] = 1

    return one_hot_vector
answer_in_one_hot = convert_to_one_hot(answer_in_padded)
answer_out_one_hot = convert_to_one_hot(answer_out_padded)
answer_in_one_hot[0].shape, answer_in_one_hot[0].shape
((30, 12638), (30, 12638))

변환된 index를 다시 단어로 변환

def convert_index_to_text(indexs, end_token): 
    
    sentence = ''
    
    # 모든 문장에 대해서 반복
    for index in indexs:
        if index == end_token:
            # 끝 단어이므로 예측 중비
            break;
        # 사전에 존재하는 단어의 경우 단어 추가
        if index > 0 and tokenizer.index_word[index] is not None:
            sentence += tokenizer.index_word[index]
        else:
        # 사전에 없는 인덱스면 빈 문자열 추가
            sentence += ''
            
        # 빈칸 추가
        sentence += ' '
    return sentence

학습 (Training)

하이퍼 파라미터 정의

BUFFER_SIZE = 1000
BATCH_SIZE = 64
EMBEDDING_DIM = 100
TIME_STEPS = MAX_LENGTH
START_TOKEN = tokenizer.word_index['<START>']
END_TOKEN = tokenizer.word_index['<END>']

UNITS = 128

VOCAB_SIZE = len(tokenizer.word_index)+1
DATA_LENGTH = len(questions)
SAMPLE_SIZE = 3

체크포인트 생성

checkpoint_path = 'model/training_checkpoint-6.ckpt'
checkpoint = ModelCheckpoint(filepath=checkpoint_path, 
                             save_weights_only=True,
                             save_best_only=True, 
                             monitor='loss', 
                             verbose=1
                            )

분산환경 설정

strategy = tf.distribute.MirroredStrategy()
strategy.num_replicas_in_sync

모델 생성 & compile

# 분산 환경 적용시
with strategy.scope():
    seq2seq = Seq2Seq(UNITS, VOCAB_SIZE, EMBEDDING_DIM, TIME_STEPS, START_TOKEN, END_TOKEN)
    seq2seq.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'])
seq2seq = Seq2Seq(UNITS, VOCAB_SIZE, EMBEDDING_DIM, TIME_STEPS, START_TOKEN, END_TOKEN)
seq2seq.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'])
# 연속하여 학습시 체크포인트를 로드하여 이어서 학습합니다.
seq2seq.load_weights(checkpoint_path)
<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f56a0067f60>
def make_prediction(model, question_inputs):
    results = model(inputs=question_inputs, training=False)
    # 변환된 인덱스를 문장으로 변환
    results = np.asarray(results).reshape(-1)
    return results
for epoch in range(35):
    seq2seq.fit([question_padded, answer_in_padded],
                answer_out_one_hot,
                epochs=10,
                batch_size=16, 
                callbacks=[checkpoint]
               )
    # 랜덤한 샘플 번호 추출
    samples = np.random.randint(DATA_LENGTH, size=SAMPLE_SIZE)

    # 예측 성능 테스트
    for idx in samples:
        question_inputs = question_padded[idx]
        # 문장 예측
        results = make_prediction(seq2seq, np.expand_dims(question_inputs, 0))
        
        # 변환된 인덱스를 문장으로 변환
        results = convert_index_to_text(results, END_TOKEN)
        
        print(f'Q: {questions[idx]}')
        print(f'A: {results}\n')
        print()

예측

# 자연어 (질문 입력) 대한 전처리 함수
def make_question(sentence):
    sentence = clean_and_morph(sentence)
    question_sequence = tokenizer.texts_to_sequences([sentence])
    question_padded = pad_sequences(question_sequence, maxlen=MAX_LENGTH, truncating='post', padding='post')
    return question_padded
make_question('오늘 날씨가 정말 화창합니다')
array([[ 76, 534,   7, 110,   1,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0]], dtype=int32)
make_question('찐찐찐찐찐이야~ 완전 찐이야~')
array([[  1,   1,   1,   1,   1, 870,   1,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0]], dtype=int32)
def run_chatbot(question):
    question_inputs = make_question(question)
    results = make_prediction(seq2seq, question_inputs)
    results = convert_index_to_text(results, END_TOKEN)
    return results

유저로부터 Text 입력 값을 받아 답변 출력

while True:
    user_input = input('<< 말을 걸어 보세요!\n')
    if user_input == 'q':
        break
    print('>> 챗봇 응답: {}'.format(run_chatbot(user_input)))
<< 말을 걸어 보세요!
커피를 마시고 싶습니다
>> 챗봇 응답: 저 랑 한 잔 해 요 
<< 말을 걸어 보세요!
여행 가고 싶습니다
>> 챗봇 응답: 화장실 가세 요 
<< 말을 걸어 보세요!
살 빼야 합니다
>> 챗봇 응답: 운동 을 시작 해보세요 
<< 말을 걸어 보세요!
다이어트 하고 싶다
>> 챗봇 응답: 기초 대 사량 을 높 여보세요 
<< 말을 걸어 보세요!
q


관련 글 더보기

- 텐서플로 함수형 (Functional) API 모델링

- LSTM과 FinanceDataReader API를 활용한 삼성전자 주가 예측

- TensorFlow LSTM layer 활용법

- 텐서플로우(tensorflow) 파이참(pycharm)에서 dll 오류 해결

- TensorFlow RNN Text 생성 (셰익스피어 글 생성)

데이터 분석, 머신러닝, 딥러닝의 대중화를 꿈 꿉니다.