[코드구현] Time Series Forecasting - LSTM (seq2seq)

5 minute read

Time Series Forecasting 프로젝트

한 시간 간격으로 측정 되어 있는 한 달치 특정 구간의 평균 속도 데이터를 이용하여 마지막 일주일 간의 평균 속도를 예측하는 task를 수행해 보았다.

Code

 

데이터는 도로교통공사의 오픈 데이터를 직접 가공하였으며 아래에서 다운로드할 수 있다.

csvfile

 

시계열 예측에 많이 사용되는 모델인 Encoder-Decoder LSTM을 직접 모델링한 후 예측을 진행해 보았다.

 

Load Data

import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt

from tqdm import trange
import random
data = pd.read_csv("서인천IC-부평IC 평균속도.csv", encoding='CP949')
plt.figure(figsize=(20,5))
plt.plot(range(len(data)), data["평균속도"])
data.head()
  집계일시 평균속도
0 2021050100 98.63
1 2021050101 100.53
2 2021050102 99.86
3 2021050103 99.34
4 2021050104 93.64

image-20210730150248737

데이터를 보면 년, 월, 일, 시 가 있는 column과 평균속도의 값이 있는 column이 포함되어 있는 744개의 데이터가 있는 것을 확인 할 수 있다.

 

Data Preprocessing

학습을 위해 데이터를 전처리한다.

from sklearn.preprocessing import MinMaxScaler
min_max_scaler = MinMaxScaler()
data["평균속도"] = min_max_scaler.fit_transform(data["평균속도"].to_numpy().reshape(-1,1))

먼저 sklearn의 MinMaxScaler를 사용하여 데이터의 범위를 0~1로 만들어준다.

 

train = data[:-24*7]
train = train["평균속도"].to_numpy()

test = data[-24*7:]
test = test["평균속도"].to_numpy()

마지막 일주일의 데이터를 예측하는 것이 목표이므로 train, test set을 마지막 일주일을 기준으로 나눠준다.

 

Sliding Window Dataset

학습을 위해서는 인풋데이터와 아웃풋 데이터가 필요하다.

시계열 예측을 위해 데이터의 일정한 길이의 input window, output window를 설정하고, 데이터의 처음 부분부터 끝부분까지 sliding 시켜서 데이터셋을 생성한다.

image-20210730172930737

 

torch의 Dataset 클래스를 사용하여 window dataset을 생성하는 클래스를 선언했다.

from torch.utils.data import DataLoader, Dataset

class windowDataset(Dataset):
    def __init__(self, y, input_window, output_window, stride=1):
        #총 데이터의 개수
        L = y.shape[0]
        #stride씩 움직일 때 생기는 총 sample의 개수
        num_samples = (L - input_window - output_window) // stride + 1

        #input과 output : shape = (window 크기, sample 개수)
        X = np.zeros([input_window, num_samples])
        Y = np.zeros([output_window, num_samples])

        for i in np.arange(num_samples):
            start_x = stride*i
            end_x = start_x + input_window
            X[:,i] = y[start_x:end_x]

            start_y = stride*i + input_window
            end_y = start_y + output_window
            Y[:,i] = y[start_y:end_y]

        X = X.reshape(X.shape[0], X.shape[1], 1).transpose((1,0,2))
        Y = Y.reshape(Y.shape[0], Y.shape[1], 1).transpose((1,0,2))
        self.x = X
        self.y = Y
        
        self.len = len(X)
    def __getitem__(self, i):
        return self.x[i], self.y[i]
    def __len__(self):
        return self.len

input window, output window, stride를 입력받고 iw+ow만큼의 길이를 stride간격으로 sliding하면서 데이터셋을 생성한다.

결과의 첫 번째 값으로는 input, 두 번째 값으로는 output이 출력되도록 선언했다.

 

iw = 24*14
ow = 24*7

train_dataset = windowDataset(train, input_window=iw, output_window=ow, stride=1)
train_loader = DataLoader(train_dataset, batch_size=64)

일주일 간의 데이터를 예측해야 하므로 output window의 크기는 24*7로 설정했고 input window의 크기는 그 두배로 설정했다.

dataset을 이용하여 torch 의 DataLoader를 생성해서 배치 학습을 진행 할 수 있도록 했다.

 

Modeling

torch의 nn.Module을 이용하여 encoder, decoder 모델을 만들고 이 둘을 합쳐서 encoder decoder 모델을 만들었다.

각 모델의 역할은 다음과 같다.

  • encoder: input을 통해 decoder에 전달할 hidden state 생성

  • decoder: intput의 마지막 값과 encoder에서 받은 hidden state를 이용하여 한 개의 값을 예측

  • encoder decoder: 위의 두 모델을 합쳐줌. 원하는 길이의 아웃풋이 나올 때까지 decoder를 여러번 실행시켜서 최종 output을 생성. 원활한 학습을 위해 디코더의 인풋으로 실제 값을 넣는 teach forcing을 구현.

 

lstm encoder

input 으로부터 입력을 받고 lstm을 이용하여 디코더에 전달할 hidden state를 생성한다.

class lstm_encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers = 1):
        super(lstm_encoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size, num_layers = num_layers, batch_first=True)

    def forward(self, x_input):
        lstm_out, self.hidden = self.lstm(x_input)
        return lstm_out, self.hidden

 

lstm decoder

sequence의 이전값 하나와, 이전 결과의 hidden state를 입력 받아서 다음 값 하나를 예측한다.

마지막에 fc layer를 연결해서 input size와 동일하게 크기를 맞춰준다.

class lstm_decoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers = 1):
        super(lstm_decoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size,num_layers = num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, input_size)           

    def forward(self, x_input, encoder_hidden_states):
        lstm_out, self.hidden = self.lstm(x_input.unsqueeze(-1), encoder_hidden_states)
        output = self.linear(lstm_out)
        
        return output, self.hidden

 

encoder decoder

위의 두 모델을 합쳐준다.

인코더를 한번 실행시키고 인코더에서 전달받은 hidden state와 input의 마지막값을 decoder에 전달해서 다음 예측값을 구한다. 여기서 나온 값과 hidden state를 반복적으로 사용해서 원하는 길이가 될때 까지 decoder를 실행한다.

decoder의 인풋으로 이전 예측값이 아닌 실제 값을 사용하는 teacher forcing도 구현하였다.

class lstm_encoder_decoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(lstm_encoder_decoder, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size

        self.encoder = lstm_encoder(input_size = input_size, hidden_size = hidden_size)
        self.decoder = lstm_decoder(input_size = input_size, hidden_size = hidden_size)

    def forward(self, inputs, targets, target_len, teacher_forcing_ratio):
        batch_size = inputs.shape[0]
        input_size = inputs.shape[2]

        outputs = torch.zeros(batch_size, target_len, input_size)

        _, hidden = self.encoder(inputs)
        decoder_input = inputs[:,-1, :]
        
        #원하는 길이가 될 때까지 decoder를 실행한다.
        for t in range(target_len): 
            out, hidden = self.decoder(decoder_input, hidden)
            out =  out.squeeze(1)
            
            # teacher forcing을 구현한다.
            # teacher forcing에 해당하면 다음 인풋값으로는 예측한 값이 아니라 실제 값을 사용한다.
            if random.random() < teacher_forcing_ratio:
                decoder_input = targets[:, t, :]
            else:
                decoder_input = out
            outputs[:,t,:] = out

        return outputs
	
    # 편의성을 위해 예측해주는 함수도 생성한다.
    def predict(self, inputs, target_len):
        self.eval()
        inputs = inputs.unsqueeze(0)
        batch_size = inputs.shape[0]
        input_size = inputs.shape[2]
        outputs = torch.zeros(batch_size, target_len, input_size)
        _, hidden = self.encoder(inputs)
        decoder_input = inputs[:,-1, :]
        for t in range(target_len): 
            out, hidden = self.decoder(decoder_input, hidden)
            out =  out.squeeze(1)
            decoder_input = out
            outputs[:,t,:] = out
        return outputs.detach().numpy()[0,:,0]

 

Train

생성한 모델과 데이터를 사용하여 훈련을 시작한다.

model = lstm_encoder_decoder(input_size=1, hidden_size=16).to(device)
learning_rate=0.01
epoch = 3000
optimizer = optim.Adam(model.parameters(), lr = learning_rate)
criterion = nn.MSELoss()
from tqdm import tqdm

model.train()
with tqdm(range(epoch)) as tr:
    for i in tr:
        total_loss = 0.0
        for x,y in train_loader:
            optimizer.zero_grad()
            x = x.to(device).float()
            y = y.to(device).float()
            output = model(x, y, ow, 0.6).to(device)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.cpu().item()
        tr.set_postfix(loss="{0:.5f}".format(total_loss/len(train_loader)))
100%|██████████| 3000/3000 [15:08<00:00,  3.30it/s, loss=0.00041]

MSELoss를 사용하고 3000번의 epoch로 학습을 진행했다. teacher forcing 비율은 0.6으로 설정했다.

총 학습 시간은 약 15분 정도 소모되었다.

 

Evaluate

학습된 모델을 사용해서 훈련집합에는 포함되지 않았던 마지막 일주일의 데이터를 예측해 보았다.

predict = model.predict(torch.tensor(train[-24*7*2:]).reshape(-1,1).to(device).float(), target_len=ow)
real = data["평균속도"].to_numpy()

predict = min_max_scaler.inverse_transform(predict.reshape(-1,1))
real = min_max_scaler.inverse_transform(real.reshape(-1,1))
plt.figure(figsize=(20,5))
plt.plot(range(400,744), real[400:], label="real")
plt.plot(range(744-24*7,744), predict[-24*7:], label="predict")

plt.title("Test Set")
plt.legend()
plt.show()

image-20210802160043945

결과 그래프를 확인해 보면 데이터의 패턴을 어느정도 잘 예측한 것을 확인할 수 있다.

 

def MAPEval(y_pred, y_true):
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100
MAPEval(predict[-24*7:],real[-24*7:])
8.985569589218596

결과의 MAPE를 확인해본 결과 약 8.99로 좋은 정확도를 가지고 있는 것을 확인할 수 있었다.

 

Conclusion

Encoder Decoder LSTM을 사용해서 시계열 예측을 진행해본 결과 MAPE 8.99의 나쁘지 않은 성능을 확인할 수 있었다.

데이터의 주기를 따로 입력하지 않고도 좋은 성능을 성능을 얻어 낼 수 있었기 때문에 주기를 알 수 없는 데이터에서 사용하면 좋을것 같다는 생각이 들었다.

훈련시간이 오래걸려서 hyper parameter를 다양하게 실험해보지는 못해서 시간을 더 투자한다면 더 좋은 성능을 얻을 수도 있을거라고 생각된다.

모델링, hyper paramerter tuning, training 과 같은 작업들이 시간이 많이 소요되기 때문에 ML 방법들에 비해 시간이 많이 소요된다.

teacher forcing을 적용하면 결과가 훨씬 더 좋아지는 것을 확인했다.