博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
TensorFlow on Kubernetes性能瓶颈定位
阅读量:6240 次
发布时间:2019-06-22

本文共 9224 字,大约阅读时间需要 30 分钟。

当前性能问题描述

  1. 增加worker数,一定范围内能带来较好的性能提升,但是继续增加worker数时,训练性能提升不明显;
  2. 增加ps数,一定范围内能带来较好的性能提升,但是继续增加ps数时,训练性能提升不明显;

可能原因:

  1. 与ps和worker的分布情况强相关:

    • 目前的调度策略,主要根据服务器的cpu和内存使用情况进 行均衡调度,尽量使得集群中每台服务器的cpu和内存使用率相当。这种情况下,ps和worker的调度存在一定程度的随机性。
    • 如果调度时,每台包含worker的服务器都有对应一个ps,那么训练性能会更高?如果有,性能提升多少呢?
  2. K8S中的worker从HDFS集群中读取训练数据时存在IO瓶颈?可能网络上的或者是HDFS本身的配置,需要通过HDFS集群的监控来进一步排查。

下面,是针对第一种“可能原因:与ps和worker的分布情况强相关“ 设计的测试场景和用例:

场景1:将每个worker所在的服务器都有对应的ps。

测试用例

用例ID 服务器数 worker数 ps数 说明
1 1 10 1 一台服务器部署了10个worker和1个ps
2 5 50 5 5台服务器分别部署了10个worker和1个p
3 10 100 10 10台服务器分别部署了10个worker和1个p
4 20 200 20 20台服务器分别部署了10个worker和1个p

TensorFlow tasks调度设计图

输入图片说明

调度实现

  • 场景1的TensorFlow对象模板***scene1.jinja***
# scene1.jinja —— 对象模板{%- set name = "##NAME##" -%}{%- set worker_replicas = ##WN## -%}{%- set ps_replicas = ##PN## -%}{%- set script = "##SCRIPT##" -%}{%- set case = "##CASE##" -%}{%- set port = 2222 -%}{%- set log_host_dir = "/var/log/tensorflow" -%}{%- set log_container_dir = "/var/log" -%}{%- set image = "registry.vivo.xyz:4443/bigdata_release/tensorflow1.3.0" -%}{%- set replicas = {
"worker": worker_replicas, "ps": ps_replicas} -%}{%- macro worker_hosts() -%} {%- for i in range(worker_replicas) -%} {%- if not loop.first -%},{%- endif -%} {
{ name }}-worker-{
{ i }}:{
{ port }} {%- endfor -%}{%- endmacro -%}{%- macro ps_hosts() -%} {%- for i in range(ps_replicas) -%} {%- if not loop.first -%},{%- endif -%} {
{ name }}-ps-{
{ i }}:{
{ port }} {%- endfor -%}{%- endmacro -%}{%- for i in range( begin_index, end_index ) -%}{%- if task_type == "worker" %}---kind: ServiceapiVersion: v1metadata: name: {
{ name }}-{
{ task_type }}-{
{ i }} namespace: {
{ name }}spec: clusterIP: None selector: name: {
{ name }} job: {
{ task_type }} task: "{
{ i }}" ports: - port: {
{ port }} targetPort: 2222---kind: JobapiVersion: batch/v1metadata: name: {
{ name }}-{
{ task_type }}-{
{ i }} namespace: {
{ name }}spec: template: metadata: labels: name: {
{ name }} job: {
{ task_type }} task: "{
{ i }}" spec: imagePullSecrets: - name: harborsecret' affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: "CASE" operator: In values: - "{
{ case }}" - key: "INDEX" operator: In values: - "{
{ i // 10 }}" - key: "SCENCE" operator: In values: - "1" containers: - name: {
{ name }}-{
{ task_type }}-{
{ i }} image: {
{ image }} resources: requests: memory: "4Gi" cpu: "300m" ports: - containerPort: 2222 command: ["/bin/sh", "-c", "export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH -np --cut-dir=1 -R 'index.html*,*gif' {
{ script }}; cd ./{
{ name }}; sh ./run.sh {
{ ps_hosts() }} {
{ worker_hosts() }} {
{ task_type }} {
{ i }} {
{ ps_replicas }} {
{ worker_replicas }}"] restartPolicy: OnFailure{%- endif -%}{%- if task_type == "ps" -%}---kind: ServiceapiVersion: v1metadata: name: {
{ name }}-{
{ task_type }}-{
{ i }} namespace: {
{ name }}spec: clusterIP: None selector: name: {
{ name }} job: {
{ task_type }} task: "{
{ i }}" ports: - port: {
{ port }} targetPort: 2222---kind: DeploymentapiVersion: extensions/v1beta1metadata: name: {
{ name }}-{
{ task_type }}-{
{ i }} namespace: {
{ name }}spec: replicas: 1 template: metadata: labels: name: {
{ name }} job: {
{ task_type }} task: "{
{ i }}" spec: imagePullSecrets: - name: harborsecret affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: "CASE" operator: In values: - "{
{ case }}" - key: "INDEX" operator: In values: - "{
{ i }}" - key: "SCENCE" operator: In values: - "1" containers: - name: {
{ name }}-{
{ task_type }}-{
{ i }} image: {
{ image }} resources: requests: memory: "4Gi" cpu: "2" ports: - containerPort: 2222 command: ["/bin/sh", "-c","export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH -np --cut-dir=1 -R 'index.html*,*gif' {
{ script }}; cd ./{
{ name }}; sh ./run.sh {
{ ps_hosts() }} {
{ worker_hosts() }} {
{ task_type }} {
{ i }} {
{ ps_replicas }} {
{ worker_replicas }}"] restartPolicy: Always{%- endif -%}{%- endfor -%}
  • Label Nodes

选择对应的节点打上对应的Label。

kubectl label node $node_name SCENCE=1 CASE=? INDEX=?

测试结果

用例2的测试截图:

输入图片说明

场景2:将所有ps和所有worker都强制进行物理隔离。

测试用例

用例ID 服务器数 worker数 ps数 说明
1 2 10 1 一台服务器部署10个worker,另外一台部署1个ps
2 10 20 5 5台服务器分别部署10个worker,5台服务器分别部署1个ps
3 20 50 10 10台服务器分别部署10个worker,10台服务器分别部署1个ps
4 40 200 20 20台服务器分别部署10个worker,20台服务器分别部署1个ps

TensorFlow tasks调度设计图

输入图片说明

调度实现

  • 场景2的TensorFlow对象模板***scene2.jinja***
# scene2.jinja —— 对象模板{%- set name = "##NAME##" -%}{%- set worker_replicas = ##WN## -%}{%- set ps_replicas = ##PN## -%}{%- set script = "##SCRIPT##" -%}{%- set case = "##CASE##" -%}{%- set port = 2222 -%}{%- set log_host_dir = "/var/log/tensorflow" -%}{%- set log_container_dir = "/var/log" -%}{%- set image = "registry.vivo.xyz:4443/bigdata_release/tensorflow1.3.0" -%}{%- set replicas = {
"worker": worker_replicas, "ps": ps_replicas} -%}{%- macro worker_hosts() -%} {%- for i in range(worker_replicas) -%} {%- if not loop.first -%},{%- endif -%} {
{ name }}-worker-{
{ i }}:{
{ port }} {%- endfor -%}{%- endmacro -%}{%- macro ps_hosts() -%} {%- for i in range(ps_replicas) -%} {%- if not loop.first -%},{%- endif -%} {
{ name }}-ps-{
{ i }}:{
{ port }} {%- endfor -%}{%- endmacro -%}{%- for i in range( begin_index, end_index ) -%}{%- if task_type == "worker" %}---kind: ServiceapiVersion: v1metadata: name: {
{ name }}-{
{ task_type }}-{
{ i }} namespace: {
{ name }}spec: clusterIP: None selector: name: {
{ name }} job: {
{ task_type }} task: "{
{ i }}" ports: - port: {
{ port }} targetPort: 2222---kind: JobapiVersion: batch/v1metadata: name: {
{ name }}-{
{ task_type }}-{
{ i }} namespace: {
{ name }}spec: template: metadata: labels: name: {
{ name }} job: {
{ task_type }} task: "{
{ i }}" spec: imagePullSecrets: - name: harborsecret' affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: "CASE" operator: In values: - "{
{ case }}" - key: "INDEX" operator: In values: - "{
{ i // 10 }}" - key: "SCENCE" operator: In values: - "2" - key: "TYPE" operator: In values: - "worker" containers: - name: {
{ name }}-{
{ task_type }}-{
{ i }} image: {
{ image }} resources: requests: memory: "4Gi" cpu: "300m" ports: - containerPort: 2222 command: ["/bin/sh", "-c", "export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH -np --cut-dir=1 -R 'index.html*,*gif' {
{ script }}; cd ./{
{ name }}; sh ./run.sh {
{ ps_hosts() }} {
{ worker_hosts() }} {
{ task_type }} {
{ i }} {
{ ps_replicas }} {
{ worker_replicas }}"] restartPolicy: OnFailure{%- endif -%}{%- if task_type == "ps" -%}---kind: ServiceapiVersion: v1metadata: name: {
{ name }}-{
{ task_type }}-{
{ i }} namespace: {
{ name }}spec: clusterIP: None selector: name: {
{ name }} job: {
{ task_type }} task: "{
{ i }}" ports: - port: {
{ port }} targetPort: 2222---kind: DeploymentapiVersion: extensions/v1beta1metadata: name: {
{ name }}-{
{ task_type }}-{
{ i }} namespace: {
{ name }}spec: replicas: 1 template: metadata: labels: name: {
{ name }} job: {
{ task_type }} task: "{
{ i }}" spec: imagePullSecrets: - name: harborsecret affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: "CASE" operator: In values: - "{
{ case }}" - key: "INDEX" operator: In values: - "{
{ i }}" - key: "SCENCE" operator: In values: - "2" - key: "TYPE" operator: In values: - "ps" containers: - name: {
{ name }}-{
{ task_type }}-{
{ i }} image: {
{ image }} resources: requests: memory: "4Gi" cpu: "2" ports: - containerPort: 2222 command: ["/bin/sh", "-c","export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH -np --cut-dir=1 -R 'index.html*,*gif' {
{ script }}; cd ./{
{ name }}; sh ./run.sh {
{ ps_hosts() }} {
{ worker_hosts() }} {
{ task_type }} {
{ i }} {
{ ps_replicas }} {
{ worker_replicas }}"] restartPolicy: Always{%- endif -%}{%- endfor -%}
  • Label Nodes

选择对应的节点打上对应的Label。

kubectl label node $node_name SCENCE=1 CASE=? INDEX=? TYPE=?

测试结果

用例2的测试截图:

输入图片说明

测试结论及思考

对比两种不同场景下用例2(5个ps,50个worker)的监控数据,发现如下现象:

  • 两种场景下,虽然创建了5个ps,但是实际上只有一个ps的负载比较高,其他的ps要么cpu usage在10%以下,要么甚至几乎为0。

  • 两种场景下同样的worker number和ps number,整个tensorflow cluster消耗的cpu和内存差别很小。

测试结论

  • 分布式tensorflow中,每个worker选择哪个ps作为自己的参数服务器跟我们如何强制分布ps和worker的布局无关,由分布式tensorflow内部自己控制(跟tf.train.replica_device_setter()设置的strategy有关)。

问题思考

  • 为什么这个训练中,多个ps中只有一个ps在工作?是算法只有一个Big参数?如果是,那么默认按照Round-Robin策略只会使用一个ps,就能解释这个问题了。这需要算法的兄弟进行确认。

  • 如果将Big参数拆分成众多Small参数,使用RR或LB或Partition策略之一,应该都能利用多个ps进行参数更新明显提升训练性能。

  • 通过这次折腾,也不是一无所获,至少发现我们对于Distributed TensorFlow的内部工作原理还不甚了解,非常有必要深入到源码进行分析。

本文转自掘金-

转载地址:http://bwcia.baihongyu.com/

你可能感兴趣的文章
Scheme来实现八皇后问题(1)
查看>>
pip或者anacnda安装opencv以及opencv-contrib
查看>>
Unity 5 中的全局光照技术详解(建议收藏)
查看>>
python 的矩阵运算——numpy
查看>>
处理handler中的内存泄漏
查看>>
P8 Visible Lattice Points
查看>>
小小不爽一下
查看>>
【转】NuGet学习笔记(1)——初识NuGet及快速安装使用
查看>>
Python学习笔记 - MySql的使用
查看>>
WebApi FormData+文件长传 异步+同步实现
查看>>
Linux文件与目录管理
查看>>
多态的弊端
查看>>
Spring @Import 注解
查看>>
PBOC APDU命令解析【转】
查看>>
封装HttpUrlConnection开箱即用
查看>>
第二天笔记
查看>>
如何在外部终止一个pengding状态的promise对象
查看>>
初级模拟电路:1-5 二极管的其他特性
查看>>
《简明Python教程》Swaroop, C. H. 著 第1章 介绍
查看>>
Chapter 4. Working with Key/Value Pairs
查看>>