通过tensorflow编写、训练和测试模型,咱们都已经轻车熟路,但是在生产环境部署模型挺不容易。在tensorflow中默认有两种方法TensorFlow 模型保存/载入保存参数,使用Saver方法仅保存变量在推断的时候需要重新定义Graph,使用tf.train.import_meta_graph导入graph信息并创建Saver,再使用Saver restore变量,但是为了找到输入输出的tensor,还得用graph.get_tensor_by_name()来获取,也就是还需要知道在定义模型阶段所赋予这些tensor的名字。
谷歌提供的模型部署组件tensorflow serving,解决了上面的问题。不过目前官网提供的二进制安装包不支持GPU,需要支持GPU的需要重源码编译,请参考下面的链接。
本文通过tensorflow serving部署一组预测模型的完成过程,记录各个接口的编码过程
Introductory Tutorial to TensorFlow Serving
Tensorflow Serving | model server GPU版本编译
安装tensorflow serving的方法有两种,一种是通过二进制包安装,简单快捷;一种是通过源码安装,编译过程经常出错,不过需要GPU支持的话,需要从源码编译。具体请参考tensorflow serving install
也就是接口变量需要通过tf.saved_model模块来保存
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
tf.reset_default_graph()
dataset = np.loadtxt('/home/zzx/2.txt')
dataset[:10]
dataset = (dataset - dataset.mean())/dataset.std()
dataset[:10]
#设置模型输出的路径
MODELS_OUPUT_DIR='/home/zzx/scenic_prediction/models2'
VERSION='01'
#模型参数
window_size=20
rnn_size=20
batch_size=100
LEARNING_RATE=0.001
NUM_EPOCHS=15
数据预处理
#序列数据的截取函数
def window_transform_series(series,window_size):
X = [series[i:(i+window_size)] for i in range(len(series)-window_size)]
y = [series[i+window_size] for i in range(len(series)-window_size)]
# reshape each
X = np.asarray(X)
X.shape = (np.shape(X)[0:2])
y = np.asarray(y)
y.shape = (len(y),1)
return X,y
#获取输入输出数据
X,y = window_transform_series(series = dataset,window_size = window_size)
#分析训练集和测试集
train_test_split = int(np.ceil(9*len(y)/float(10)))
X_train = X[:train_test_split,:]
y_train = y[:train_test_split]
X_test = X[train_test_split:,:]
y_test = y[train_test_split:]
X_train = np.asarray(np.reshape(X_train, (X_train.shape[0], window_size, 1)))
X_test = np.asarray(np.reshape(X_test, (X_test.shape[0], window_size, 1)))
print(X_train.shape)
print(y_train.shape)
def get_batches(batch_size,x_inputs,y_outputs):
batches_len=int(np.floor(len(x_inputs)/batch_size))
print(batches_len)
x_batches=np.asarray([x_inputs[i*batch_size:(i+1)*batch_size,:] for i in range(0,batches_len)]).astype(np.float64)
y_batches=np.asarray([y_outputs[i*batch_size:(i+1)*batch_size,:] for i in range(0,batches_len)]).astype(np.float64)
return x_batches,y_batches
模型定义
inputs = tf.placeholder(tf.float32, [None, None,1],name='inputs')
targets = tf.placeholder(tf.float32, [None, None],name='targets')
lr = tf.placeholder(tf.float32, name='lr')
def model(inputs,rnn_size,batch_size):
num_layers=1
lstm = tf.contrib.rnn.BasicLSTMCell(rnn_size,state_is_tuple=False,activation=tf.nn.relu)
cell = tf.contrib.rnn.MultiRNNCell([lstm] * num_layers)
inital_state=tf.identity(cell.zero_state(batch_size,tf.float32),name='initial_state')
outputs,final_state=tf.nn.dynamic_rnn(cell=cell,inputs=inputs,dtype=tf.float32)
print(final_state)
#最后的链接层不再需要激活函数
logits=tf.contrib.layers.fully_connected(inputs=outputs[:, -1],num_outputs=1,activation_fn=None)
return logits,inital_state
logits,inital_state = model(inputs,rnn_size,batch_size)
cost = tf.losses.mean_squared_error(labels=targets,predictions=logits)
optimizer = tf.train.AdamOptimizer(lr)
gradients = optimizer.compute_gradients(cost)
capped_gradients = [(tf.clip_by_value(grad, -1., 1.), var) for grad, var in gradients if grad is not None]
train_op = optimizer.apply_gradients(capped_gradients)
init=tf.global_variables_initializer()
x_batches,y_batches=get_batches(batch_size,X_train,y_train)
print(x_batches.shape)
print(y_batches.shape)
losses=[]
with tf.Session() as sess:
sess.run(init)
state=sess.run(inital_state)
for epoch_i in range(NUM_EPOCHS):
for i in range(len(x_batches)):
feed={inputs:x_batches[i],targets:y_batches[i],lr:LEARNING_RATE}
train_loss, _ = sess.run([cost, train_op],feed_dict=feed)
print('Epoch {:>3} Batch {:>4}/{} train_loss={:.3f}'.format(epoch_i,i,len(x_batches),train_loss))
losses.append(train_loss)
#test data
targets_predict=sess.run(logits,feed_dict={inputs:X_test})
print(targets_predict.shape)
OUTPUT_PATH=os.path.join(tf.compat.as_bytes(MODELS_OUPUT_DIR),tf.compat.as_bytes(VERSION))
print('Exporting trained model to' + str(OUTPUT_PATH))
#创建符合tensorflow serving服务器的模型参数保存builder
builder=tf.saved_model.builder.SavedModelBuilder(OUTPUT_PATH)
#获取输入输出的张量信息
inputs_tensor_info=tf.saved_model.utils.build_tensor_info(inputs)
outputs_tensor_info=tf.saved_model.utils.build_tensor_info(logits)
#对输入输出张量签名
predict_signature=tf.saved_model.signature_def_utils.build_signature_def(
inputs={"inputs":inputs_tensor_info},
outputs={"outputs":outputs_tensor_info},
method_name=tf.saved_model.signature_constants.REGRESS_METHOD_NAME
)
#定义预测函数的名字
builder.add_meta_graph_and_variables(sess,[tf.saved_model.tag_constants.SERVING],{"scenic_prediction_2hour":predict_signature})
builder.save()
plt.plot(losses,color='g')
plt.show()
plt.plot(y_test,color = 'k')
plt.plot(targets_predict,color = 'r')
plt.show()
!ls -rtl /home/zzx/scenic_prediction/models2/01
!cat /home/zzx/scenic_prediction/tfserv.conf
执行下面的命令既可以把模型部署好了
tensorflow_model_server --port=8500 --model_config_file=./tfserv.conf
因为tensorflow serving 是通过protobuf来实现客户端和服务器之间的连接的,所以把protobuf文件编译成支持各种语言的客户端,请参考tobegit3hub,但是支持使用这样的客户端有点不方便。因为前端的人员还需要研究tensorflow serving的各种接口,代码耦合性很高,并且学习成本很大。本文通过官方支持的tensorflow-serving-api结合python版本的socketio库实现一个中转服务的RESTful接口,方便前端直接调用。本文写作的时候,查看官网发现serving已经官方支持RESTful接口了,所以没必要自己实现,不过由于之前还没有,所以把已经写好的代码还是放到这里来吧。
socketio服务提供的接口,socketio支持各种版本的语言客户端,所以前端可以选择各种语言和本服务通讯。
import numpy as np
import socketio
import eventlet
import eventlet.wsgi
from flask import Flask
from grpc.beta import implementations
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2
from tensorflow.python.framework import dtypes
import json
sio = socketio.Server()
app = Flask(__name__)
HOST="127.0.0.1"
PORT=8500
channel=implementations.insecure_channel(HOST,PORT)
stub=prediction_service_pb2.beta_create_PredictionService_stub(channel=channel)
@sio.on('scenic_prediction_2hour',namespace='/')
def nanshanchi_1hours(sid, data):
if data:
print(data)
inputs=np.array(data.split(',')).astype(np.float64)
request=predict_pb2.PredictRequest()
request.model_spec.name='scenic_prediction_2hour'
request.model_spec.signature_name='scenic_prediction_2hour'
request.inputs['inputs'].CopyFrom(tf.contrib.util.make_tensor_proto(inputs,dtype=dtypes.float64,shape=[1, 20,1]))
response=stub.Predict(request,10.0)
print(response)
results={}
for key in response.outputs:
tensor_proto=response.outputs[key]
nd_array=tf.contrib.util.make_ndarray(tensor_proto)
results[key]=str(nd_array[0][0])
print(results)
return json.dumps(results)
else:
print("no data")
# NOTE: DON'T EDIT THIS.
sio.emit('manual', data={}, skip_sid=True)
@sio.on('connect',namespace='/')
def connect(sid, environ):
print("connect ", sid)
@sio.on('disconnect',namespace='/')
def disconnect(sid):
print(sid)
# wrap Flask application with engineio's middleware
app = socketio.Middleware(sio, app)
# deploy as an eventlet WSGI server
eventlet.wsgi.server(eventlet.listen(('', 4567)), app)