🔥알림🔥
① 테디노트 유튜브 -
구경하러 가기!
② LangChain 한국어 튜토리얼
바로가기 👀
③ 랭체인 노트 무료 전자책(wikidocs)
바로가기 🙌
④ RAG 비법노트 LangChain 강의오픈
바로가기 🙌
⑤ 서울대 PyTorch 딥러닝 강의
바로가기 🙌
LSTM과 FinanceDataReader API를 활용한 삼성전자 주가 예측
이번 튜토리얼에서는 딥러닝 모델을 활용하여 삼성전자 주가 예측을 진행해 보겠습니다.
이전에도 한 번 공유해 드린 적이 있고, 다시 내용을 작성하는 이유는 이번에는 FinanceDataReader 파이썬 패키지를 활용하여 진행해볼 예정입니다.
FinanceDataReader를 활용하면 더이상 파일데이터로 주가 데이터를 수집해올 필요가 없으며, 가장 최신의 업데이트된 데이터를 API 콜을 통해 DataFrame으로 받아올 수 있습니다.
또한 TensorFlow에서 윈도우 데이터셋으로 구성하여 코드가 보다 간결해 졌습니다.
이전 블로그 글과 비교해보실 분들은 아래 링크를 참고해 주세요.
LSTM을 활용한 주가 예측 모델
이번 튜토리얼 에서는 다음과 같은 프로세스 파이프라인으로 주가 예측을 진행합니다.
- FinanceDataReader를 활용하여 주가 데이터 받아오기
- TensorFlow Dataset 클래스를 활용하여 주가 데이터 구축
- LSTM 을 활용한 주가 예측 모델 구축
필요한 모듈 import
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import os
%matplotlib inline
warnings.filterwarnings('ignore')
plt.rcParams['font.family'] = 'NanumGothic'
데이터 (FinanceDataReader)
FinanceDataReader는 주가 데이터를 편리하게 가져올 수 있는 파이썬 패키지입니다.
FinanceDataReader가 아직 설치 되지 않으신 분들은 아래의 주석을 해제한 후 명령어로 설치해 주시기 바랍니다.
# !pip install finance-datareader
import FinanceDataReader as fdr
# 삼성전자(005930) 전체 (1996-11-05 ~ 현재)
samsung = fdr.DataReader('005930')
매우 편리하게 삼성전자 주가 데이터를 DataFrame
형식으로 받아옵니다.
기본 오름차순 정렬이 된 데이터임을 알 수 있습니다.
컬럼 설명
Open
: 시가High
: 고가Low
: 저가Close
: 종가Volume
: 거래량Change
: 대비
samsung.tail()
미국 주식 데이터도 가져올 수 있습니다.
# Apple(AAPL), 애플
apple = fdr.DataReader('AAPL')
apple.tail()
다음과 같이 2017
을 같이 넘겨주면, 해당 시점 이후의 주식 데이터를 가져옵니다.
# Apple(AAPL), 애플
apple = fdr.DataReader('AAPL', '2017')
apple.head()
시작과 끝 날짜를 지정하여 범위 데이터를 가져올 수 있습니다.
# Ford(F), 1980-01-01 ~ 2019-12-30 (40년 데이터)
ford = fdr.DataReader('F', '1980-01-01', '2019-12-30')
ford.head()
ford.tail()
그 밖에 금, 은과 같은 현물, 달러와 같은 화폐 데이터도 가져올 수 있습니다.
더욱 자세한 내용은 GitHub 페이지 링크를 참고해 보시기 바랍니다.
주가데이터 가져오기
지난 주식 예측 튜토리얼에서는 삼성전자 주가데이터로 실습을 해봤으니,
이번에는 글로벌 증시의 대장주인 Apple사의 주가데이터를 가져와서 예측해 보도록 하겠습니다.
# 삼성전자 주식코드: 005930
STOCK_CODE = '005930'
stock = fdr.DataReader(STOCK_CODE)
stock.head()
stock.tail()
stock.index
위에서 보시는 바와 같이 index가 DatetimeIndex
로 지정되어 있습니다.
DatetimeIndex
로 정의되어 있다면, 아래와 같이 연도, 월, 일을 쪼갤 수 있으며, 월별, 연도별 피벗데이터를 만들때 유용하게 활용할 수 있습니다.
stock['Year'] = stock.index.year
stock['Month'] = stock.index.month
stock['Day'] = stock.index.day
stock.head()
시각화
plt.figure(figsize=(16, 9))
sns.lineplot(y=stock['Close'], x=stock.index)
plt.xlabel('time')
plt.ylabel('price')
time_steps = [['1990', '2000'],
['2000', '2010'],
['2010', '2015'],
['2015', '2020']]
fig, axes = plt.subplots(2, 2)
fig.set_size_inches(16, 9)
for i in range(4):
ax = axes[i//2, i%2]
df = stock.loc[(stock.index > time_steps[i][0]) & (stock.index < time_steps[i][1])]
sns.lineplot(y=df['Close'], x=df.index, ax=ax)
ax.set_title(f'{time_steps[i][0]}~{time_steps[i][1]}')
ax.set_xlabel('time')
ax.set_ylabel('price')
plt.tight_layout()
plt.show()
데이터 전처리
주가 데이터에 대하여 딥러닝 모델이 더 잘 학습할 수 있도록 정규화(Normalization)를 해주도록 하겠습니다.
표준화 (Standardization)와 정규화(Normalization)에 대한 내용은 아래 링크에서 더 자세히 다루니, 참고해 보시기 바랍니다.
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
# 스케일을 적용할 column을 정의합니다.
scale_cols = ['Open', 'High', 'Low', 'Close', 'Volume']
# 스케일 후 columns
scaled = scaler.fit_transform(stock[scale_cols])
scaled
스케일이 완료된 column으로 새로운 데이터프레임을 생성합니다.
시간 순으로 정렬되어 있으며, datetime index는 제외했습니다.
6,000개의 row, 5개 column으로 이루어진 데이터셋이 DataFrame으로 정리되었습니다.
df = pd.DataFrame(scaled, columns=scale_cols)
train / test 분할
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(df.drop('Close', 1), df['Close'], test_size=0.2, random_state=0, shuffle=False)
x_train.shape, y_train.shape
x_test.shape, y_test.shape
x_train
TensroFlow Dataset을 활용한 시퀀스 데이터셋 구성
import tensorflow as tf
def windowed_dataset(series, window_size, batch_size, shuffle):
series = tf.expand_dims(series, axis=-1)
ds = tf.data.Dataset.from_tensor_slices(series)
ds = ds.window(window_size + 1, shift=1, drop_remainder=True)
ds = ds.flat_map(lambda w: w.batch(window_size + 1))
if shuffle:
ds = ds.shuffle(1000)
ds = ds.map(lambda w: (w[:-1], w[-1]))
return ds.batch(batch_size).prefetch(1)
Hyperparameter를 정의합니다.
WINDOW_SIZE=20
BATCH_SIZE=32
# trian_data는 학습용 데이터셋, test_data는 검증용 데이터셋 입니다.
train_data = windowed_dataset(y_train, WINDOW_SIZE, BATCH_SIZE, True)
test_data = windowed_dataset(y_test, WINDOW_SIZE, BATCH_SIZE, False)
# 아래의 코드로 데이터셋의 구성을 확인해 볼 수 있습니다.
# X: (batch_size, window_size, feature)
# Y: (batch_size, feature)
for data in train_data.take(1):
print(f'데이터셋(X) 구성(batch_size, window_size, feature갯수): {data[0].shape}')
print(f'데이터셋(Y) 구성(batch_size, window_size, feature갯수): {data[1].shape}')
모델
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Conv1D, Lambda
from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
model = Sequential([
# 1차원 feature map 생성
Conv1D(filters=32, kernel_size=5,
padding="causal",
activation="relu",
input_shape=[WINDOW_SIZE, 1]),
# LSTM
LSTM(16, activation='tanh'),
Dense(16, activation="relu"),
Dense(1),
])
# Sequence 학습에 비교적 좋은 퍼포먼스를 내는 Huber()를 사용합니다.
loss = Huber()
optimizer = Adam(0.0005)
model.compile(loss=Huber(), optimizer=optimizer, metrics=['mse'])
# earlystopping은 10번 epoch통안 val_loss 개선이 없다면 학습을 멈춥니다.
earlystopping = EarlyStopping(monitor='val_loss', patience=10)
# val_loss 기준 체크포인터도 생성합니다.
filename = os.path.join('tmp', 'ckeckpointer.ckpt')
checkpoint = ModelCheckpoint(filename,
save_weights_only=True,
save_best_only=True,
monitor='val_loss',
verbose=1)
history = model.fit(train_data,
validation_data=(test_data),
epochs=50,
callbacks=[checkpoint, earlystopping])
저장한 ModelCheckpoint 를 로드합니다.
model.load_weights(filename)
test_data
를 활용하여 예측을 진행합니다.
pred = model.predict(test_data)
pred.shape
예측 데이터 시각화
아래 시각화 코드중 y_test 데이터에 [20:]으로 슬라이싱을 한 이유는
예측 데이터에서 20일치의 데이터로 21일치를 예측해야하기 때문에 test_data로 예측 시 앞의 20일은 예측하지 않습니다.
따라서, 20번 째 index와 비교하면 더욱 정확합니다.
plt.figure(figsize=(12, 9))
plt.plot(np.asarray(y_test)[20:], label='actual')
plt.plot(pred, label='prediction')
plt.legend()
plt.show()
댓글남기기