딥러닝을 통해 과연 주가예측가능할까요?
제가 생각하기에는 ...yes라고 대답하기 애매한 yes라고 대답하고 싶습니다.
우선 주식예측에 사용되는 대표적인 인공지능 기술들은 NN, CNN, RNN등이 있지만
그중에서 RNN이 근소한차이로 타 인공신경망에 비해 성능이 좀더 좋다고 합니다.
인공신경망 : 인간의 뇌구조에서 영감을 얻은 학습기법 ... (생략)
1. LSTM 이란?
Long - Short - Term - Memory (LSTM) 네트워크는 시간경과에 따른 Backpropagation 을 사용하여
훈련되는 인공신경망네트워크입니다. 이 인공신경망 네트워크는 기계학습에서 어려운문제를
해결하고 최첨단 결과를 달성하는데 사용될수있다고합니다. LSTM 네트워크는 레이어를 통해 연결된 메모리 블록들을 가지고있다.
LSTM 은 RNN 의 한 종류인데 다양한 종류의 업무에 잘 통하는 것으로 알려져 있습니다. 공부하다보니 RNN 설계구조의 종류가 무려 10,000 개 넘게 있다고 합니다.
LSTM은 Recurrent Neural Networks의 한 종류로써, 순서가 중요한 feature의 요소일 때 흔히 적용하는 RNN의 한 종류로써, 쉬운 예를 들어 설명하자면, 문장과 같은 단어가 문장 안에서의 순서가 중요한 경우나, 주가와 같은 시계열 데이터셋에서 효과적인 모델입니다. 특히, 최근에는 자연어 처리 분야에서 활발히 사용되어지는 알고리즘입니다.
LSTM의 가장 중요한 특징은 앞서 말한바와 같이, squence를 저장하고 이를 학습에 활용한다는 점입니다.
2. RNN 이란?
RNN은 이름에서 알 수 있는 것처럼 신경망(Neural Network)의 일종입니다. 일반적인 신경망을 Feed-forward neural networks(FFNets)라고 하는데요. FFNets에서는 데이터를 트레이닝 셋과 테스트 셋으로 나누어서 관리를 하고 트레이닝 셋을 통해서 신경망의 가중치를 학습시키고 이 결과를 테스트셋을 통해서 확인을 하는 방식입니다.
FFNets에서는 데이터를 입력하면 입력층에서 은닉층까지 연산이 차근차근 진행되고 출력이 나가게 됩니다. 이 과정에서 입력 데이터는 모든 노드를 딱 한 번씩만 지나가게 됩니다. 데이터가 노드를 한 번만 지나가게 된다는 것은 데이터의 순서 즉 시간적인 측면을 고려하지 않는 구조라는 의미입니다. 데이터들의 시간 순서를 무시하고 현재 주어진 데이터를 통해서 독립적으로 학습을 합니다.
하지만 RNN의 경우는 은닉층의 결과가 다시 같은 은닉층의 입력으로 들어가도록 연결되어 있습니다. 이런 특성이 RNN이 순서 또는 시간이라는 측면을 고려할 수 있는 특징을 가져다주게 됩니다. RNN에서 Recurrent는 '반복되는, 되풀이되는'이라는 의미를 가지고 있는데 이 이름은 은닉층의 결과가 다시 은닉층으로 들어가게 되는 특성에서 나온 듯 합니다.
일반 인공네트워크신경망
RNN 인공네트워크신경망
좀더 자세히 알아보면
RNN 은 이러한 Backpropagation 구조가 여러개로 연결되어있습니다.
실제로 시간축으로나타내면 이런구조입니다.
개발환경 : 파이썬 (3.7) , tensorflow (1.15.2) numpy(1.19.2) conda(4.9.1)
개발은 jupyter notebook 에서 개발되었습니다.
과거 아마존의 주가 데이터는 finance.yahoo.com/quote/AMZN/history?p=AMZN 에서 가져왔습니다.
1. 모듈임포트
import numpy as np
import pandas as pd
import datetime
import matplotlib.pyplot as plt
import tensorflow as tf
2. 필요한 함수구현
# 표준화
def data_standardization(x):
x_np = np.asarray(x)
return (x_np - x_np.mean()) / x_np.std()
# 너무 작거나 너무 큰 값이 학습을 방해하는 것을 방지하고자 정규화한다
# x가 양수라는 가정하에 최소값과 최대값을 이용하여 0~1사이의 값으로 변환
# Min-Max scaling
def min_max_scaling(x):
x_np = np.asarray(x)
return (x_np - x_np.min()) / (x_np.max() - x_np.min() + 1e-7) # 1e-7은 0으로 나누는 오류 예방차원
# 정규화된 값을 원래의 값으로 되돌린다
# 정규화하기 이전의 org_x값과 되돌리고 싶은 x를 입력하면 역정규화된 값을 리턴한다
def reverse_min_max_scaling(org_x, x):
org_x_np = np.asarray(org_x)
x_np = np.asarray(x)
return (x_np * (org_x_np.max() - org_x_np.min() + 1e-7)) + org_x_np.min()
np.mean -> 평균
np.std -> 표준편차
np.asarry(value , [dtype = ????] )
# 하이퍼파라미터
input_data_column_cnt = 6 # 입력데이터의 컬럼 개수(Variable 개수)
output_data_column_cnt = 1 # 결과데이터의 컬럼 개수
seq_length = 28 # 1개 시퀀스의 길이(시계열데이터 입력 개수)
rnn_cell_hidden_dim = 20 # 각 셀의 (hidden)출력 크기
forget_bias = 1.0 # 망각편향(기본값 1.0)
num_stacked_layers = 1 # stacked LSTM layers 개수
keep_prob = 1.0 # dropout할 때 keep할 비율
epoch_num = 1000 # 에폭 횟수(학습용전체데이터를 몇 회 반복해서 학습할 것인가 입력)
learning_rate = 0.01 # 학습률 (적당히 설정해놔야됨 여기서 적당히란...? )
# 데이터를 로딩한다.
stock_file_name = 'TSLAmax.csv' # 주가데이터 파일
encoding = 'euc-kr' # 문자 인코딩
names = ['Date','Open','High','Low','Close','Adj Close','Volume']
raw_dataframe = pd.read_csv(stock_file_name, names=names, encoding=encoding) #판다스이용 csv파일 로딩
raw_dataframe.info() # 데이터 정보 출력
# raw_dataframe.drop('Date', axis=1, inplace=True) # 시간열을 제거하고 dataframe 재생성하지 않기
del raw_dataframe['Date'] # 위 줄과 같은 효과
stock_info = raw_dataframe.values[1:].astype(np.float) # 금액&거래량 문자열을 부동소수점형으로 변환한다
print("stock_info.shape: ", stock_info.shape)
print("stock_info[0]: ", stock_info[0])
print('date 값 삭제 후 rea_dateframe.info')
raw_dataframe.info() # 데이터 정보 출력
# 데이터 전처리
# 가격과 거래량 수치의 차이가 많아나서 각각 별도로 정규화한다
# 가격형태 데이터들을 정규화한다
# ['Open','High','Low','Close','Adj Close','Volume']에서 'Adj Close'까지 취함
# 곧, 마지막 열 Volume를 제외한 모든 열
price = stock_info[:,:-1]
norm_price = min_max_scaling(price) # 가격형태 데이터 정규화 처리
print("price.shape: ", price.shape)
print("price[0]: ", price[0])
print("norm_price[0]: ", norm_price[0])
print("="*100) # 화면상 구분용
# 거래량형태 데이터를 정규화한다
# ['Open','High','Low','Close','Adj Close','Volume']에서 마지막 'Volume'만 취함
# [:,-1]이 아닌 [:,-1:]이므로 주의하자! 스칼라가아닌 벡터값 산출해야만 쉽게 병합 가능
volume = stock_info[:,-1:]
norm_volume = min_max_scaling(volume) # 거래량형태 데이터 정규화 처리
print("volume.shape: ", volume.shape)
print("volume[0]: ", volume[0])
print("norm_volume[0]: ", norm_volume[0])
print("="*100) # 화면상 구분용
# 행은 그대로 두고 열을 우측에 붙여 합친다
x = np.concatenate((norm_price, norm_volume), axis=1) # axis=1, 세로로 합친다
print("x.shape: ", x.shape)
print("x[0]: ", x[0]) # x의 첫 값
print("x[-1]: ", x[-1]) # x의 마지막 값
print("="*100) # 화면상 구분용
y = x[:, [-2]] # 타켓은 주식 종가이다
print("y[0]: ",y[0]) # y의 첫 값
print("y[-1]: ",y[-1]) # y의 마지막 값
dataX = [] # 입력으로 사용될 Sequence Data
dataY = [] # 출력(타켓)으로 사용
for i in range(0, len(y) - seq_length):
_x = x[i : i+seq_length]
_y = y[i + seq_length] # 다음 나타날 주가(정답)
if i is 0:
print(_x, "->", _y) # 첫번째 행만 출력해 봄
dataX.append(_x) # dataX 리스트에 추가
dataY.append(_y) # dataY 리스트에 추가
# 학습용/테스트용 데이터 생성
# 전체 70%를 학습용 데이터로 사용
train_size = int(len(dataY) * 0.7)
# 나머지(30%)를 테스트용 데이터로 사용
test_size = len(dataY) - train_size
# 데이터를 잘라 학습용 데이터 생성
trainX = np.array(dataX[0:train_size])
trainY = np.array(dataY[0:train_size])
# 데이터를 잘라 테스트용 데이터 생성
testX = np.array(dataX[train_size:len(dataX)])
testY = np.array(dataY[train_size:len(dataY)])
# 텐서플로우 플레이스홀더 생성
# 입력 X, 출력 Y를 생성한다
X = tf.placeholder(tf.float32, [None, seq_length, input_data_column_cnt])
print("X: ", X)
Y = tf.placeholder(tf.float32, [None, 1])
print("Y: ", Y)
# 검증용 측정지표를 산출하기 위한 targets, predictions를 생성한다
targets = tf.placeholder(tf.float32, [None, 1])
print("targets: ", targets)
predictions = tf.placeholder(tf.float32, [None, 1])
print("predictions: ", predictions)
# 모델(LSTM 네트워크) 생성
def lstm_cell():
# LSTM셀을 생성
# num_units: 각 Cell 출력 크기
# forget_bias: to the biases of the forget gate
# (default: 1) in order to reduce the scale of forgetting in the beginning of the training.
# state_is_tuple: True ==> accepted and returned states are 2-tuples of the c_state and m_state.
# state_is_tuple: False ==> they are concatenated along the column axis.
cell = tf.contrib.rnn.BasicLSTMCell(num_units=rnn_cell_hidden_dim,
forget_bias=forget_bias, state_is_tuple=True, activation=tf.nn.softsign)
if keep_prob < 1.0:
cell = tf.contrib.rnn.DropoutWrapper(cell, output_keep_prob=keep_prob)
return cell
# num_stacked_layers개의 층으로 쌓인 Stacked RNNs 생성
stackedRNNs = [lstm_cell() for _ in range(num_stacked_layers)]
multi_cells = tf.contrib.rnn.MultiRNNCell(stackedRNNs, state_is_tuple=True) if num_stacked_layers > 1 else lstm_cell()
# RNN Cell(여기서는 LSTM셀임)들을 연결
hypothesis, _states = tf.nn.dynamic_rnn(multi_cells, X, dtype=tf.float32)
print("hypothesis: ", hypothesis)
# [:, -1]를 잘 살펴보자. LSTM RNN의 마지막 (hidden)출력만을 사용했다.
# 과거 여러 거래일의 주가를 이용해서 다음날의 주가 1개를 예측하기때문에 MANY-TO-ONE형태이다
hypothesis = tf.contrib.layers.fully_connected(hypothesis[:, -1], output_data_column_cnt, activation_fn=tf.identity)
# 손실함수로 평균제곱오차를 사용한다
loss = tf.reduce_sum(tf.square(hypothesis - Y))
# 최적화함수로 AdamOptimizer를 사용한다
optimizer = tf.train.AdamOptimizer(learning_rate)
# optimizer = tf.train.RMSPropOptimizer(learning_rate) # LSTM과 궁합 별로임
train = optimizer.minimize(loss)
# RMSE(Root Mean Square Error)
# 제곱오차의 평균을 구하고 다시 제곱근을 구하면 평균 오차가 나온다
# rmse = tf.sqrt(tf.reduce_mean(tf.square(targets-predictions))) # 아래 코드와 같다
rmse = tf.sqrt(tf.reduce_mean(tf.squared_difference(targets, predictions)))
train_error_summary = [] # 학습용 데이터의 오류를 중간 중간 기록한다
test_error_summary = [] # 테스트용 데이터의 오류를 중간 중간 기록한다
test_predict = '' # 테스트용데이터로 예측한 결과
sess = tf.Session()
# 학습한다
start_time = datetime.datetime.now() # 시작시간을 기록한다
print('학습을 시작합니다...')
for epoch in range(epoch_num):
_, _loss = sess.run([train, loss], feed_dict={X: trainX, Y: trainY})
if ((epoch+1) % 100 == 0) or (epoch == epoch_num-1): # 100번째마다 또는 마지막 epoch인 경우
# 학습용데이터로 rmse오차를 구한다
train_predict = sess.run(hypothesis, feed_dict={X: trainX})
train_error = sess.run(rmse, feed_dict={targets: trainY, predictions: train_predict})
# 테스트용데이터로 rmse오차를 구한다
test_predict = sess.run(hypothesis, feed_dict={X: testX})
test_error = sess.run(rmse, feed_dict={targets: testY, predictions: test_predict})
# 현재 오류를 출력한다
print("epoch: {}, train_error(A): {}, test_error(B): {}, B-A: {}".format(epoch+1, train_error, test_error, test_error-train_error))
end_time = datetime.datetime.now() # 종료시간을 기록한다
elapsed_time = end_time - start_time # 경과시간을 구한다
print('elapsed_time per epoch:',elapsed_time/epoch_num)
# 하이퍼파라미터 출력
print('input_data_column_cnt:', input_data_column_cnt, end='')
print(',output_data_column_cnt:', output_data_column_cnt, end='')
print(',seq_length:', seq_length, end='')
print(',rnn_cell_hidden_dim:', rnn_cell_hidden_dim, end='')
print(',forget_bias:', forget_bias, end='')
print(',num_stacked_layers:', num_stacked_layers, end='')
print(',keep_prob:', keep_prob, end='')
print(',epoch_num:', epoch_num, end='')
print(',learning_rate:', learning_rate, end='')
print(',train_error:', train_error_summary[-1], end='')
print(',test_error:', test_error_summary[-1], end='')
print(',min_test_error:', np.min(test_error_summary))
# 결과 그래프 출력
plt.plot(train_error_summary, 'gold')
plt.plot(test_error_summary, 'b')
plt.ylabel('Root Mean Square Error')
plt.plot(testY, 'r')
plt.plot(test_predict, 'b')
plt.xlabel('Time Period')
plt.ylabel('Stock Price')
# sequence length만큼의 가장 최근 데이터를 슬라이싱한다
recent_data = np.array([x[len(x)-seq_length : ]])
print("recent_data.shape:", recent_data.shape)
print("recent_data:", recent_data)
# 내일 종가를 예측해본다
test_predict = sess.run(hypothesis, feed_dict={X: recent_data})
print("test_predict", test_predict[0])
test_predict = reverse_min_max_scaling(price,test_predict) # 금액데이터 역정규화한다
print("Tomorrow's stock price", test_predict[0]) # 예측한 주가를 출력한다
결과(아마존 1997~ 2017년 까지 데이터를통해 학습했을때의 결과)
결과2. 아마존 최근5년데이터를 가지고 학습했을떄의 결과
80% 의 정확도, 그리고 현재 주가가 아마 3060달러쯤 될텐데
내일의 아마존주가는 2998달러라고한다...