RNN, TENSORFLOW, CLOUD ML 엔진을 사용하여 시계열 예측을 수행하는 방법

tf.contrib.learn의 Estimators API는 TensorFlow를 사용하여 시작하는 매우 편리한 방법입니다. Estimators API에 대한 내 견해에서 정말 멋진 점은 분산된 TensorFlow 모델을 만드는 것은 매우 쉬운 방법이라는 것입니다. 인터넷에서 떠 다니는 TensorFlow 샘플은 분산되어 있지 않습니다. 단일 시스템에서 코드를 실행한다고 가정합니다. 사람들은 그런 코드로 시작하여 낮은 수준의 TensorFlow 코드가 실제로 완전한 데이터 세트에서 작동하지 않는다는 것을 알게 됩니다. 그런 다음 원래의 샘플 주위에 분산된 학습 코드를 추가하고 다른 사람의 코드를 편집하기 위해 많은 작업을 해야합니다.

Estimators API를 사용하지 않는 TensorFlow 샘플이 있으면 그것을 무시하십시오. 프로덕션 (읽기 : 대용량) 데이터 세트에서 작동하게 하려면 많은 작업이 필요합니다. 모니터, 코디네이터, 파라메터 서버 및 모든 종류의 시스템 프로그래밍이 가능합니다. Estimator API로 시작하고 Experiment 클래스를 사용합니다.

시계열 예측에는 커스텀 Estimator가 필요합니다.

Estimators API에는 Deep Neural Network 분류기와 회귀 분석기가 함께 제공됩니다. 일반적인 구조화된 데이터가있는 경우 링크된 자습서인 Google Cloud의 교육 과정 (곧 Coursera에서 사용 가능)을 수강하면 실생활의 대규모 데이터 세트에서 작동하는 머신러닝 모델을 만들 수 있습니다. 구조화된 데이터의 경우에는 관계형 데이터웨어 하우스에 저장하십시오. 그러나 일반적으로 구조화되지 않은 데이터 문제의 경우에는 커스텀 Estimator를 만들어야합니다.

머신러닝을 수행하고자 하는 일반적인 유형의 데이터는 시계열 데이터입니다. 본질적으로 입력은 숫자 세트이며 그 순서에서 다음 숫자를 예측하려고합니다. 이 포스트에서는 좀 더 일반적인 것으로 만들고 시퀀스의 마지막 두 숫자 를 예측한다고 가정합니다 . 컴퓨터 과학의 속담에 두 가지를 할 수 있다면, N개를 할 수 있다고 합니다.

시퀀스 간 예측에 사용되는 전통적인 신경 네트워크 아키텍처를 RNN (Recurrent Neural Network)이라고합니다. RNN을 구현하는 방법을 알 필요가 없으므로 포스트가 원하는 것보다 더 깊이 들어가는 경우에는 종료하십시오.

이 글을 따라 가기 위해 Jupyter Notebook을 다른 브라우저 창에서 열어 두십시오 . 여기에는 핵심 코드만 표시하고 있습니다. Jupyter Notebook과  GitHub 폴더에는 모든 코드가 들어 있습니다.

시계열 데이터 일부 시뮬레이션

원하는 만큼 생성할 수 있는 작은 샘플 데이터 세트로 배우는 것이 일반적으로 쉽습니다. 실제 데이터에는 고유 한 단점이 있습니다! 그래서 일련의 시계열 데이터를 생성 해 봅시다. 각 시퀀스는 10개의 숫자로 구성됩니다. 우리는 첫 8개를 입력으로 사용하고 마지막 두개를 레이블로 사용합니다 (예 : 예측할 대상).

numpy (np)를 사용하여 이러한 시계열을 생성하는 코드는 다음과 같습니다.

SEQ_LEN = 10

  def create_time_series():

  freq = (np.random.random()*0.5) + 0.1 # 0.1 to 0.6

  ampl = np.random.random() + 0.5 # 0.5 to 1.5

  x = np.sin(np.arange(0,SEQ_LEN) * freq) * ampl

  return x

이러한 시계열 시퀀스를 CSV 파일 (train.csv 및 valid.csv)에 작성하면 우리는 이제 시작하게 되는 것입니다.

입력 기능

TensorFlow의 Estimators API가 작동하는 방식은 데이터를 읽기 위해 input_fn을 제공해야 합니다. x와 y 값을 제공하지 않습니다. 대신 입력 및 레이블을 반환하는 함수를 제공합니다. 입력은 모든 입력 (텐서에 대한 입력 이름)의 사전이고 레이블은 텐서입니다.

이 경우 CSV 파일은 단순히 10개의 부동 소수점 숫자로 구성됩니다. DEFAULTS는 텐서의 데이터 유형을 지정하는 역할을 합니다. 우리는 한 번에 20라인 씩 데이터를 읽고 싶습니다. 그것은 BATCH_SIZE입니다. 일괄 처리는 경사 하강이 수행되는 샘플 수입니다. 이 숫자로 실험해야 합니다. 너무 큰 경우 훈련이 느리고 너무 작으면 훈련이 수렴하지 않을 것입니다. 우리는 입력만 했으므로 입력한 이름은 중요하지 않습니다. 우리는 그것을 rawdata라고 부를 것입니다.

DEFAULTS = [[0.0] for x in xrange(0, SEQ_LEN)]

BATCH_SIZE = 20

TIMESERIES_COL = ‘rawdata’

N_OUTPUTS = 2      # in each sequence, 1-8 are features, and 9-10 is label

N_INPUTS = SEQ_LEN – N_OUTPUTS

Estimators API가 원하는 input_fn은 파라메터를 사용하지 않아야 합니다. 그러나 우리는 명령줄에서 읽을 파일 이름을 제공 할 수 있기를 원합니다. 자, input_fn을 반환하는 read_dataset () 함수를 작성해 보겠습니다.

# read data and convert to needed format

def read_dataset(filename, mode=tf.contrib.learn.ModeKeys.TRAIN):

    def _input_fn():

         num_epochs = 100 if mode == tf.contrib.learn.ModeKeys.TRAIN else 1

우리가 하는 첫번째 일은 에포크의 수를 결정하는 것입니다. 이것은 우리가 데이터 세트를 거쳐야하는 횟수입니다. 교육을 받는 경우 100번, 데이터를 평가하는 경우 한 번만 데이터 세트를 살펴 보겠습니다.

그런 다음 와일드 카드 확장을 수행합니다. 많은 경우 Big Data 프로그램은 train.csv-0001-of-0036과 같은 공유 파일을 생성하므로 train.csv *를 입력으로 제공하기만 합니다. 이것을 사용하여 파일 이름 대기열을 채운 다음 TextLineReader를 사용하여 데이터를 읽습니다.

# could be a path to one file or a file pattern.

input_file_names = tf.train.match_filenames_once(filename)

filename_queue = tf.train.string_input_producer(

         input_file_names, num_epochs=num_epochs, shuffle=True)

reader = tf.TextLineReader()

    _, value = reader.read_up_to(filename_queue, num_records=BATCH_SIZE)

value_column = tf.expand_dims(value, -1)

그런 다음 처음 8개의 숫자를 입력으로 처리하고 마지막 2개를 레이블로 처리하여 데이터를 디코딩합니다. 우리가 그것을 읽을 때, 입력은 batchsize x 1 인 8개의 텐서 리스트입니다. tf.concat을 사용하면 8x batchsize의 단일 텐서가됩니다. Estimators API가 목록이 아닌 텐서를 원하기 때문에 이것은 중요합니다.

# all_data is a list of tensors

all_data = tf.decode_csv(value_column, record_defaults=DEFAULTS)

inputs = all_data[:len(all_data)-N_OUTPUTS] # first few values

label = all_data[len(all_data)-N_OUTPUTS : ] # last few values

# from list of tensors to tensor with one more dimension

inputs = tf.concat(inputs, axis=1)

label = tf.concat(label, axis=1)

print ‘inputs={}’.format(inputs)

return {TIMESERIES_COL: inputs}, label # dict of features, label

RNN 정의

우리가 LinearRegressor, DNNRegressor, DNNLinearCombinedRegressor 등을 사용했다면 우리는 단순히 기존 클래스를 사용할 수 있었습니다. 그러나 sequence-to-sequence 예측을 하기 때문에 우리는 우리 자신의 모델 함수를 작성해야합니다. 적어도 당분간 Estimators API에는 즉시 사용 가능한 RNNRegressor가 제공되지 않습니다. 따라서 저수준 TensorFlow 기능을 사용하여 자체 RNN 모델을 작성해 봅시다.

LSTM_SIZE = 3  # number of hidden layers in each of the LSTM cells

# create the inference model
def simple_rnn(features, targets, mode):
# 0. Reformat input shape to become a sequence
x = tf.split(features[TIMESERIES_COL], N_INPUTS, 1)
#print ‘x={}’.format(x)

# 1. configure the RNN
lstm_cell = rnn.BasicLSTMCell(LSTM_SIZE, forget_bias=1.0)
outputs, _ = rnn.static_rnn(lstm_cell, x, dtype=tf.float32)

  # slice to keep only the last cell of the RNN
outputs = outputs[-1]
#print ‘last outputs={}’.format(outputs)

# output is result of linear activation of last layer of RNN
weight = tf.Variable(tf.random_normal([LSTM_SIZE, N_OUTPUTS]))
bias = tf.Variable(tf.random_normal([N_OUTPUTS]))
predictions = tf.matmul(outputs, weight) + bias

# 2. Define the loss function for training/evaluation
#print ‘targets={}’.format(targets)
#print ‘preds={}’.format(predictions)
loss = tf.losses.mean_squared_error(targets, predictions)
eval_metric_ops = {
“rmse”: tf.metrics.root_mean_squared_error(targets, predictions)
}

# 3. Define the training operation/optimizer
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.contrib.framework.get_global_step(),
learning_rate=0.01,
optimizer=”SGD”)

  # 4. Create predictions
predictions_dict = {“predicted”: predictions}

# 5. return ModelFnOps
return tflearn.ModelFnOps(
mode=mode,
predictions=predictions_dict,
loss=loss,
train_op=train_op,
eval_metric_ops=eval_metric_ops)

input_fn에서 피쳐로 전달하기 위해 입력을 단일 텐서로 묶어야한다는 것을 상기하십시오. 단계 0은 그 과정을 단순히 뒤집어서 텐서리스트를 되찾습니다.

Recurrent Neural Network는 사용자가 입력을 전달하는 BasicLSTMLCell로 구성됩니다. 출력과 상태를 다시 얻습니다. 슬라이스하여 RNN의 마지막 셀만 유지합니다. 이전 상태를 사용하지 않습니다. 다른 아키텍처도 가능합니다. 예를 들어 네트워크가 항상 하나의 출력만 갖고 롤링된 창을 사용하도록 훈련 할 수 있었습니다. 이 푸스트의 끝 부분에서 예제를 수정하여 이 작업을 수행하는 방법에 대해 설명하겠습니다.

위 코드의 주석은 다른 단계와 관련하여 매우 분명합니다. 우리는 그곳에서 놀라운 일을하지 않습니다. 이것은 회귀 문제이므로 RMSE를 사용하고 있습니다.

실험 만들기

Experiment 클래스는 Estimators API의 스마트 클래스입니다. 이것은 모델 함수를 취하는 방법, 훈련 및 검증을위한 함수를 입력하고 배급, 조기 정지 등과 같은 합리적인 것들을 알고 있습니다.

def get_train():
return read_dataset(‘train.csv’, mode=tf.contrib.learn.ModeKeys.TRAIN)

def get_valid():
return read_dataset(‘valid.csv’, mode=tf.contrib.learn.ModeKeys.EVAL)

def experiment_fn(output_dir):
# run experiment
return tflearn.Experiment(
tflearn.Estimator(model_fn=simple_rnn, model_dir=output_dir),
train_input_fn=get_train(),
eval_input_fn=get_valid(),
eval_metrics={
‘rmse’: tflearn.MetricSpec(
metric_fn=metrics.streaming_root_mean_squared_error
)
}
)

shutil.rmtree(‘outputdir’, ignore_errors=True) # start fresh each time
learn_runner.run(experiment_fn, ‘outputdir’)

클라우드 교육

위의 코드는 단일 시스템에서 작동합니다. Python 모듈에 패키지를 작성하면 CloudML Engine에 제출하여 서버가 없는 방식으로 학습 할 수 있습니다.

OUTDIR=gs://${BUCKET}/simplernn/model_trained
JOBNAME=simplernn_$(date -u +%y%m%d_%H%M%S)
REGION=us-central1
gsutil -m rm -rf $OUTDIR
gcloud ml-engine jobs submit training $JOBNAME \
–region=$REGION \
–module-name=trainer.task \
–package-path=${REPO}/simplernn/trainer \
–job-dir=$OUTDIR \
–staging-bucket=gs://$BUCKET \
–scale-tier=BASIC \
–runtime-version=1.0 \
— \
–train_data_paths=”gs://${BUCKET}/train.csv*” \
–eval_data_paths=”gs://${BUCKET}/valid.csv*”  \
–output_dir=$OUTDIR \
–num_epochs=100

일반적인 변형 : 매우 긴 시계열

이 글에서는 수천개의 짧은 (10개의 요소) 시퀀스가 ​​있다고 가정했습니다. 매우 긴 서열을 갖는다면 어떨까요? 예를 들어 주식 가격이나 센서에서 읽는 온도를 가질 수 있습니다. 그러한 경우, 할 수 있는 일은 긴 시퀀스를 고정된 길이의 롤링 시퀀스로 분해하는 것입니다. 이 길이는 분명히 임의적이지만 “look-back” 인터벌 RNN으로 생각하십시오. 긴 시퀀스를 취하여 고정 길이의 더 작고 겹치는 시퀀스로 나누는 TensorFlow 코드는 다음과 같습니다.

import tensorflow as tf
import numpy as np
def breakup(sess, x, lookback_len):
N = sess.run(tf.size(x))
windows = [tf.slice(x, [b], [lookback_len]) for b in xrange(0, N-lookback_len)]
windows = tf.stack(windows)
return windows
For example:

x = tf.constant(np.arange(1,11, dtype=np.float32))
with tf.Session() as sess:
print ‘input=’, x.eval()
seqx = breakup(sess, x, 5)
print ‘output=’, seqx.eval()
will result in:

input= [  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
output= [[ 1.  2.  3.  4.  5.]
[ 2.  3.  4.  5.  6.]
[ 3.  4.  5.  6.  7.]
[ 4.  5.  6.  7.  8.]
[ 5.  6.  7.  8.  9.]]

이러한 고정 길이 시퀀스가 ​​있으면 모든 것이 이전과 동일합니다.

원문 : http://dataconomy.com/2017/05/how-to-do-time-series-prediction-using-rnns-tensorflow-and-cloud-ml-engine/