相关文章推荐
大数据文摘 大数据文摘 转载

手把手:我的深度学习模型训练好了,然后要做啥?

姜范波、云舟
编译

本文讲的是如何快速而不求完美地部署一个训练好的 机器学习 模型并应用到实际中。如果你已经成功地使用诸如 Tensor flow或Caffe这样的框架训练好了一个 机器学习 模型,现在你正在试图让这个模型能够快速的演示,那么读这篇文章就对了。

阅读时长: 10-15分钟

使用前检查清单

  • 检查tensorflow的安装

  • 从 stdin 运行在线分类

  • 在本地运行分类

  • 把分类器放到硬编码(hardcoded)的代理

  • 把分类器放到有服务发现(service discovery)的代理

  • 用一个伪DNS调用分类器

机器学习 的实际应用

当我们第一次进入Hive的 机器学习 空间时,针对我们的实际应用场景,我们已经拥有了数百万张准确标记的图像, 这些图像使我们能够在一周之内,从头开始训练最先进的深度卷积 神经网络 图像分类模型 (即随机 权重 )。然而,在更典型的应用场景中,图像的数量级通常只有数百幅,这种情况下,我建议微调现有的模型。比如,https://www.tensorflow.org/tutorials/image_retraining有一个关于如何微调Imagenet模型(在1.2M图像上训练1000个类别)以对花进行分类的样本数据集(3647个图像, 5个类别)。

上面的 Tensor flow教程简要而言,是在安装bazel和tensorflow之后,需要运行以下代码,用大约30分钟的来建模,5分钟来训练:

(
cd "$HOME" && \
curl -O http://download.tensorflow.org/example_images/flower_photos.tgz && \
tar xzf flower_photos.tgz ;
bazel build tensorflow/examples/image_retraining:retrain \
          tensorflow/examples/image_retraining:label_image \
bazel-bin/tensorflow/examples/image_retraining/retrain \
  --image_dir "$HOME"/flower_photos \
  --how_many_training_steps=200
bazel-bin/tensorflow/examples/image_retraining/label_image \
  --graph=/tmp/output_graph.pb \
  --labels=/tmp/output_labels.txt \
  --output_layer=final_result:0 \
  --image=$HOME/flower_photos/daisy/21652746_cc379e0eea_m.jpg

或者,如果你安装了Docker,则可以使用以下预构建的Docker镜像:

sudo docker run -it --net=host liubowei/simple-ml-serving:latest /bin/bash
>>> cat test.sh && bash test.sh

这将进入容器内部的交互式shell中并运行上述命令; 如果你愿意的话,也可以按照容器内的其余部分进行操作。

现在,tensorflow已经将模型信息保存到/tmp/output_graph.pb和/tmp/output_labels.txt中,这些作为命令行 参数 传递给label_image.py脚本。Google的image_recognition教程也链接到另一个脚本,但是这里我们仍将使用label_image.py。

将本地运行转换为在线运行( Tensor flow)

如果我们只想接受来自标准输入的文件名,每行一个,我们就可以很容易地进行“在线”运行:

while read line ; do
bazel-bin/tensorflow/examples/image_retraining/label_image \
--graph=/tmp/output_graph.pb --labels=/tmp/output_labels.txt \
--output_layer=final_result:0 \
--image="$line" ;
done

然而,从性能的角度来看这样糟糕透了—— 每一个输入都要重新加载 神经网络 权重 ,整个 Tensor flow框架和python本身!

当然可以改进。先修改label_image.py 脚本。对我而言,这个脚本的位置在:

in bazel-bin/tensorflow/examples/image_retraining/label_image.runfiles/org_tensorflow/tensorflow/examples/image_retraining/label_image.py.

修改如下:

141:  run_graph(image_data, labels, FLAGS.input_layer, FLAGS.output_layer,
142:        FLAGS.num_top_predictions)141:  for line in sys.stdin:

修改后马上快了很多,但这还不是最好。

141:  for line in sys.stdin:
142:    run_graph(load_image(line), labels, FLAGS.input_layer, FLAGS.output_layer,
142:        FLAGS.num_top_predictions)

原因在于用with tf.Session()构建对话。 Tensor flow本质上是在每次调用run_graph时将所有的计算加载到内存中。一旦开始尝试在GPU上进行运算,这一点就会变得很明显—— 可以看到GPU内存使用随着 Tensor flow加载和卸载GPU的模型 参数 而上下波动。 据我所知,这种结构并不存在于Caffe或Pytorch框架中。

解决方法是把with命令去掉,传递一个sess变量到run_graph:

def run_graph(image_data, labels, input_layer_name, output_layer_name,
              num_top_predictions, sess):
    # Feed the image_data as input to the graph.
    #   predictions will contain a two-dimensional array, where one
    #   dimension represents the input image count, and the other has
    #   predictions per class
    softmax_tensor = sess.graph.get_tensor_by_name(output_layer_name)
    predictions, = sess.run(softmax_tensor, {input_layer_name: image_data})
    # Sort to show labels in order of confidence
    top_k = predictions.argsort()[-num_top_predictions:][::-1]
    for node_id in top_k:
      human_string = labels[node_id]
      score = predictions[node_id]
      print('%s (score = %.5f)' % (human_string, score))
    return [ (labels[node_id], predictions[node_id].item()) for node_id in top_k ] # numpy floats are not json serializable, have to run item
  with tf.Session() as sess:
    for line in sys.stdin:
      run_graph(load_image(line), labels, FLAGS.input_layer, FLAGS.output_layer,
          FLAGS.num_top_predictions, sess)

如果你运行完这一段,你会发现每张图只需要大约0.1秒,对于在线应用来说已经够快了。

将本地运行转换为在线运行(其他ML框架)

Caffe使用net.forward代码,很容易被放入一个可调用的框架中:see http://nbviewer.jupyter.org/github/BVLC/caffe/blob/master/examples/00-classification.ipynb

Mxnet也是非常独特的:它实际上已经准备好了面向大众的服务器代码。

部署

我们的计划是,将这些代码包装到一个Flask应用程序中。如果你没有听说Flask,简单解释一下, Flask是一个非常轻量级的Python Web框架,它允许你以最少的工作启动一个http api服务器。

作为一个快速参考,这里是一个Flask应用程序,它接收包含多部分表单数据的POST请求:

#!/usr/bin/env python
# usage: python echo.py to launch the server ; and then in another session, do
# curl -v -XPOST 127.0.0.1:12480 -F "data=@./image.jpg"
from flask import Flask, request
app = Flask(__name__)
@app.route('/', methods=['POST'])
def classify():
        data = request.files.get('data').read()
        print repr(data)[:1000]
        return data, 200
    except Exception as e:
        return repr(e), 500
app.run(host='127.0.0.1',port=12480)

这里是如何将相应的FLASK应用程序连接到上面的run_graph:

And here is the corresponding flask app hooked up to run_graph above:
#!/usr/bin/env python
# usage: bash tf_classify_server.sh
from flask import Flask, request
import tensorflow as tf
import label_image as tf_classify
import json
app = Flask(__name__)
FLAGS, unparsed = tf_classify.parser.parse_known_args()
labels = tf_classify.load_labels(FLAGS.labels)
tf_classify.load_graph(FLAGS.graph)
sess = tf.Session()
@app.route('/', methods=['POST'])
def classify():
        data = request.files.get('data').read()
        result = tf_classify.run_graph(data, labels, FLAGS.input_layer, FLAGS.output_layer, FLAGS.num_top_predictions, sess)
        return json.dumps(result), 200
    except Exception as e:
        return repr(e), 500
app.run(host='127.0.0.1',port=12480)

模型部署至此看起来还是相当不错的。除了一点——需要FlASK和 Tensor flow完全同步——Flask按照接收的顺序一次处理一个请求,并且 Tensor flow在进行图像分类时完全占用线程。

速度瓶颈可能还是在实际的计算工作中,所以升级Flask包装代码没有太多的意义。现在,也许这个代码足以处理你的负载。

有两种显而易见的方法可以扩大请求的通量:通过增加工人数量来横向放大,这在下一节将会介绍,或者通过使用GPU和批处理 逻辑 来纵向扩展。实现后者需要一个能够一次处理多个待处理请求的web服务器,并决定是否继续等待更大的批处理或将其发送到 Tensor flow图形线程进行分类,对于这个Flask应用程序是非常不适合的。有两种可能性:使用Twisted + Klein来保留Python代码,或者如果你更喜欢一流的事件循环支持,并且能够连接到非Python ML框架(如Torch),则可以使用Node.js + ZeroMQ。

扩展:负载平衡和服务发现

那么,假设现在你只有一台服务器来部署模型,由于它太慢了,或者我们的负载变得太高了,此时你想要启动更多服务器——如何在每个服务器上分配请求?

常规的方法是添加一个代理层,也许是haproxy或nginx, 它能够平衡后端服务器之间的负载,同时向客户端呈现一个统一的接口。 为了在本节稍后使用,以下是运行基本Node.js负载均衡器http代理的一些示例代码:

// Usage : node basic_proxy.js WORKER_PORT_0,WORKER_PORT_1,...
const worker_ports = process.argv[2].split(',')
if (worker_ports.length === 0) { console.err('missing worker ports') ; process.exit(1) }
const proxy = require('http-proxy').createProxyServer({})
proxy.on('error', () => console.log('proxy error'))
let i = 0
require('http').createServer((req, res) => {
  proxy.web(req,res, {target: 'http://localhost:' + worker_ports[ (i++) % worker_ports.length ]})
}).listen(12480)
console.log(`Proxying localhost:${12480} to [${worker_ports.toString()}]`)
// spin up the ML workers
const { exec } = require('child_process')
worker_ports.map(port => exec(`/bin/bash ./tf_classify_server.sh ${port}`))

为了自动检测后端服务器的数量和位置, 人们通常使用“服务发现”工具,该工具可能与负载平衡器捆绑在一起,或者是分开的。一些知名例子的是Consul和Zookeeper。 设置和学习使用它们不在本文的讨论范围之内,所以我使用了一个非常基本的,通过node.js服务发现包seport实现的代理。

Proxy代码:

// Usage : node seaport_proxy.js
const seaportServer = require('seaport').createServer()
seaportServer.listen(12481)
const proxy = require('http-proxy').createProxyServer({})
proxy.on('error', () => console.log('proxy error'))
let i = 0
require('http').createServer((req, res) => {
  seaportServer.get('tf_classify_server', worker_ports => {
    const this_port = worker_ports[ (i++) % worker_ports.length ].port
    proxy.web(req,res, {target: 'http://localhost:' + this_port })
}).listen(12480)
console.log(`Seaport proxy listening on ${12480} to '${'tf_classify_server'}' servers registered to ${12481}`)

Worker代码:

// Usage : node tf_classify_server.js
const port = require('seaport').connect(12481).register('tf_classify_server')
console.log(`Launching tf classify worker on ${port}`)
require('child_process').exec(`/bin/bash ./tf_classify_server.sh ${port}`)

然而,当应用于 机器学习 时,这个设置遇到了带宽问题。

每秒几十到几百张图像,这个系统就会成为网络带宽的瓶颈。在目前的设置中,所有的数据都必须通过我们的单个seaport 主节点,这也是呈现给客户端的端点。

为了解决这个问题,我们需要我们的客户端不要访问http://127.0.0.1:12480这个端点,而是要在后端服务器之间通过自动轮换来访问。如果你懂网络,一定会想:这不就是DNS干的活嘛!

但是,设置自定义的DNS服务器已经超出了本文的范围。相反,通过更改客户端以遵循两步“手动DNS”协议,我们可以重新使用我们的基础版的seaport 代理来实现客户端直接连接到其服务器的“点对点”协议:

Proxy代码:

// Usage : node p2p_proxy.js
const seaportServer = require('seaport').createServer()
seaportServer.listen(12481)
let i = 0
require('http').createServer((req, res) => {
  seaportServer.get('tf_classify_server', worker_ports => {
    const this_port = worker_ports[ (i++) % worker_ports.length ].port
 
推荐文章