请教mesos,k8s,spark,马拉松,swarm,zookeeper,map-spark reducebywindow

《一》Docker Swarm、mesos与k8s的区别
Docker Swarm与mesos的区别
DockerSwarm 是目前 Docker 社区原生支持的集群工具,它通过扩展 DockerAPI 力图让用户像使用单机 Docker API 一样来驱动整个集群;相较于 Mesos 而言, DockerSwarm 对集群的侵入性更小,从而资源损耗也更低;
而 Mesos 是 Apache 基金会下的集群资源管理工具,它通过抽象主机的 CPU、内存、存储等计算资源来搭建一套高效、容错、弹性的分布式系统。
1.安装配置方面
DockerSwarm 要比 Mesos简单的多。K8s不同的操作系统上安装都不同。每个操作系统都有自己的独立安装指令。
使用 Docker Swarm 搭建一套集群 。
1&.通过 shell 命令&swarmcreate&生成一个集群的
2&.借助这个 token 将想要添加到集群的主机广播到 DockerHub 集群发现 的公共服务(HostedDiscovery with Docker Hub)上,这样我们就把主机添加集群中了;
3&.接下来通过命令&swarmmanage&以及相应的 token 我们就可以在任何一台连接到互联网的主机上管理我们的集群了。
第1步使用staticfile、consul、etcd 或者ZooKeeper 中的任意一个,甚至是静态的IP列表来做集群发现即可。
与 DockerSwarm 不同,我们必须保证 Mesos 的管理节点直正常运行,然后才能向集群中添加agent/slave节点;另外向集群中添加节点时还需要配置resource,containers 等基本参数;最后只搭建好了 Mesos 集群是无法方便的使用集群资源的,我们需要Marathon、Chronos、Spark 等调度器去调度资源,才能真正使用起这套东西。主要也是由 Mesos 需要支撑多种资源调度导致的。
2.集群高可用/容错
DockerSwarm 与 Mesos 都可以通过一致性中间件构造高可用集群。Mesos 的 Master 节点一般通过ZooKeeper 保证高可用,而 Docker Swarm 的 manager节点可以通过 consul、etcd 或ZooKeeper 中的任意一个来保证高可用。&K8s专注容器的高可用性,集群的精细管理,复杂的网络场景
3.基本的健康检查
DockerSwarm 没有提供对其部署的容器进行健康检查的功能,所以需要容器部署方来进行相应的容器的健康检查以及异常重启等;而 Mesos 的调度器Marathon 是支持健康检查的,它可以每隔一段时间扫描一次应用的绑定端口,并在容忍3次或者几次失败后将应用重启,目前支持 HTTP、TCP协议,当然,这都需要应用提供 health 的接口。
4.可扩展性
由于 DockerSwarm 使用标准的 Docker API,从而任何使用 DockerAPI 与 Docker 进行通讯的工具都可以无缝地和 Docker Swarm 协同工作&专注&
Mesos 的可扩展性首先在于它可以承接各种调度器,Spark、Hadoop、Kafka、Cassandra、Marathon、Chronos 等等都可以拿 Mesos 来做资源池;其次,Mesos 可以与Mesos-DNS 结合来实现内部的服务发现/负载均衡。
Docker Swarm与k8s的区别
1、出生不同,Google根据其在Linux上容器管理经验,改造到docker管理上,就是kubernetes。他的在许多方面表现良好。最重要的是构造于Google多年的宝贵经验只上。kubernetes并不是为了docker写的,kubernetes把集群带到了一个全新的高度,代价是学习曲线比较陡。docker-swarm使用了一个不同的方式。它是docker原生的集群工具。最方便的部分是它暴露了docker标准的编程接口,意味着你之前一直在使用的任何与docker沟通的工具(docker命令行接口,dockercompose,dokku,krane等等),都可以无缝的在docker
swarm上使用。
2、安装配置不同,安装设置swarm非常简单,简单明了并且很灵活。我们需要做的就是安装一个服务发现工具,然后在所有的节点上安装swarm容器。相比较而言,kubernetes的安装就有点复杂。不同的操作系统上安装都不同。每个操作系统都有自己的独立安装指令。
3、运行方式不同,使用Swarm和使用容器没有什么不同。比如,你习惯于使用DockerCLI(命令行接口),你可以继续使用几乎相同的命令。如果你习惯于使用Docker Componse来运行容器,你可以继续在Swarm集群中使用。不管你之前习惯于怎么使用容器,你仍旧可以使用,只是在更大级别的集群中使用。Kubernetes要求你去学习它自己的CLI(命令行接口)和配置。你不能使用你之前创建的docker-compose.yml配置,你必须要去新建与Kubernetes对应的配置。你也不能使用之前学习的DockerCLI(命令行接口)。你必须要去学习
Kubernetes CLI(命令行接口)
从易用性角度来看,建议采用DockerSwarm。从稳定性和可扩展性来看,建议选择Mesos;从网络场景的复杂度与精细化来看,建议选用k8s
《二》Docker的网络实现原理
基于对net namespace的控制,docker可以为在容器创建隔离的网络环境,在隔离的网络环境下,容器具有完全独立的网络栈,与宿主机隔离,也可以使容器共享主机或者其他容器的网络命名空间
docker容器的网络有五种模式:
l& bridge:docker默认的网络模式,为容器创建独立的网络命名空间,容器具有独立的网卡等所有单独的网络栈,是最常用的使用方式。
l& host:直接使用容器宿主机的网络命名空间。
l& none:为容器创建独立网络命名空间,但不为它做任何网络配置,容器中只有lo,用户可以在此基础上,对容器网络做任意定制。
l& 其他容器:与host模式类似,只是容器将与指定的容器共享网络命名空间。
l& 用户自定义:docker1.9版本以后新增的特性,允许容器使用第三方的网络实现或者创建单独的bridge网络,提供网络隔离能力。
bridge模式是docker默认的,也是开发者最常使用的网络模式。在这种模式下,docker为容器创建独立的网络栈,保证容器内的进程使用独立的网络环境,实现容器之间、容器与宿主机之间的网络栈隔离。同时,通过宿主机上的docker0网桥,容器可以与宿主机乃至外界进行网络通信。
从docker0子网中分配一个IP给容器使用,并设置docker0的IP地址为容器的默认网关。在主机上创建一对虚拟网卡veth pair设备(虚拟网卡),Docker将veth pair设备的一端放在新创建的容器中,并命名为eth0(容器的网卡),另一端放在主机中,以vethxxx这样类似的名字命名,并将这个网络设备加入到docker0网桥中。
同一宿主机上,容器之间都是连接到docker0这个网桥上的,它可以作为虚拟交换机使容器可以相互通信。由于宿主机的IP地址与容器veth
pair的 IP地址均不在同一个网段,故仅仅依靠veth pair和namespace的技术,还不足以使宿主机以外的网络主动发现容器的存在。为了使外界可以访问容器中的进程,docker采用了端口绑定的方式,也就是通过iptables的NAT,将宿主机上的端口端口流量转发到容器内的端口上。
本文已收录于以下专栏:
相关文章推荐
还记得在十多年前,SaaS鼻祖SalesForce喊出的口号“No Software”吗?SalesForce在这个口号声中开创了SaaS行业,并成为当今市值460亿美元的SaaS之王。今天谈谈“No...
无数的文章、社交媒体在探讨Docker、Kubernetes、Mesos三者之间孰优孰劣。如果你听信了某些一知半解者的言论,你可能会认为这三个开源项目正在为争夺容器霸权而殊死战斗。同时,你也会相信,在...
导读:容器技术已经成了很多公司基础架构一部分,架构师是否已经清楚了解 Docker, Swarm, K8S, Mesos,虚拟化等相关技术的未来走向?
王渊命,技术极客,曾任新浪微博架构...
1. Docker默认自动创建的三种网络
当安装完Docker时,Docker默认自动创建了三种网络:bridege, none和host. 使用docker network list命令可以查看。
上一篇文章[Kubernetes]Docker的网络模型较详细介绍了Docker里的bridge网络模型。本篇介绍Docker的overlay网络模型。Docker内置overlay网路模式驱动lib...
接上一篇文章:http://blog.csdn.net/rapheler/article/details/
咱们来看一下Marathon跑一个Docker任务
他的最新文章
讲师:王哲涵
讲师:韦玮
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)当前位置: >>
spark开发指南
1、重定向方式读入 scala 文件(执行 scala 脚本) /usr/local/spark-1.2.0-bin-hadoop2.4/bin/spark-shell &test_scala01.scalaSpark 运行调试方法与学习资源汇总http://blog.csdn.net/melodyishere/article/detai
ls/ 最近,在学习和使用 Spark 的过程中,遇到了一些莫名其妙的错误和问题,在逐个解决的过 程中, 体会到有必要对解决上述问题的方法进行总结, 以便能够在短时间内尽快发现问题来 源并解决问题,现与各位看官探讨学习如下: 解决 spark 运行调试问题的四把“尖刀”: 1、Log 包括控制台日志、 主从节点日志、 HDFS 日志等。 许多错误可以通过日志,直接对错误类型、 错误来源进行准确定位,因此,学会读取和分析 Log 是解决问题的第一步。 2、Google 确定错误类型和原因后,就可以使用 Google 在 Spark User List、Google Group 等各类 Spark 学习社区和论坛进行求助,而大部分问题都是可以找到答案的。 3、官网配置文档 大部分问题都是配置问题引起的,如何准确配置,需要结合官方说明文档进行配置,而其中 的 Tuning Spark 内容,也是每个 Spark 实践者必须了解和掌握的,推荐精读: Tuning Spark(内存调优): http://spark.apache.org/docs/1.0.0/tuning.html Spark Configuration(基本配置): http://spark.apache.org/docs/1.0.0/configuration.html 4、官网 Example 各个子项目都有对应的 Example 和源码,可以从源码的 src 子文件夹里找到,而在每个子项 目的官网说明上也有相应的代码示例,也可以作为参考。这 4 个方法是解决问题的第一位要素,特别地,要会通过 Log 反推错误来源,定位故障原 因,然后第一时间想到谷歌,把错误关键词粘贴查找寻求解答。而大部分人的问题其实主要 都是配置问题,1 和 2 其实就能解决了。3 和 4 主要用来模仿学习,照猫画虎,学以致用, 不成功便成仁。 交流和互动也很重要,把握国内 Spark 研究的前沿,就要善于利用和挖掘各类互联网资源, 这里,推荐一些博客、微博、QQ 学群供大家参考学习: 知名博主: 徽沪一郎:/hseagle/category/569175.html fxjwind:/fxjwind/category/518904.html 张包峰:http://blog.csdn.net/zbf8441372/article/category/1556747 高彦杰:http://blog.csdn.net/gaoyanjie55 saisai_shao:http://jerryshao.me/微博大牛: hashjoin, Databricks 大数据公司创始人之一, UC Berkeley AMPLab: /hashjoin Andrew-Xia:/u/ CrazyJvm:/ 明风 Andy,淘宝技术部,数据挖掘与计算团队负责人:/mingfengandy saisai_shao: /u/ 连城:/lianchengzju 张包峰:/pelickzhang 王联辉:/u/ 徽沪一郎:/eagleonlineSpark 学习资料 Fast Data Processing with Spark,/s/1bnnJHlP Scala 学习资料: 快学 Scala,/s/1gdJzElt Scala Cookbook,/s/1jGn5zPc Scala 编程,/s/1pJ0SzgvSpark 论文: NSDI-2012,/s/1jGifNMm [博士论文]Matei Zaharia,/s/1nt1C2BRQQ 群: Spark 零基础学习@Spark 快速理解http://blog.csdn.net/colorant/article/details/8255958 BLOG:http://blog.csdn.net/colorant/ 更多云计算相关项目快速理解文档 http://blog.csdn.net/colorant/article/details/8255910 ==是什么 == 目标 Scope(解决什么问题) 在大规模的特定数据集上的迭代运算或重复查询检索 官方定义 aMapReduce-like cluster computing framework designed for low-latency iterativejobs and interactive use from an interpreter 个人理解 首先,MapReduce-like 是说架构上和多数分布式计算框架类似,Spark 有分配任务的主节点 (Driver)和执行计算的工作节点(Worker)其次,Low-latency 基本上应该是源于 Worker 进程较长的生命周期,可以在一个 Job 过程中 长驻内存执行 Task,减少额外的开销 然后对 interative 重复迭代类查询运算的高效支持,是 Spark 的出发点了。最后它提供了一 个基于 Scala 的 Shell 方便交互式的解释执行任务 ==如何实现 == 核心思路,架构 RDD:Spark 的核心概念是 RDD (resilientdistributed dataset),指的是一个只读的,可分区的 分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。 Lineage:利用内存加快数据加载在众多的其它的 In-Memory 类数据库或 Cache 类系统中也 有实现, Spark 的主要区别在于它处理分布式运算环境下的数据容错性 (节点实效/数据丢失) 问题时采用的方案。为了保证 RDD 中数据的鲁棒性, RDD 数据集通过所谓的血统关系 (Lineage)记住了它是如何从其它 RDD 中演变过来的。相比其它系统的细颗粒度的内存数据 更新级别的备份或者 LOG 机制, RDD 的 Lineage 记录的是粗颗粒度的特定数据变换 (Transformation)操作(filter, map, join etc.)行为。当这个 RDD 的部分分区数据丢失时,它 可以通过 Lineage 获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模 型,限制了 Spark 的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。 总之,Spark 的核心思路就是将数据集缓存在内存中加快读取速度,同时用 lineage 关联的 RDD 以较小的性能代价保证数据的鲁棒性。 适用领域 正如其目标 scope,Spark 适用于需要多次操作特定数据集的应用场合。需要反复操作的次 数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就 相对较小。 细节 使用内存缓存数据集快在哪里?主要是几个方面:首先是磁盘 IO,其次数据的序列化和反 序列化的开销也节省了, 最后相对其它内存数据库系统, 粗颗粒度的内存管理机制减小了数 据容错的代价(如典型的数据备份复制机制) ==相关项目 == 上下游项目 Discretized Streams (Spark streaming) 构建在 Spark 上处理 Stream 数据的框架, 基本的原理是将 Stream 数据分成小的时间片断 (几 秒),以类似 batch 批量处理的方式来处理这小部分数据。个人理解构建在 Spark 上的原因 大概是因为 Spark 的低延迟执行引擎(100ms+)勉强可以用于实时处理,而 Spark 的核心理 念数据重用和流式数据处理本身并没有直接的交集, 相反个人感觉流式数据的无穷连续性的 特性一定程度上和数据重用是冲突的。相比基于 Record 的其它处理框架(如 Storm),RDD 数据集更容易做高效的容错处理。 此外小批量处理的方式使得它可以同时兼容批量和实时数 据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。 Shark ( Hive on Spark) Shark 基本上就是在 Spark 的框架基础上提供和 Hive 一样的 H iveQL 命令接口, 为了最大程 度的保持和 Hive 的兼容性,Shark 使用了 Hive 的 API 来实现 query Parsing 和 Logic Plan generation,最后的 PhysicalPlan execution 阶段用 Spark 代替 Hadoop MapReduce。通过配 置 Shark 参数,Shark 可以自动在内存中缓存特定的 RDD,实现数据重用,进而加快特定数 据集的检索。同时,Shark 通过 UDF 用户自定义函数实现特定的数据分析学习算法,使得 SQL 数据查询和运算分析能结合在一起,最大化 RDD 的重复使用。 类似项目 Twister :http://www.iterativemapreduce.org 大概的意思也是通过 Cache 数据,实现迭代的 MapReduce 过程中的数据重用,不过它的模型貌似相对简单些,大致是通过拓展 MapReduce API, 分发管理缓存数据, 然后通过自己的 Daemon 进程管理和分配 MapReduce Task 到 Cache 对应的节点上,容错性和计算模型方面没有 Shark 的 RDD 来得精巧和通用。 Haloop:和 Twister 类似,修改扩展了 MapReduce 框架,增加了循环逻辑和 Data Caching ==相关文献 == http://spark-project.org/项目首页 http://shark.cs.berkeley.edu/ shark 项目主页 Paper 论文 http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf ==其它 == 主要实现和编程接口基于 Scala Spark 的 YARN 模式部署/art/669.htmSpark 可以通过 SBT(Scala Build Tool)或者 Maven 来编译,官方提供的二进制安装文件 是用 Maven 编译。 1:Spark 的编译 Spark 可以通过 SBT(Scala Build Tool)或者 Maven 来编译,官方提供的二进制安装 文件是用 Maven 编译,如果是要在 YARN 集群上运行的话,还需要再用 SBT 编译一下,生成 YARN client 端使用的 jar 包;最好是直接对源码使用 SBT 进行编译而生成 YARN client 端 使用的 jar 包。笔者在测试过程中,对 Maven 编译过的 Spark 进行 SBT 二次编译后,在运行 部分例子的时候有错误发生。 A:Maven 编译 笔者使用的环境曾经编译过 Hadoop2.2.0(参见 hadoop2.2.0 源码编译(CentOS6.4)), 所以不敢确定 Maven 编译过程中, Spark 是不是需要编译 Hadoop2.2.0 中使用的部分底层软 件(看官方资料是需要 Protobuf2.5)。除了网络下载不给力而不断的中止、然后重新编译 而花费近 1 天的时间外,编译过程还是挺顺利的。 maven 编译时,首先要进行设置 Maven 使用的内存项配置:export MAVEN_OPTS=&-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m&然后用 Maven 编译:mvn -Pnew-yarn -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -DskipTestspac kage参考文档:Building Spark with Maven B:SBT 编译 Spark 源码和二进制安装包都绑定了 SBT。 值得注意的是, 如果要使用 Scala 进行 Spark 应用开发, 必须使用和 Spark 版本相对应版本的 Scala, 如: Spark0.8.1 对应的 Scala2.9.3。 对于不匹配的 Scala 应用开发可能会不能正常工作。 SBT 编译命令:SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true ./sbt/sbt assembly二种编译都是在 Spark 根目录下运行。 在 SBT 编译过程中如果网络不给力, 手工中断编 译(ctrl+z)后要用 kill-9 将相应的进程杀死后,然后再重新编译,不然会被之前的 sbt 进程锁住而不能重新编译。 2:Spark 运行 Spark 可以单独运行, 也可以在已有的集群上运行, 如 Amazon EC2、 Apache Mesos、 Hadoop YARN。下面用 Spark 自带的例程进行测试,运行的时候都是在 Spark 的根目录下进行。如果 需要知道运行更详细的信息,可以使用 log4j,只要在根目录下运行:cp conf/log4j.properties.template conf/log4j.propertiesA:本地运行./run-example org.apache.spark.examples.SparkPi local也可以多线程方式运行,下面的命令就是开 4 个线程。./run-example org.apache.spark.examples.SparkPi local[4] B:YARN 集群 启动 Hadoop2.2.0 集群 确保环境变量 HADOOP_CONF_DIR 或 YARN_CONF_DIR 已经设置 在 YARN 集群中运行 Spark 应用程序的命令:SPARK_JAR=&SPARK_ASSEMBLY_JAR_FILE& ./spark-classorg.apache.spark.deploy.yarn.Cl ient \ --jar &YOUR_APP_JAR_FILE& \ --class &APP_MAIN_CLASS& \ --args &APP_MAIN_ARGUMENTS& \ --num-workers &NUMBER_OF_WORKER_MACHINES& \ --master-class &ApplicationMaster_CLASS& --master-memory &MEMORY_FOR_MASTER& \ --worker-memory &MEMORY_PER_WORKER& \ --worker-cores &CORES_PER_WORKER& \ --name &application_name& \ --queue &queue_name& \ --addJars &any_local_files_used_in_SparkContext.addJar& \ --files &files_for_distributed_cache& \ --archives &archives_for_distributed_cache&例 1 计算 PI,可以看出程序运行时是先将运行文件上传到 Hadoop 集群的,所以客户端 最好是和 Hadoop 集群在一个局域网里。SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2. 2.0.jar \ ./spark-class org.apache.spark.deploy.yarn.Client \ --jar examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating .jar \ --class org.apache.spark.examples.SparkPi \ --args yarn-standalone \ --num-workers 3 \ --master-memory 2g \ --worker-memory 2g \ --worker-cores 1例 2 计算 TCSPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2. 2.0.jar \ ./spark-class org.apache.spark.deploy.yarn.Client \ --jar examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating .jar \ --class org.apache.spark.examples.SparkTC \ --args yarn-standalone \ --num-workers 3 \ --master-memory 2g \ --worker-memory 2g \ --worker-cores 1点击 Tracking UI 中的相应链接可以查看 Spark 的运行信息:
Spark on YARN 客户端模式作业运行全过程分析/jiagoucunchu/Hadoop/419994.html 博客地址: / 文章标题: 《Spark on YARN 客户端模式作业运行全过程分析》 本文链接: /archives/1191 Hadoop 、 Hive、 Hbase、 Flume 等 QQ 交流群:
(已满) , 请加入新群:
《Spark on YARN 集群模式作业运行全过程分析》 《Spark on YARN 客户端模式作业运行全过程分析》 《 Spark Standalone 模式作业运行全过程分析》(未发布) 在前篇文章中我介绍了 Spark on YARN 集群模式(yarn-cluster)作业从提交到运行 整个过程的情况(详情见 《Spark on YARN 集群模式作业运行全过程分析》 ),我们知 道 Spark on yarn 有两种模式:yarn-cluster 和 yarn-client。这两种模式作业虽然都是在 yarn 上面 运行,但是其中的运行方式很不一样,今天我就来谈谈 Spark on YARN yarn-client 模式 作业从提交到运行的过程剖析。 和 yarn-cluster 模式一样,整个程序也是通过 spark-submit 脚本提交的。但是 yarn-client 作业程序的运行不需要通过 Client 类来封装启动,而是直接通过反射机制调用 作业的 main 函数。下面就来分析: 1、通过 SparkSubmit 类的 launch 的函数直接调用作业的 main 函数(通过反射机制 实现),如果是集群模式就会调用 Client 的 main 函数。 2、而应用程序的 main 函数一定都有个 SparkContent,并对其进行初始化; 3、在 SparkContent 初始化中将会依次做如下的事情:设置相关的配置、注册 MapOutputTracker、BlockManagerMaster、BlockManager,创建 taskScheduler 和 dagScheduler;其中比较重要的是创建 taskScheduler 和 dagScheduler。在创建 taskScheduler 的时候会根据我们传进来的 master 来选择 Scheduler 和 SchedulerBackend。由于我们选择的是 yarn-client 模式,程序会选 择 YarnClientClusterScheduler 和 Yarn ClientSchedulerBackend,并将 YarnClientSchedulerBackend 的实例初始化 YarnClientClusterScheduler,上面两个实例的获取都是通过反射机制实现的, YarnClientSchedulerBackend 类是 CoarseGrainedSchedulerBackend 类的子类, YarnClientClusterScheduler 是 TaskSchedulerImpl 的子类,仅仅重写了 TaskSchedulerImpl 中的 getRackForHost 方法。 4、 初始化完 taskScheduler 后, 将创建 dagScheduler, 然后通过 taskScheduler.start() 启动 taskScheduler,而在 taskScheduler 启动的过程中也会调用 SchedulerBackend 的 start 方法。在 SchedulerBackend 启动的过程中将会初始化一些参数,封装在 ClientArguments 中, 并将封装好的 ClientArguments 传进 Client 类中, 并 client.runApp() 方法获取 Application ID。 5、client.runApp 里面的做是和前面 客户端进行操作 那节类似,不同的是在里面启动 是 ExecutorLauncher(yarn-cluster 模式启动的是 ApplicationMaster)。 6、 在 ExecutorLauncher 里面会初始化并启动 amClient, 然后向 ApplicationMaster 注册该 Application。注册完之后将会等待 driver 的启动,当 driver 启动完之后,会创建 一个 MonitorActor 对象用于和 CoarseGrainedSchedulerBackend 进行通信(只有事件 AddWebUIFilter 他们之间才通信,Task 的运行状况不是通过它和 CoarseGrainedSchedulerBackend 通信的)。然后就是设置 addAmIpFilter,当作业完 成的时候,ExecutorLauncher 将通过 amClient 设置 Application 的状态为 FinalApplicationStatus.SUCCEEDED。 7、分配 Executors,这里面的分配逻辑和 yarn-cluster 里面类似,就不再说了。 8、最后,Task 将在 CoarseGrainedExecutorBackend 里面运行,然后运行状况会通 过 Akka 通知 CoarseGrainedScheduler,直到作业运行完成。 9、在作业运行的时候,YarnClientSchedulerBackend 会每隔 1 秒通过 client 获取到 作业的运行状况, 并打印出相应的运行信息, 当 Application 的状态是 FINISHED、 FAILED 和 KILLED 中的一种,那么程序将退出等待。 10、最后有个线程会再次确认 Application 的状态,当 Application 的状态是 FINISHED、FAILED 和 KILLED 中的一种,程序就运行完成,并停止 SparkContext。整个 过程就结束了。 本博客文章除特别声明,全部都是原创! 尊重原创,转载请注明: 转载自 过往记忆(/) 本文链接地址: 《Spark on YARN 客户端模式作业运行全过程分析》 (/archives/1191)第一个 Spark On Yarn 程序http://blog.csdn.net/li/article/details/环境hadoop 2.2.0 + Scala 2.10.3 + Spark 0.9 + Idea 13 单机伪分布式的 Yarn Idea SBT 插件使用:建立 SBT 项目,然后在 Setting 中设置 SBT autoimport 和 auto 创 建目录结构,ok 后 refresh build.sbt[plain] view plaincopyprint?1. 2. 3. 4. 5. 6. 7.g&name := &WordCount& version := &1.0& scalaVersion := &2.10.3& libraryDependencies += &org.apache.spark& %% &spark-core& % &0.9.0-incubatin8. 9. 10. 11.bating&resolvers += &Akka Repository& at &http://repo.akka.io/releases/& libraryDependencies += &org.apache.spark& % &spark-bagel_2.10& % &0.9.0-incu12. 13.bating&libraryDependencies += &org.apache.spark& % &spark-mllib_2.10& % &0.9.0-incu14. 15. 16. 17.libraryDependencies += &org.apache.spark& % &spark-graphx_2.10& % &0.9.0-inc ubating&libraryDependencies += &org.apache.spark& % &spark-streaming_2.10& % &0.9.0incubating&后面的那些依赖可以不要,Spark 自带的其他的 exapmle 中会用到CODE[html] view plaincopyprint?1. 2. 3.package myclass import org.apache.spark._ 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25.import SparkContext._ /** * Created by jack on 2/22/14. */ object MyWordCount { def main(args: Array[String]) { /* 本地版本 val spark = new SparkContext(&local&,&my word count&,System.getenv(& SPARK_HOME&),SparkContext.jarOfClass(this.getClass))val file = spark.textFile(&src/main/resources/abc&) val counts = file.flatMap(line =& line.split(& &)) .map(word =& (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile(&src/main/resources/out&) spark.stop()*/ // HDFS 版本 val spark = new SparkContext(&yarn-standalone&,&my word count&,Syste m.getenv(&SPARK_HOME&),SparkContext.jarOfClass(this.getClass))val file = spark.textFile(&hdfs://127.0.0.1:9000/user/jack/input&) val wordcounts = file.flatMap(line =& line.split(& &)).map(word =& ( word,1)).reduceByKey(_+_) wordcounts.saveAsTextFile(&hdfs://127.0.0.1:9000/user/jack/myWordCount Output&) spark.stop() } }26. 27. 28.第二个 import 至关重要,不然会出现 reduceByKey 方法无法使用的情况 单机版本可以直接在 Idea 下跑和调试,HDFS 版要打包为 jar 后在命令行执行run_spark_shell.sh执行脚本 跑起1. 2. 3.#!/bin/sh SPARK_JAR=$SPARK_HOME/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-i ncubating-hadoop2.2.0.jar \ $SPARK_HOME/bin/spark-class org.apache.spark.deploy.yarn.Client \ 4. 5. 6. 7. 8. 9.--jar /home/jack/IdeaProjects/Spark/WordCount/out/artifacts/mywordcount/mywo rdcount.jar \ --class myclass.MyWordCount \ --num-workers 1 \ --master-memory 512m \ --worker-memory 1g \ --worker-cores 1 \KMeans on Sparkhttp://blog.csdn.net/li/article/details/思路: 1.随机生成数据 2.随机生成 K 个聚类中心 3.计算每个点所属的类别 4.计算新的聚类中心 5.比较聚类中心的变化情况,大于阈值跳转至 3;小于阈值停止。[plain] view plaincopyprint?1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15.package myclass import java.util.Random import org.apache.spark.SparkContext import SparkContext._ import org.apache.spark.util.Vector /** * Created by jack on 2/26/14. */ object MyKMeans { val N = 1000 val R = 1000 val D = 10 val K = 10 //随机数范围 //点空间纬度 //聚类中心个数 0-1 * Rval rand = new Random(42) //随机种子 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53. 54. 55. 56. 57. 58.val convergeDist = 0.01 /**//迭代收敛条件* 将 p 分配到当前所有聚类中心的最短距离的类中 * */ def closestPoint(p:Vector,centers: Array[Vector]): Int = { var bestIndex = 0 var closest = Double.PositiveInfinity for (i &- 0 until centers.length) { val tempDist = p.squaredDist(centers(i)) if(tempDist & closest) { closest = tempDist bestIndex = i } } bestIndex } /** * 产生 N 个 D 维(每一维取值 0-1000)随机的点 * */ def generateData = { def generatePoint(i: Int) = { Vector(D,_ =& rand.nextDouble * R) } Array.tabulate(N)(generatePoint) } def main(args: Array[String]) { val sc = new SparkContext(&local&,&My KMeans&,System.getenv(&SPARK_H OME&),SparkContext.jarOfClass(this.getClass)) val data = sc.parallelize(generateData).cache() //随机初始化 K 个聚类中心 val kPoints = data.takeSample(false,K,42).toArray var tempDist = 1.0 while(tempDist & convergeDist) { //closest 为(类别, (点,1)),1 是用来后续统计各个类中点的数量 count val closest = data.map(p =& (closestPoint(p,kPoints),(p,1))) //按类别, 计算点的坐标和, 以及该类别中节点总数 (类别, (点向量和, 点数) ) val pointStats = closest.reduceByKey{ 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71. 72. 73. 74. 75. 76. 77. 78. 79. 80.case ((x1,y1),(x2,y2)) =& (x1+x2,y1+y2) } //生成新的聚类中心的 Map(类别,新聚类中心) val newPoints = pointStats.map{ pair =& (pair._1, pair._2._1 / pair._2._2) }.collectAsMap() tempDist = 0.0 for (i &- 0 until K) { tempDist += kPoints(i).squaredDist(newPoints(i)) } //更新聚类中心到 kPoint for (newP &- newPoints) { kPoints(newP._1) = newP._2 } println(&Finished iteration(delta = &+ tempDist + &)&) } println(&Final centers:&) kPoints.foreach(println) System.exit(0) } }切换到/usr/local/spark 目录 1)、本地测试: ./run-example org.apache.spark.examples.SparkPi local 2)、普通集群模式 ./run-example org.apache.spark.examples.SparkPi spark://namenode1:7077 ./run-example org.apache.spark.examples.SparkLR spark://namenode1:7077 ./run-example org.apache.spark.examples.SparkKMeans spark://namenode1:7077 file:/usr/local/spark/kmeans_data.txt 2 1 注:此处不能用 namenode1,要用具体 IP 地址 结合 HDFS 的集群模式 hadoop fs -put README.md / /user/rocketzhang(不能用相对路径,无法识 别) MASTER=spark://10.168.9.240:7077 ./spark-shell scala& val file = sc.textFile(&hdfs://namenode1:9000/user/root/README.md&) scala& val count = file.flatMap(line =& line.split(& &)).map(word =& (word, 1)).reduce ByKey(_+_) scala& count.collect() scala& :quitSpark1.0.x 入门指南(spark on yarn, standalone)http://blog.csdn.net/can007/article/details/1 节点说明IPRole192.168.1.111ActiveNameNode192.168.1.112StandbyNameNode,Master,Worker192.168.1.113DataNode,Master,Worker192.168.1.114DataNode,WorkerHDFS 集群和 Spark 集群之间节点共用。2 安装 HDFS见 HDFS2.X 和 Hive 的安装部署文档: /Scott007/p/3614960.html3 Spark 部署Spark 常用的安装部署模式有 Spark On Yarn 和 Standalone,可以同时使用。 3.1 Spark on Yarn这种模式,借助 Yarn 资源分配的功能,使用 Spark 客户端来向 Yarn 提交任务运行。 只需将 Spark 的部署包放置到 Yarn 集群的某个节点上即可(或者是 Yarn 的客户端,能读 取到 Yarn 集群的配置文件即可)。 Spark 本身的 Worker 节点、 Master 节点不需要启动。 但是,Spark 的部署包须是基于对应的 Yarn 版本正确编译后的,否则会出现 Spark 和 Yarn 的兼容性问题。 on Yarn 的两种运行方式,其运行结束后的日志不能在 Yarn 的 Application 管理界面 看到,目前只能在客户端通过:yarn logs -applicationId &applicationId& 命令查看每个 Application 的日志。 3.1.1 配置 部署这种模式, 需要修改 conf 目录下的 spark-env.sh 文件。 在其中新增如下配置选 项:export HADOOP_HOME= /home/hadoop/hadoop-2.0.0-cdh4.5.0 export HADOOP_CONF_DIR= $HADOOP_HOME/etc/hadoop SPARK_EXECUTOR_INSTANCES=2 SPARK_EXECUTOR_CORES=1 SPARK_EXECUTOR_MEMORY=400M SPARK_DRIVER_MEMORY=400M SPARK_YARN_APP_NAME=&Spark 1.0.0& 其中:(1) HADOOP_HOME:当前节点中 HDFS 的部署路径,因为 Spark 需要和 HDFS 中的节 点在一起;(2) HADOOP_CONF_DIR:HDFS 节点中的 conf 配置文件路径,正常情况下此目录为 $HADOOP_HOME/etc/hadoop; (3) SPARK_EXECUTOR_INSTANCES:在 Yarn 集群中启动的 Worker 的数目, 默认为 2 个; (4) SPARK_EXECUTOR_CORES:每个 Worker 所占用的 CPU 核的数目; (5) SPARK_EXECUTOR_MEMORY:每个 Worker 所占用的内存大小; (6) SPARK_DRIVER_MEMORY:Spark 应用程序 Application 所占的内存大小,这里的 Driver 对应 Yarn 中的 ApplicationMaster; (7) SPARK_YARN_APP_NAME:Spark Application 在 Yarn 中的名字; 配置完成后,将 Spark 部署文件放置到 Yarn 的节点中即可。这里,将 spark-1.0.0 整个目录放到 Yarn 集群的一个节点 192.168.1.112 的/home/hadoop(设为 spark 的安装路径的父目录)路径下。3.1.2 测试 在 Spark 的部署路径的 bin 路径下,执行 spark-submit 脚本来运行 spark-examples 包中的例子。执行如下:./bin/spark-submit --master yarn \ --class org.apache.spark.examples.JavaWordCount \ --executor-memory 400M \ --driver-memory 400M \ /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spar k-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar ./hdfs-site.xm l这个例子是计算 WordCount 的,例子被打包在 /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0. 0-hadoop2.0.0-cdh4.5.0.jar 包中,对应的 Class 为 org.apache.spark.examples.JavaWordCount, ./hdfs-site.xml 是 HDFS 中指定路径 下的一个文件, WordCount 就是针对它来做的。 而--master yarn 就是指定运行在 Yarn 集群中,以 yarn 模式运行。 Spark On Yarn 有两种运行模式,一种是 Yarn Cluster 方式,一种是 Yarn Client 方 式。 (1) Yarn Cluster: Spark Driver 程序将作为一个 ApplicationMaster 在 YARN 集群中 先启动,然后再由 ApplicationMaster 向 RM 申请资源启动 executor 以运行 Task。因为 Driver 程序在 Yarn 中运行,所以程序的运行结果不能在客户端显示,所以最好将结果保 存在 HDFS 上,客户端的终端显示的是作为 Yarn 的 job 的运行情况。 (2) Yarn Client: Spark Driver 程序在客户端上运行,然后向 Yarn 申请运行 exeutor 以运行 Task,本地程序负责最后的结果汇总等。客户端的 Driver 将应用提交给 Yarn 后, Yarn 会先后启动 ApplicationMaster 和 executor, 另外 ApplicationMaster 和 executor 都是装载在 container 里运行,container 默认的内存是 1G,ApplicationMaster 分配的 内存是 driver-memory, executor 分配的内存是 executor-memory。 同时, 因为 Driver 在客户端,所以程序的运行结果可以在客户端显示,Driver 以进程名为 SparkSubmit 的形式存在。上面命令中的提交方式“yarn”就是默认按照“Yarn Client”方式运行。用户可自定义 运行方式,通过“--master”指定程序以 yarn、yarn-cluster 或者 yarn-client 中的一 种方式运行。需要重点说明的是最后文件的路径, 是相当于 HDFS 中的/user/hadoop 而言, hadoop 是当前命令的用户。“./hdfs-site.xml”在 HDFS 中的全路径为 “hdfs://namespace/user/hadoop/hdfs-site.xml”,其中 hadoop 是当前的用 户,namespace 是 HDFS 的命名空间;如果写成“/hdfs-site.xml”则在 HDFS 中指的是 “hdfs://namespace/hdfs-site.xml”;当然也可以直接传入 “hdfs://namespace/user/hadoop/hdfs-site.xml”用于指定在 HDFS 中的要进 行 WordCount 计算的文件。 另外,Spark 应用程序需要的 CPU Core 数目和内存,需要根据当前 Yarn 的 NodeManager 的硬件条件相应设置,不能超过 NodeManager 的硬件条件。./bin/spark-submit --master yarn \ --class org.apache.spark.examples.JavaWordCount \ --executor-memory 400M \ --driver-memory 400M \ /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spar k-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar hdfs://namespa ce/user/hadoop/hdfs-site.xml在 Yarn 的 ResourceManager 对应的 Web 界面中查看启动的 Application。Running:Success: 同时可以在启动脚本的客户端看到 WordCount 的运行结果: 3.2 Spark Standalone这种模式,就是把 Spark 单独作为一个集群来进行部署。集群中有两种节点,一种是 Master,另一种是 Worker 节点。Master 负责分配任务给 Worker 节点来执行,并负 责最后的结果合并,Worker 节点负责具体的任务执行。 3.2.1 配置 所需修改的配置文件除了 spark-env.sh 文件以外,还有 slave 文件,都位于 conf 目 录中。 slave 文件中保存的是 worker 节点 host 或者 IP,此处的配置为: 192.168.1.112 192.168.1.113 192.168.1.114 至于 spark-env.sh 文件,可以配置如下属性: (1) SPARK_MASTER_PORT:Master 服务端口,默认为 7077; (2) SPARK_WORKER_CORES:每个 Worker 进程所需要的 CPU 核的数目; (3) SPARK_WORKER_MEMORY:每个 Worker 进程所需要的内存大小; (4) SPARK_WORKER_INSTANCES:每个 Worker 节点上运行 Worker 进程的数目; (5) SPARK_MASTER_WEBUI_PORT:Master 节点对应 Web 服务的端口;(6)export SPARK_DAEMON_JAVA_OPTS=&-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:.1.118:8.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark&:用于指定 Master 的 HA, 依赖于 zookeeper 集群; (7) export SPARK_JAVA_OPTS=&-Dspark.cores.max=4&:用于限定每个提交的 Spark Application 的使用的 CPU 核的数目,因为缺省情况下提交的 Application 会使 用所有集群中剩余的 CPU Core。 注意在 Worker 进程的 CPU 个数和内存大小的时候,要结合机器的实际硬件条件,如 果一个 Worker 节点上的所有 Worker 进程需要的 CPU 总数目或者内存大小超过当前 Worker 节点的硬件条件,则 Worker 进程会启动失败。 将配置好的 Spark 文件拷贝至每个 Spark 集群的节点上的相同路径中。为方便使用 spark-shell,可以在环境变量中配置上 SPARK_HOME。 3.2.2 启动 配置结束后,就该启动集群了。这里使用 Master 的 HA 方式,选取 192.168.1.112、 192.168.1.113 节点作为 Master, 192.168.1.112、 192.168.1.113、 192.168.1.114 节点上运行两个 Worker 进程。 首先在 192.168.1.113 节点上做此操作:启动之后,可以查看当前节点的进程:另外,为了保证 Master 的 HA,在 192.168.1.112 节点上只启动 Master: 192.168.1.112 节点的进程为:启动过后,通过 Web 页面查看集群的情况,这里访问的是:http://192.168.1.113:8090/ 再看 standby 节点 192.168.1.112 的 web 界面 http://192.168.1.112:8090/ 3.2.3 测试 Spark 的 bin 子目录中的 spark-submit 脚本是用于提交程序到集群中运行的工具, 我 们使用此工具做一个关于 pi 的计算。命令如下:./bin/spark-submit --master spark://spark113:7077 \ --class org.apache.spark.examples.SparkPi \ --name Spark-Pi --executor-memory 400M \ --driver-memory 512M \ /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spar k-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar 其中--master 参数用于指定 Master 节点的 URI,但是这里填的是 Host,不是 IP! 任务启动之后,在 Spark 的 Master 的 Web 界面可以看到运行中的 Application。任务运行结束之后,在 Web 界面中 Completed Applications 表格中会看到对应的结 果。同时,命令行中会打印出来运行的结果,如下所示:4 spark-submit 工具 上面测试程序的提交都是使用的 spark-submit 脚本,其位于$SPARK_HOME/bin 目 录中,执行时需要传入的参数说明如下:Usage: spark-submit [options] &app jar | python file& [app options]参数名称含义可以是 spark://host:port, mesos://host:port, yarn, --master MASTER_URL yarn-cluster,yarn-client, local--deploy-mode DEPLOY_MODEDriver 程序运行的地方,client 或者 cluster--class CLASS_NAME主类名称,含包名--name NAMEApplication 名称--jars JARSDriver 依赖的第三方 jar 包用逗号隔开的放置在 Python 应用程序 PYTHONPATH 上的.zip, .egg, .py --py-files PY_FILES 文件列表--files FILES用逗号隔开的要放置在每个 executor 工作目录的文件列表--properties-file FILE设置应用程序属性的文件路径,默认是 conf/spark-defaults.conf--driver-memory MEMDriver 程序使用内存大小--driver-java-options--driver-library-pathDriver 程序的库路径--driver-class-pathDriver 程序的类路径--executor-memory MEMexecutor 内存大小,默认 1G --driver-cores NUMDriver 程序的使用 CPU 个数,仅限于 Spark Alone 模式--supervise失败后是否重启 Driver,仅限于 Spark Alone 模式--total-executor-cores NUMexecutor 使用的总核数,仅限于 Spark Alone、Spark on Mesos 模式--executor-cores NUM每个 executor 使用的内核数,默认为 1,仅限于 Spark on Yarn 模式提交应用程序给哪个 YARN 的队列, 默认是 default 队列, 仅限于 Spark on --queue QUEUE_NAME Yarn 模式--num-executors NUM启动的 executor 数量,默认是 2 个,仅限于 Spark on Yarn 模式--archives ARCHIVES仅限于 Spark on Yarn 模式另外,在执行 spark-submit.sh 工具进行提交应用之前,可以使用如下方式提前定义 好当前 Spark Application 所使用的 CPU Core 数目和内存大小:SPARK_JAVA_OPTS=&-Dspark.cores.max=2 -Dspark.executor.me mory=600m& \ ./bin/spark-submit --master spark://update113:7077 \ --class org.apache.spark.examples.SparkPi \ ? ?5 Spark HistoryServer 类似于 Mapreduce 的 JobHistoryServer,Spark 也有一个服务可以保存历史 Application 的运行记录。 修改$SPARK_HOME/conf 下的 spark-defaults.conf 文件(注意,修改后的配置文件 在每个节点都要有),其中可修改的配置属性为:属性名称默认值含义spark.history.updateInterval10以秒为单位,更新日志相关信息的时间间隔保存 Application 历史记录的个数,如果超过这个值,旧的应 spark.history.retainedApplications 250 用程序信息将被删除spark.history.ui.port18080HistoryServer 的 web 端口是否使用 kerberos 方式登录访问 HistoryServer,对于持久 spark.history.kerberos.enabled False 层位于安全集群的 HDFS 上是有用的,如果设置为 true,就要 配置下面的两个属性spark.history.kerberos.principal用于 HistoryServer 的 kerberos 主体名称spark.history.kerberos.keytab用于 HistoryServer 的 kerberos keytab 文件位置授权用户查看应用程序信息的时候是否检查 acl。如果启用,只 spark.history.ui.acls.enable False 有应用程序所有者和 spark.ui.view.acls 指定的用户可以查看 应用程序信息;否则,不做任何检查spark.eventLog.enabledFalse是否记录 Spark 事件保存日志相关信息的路径,可以是 hdfs://开头的 HDFS 路径, spark.eventLog.dir 也可以是 file://开头的本地路径,都需要提前创建spark.yarn.historyServer.addressServer 端的 URL:Ip:port 或者 host:port 此处的设置如下:spark.eventLog.enabled spark.eventLog.dirtrue hdfs://yh/user/hadoop/sparklogs update113:18080spark.yarn.historyServer.address设置完文件之后,进入 sbin 目录启动服务:运行完成的 Application 历史记录可以通过访问上面指定的 HistoryServer 地址查看, 这里是 http://192.168.1.113:18080/。无论运行时是本地模式,还是 yarn-client、yarn-cluster,运行记录均可在此页面查 看。并且程序运行时的环境变量、系统参数、各个阶段的耗时均可在此查看,很强大!6 Spark 可配置参数 Spark 参数的配置可通过三种方式:SparkConf 方式 & 命令行参数方式 &文件配置方 式。6.1 应用属性属性名默认值含义spark.app.name应用程序名称spark.master要连接的 Spark 集群 Master 的 URLspark.executor.memory512 m每个 executor 使用的内存大小序列化方式,官方建议使用 org.apache.spark spark.serializer .serializer.JavaSerializer 以任意是定义为 org.apache.spark.Serializer 子类的序 化器 org.apache.spark.serializer.KryoSerializer,当然也可如果要使用 Kryo 序化器,需要创建一个继承 spark.kryo.registrator KryoRegistrator 的类并设置系统属性 spark.kryo.registrator 指向该类用于保存 map 输出文件或者转储 RDD。可以多个目录,之 间以逗号分隔。在 Spark 1.0 及更高版本此属性会被环境 spark.local.dir /tmp 变量 SPARK_LOCAL_DIRS (Standalone、 Mesos) 或 LOCAL_DIRS (YARN) 代替spark.logConfFalseSparkContext 启动时是否记录有效 SparkConf 信息 6.2 运行环境变量属性名默认值含义传递给 executor 的额外 JVM 选项,但是不能使用它来 spark.executor.extraJavaOptions 设置 Spark 属性或堆空间大小spark.executor.extraClassPath追加到 executor 类路径中的附加类路径spark.executor.extraLibraryPath启动 executor JVM 时要用到的特殊库路径executor 在加载类的时候是否优先使用用户自定义的 spark.files.userClassPathFirst False JAR 包,而不是 Spark 带有的 JAR 包,目前,该属性只 是一项试验功能6.3 Shuffle 操作相关属性属性名 默认值 含义如果为 true,在 shuffle 时就合并中间文件,对于有大量 Reduce 任务的 shuffle 来说, 合并文件可以提高文件系统性 spark.shuffle.consolidateFiles False 能, 如果使用的是 ext4 或 xfs 文件系统, 建议设置为 true; 对于 ext3,由于文件系统的限制,设置为 true 反而会使内 核&8 的机器降低性能如果为 true,在 shuffle 期间通过溢出数据到磁盘来降低了 spark.shuffle.spill True 内存使用总量,溢出阈值是由 spark.shuffle.memoryFraction 指定的spark.pressTrue是否压缩在 shuffle 期间溢出的数据,如果压缩将使用 pression.codec。是否压缩 map 输出文件,压缩将使用 press True pression.codec。每个 shuffle 的文件输出流内存缓冲区的大小, 以 KB 为单位。 spark.shuffle.file.buffer.kb 100 这些缓冲区可以减少磁盘寻道的次数,也减少创建 shuffle 中间文件时的系统调用每个 reduce 任务同时获取 map 输出的最大大小 (以兆字 节为单位) 。 由于每个 map 输出都需要一个缓冲区来接收它, spark.reducer.maxMbInFlight 48 这代表着每个 reduce 任务有固定的内存开销,所以要设置 小点,除非有很大内存6.4 SparkUI 相关属性属性名 默认值 含义spark.ui.port4040应用程序 webUI 的端口spark.ui.retainedStages1000在 GC 之前保留的 stage 数量spark.ui.killEnabledTrue允许在 webUI 将 stage 和相应的 job 杀死是否记录 Spark 事件,用于应用程序在完成后重构 spark.eventLog.enabled False webUI是否压缩记录 Spark 事件,前提 press False spark.eventLog.enabled 为 true如果 spark.eventLog.enabled 为 true,该属性为记 spark.eventLog.dir file:///tmp/spark-events 录 spark 事件的根目录。在此根目录中,Spark 为每个 应用程序创建分目录,并将应用程序的事件记录到在此 目录中。 可以将此属性设置为 HDFS 目录, 以便 history server 读取历史记录文件6.5 压缩和序列化相关属性属性名 默认值 含义pressTrue是否在发送之前压缩广播变量pressFalse是否压缩 RDD 分区用于压缩内部数据如 RDD 分区和 shuffle 输出的编码 解码器, org.apache.spark.io. pression.codec LZFCompressionCodec org.apache.spark.io.SnappyCompressionCodec 。其中,Snappy 提供更快速的压缩和解压缩,而 LZF 提供了更好的压缩比 org.apache.spark.io.LZFCompressionCodec 和pression.snappy 32768 .block.size使用 Snappy 编码解码器时, 编码解码器使用的块大小 (以字节为单位)org.apache.spark.serialize spark.closure.serializer r. JavaSerializer 用于闭包的序化器,目前只有支持 Java 序化器org.apache.spark.serializer.JavaSerializer 序列 化时,会缓存对象以防止写入冗余数据,此时会停止这 spark.serializer. 10000 objectStreamReset 就可以收集旧对象。 若要关闭这重定期重置功能将其设 置为& = 0 。默认情况下每 10000 个对象将重置序化 器 些对象的垃圾收集。通过调用重置序化器,刷新该信息 当使用 Kryo 序化数据时, 是否跟踪对同一对象的引用。 spark.kryo.referenceTracking True 如果你的对象图有回路或者同一对象有多个副本, 有必 要设置为 true;其他情况下可以禁用以提高性能在 Kryo 里允许的最大对象大小(Kryo 会创建一个缓 spark.kryoserializer.buffer.m 2 b 冲区,至少和序化的最大单个对象一样大)。每个 worker 的每个 core 只有一个缓冲区6.6 执行时相关属性属性名 默认值 含义本地模式:机器核数 如果用户不设置,系统使用集群中运行 spark.default.parallelism Mesos:8 其他:max(executor 的 core,2) shuffle 操作的默认任务数 (groupByKey、 reduceByKey 等)org.apache.spark.broadcast. spark.broadcast.factory HttpBroadcastFactory 广播的实现类TorrentBroadcastFactory 块大小(以 spark.broadcast.blockSize 4096 kb 为单位)。过大会降低广播速度;过小 会使印象 BlockManager 性能通过 SparkContext.addFile() 添加的 spark.files.overwrite Fale 文件在目标中已经存在并且内容不匹配 时,是否覆盖目标文件在获取由 driver 通过 spark.files.fetchTimeout False SparkContext.addFile() 添加的文件 时,是否使用通信时间超时 spark.storage.memoryFraction0.6Java 堆用于 cache 的比例用于存储 RDD 的 techyon 目录, tachyon 文件系统的 URL 由 spark.tachyonStore.baseDir System.getProperty(&java.io.tmpdir&) spark.tachyonStore.url 设置,也可以 是逗号分隔的多个 techyon 目录以字节为单位的块大小,用于磁盘读取一 spark.storage. 8192 memoryMapThreshold 个块大小时进行内存映射。这可以防止 Spark 在内存映射时使用很小块,一般情 况下,对块进行内存映射的开销接近或低 于操作系统的页大小spark.tachyonStore.urltachyon://localhost:19998基于 techyon 文件的 URLspark 记录任何元数据(stages 生成、 task 生成等)的持续时间。定期清理可以 确保将超期的元数据丢弃,这在运行长时 spark.cleaner.ttl 间任务是很有用的,如运行 7*24 的 sparkstreaming 任务。 RDD 持久化在内 存中的超期数据也会被清理6.7 网络相关属性属性名 默认值 含义spark.driver.host运行 driver 的主机名或 IP 地址spark.driver.port随机driver 侦听的端口以 MB 为单位的 driver 和 executor 之间通信信息的大小, spark.akka.frameSize 10 设置值越大,driver 可以接受更大的计算结果 用于通信的 actor 线程数,在大型集群中拥有更多 CPU 内 spark.akka.threads 4 核的 driver 可以增加 actor 线程数spark.akka.timeout100以秒为单位的 Spark 节点之间超时时间下面 3 个参数是用于设置 Akka 自带的故障探测器。 启用的 话,以秒为单位设置如下这三个参数,有助于对恶意的 executor 的定位,而对于由于 GC 暂停或网络滞后引起的 spark.akka.heartbeat.pauses 600 情况下,不需要开启故障探测器;另外故障探测器的开启会 导致由于心跳信息的频繁交换而引起的网络泛滥。本参数是设置可接受的心跳停顿时间对应 Akka 的 spark.akka.failure-detector.threshold 300.0 akka.remote.transport-failure-detector.thresholdspark.akka.heartbeat.interval1000心跳间隔时间6.8 调度相关属性属性名 默认值 含义spark.task.cpus1为每个任务分配的内核数spark.task.maxFailures4Task 的最大重试次数spark.scheduler.modeFIFOSpark 的任务调度模式,还有一种 Fair 模式当应用程序运行在 Standalone 集群或者粗粒度共享模 spark.cores.max 式 Mesos 集群时,应用程序向集群请求的最大 CPU 内 核总数(不是指每台机器,而是整个集群)。如果不设 置,对于 Standalone 集群将使用 spark.deploy.defaultCores 中数值,而 Mesos 将使 用集群中可用的内核如果设置为 true,在 Mesos 集群中运行时使用粗粒度 spark.mesos.coarse False 共享模式以下几个参数是关于 Spark 推测执行机制的相关参数。 此参数设定是否使用推测执行机制, 如果设置为 true 则 spark.speculation False spark 使用推测执行机制, 对于 Stage 中拖后腿的 Task 在其他节点中重新启动,并将最先完成的 Task 的计算 结果最为最终结果Spark 多长时间进行检查 task 运行状态用以推测,以 spark.speculation.interval 100 毫秒为单位spark.speculation.quantile0.75推测启动前,Stage 必须要完成总 Task 的百分比spark.speculation.multiplier1.5比已完成 Task 的运行速度中位数慢多少倍才启用推测以下几个参数是关于 Spark 数据本地性的。本参数是以 毫秒为单位启动本地数据 task 的等待时间, 如果超出就 启动下一本地优先级别的 task。该设置同样可以应用到 spark.locality.wait 3000 各优先级别的本地性之间(本地进程 -& 本地节点 -& 本地机架 -& 任意节点 ),当然,也可以通过 spark.locality.wait.node 等参数设置不同优先级别的 本地性spark.locality.wait.processspark.locality.wait本地进程级别的本地等待时间spark.locality.wait.nodespark.locality.wait本地节点级别的本地等待时间spark.locality.wait.rackspark.locality.wait本地机架级别的本地等待时间spark.scheduler.revive.interval1000复活重新获取资源的 Task 的最长时间间隔(毫秒), 发生在 Task 因为本地资源不足而将资源分配给其他 Task 运行后进入等待时间, 如果这个等待时间内重新获 取足够的资源就继续计算6.9 安全相关属性属性名 默认值 含义spark.authenticateFalse是否启用内部身份验证设置组件之间进行身份验证的密钥。如果不是 YARN 上运行并且 spark.authenticate.secret spark.authenticate 为 true 时,需要设置密钥spark.core.connection. 30 auth.wait.timeout 进行身份认证的超时时间Spark web UI 要使用的以逗号分隔的筛选器名称列表。筛选器要符合 javax servlet Filter 标准,每个筛选器的参数可以通过设置 java 系统属 性来指定:spark.&class name of spark.ui.filters filter&.params='param1=value1,param2=value2' 例如:-Dspark.ui.filters=com.test.filter1 -.test.filter1.params='param1=foo,param2=testing'Spark webUI 存取权限是否启用。如果启用,在用户浏览 web 界面的时 spark.ui.acls.enable False 候会检查用户是否有访问权限以逗号分隔 Spark webUI 访问用户的列表。默认情况下只有启动 Spark spark.ui.view.acls job 的用户才有访问权限 6.10 SparkStreaming 相关属性属性名 默认值 含义Spark Streaming 接收器将接收数据合并成数据块并存储 spark.streaming.blockInterval 200 在 Spark 里的时间间隔,毫秒如果设置为 true,强迫将 SparkStreaming 持久化的 RDD 数据从 Spark 内存中清理,同样的,SparkStreaming 接收 spark.streaming.unpersist True 的原始输入数据也会自动被清理;如果设置为 false,则允许 原始输入数据和持久化的 RDD 数据可被外部的 Streaming 应用程序访问,因为这些数据不会自动清理6.11 Standalone 模式特有属性可以在文件 conf/spark-env.sh 中来设置此模式的特有相关属性: (1)SPARK_MASTER_OPTS:配置 master 使用的属性 (2)SPARK_WORKER_OPTS:配置 worker 使用的属性 (3)SPARK_DAEMON_JAVA_OPTS:配置 master 和 work 都使用的属性配置的时候,使用类似的语句:export SPARK_MASTER_OPTS=&-Dx1=y1 -Dx2=y2& 其中 x 代表属性,y 代表属性值。 SPARK_MASTER_OPTS 所支持的属性有:属性名默认值含义 Standalone 集群管理器是否自由选择节点还是固定到尽可能少 spark.deploy.spreadOut True 的节点,前者会有更好的数据本地性,后者对于计算密集型工作负 载更有效spark.worker.timeout60master 因为没有收到心跳信息而认为 worker 丢失的时间(秒)如果没有设置 spark.cores.max,该参数设置 Standalone 集群 分配给应用程序的最大内核数,如果不设置,应用程序获取所有的 spark.deploy.defaultCores 有效内核。注意在一个共享的集群中,设置一个低值防止攫取了所 有的内核,影响他人的使用SPARK_WORKER_OPTS 所支持的属性有属性名默认值含义是否定期清理 worker 的应用程序工作目录, 只适用于 spark.worker.cleanup.enabled False Standalone 模式, 清理的时候将无视应用程序是否在 运行清理 worker 本地过期的应用程序工作目录的时间间 spark.worker.cleanup.interval 1800 隔(秒)worker 保留应用程序工作目录的有效时间。该时间由 spark.worker.cleanup.appDataTtl 7*24*3600 磁盘空间、应用程序日志、应用程序的 jar 包以及应用 程序的提交频率来设定SPARK_DAEMON_JAVA_OPTS 所支持的属性有:属性名含义 下面 3 个参数是用于配置 zookeeper 模式的 master HA。设置为 spark.deploy.recoveryMode ZOOKEEPER 表示启用 master 备用恢复模式,默认为 NONEspark.deploy.zookeeper.urlzookeeper 集群 URLspark.deploy.zookeeper.dirzooKeeper 保存恢复状态的目录,缺省为/sparkspark.deploy.recoveryMode设成 FILESYSTEM 启用 master 单节点恢复模式,缺省值为 NONEspark.deploy.recoveryDirectorySpark 保存恢复状态的目录6.12 Spark on Yarn 特有属性属性名 默认值 含义RM 等待 Spark AppMaster 启动重试次 spark.yarn.applicationMaster.waitTries 10 数,也就是 SparkContext 初始化次数。 超过这个数值,启动失败spark.yarn.submit.file.replication3应用程序上传到 HDFS 的文件的副本数若为 true, 在 job 结束后, 将 stage 相关 spark.yarn.preserve.staging.files False 的文件保留而不是删除Spark AppMaster 发送心跳信息给 spark.yarn.scheduler.heartbeat.interval-ms 5000 YARN RM 的时间间隔2 倍于 spark.yarn.max.executor.failures executor 数导致应用程序宣告失败的最大 executor 失败次数spark.yarn.historyServer.addressSpark history server 的地址(不要加 http://)。这个地址会在 Spark 应用程 序完成后提交给 YARN RM,然后 RM 将 信息从 RM UI 写到 history server UI 上。7 示例配置主要的配置文件均位于$SPARK_HOME/conf 中,包括 slave、spark-env.sh、 spark-defaults.conf 文件等。7.1 slave 文件192.168.1.112 192.168.1.113 192.168.1.1147.2 spark-env.sh 文件export JAVA_HOME=&/export/servers/jdk1.6.0_25& #yarn export HADOOP_HOME=/home/hadoop/hadoop-2.0.0-cdh4.5.0 export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop SPARK_EXECUTOR_INSTANCES=2 SPARK_EXECUTOR_CORES=1 SPARK_EXECUTOR_MEMORY=400M SPARK_DRIVER_MEMORY=400M SPARK_YARN_APP_NAME=&Spark 1.0.0&#alone SPARK_MASTER_WEBUI_PORT=8090 SPARK_WORKER_MEMORY=400M SPARK_WORKER_CORES=1 SPARK_WORKER_INSTANCES=2 #Master HA export SPARK_DAEMON_JAVA_OPTS=&-Dspark.deploy.recoveryMo de=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117: .1.118:.1.119:2181 -Dspark.deplo y.zookeeper.dir=/spark&7.3 spark-defaults.conf 文件#history server spark.eventLog.enabled spark.eventLog.dir klogs spark.yarn.historyServer.address spark113:18080 true hdfs://namespace/user/hadoop/spar #shuffle spark.shuffle.consolidateFiles true #task spark.task.cpus 1 spark.task.maxFailures 3 #scheduler type spark.scheduler.mode FAIR #security park.authenticate true spark.authenticate.secret hadoop spark.core.connection.auth.wait.timeout 1500 spark.ui.acls.enable true spark.ui.view.acls root,hadoop #each executor used max memory spark.executor.memory 400m #spark on yarn spark.yarn.applicationMaster.waitTries 5 spark.yarn.submit.file.replication 3 spark.yarn.preserve.staging.files false spark.yarn.scheduler.heartbeat.interval-ms 5000 #park standalone and on mesos spark.cores.max 48 Spark SQLSpark 支持 Scala、Python 等语言写的脚本直接在 Spark 环境执行,更重要的是支持 对 Hive 语句进行包装后在 Spark 上运行。这就是 Spark SQL。8.1 相关配置配置的步骤比较简单, 把 Hive 的配置文件 hive-site.xml 直接放置到$SPARK_HOME 的 conf 路径下即可。如果是想在 Spark 集群本地执行 SQL 的话,每个对应的节点都要做 同样的配置。8.2 运行 SQL启动 bin 目录下的 spark-shell 脚本,依次执行如下语句:val sc: SparkContext val hiveContext = new org.apache.spark.sql.hive.HiveCont ext(sc) import hiveContext._ hql(&CREATE TABLE IF NOT EXISTS src (key INT, value STRIN G)&) hql(&LOAD DATA LOCAL INPATH '/examples/data.txt' INTO TAB LE src&) hql(&FROM src SELECT key, value&).collect().foreach(prin tln)上面的命令,分别是声明 SparkContext 对象,利用 hql 方法执行 Hive 的 SQL 语句, 在执行 SQL 语句的过程中,可以通过 Hive 的 Cli 客户端进行查看相应操作的结果。8.3 on yarn 模式由于 spark-shell 脚本是在本地执行的,如果想放到 Yarn 上去执行的话,可以使用上 面第 4 节中的 spark-submit 工具,这时候需要对需要输入的 sql 语句进行包装,将包装 类打包成 jar 文件,再提交。包装类的代码如下:1 23 5import java.util.L 4import org.apache.spark.SparkC 6 import org.apache.spark.api.java.JavaSparkC 7 import org.apache.spark.sql.api.java.R 8 import org.apache.spark.sql.hive.api.java.JavaHiveC 9 10 /**11* Description:12 */15* Author: 13* Date: 4public class SparkSQL { public static void main(String[] args) { if(args.length != 2){System.out.println(&usage: &applicationName& &sql statme nts&&); System.exit(1);21 licationName = args[0];24 26 licationName);27 rkContext(conf);28 ew JavaHiveContext(sc);29 Context.hql(sql).collect();30 31 }22 23 String appString sql = args[1];25SparkConf conf = new SparkConf().setAppName(app JavaSparkContext sc = new JavaSpa JavaHiveContext hiveContext = n List&Row& results = hive System.out.println(&Sql is:& + sql + &, has been executed over.&);32 System.out.println(&The result size is & + results.siz e() + &, they are:&);33 e(); i++){34 toString());35 xecute over ...&);38 }36 37 for(int i=0; i&results.siz System.out.println(results.get(i). System.out.println(&E System.ou }41 42 } sc.stop();39t.println(&Stop over ...&);40将其打包成 jar 文件 spark-0.0.1-SNAPSHOT.jar,再使用 spark-submit 工具进行 任务的提交,命令如下:./spark-submit \ --class spark.SparkSQL \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 400m --executor-memory 400m --executor-c ores 1 \ --jars /home/hadoop/spark-1.0.0/examples/libs/spark-core _2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/examples/libs/s park-hive_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/lib_ma naged/jars/datanucleus-api-jdo-3.2.1.jar,/home/hadoop/sp ark-1.0.0/lib_managed/jars/datanucleus-core-3.2.2.jar,/h ome/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-rdbm s-3.2.1.jar,/home/hadoop/hive-0.12.0/lib/mysql-connector -java-5.1.27-bin.jar --files /home/hadoop/spark-1.0.0/conf/hive-site.xml \ /home/hadoop/spark-1.0.0/examples/libs/spark-0.0.1-SNAPS HOT.jar &hiveTest& &CREATE TABLE IF NOT EXISTS test4 (key INT, value STRING)&其中,--master 参数指定的是 yarn-cluster 模式,当然也可以使用 yarn-client 模式, 至于区别,已经在上文说了;--class 指定的是我们包装类的主类,见上文源码;--jars 是 依赖的四个 jar 包;--files 是指定的 hive-site.xml 配置文件,提交到 Yarn 中的 Application 在执行的时候, 需要把此配置文件分发到每个 Executor 上; 最后的两个参数, 一个是 Application 的名称,一个是运行的 SQL 语句。 运行结束后,可以到 Spark HistoryServer 中查看运行结果。 出自:/Linux/304.htm Spark 源码系列(七)Spark on yarn 具 体实现本来不打算写的了, 但是真的是闲来无事, 整天看美剧也没啥意思。 这一章打算讲一下 Spark on yarn 的实现,1.0.0 里面已经是一个 stable 的版本了,可是 1.0.1 也出来了,离 1.0.0 发布才一个 月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲 1.0.0 的代码,所以各位朋友也不要再问我 讲的是哪个版本,目前为止发布的文章都是基于 1.0.0 的代码。 在第一章《spark-submit 提交作业过程》的时候,我们讲过 Spark on yarn 的在 cluster 模式 下它的 main class 是 org.apache.spark.deploy.yarn.Client。okay,这个就是我们的头号目标。提交作业找到 main 函数,里面调用了 run 方法,我们直接看 run 方法。val appId = runApp() monitorApplication(appId) System.exit(0)运行 App,跟踪 App,最后退出。我们先看 runApp 吧。def runApp(): ApplicationId = { // 校验参数,内存不能小于 384Mb,Executor 的数量不能少于 1 个。 validateArgs() // 这两个是父类的方法,初始化并且启动 Client init(yarnConf) start()// 记录集群的信息(e.g, NodeManagers 的数量,队列的信息). logClusterResourceDetails() // 准备提交请求到 ResourcManager (specifically its ApplicationsManager (ASM)// Get a new client application. val newApp = super.createApplication() val newAppResponse = newApp.getNewApplicationResponse() val appId = newAppResponse.getApplicationId() // 检查集群的内存是否满足当前的作业需求 verifyClusterResources(newAppResponse) // 准备资源和环境变量. //1.获得工作目录的具体地址: /.sparkStaging/appId/ val appStagingDir = getAppStagingDir(appId) //2.创建工作目录,设置工作目录权限,上传运行时所需要的 jar 包 val localResources = prepareLocalResources(appStagingDir) //3.设置运行时需要的环境变量 val launchEnv = setupLaunchEnv(localResources, appStagingDir) //4.设置运行时 JVM 参数,设置 SPARK_USE_CONC_INCR_GC 为 true 的话,就使用 CMS 的垃圾回收机制 val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv) // 设置 application submission context. val appContext = newApp.getApplicationSubmissionContext() appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType(&SPARK&) // 设置 ApplicationMaster 的内存,Resource 是表示资源的类,目前有 CPU 和内存两种. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) appContext.setResource(memoryResource) // 提交 Application. submitApp(appContext) appId }monitorApplication 就不说了,不停的调用 getApplicationReport 方法获得最新的 Report, 然后调用 getYarnApplicationState 获取当前状态, 如果状态为 FINISHED、 FAILED、 KILLED 就退出。 说到这里,顺便把跟 yarn 相关的参数也贴出来一下,大家一看就清楚了。def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r &- bySize if r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains(&spark.default.parallelism&)) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } } ApplicationMaster直接看 run 方法就可以了,main 函数就干了那么一件事...def run() { // 设置本地目录,默认是先使用 yarn 的 YARN_LOCAL_DIRS 目录,再到 LOCAL_DIRS System.setProperty(&spark.local.dir&, getLocalDirs())// set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty(&spark.ui.port&, &0&) // when running the AM, the Spark master is always &yarn-cluster& System.setProperty(&spark.master&, &yarn-cluster&) // 设置优先级为 30,和 mapreduce 的优先级一样。它比 HDFS 的优先级高,因为它的操作是清理该作业在 hdfs 上 面的 Staging 目录 ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() // 通过 yarn.resourcemanager.am.max-attempts 来设置,默认是 2 // 目前发现它只在清理 Staging 目录的时候用 isLastAMRetry = appAttemptId.getAttemptId() &= maxAppAttempts amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) amClient.start() // setup AmIpFilter for the SparkUI - do this before we start the UI // 方法的介绍说是 yarn 用来保护 ui 界面的,我感觉是设置 ip 代理的 addAmIpFilter() // 注册 ApplicationMaster 到内部的列表里 ApplicationMaster.register(this) // 安全认证相关的东西,默认是不开启的,省得给自己找事 val securityMgr = new SecurityManager(sparkConf) // 启动 driver 程序 userThread = startUserClass() // 等待 SparkContext 被实例化,主要是等待 spark.driver.port property 被使用 // 等待结束之后,实例化一个 YarnAllocationHandler waitForSparkContextInitialized() // Do this after Spark master is up and SparkContext is created so that we can register UI Url. // 向 yarn 注册当前的 ApplicationMaster, 这个时候 isFinished 不能为 true,是 true 就说明程序失败了 synchronized { if (!isFinished) { registerApplicationMaster() registered = true } } // 申请 Container 来启动 Executor allocateExecutors() // 等待程序运行结束 userThread.join() System.exit(0) }run 方法里面主要干了 5 项工作:1、初始化工作 2、启动 driver 程序 3、注册 ApplicationMaster 4、分配 Executors 5、等待程序运行结束 我们重点看分配 Executor 方法。private def allocateExecutors() { try { logInfo(&Allocating & + args.numExecutors + & executors.&) // 分 host、rack、任意机器三种类型向 ResourceManager 提交 ContainerRequest // 请求的 Container 数量可能大于需要的数量 yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. while (yarnAllocator.getNumExecutorsRunning & args.numExecutors && userThread.isAlive) { if (yarnAllocator.getNumExecutorsFailed &= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, &max number of executor failures reached&) } // 把请求回来的资源进行分配,并释放掉多余的资源 yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } } finally { // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } logInfo(&All executors have launched.&)// 启动一个线程来状态报告 if (userThread.isAlive) { // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = sparkConf.getLong(&spark.yarn.scheduler.heartbeat.interval-ms&, 5000) // must be &= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) launchReporterThread(interval) } }这里面我们只需要看 addResourceRequests 和 allocateResources 方法即可。 先说 addResourceRequests 方法,代码就不贴了。 Client 向 ResourceManager 提交 Container 的请求, 分三种类型: 优先选择机器、同一个 rack 的机器、任意机器。 优先选择机器是在 RDD 里面的 getPreferredLocations 获得的机器位置,如果没有优先选择机 器,也就没有同一个 rack 之说了,可以是任意机器。下面我们接着看 allocateResources 方法。 1、把从 ResourceManager 中获得的 Container 进行选择,选择顺序是按照前面的介绍的三种 类别依次进行,优先选择机器 & 同一个 rack 的机器 & 任意机器。 2、 选择了 Container 之后, 给每一个 Container 都启动一个 ExecutorRunner 一对一贴身服务, 给它发送运行 CoarseGrainedExecutorBackend 的命令。 3、ExecutorRunner 通过 NMClient 来向 NodeManager 发送请求。总结:把作业发布到 yarn 上面去执行这块涉及到的类不多,主要是涉及到 Client、 ApplicationMaster、YarnAllocationHandler、ExecutorRunner 这四个类。 1、Client 作为 Yarn 的客户端,负责向 Yarn 发送启动 ApplicationMaster 的命令。 2、ApplicationMaster 就像项目经理一样负责整个项目所需要的工作,包括请求资源,分配资 源,启动 Driver 和 Executor,Executor 启动失败的错误处理。 3、ApplicationMaster 的请求、分配资源是通过 YarnAllocationHandler 来进行的。 4、Container 选择的顺序是:优先选择机器 & 同一个 rack 的机器 & 任意机器。 5、ExecutorRunner 只负责向 Container 发送启动 CoarseGrainedExecutorBackend 的命令。 6、Executor 的错误处理是在 ApplicationMaster 的 launchReporterThread 方法里面,它启动 的线程除了报告运行状态,还会监控 Executor 的运行,一旦发现有丢失的 Executor 就重新请求。 7、在 yarn 目录下看到的名称里面带有 YarnClient 的是属于 yarn-client 模式的类,实现和 前面的也差不多。Apache Spark 探秘:三种分布式部署方 式比较址:http://dongxicheng.org/framework-on-yarn/apache-spark-comparing-th ree-deploying-ways/ 本博客的文章集合:http://dongxicheng.org/recommend/ 目前 Apache Spark 支持三种分布式部署方式, 分别是 standalone、 spark on mesos 和 spark on YARN,其中,第一种类似于 MapReduce 1.0 所采用的模式,内部实现了 容错性和资源管理, 后两种则是未来发展的趋势, 部分容错性和资源管理交由统一的资源管 理系统完成:让 Spark 运行在一个通用的资源管理系统之上,这样可以与其他计算框架, 比如 MapReduce,公用一个集群资源,最大的好处是降低运维成本和提高资源利用率(资 源按需分配)。本文将介绍这三种部署方式,并比较其优缺点。 standalone 模式,即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖 任何其他资源管理系统。从一定程度上说,该模式是其他两种的基础。借鉴 Spark 开发模 式,我们可以得到一种开发新型计算框架的一般思路:先设计出它的 standalone 模式,为 了快速开发,起初不需要考虑服务(比如 master/slave)的容错性,之后再开发相应的 wrapper,将 stanlone 模式下的服务原封不动的部署到资源管理系统 yarn 或者 mesos 上,由资源管理系统负责服务本身的容错。目前 Spark 在 standalone 模式下是没有任何 单点故障问题的,这是借助 zookeeper 实现的,思想类似于 Hbase master 单点故障解 决方案。将 Spark standalone 与 MapReduce 比较,会发现它们两个在架构上是完全一 致的:1) 都是由 master/slaves 服务组成的,且起初 master 均存在单点故障,后来均通过 zookeeper 解决(Apache MRv1 的 JobTracker 仍存在单点问题,但 CDH 版本得到了 解决);2) 各个节点上的资源被抽象成粗粒度的 slot,有多少 slot 就能同时运行多少 task。不 同的是,MapReduce 将 slot 分为 map slot 和 reduce slot,它们分别只能供 Map Task 和 Reduce Task 使用, 而不能共享, 这是 MapReduce 资源利率低效的原因之一, 而 Spark 则更优化一些,它不区分 slot 类型,只有一种 slot,可以供各种类型的 Task 使用,这种 方式可以提高资源利用率,但是不够灵活,不能为不同类型的 Task 定制 slot 资源。总之, 这两种方式各有优缺点。Spark On Mesos 模式。这是很多公司采用的模式,官方推荐这种模式(当然,原因之 一是血缘关系) 。 正是由于 Spark 开发之初就考虑到支持 Mesos, 因此, 目前而言, Spark 运行在 Mesos 上会比运行在 YARN 上更加灵活,更加自然。目前在 Spark On Mesos 环 境中, 用户可选择两种调度模式之一运行自己的应用程序 (可参考 Andrew Xia 的 “Mesos Scheduling Mode on Spark”): 1) 粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个 Dirver 和若干个 Executor 组成, 其中, 每个 Executor 占用若干资源, 内部可运行多个 Task (对 应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申 请好, 且运行过程中要一直占用这些资源, 即使不用, 最后程序运行结束后, 回收这些资源。 举个例子,比如你提交应用程序时,指定使用 5 个 executor 运行你的应用程序,每个 executor 占用 5GB 内存和 5 个 CPU,每个 executor 内部设置了 5 个 slot,则 Mesos 需要先为 executor 分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中, mesos 的 master 和 slave 并不知道 executor 内部各个 task 的运行情况,executor 直 接将任务状态通过内部的通信机制汇报给 Driver,从一定程度上可以认为,每个应用程序 利用 mesos 搭建了一个虚拟集群自己使用。 2) 细粒度模式 (Fine-grained Mode) : 鉴于粗粒度模式会造成大量资源浪费, Spark On Mesos 还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思 想是按需分配。 与粗粒度模式一样, 应用程序启动时, 先会启动 executor, 但每个 executor 占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos 会为 每个 executor 动态分配资源,每分配一些,便可以运行一个新任务,单个 Task 运行完之 后可以马上释放对应的资源。每个 Task 会汇报状态给 Mesos slave 和 Mesos Master, 便于更加细粒度管理和容错,这种调度模式类似于 MapReduce 调度模式,每个 Task 完 全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。Spark On YARN 模式。这是一种最有前景的部署模式。但限于 YARN 自身的发展,目 前仅支持粗粒度模式(Coarse-grained Mode)。这是由于 YARN 上的 Container 资源 是不可以动态伸缩的,一旦 Container 启动之后,可使用的资源不能再发生变化,不过这 个已经在 YARN 计划(具体参考: https://issues.apache.org/jira/browse/YARN-1197)中了。总之,这三种分布式部署方式各有利弊,通常需要根据公司情况决定采用哪种方案。进 行方案选择时,往往要考虑公司的技术路线(采用 Hadoop 生态系统还是其他生态系统)、 服务器资源(资源有限的话就不要考虑 standalone 模式了

我要回帖

更多关于 spark map reduce 的文章

 

随机推荐