SparkHA的部署方案

本篇内容主要讲解“Spark HA的部署方案”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Spark HA的部署方案”吧!

创新互联是一家专业提供荣昌企业网站建设,专注与成都网站设计、网站建设、成都h5网站建设、小程序制作等业务。10年已为荣昌众多企业、政府机构等服务。创新互联专业网站建设公司优惠进行中。

目录

一、 准备服务器环境

1.1、服务器规划

1.2、软件版本

二、安装ZooKeeper Cluster

三、安装Hadoop2 HA Cluster

四、安装HBase HA Cluster

五、安装Spark HA Cluster步骤

5.1、初始化配置

5.2、安装Spark

5.3配置环境变量(需要切换Root命令执行,执行完毕后一定记着切换到普通用户)

5.3.1修改系统环境变量(追加)

5.3.2修改spark的环境变量

5.3.3修改从节点

5.4安装其他集群机器

5.5启动spark

5.6启动HA方式

5.7检查是否启动 

5.8发布停止Dirver

5.9、启动关闭Driver的历史监控

5.10、hive on Spark配置

六、相关参数说明

***********************************************************************************

一、准备服务器环境

1.1、服务器规划

ZooKeeper Cluster
HostIP
zookeeper1192.168.50.228
zookeeper2192.168.50.229
zookeeper3192.168.50.230
Spark HA Cluster


HOSTIPMasterWorker
nn1192.168.50.221YN
nn2192.168.50.222YN
dn1192.168.50.225NY
dn2192.168.50.226NY
dn3192.168.50.227NY

1.2、软件版本

Linux: CentOS2.6.32-431.el6.x86_64

Hadoop:2.6.0

ZooKeeper:3.4.6

JDK/JRE: 1.7.0_75 (64bit)

spark-1.3.0-bin-hadoop2.4

二、安装ZooKeeper Cluster参考《Zookeeper部署文档_V1.0》

Spark依赖ZooKeeper做选举,故需要先部署ZooKeeper

三、安装Hadoop2 HA Cluster参考《Hadoop2 HA集群部署V1.0》

Spark独立模式,可以不使用HDFS,如果有yarn的集群分发则需要部署

四、安装HBase HA Cluster参考《HBase HA部署文档V1.0》

Spark如果不存储数据到HBase,可以不做部署

5.1、初始化配置

1、修改主机名称第一步临时修改

#hostname     nn1

第二步修改永久修改,防止下次重启后被重置

修改/etc/sysconfig/network中的hostname

NETWORKING=yes

HOSTNAME= nn1

第三步做DNS映射,可以直接访问主机名则访问的部本机IP地址

修改/etc/hosts文件

在最后增加一行,如

192.168.50.221    nn1

第四步重启电脑

重启后,ping nn1

如果能ping成功则配置完成

其他机器依次配置即可

2、关闭防火墙命令:service iptables stop

同时关闭防火墙自启动服务:chkconfig iptables off

查看是否关闭了自启动:chkconfig --list | grep iptables ,如果全部是off则表示都关闭了

查看状态:service iptables status

# service iptables status

Firewall is stopped.

3、创建应用帐户和组(可选,建议新建一个专门的用户)

为了系统安全建议每个对外应用都创建单独的帐户和组,具体创建方法请网上搜索。

#新建组

[root@nn2  ~]# groupadd  bdata

#添加用户和分组

[root@nn2  ~]# useradd -g bdata bdata

#设置密码

[root@nn2  ~]# passwd bdata

Changing password for user bdata.

New password:

BAD PASSWORD: it does not contain enough DIFFERENT characters

BAD PASSWORD: is a palindrome

Retype new password:

passwd: all authentication tokens updated successfully.

4、设置ssh

修改/etc/ssh/sshd_config root帐户修改

#vim /etc/ssh/sshd_config

取消如下注释

RSAAuthentication yes

PubkeyAuthentication yes

AuthorizedKeysFile .ssh/authorized_keys

修改后,重启ssh: service sshd restart

切换到普通用户下进行设置

生成SSH通讯密钥,有这个证书后,hadoop启动时就可以免密码登录了

a、先在每台机器上面执行ssh-keygen -t rsa -P "",密码都为空,主要是每次hadoop启动时需要认证,如果有密码每次都需要输入,实在麻烦,所以密码留空,回车即可,执行后,将产生两个文件,位于~/.ssh文件夹中

b、然后相互执行 ssh-copy-id userName@machineName,此功能可以快速把自己的公钥发给对方,并自动追加

[root@nn1  ~]# ssh nn1

进入后,exit退出SSH即可

这里注意,有时候还是需要你输入密码,这里有可能是authorized_keys的权限问题,我们进一步设置权限,chmod 600 authorized_keys即可

如果对于其他计算机则需要把你的公钥发送给对方,你就可以无密码登录了,认证了

5、安装JDK/JRE

Spark是使用Scala语言开发的软件,所以必须要安装JRE或JDK才可以运行,为了测试方便建议安装JDK(生产环境安装JRE即可),JDK安装步骤(略)

5.2、安装Spark下载地址:

http://mirrors.cnnic.cn/apache/spark/spark-1.3.0/spark-1.3.0-bin-hadoop2.4.tgz

由于spark基于scale写的,故需要scale库

下载地址:

http://downloads.typesafe.com/scala/2.10.5/scala-2.10.5.tgz?_ga=1.126203213.1556481652.1427182777

5.3、配置环境变量(需要切换Root命令执行,执行完毕后一定记着切换到普通用户)

5.3.1、修改系统环境变量(追加)

#vim /etc/profile

export JAVA_HOME=/home/utoken/software/jdk1.7.0_75

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export SCALA_HOME=/home/utoken/software/scala-2.10.5

export SPARK_HOME=/home/utoken/software/spark-1.3.0-bin-hadoop2.4

export PATH=$PATH:$JAVA_HOME/bin: $SCALA_HOME/bin:$SPARK_HOME/bin

5.3.2、修改spark的环境变量

5.3.2.1、修改Spark的环境变量配置文件

#vim spark-env.sh

#导入spark运行的环境变量

export JAVA_HOME=/home/utoken/software/jdk1.7.0_75

#关于这里为什么需要配置环境变量的解释:虽然我们在所有机器都配置了环境变量(~/.bash_profile),但start-all.sh是通过ssh登录slave机器,然后启动spark worker进程的,所以~/.base_profile必须是用户登录后才会执行,ssh登录是non-login登录不会触发.base_profile的执行,所以启动时worker机器会找不到JAVA_HOME,解决办法:把环境变更在复制一份到.baserc配置中,这个环境变量是在启动shell脚本时执行的

#如果是多Master的情况下,不能定义Spark_Master_IP的属性,否则无法启动多个Master,这个属性的定义可以在Application中定义

#export SPARK_MASTER_IP=nn1

#指定每个Worker需要的内存大小(全局)

export SPARK_WORKER_MEMORY=5g

#Spark的一直执行任务的文件

export SPARK_WORK_DIR=/home/utoken/datadir/spark/work

#Spark进行shuffle等的一些小文件,临时文件,可以观察打开的句柄数

export SPARK_LOCAL_DIRS=/home/utoken/datadir/spark/tmp

#采用Zookeeper保证HA,导入相应的环境变量

export SPARK_DAEMON_JAVA_OPTS="-Dsun.io.serialization.extendedDebugInfo=true -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181 -Dspark.deploy.zookeeper.dir=/spark"

或者采用另一种导入方式

#指定Spark恢复模式,这里采用Zookeeper模式,默认为NONE

export  -Dspark.deploy.recoveryMode=ZOOKEEPER

export  -Dspark.deploy.zookeeper.url=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181

export  -Dspark.deploy.zookeeper.dir=/spark

选项:

spark.deploy.recoveryMode       NONE   恢复模式(Master重新启动的模式),有三种:1, ZooKeeper, 2, FileSystem, 3 NONE

spark.deploy.zookeeper.url  ZooKeeper的Server地址

spark.deploy.zookeeper.dir  /spark ZooKeeper 保存集群元数据信息的文件目录,包括Worker,Driver和Application。

#下面是结合Spark On Yarn方式的集群模式需要配置的,独立集群模式不需要配置

export HADOOP_HOME=/home/utoken/software/hadoop-2.5.2

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

export YARN_CONF_DIR=$HADOOP_HOME/etc/Hadoop

5.3.2.2、修改Spark针对每个Driver应用程序的全局配置

#vim spark-defaults.conf(针对每个Driver有效)

spark-defaults.conf文件说明

影响范围:编辑driver所在机器上的spark-defaults.conf,该文件会影响到driver所提交运行的application,及专门为该application提供计算资源的executor的启动参数。因此只需要在driver所在的机器上编辑该文件,不需要在worker或master所运行的机器上编辑该文件。

          配置举例:

spark.executor.extraJavaOptions    -XX:MaxPermSize=1000m

spark.executor.memory   1g

spark.serializer org.apache.spark.serializer.KryoSerializer

spark.cores.max  32

spark.shuffle.manager   SORT

spark.driver.memory  2g

spark.shuffle.consolidateFiles true

详细说明:

spark.executor.extraJavaOptions 设置扩展Java选项,这里指定永久带最大内存1000m

spark.executor.memory  指定该application提供计算资源的executor启动时, heap memory需要有1g(占用的Worker内存),默认512m

spark.serializer    指定采用序列化的类,这里采用org.apache.spark.serializer.KryoSerializer

spark.cores.max         指定每个Driver运行的最大核心数,这里指定3G,默认系统拥有的所有核心数

spark.shuffle.manager    指定在shuffle的时候,采用的管理方式,这里指定SORT方式,进行排序可以减少临时文件的产生,但是性能稍微有些消耗

spark.driver.memory       指定Driver自身占用的内存大小,这里2G

以上属性也可以单独在SparkConf中进行配置,并根据每个机器性能进行优化指定

spark.shuffle.consolidateFiles 指定合并小文件,在map的时候会有很多小文件,在reduce后,其实只有一个结果,而小文件会留下,设置下面的就是在map后的小文件合并成一个文件了

关于Spark属性的优先权为:SparkConf方式 > 命令行参数方式 >文件配置方式

5.3.3、修改从节点

#vim slaves

dn1

dn2

dn3

5.4、安装其他集群机器

 Scala快速分发:

scp -r /home/utoken/software/scala-2.10.5 utoken@dn1:/home/utoken/software/

spark快速分发:

scp -r spark-1.3.0-bin-hadoop2.4 utoken@dn1:/home/utoken/software/

5.5、启动/关闭spark启动和关闭集群

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ ./sbin/start-all.sh

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ ./sbin/stop-all.sh

单独启动和关闭Master

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ ./sbin/start-master.sh

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ ./sbin/stop-master.sh

单独启动Slaves

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ ./sbin/start-slaves.sh

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ ./sbin/stop-slaves.sh

单独启动Slaves中的某个Worker,同时指定端口(不存在端口冲突的情况下不需要指定)

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$./bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077

指定某个端口

-webui-port 8082

5.6、启动HA方式第一步,启动整个集群:

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ ./sbin/start-all.sh

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ ./sbin/stop-all.sh

第二步,启动Standby,Spark

[utoken@nn2 spark-1.3.0-bin-hadoop2.4]$ ./sbin/start-master.sh

[utoken@nn2 spark-1.3.0-bin-hadoop2.4]$ ./sbin/stop-master.sh

5.7、检查是否启动主节点

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ jps

31246 Master

Slaves节点

[utoken@dn2 software]$ jps

7734 Worker

5.8、发布/停止Driver应用5.8.1、发布应用命令:

./bin/spark-submit \

  --class

  --master \

  --deploy-mode \

  --conf = \

  ... # other options

  \

  [application-arguments]

参数说明:

--class指定你需要执行的主类,

如--class com.pride.market.spark.streaming.Bootstrap

--master指定master地址,如:spark://nn1:7077

--deploy-mode 指定Driver运行的模式,默认是Client方式,有如下两个选项:client、cluster

--supervise       失败后是否重启Driver,仅限于Spark StandAlong模式

--jar 表示你的应用程序的依赖包,多个依赖包逗号分割即可

application-jar表示你的应用程序的jar包,该jar包必须在所有节点都可见,你可以上传到HDFS文件系统中,采用hdfs://......jar方式,特别是采用cluster模式,就必须全局可见;如果采用的是client,只需要在命令最后跟上jar即可自动上传到各个worker节点上去

application-arguments表示你的主类需要传递的参数main(args),

如果需要查看配置选项是从哪里来的,可以用打开--verbose选项来生成更详细的运行信息以做参考

5.8.2、停止任务(Application/Driver)

1、Client方式发布的应用停止方式:

[utoken@nn1 ~]$ jps

18376 SparkSubmit

[utoken@nn1 ~]$kill -9 18376

2、Cluster模式发布的应用停止方式:

./bin/spark-class org.apache.spark.deploy.Client kill

其中Master URL表示集群Master地址,如spark://nn1:7077

Driver ID需要通过Master的8080端口查看,地址:http://nn1:8080

在Running Drivers中找到这里的Submission ID。

如:

[utoken@nn2 sbin]$ spark-class org.apache.spark.deploy.Client kill spark://nn1:7077 driver-20150505151651-0014

查看yarn任务列表

#yarn application -list

[bdata@nn1 ~]$ yarn application -kill applicationId

详细发布命令及示例可见官网地址:

http://spark.apache.org/docs/latest/submitting-applications.html

5.9、启动/关闭Driver的历史监控

每一个SparkContext启动一个web UI,默认情况下在端口4040, 显示关于应用程序的有用信息,包括:

调度器阶段和任务的列表

RDD大小和内存使用的概览

环境信息。

关于运行中执行器的信息

5.9.1、修改spark-default.conf配置(如果没有做下配置,日志将不会持久化,一旦运行完毕后,无法查看日志情况)

在最后增加如下选项

#是否启用事件日志记录,这里启用

spark.eventLog.enabled  true

#Driver任务运行的日志生成目录

spark.eventLog.dir      hdfs://dfscluster/sparktst/eventslog

#监控页面需要监控的目录,需要先启用和指定事件日志目录,配合上面两项使用

spark.history.fs.logDirectory   hdfs://dfscluster/sparktst/eventslog

特殊说明:hdfs://dfscluster/sparktst/eventslog,该目录为HDFS的目录,请提前创建好,同时这里用到了HADOOP HA模式的集群名称dfscluster,所以我们需要把HADOOP的配置文件hdfs-site.xml复制到Spark的conf目录下,这样就不会报集群名字dfscluster找不到的问题,详见问题12

更为详细的文档请查看官方文档:

http://spark.apache.org/docs/latest/monitoring.html

5.9.2、启动/关闭历史记录服务

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ sbin/start-history-server.sh

[utoken@nn1 spark-1.3.0-bin-hadoop2.4]$ sbin/stop-history-server.sh

5.10、hive on spark的配置

5.10.1、根据《Hive安装配置》文档,进行Hive的安装配置

5.10.2、复制Hive的配置文件hive-site.xml到Spark的配置目录conf下,然后分发到Spark的整个集群

[bdata@bdata4 hadoop]$ for i in {34,35,36,37,38};do scp hive-site.xml 192.168.10.$i:/u01/spark-1.5.1/conf/ ; done

5.10.3、启动整个集群,即可

5.10.4、测试

#采用集群方式启动spark-shell

[bdata@bdata4 sbin]$ spark-shell --master spark://bdata4:7077

#构建hiveContext

scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

16/01/13 17:30:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

sqlContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2f6c736

#进行sql的查询操作,这里的表YHJHK_IY02,是提前在hive中创建好的,并已经加载了数据,故这里可以直接查询

scala> sqlContext.sql("select * from YHJHK_IY02 where AAC002 = '510922197303151981'").collect().foreach(println) 

数据略

其他启动方式

直接基于spark-sql方式启动

[bdata@bdata4 sbin]$./spark-sql --master spark://bdata4:7077

基于YARN模式启动,与Hive整合只支持yarn-client模式,不支持yarn-cluster

[bdata@bdata4 sbin]$./spark-sql --master yarn-client

上面两种方式都可以指定执行参数,如后面跟参数(注意有些参数在各个集群下生效情况是不一样的哈)

--driver-memory 1G --driver-cores 2 --executor-memory 4G 或者 --driver-memory 1G --driver-cores 2 --executor-cores 4 --num-executors 8 --executor-memory 4G

注意:如果发布任务的总内存超过了物理机器的总内存,该任务将不会执行,因此,一定注意核算集群的内存总大小剩余,于系统核心数无关

六、相关参数说明

详见官网配置表:

http://spark.apache.org/docs/latest/configuration.html

Driver Application设置(该内容也可以设置在spark-default.conf):

#允许通许的消息大小,由于默认消息太小导致通讯失败,故可以设置更大的值(该值表示Spark使用AKKA框架中的Actor通信消息的最大容量(如任务task的输出结果,因为整个spark集群的消息传递都是通过Actor进行的,默认10M,在处理大规模数据时,可以调整该值))

spark.akka.frameSize=10

#分区数量(可理解为并行线程数),建议设置成内核数的2-3倍

spark.default.parallelism=9

#是否自动清除不需要再次使用的RDD(优化内存存储)

spark.streaming.unpersist=true

#是否将多个suffle小文件合并成一个大文件(优化磁盘存储)

spark.shuffle.consolidateFiles=true

#把指定毫秒内接受到的流数据放入一个block文件中(优化磁盘存储)

spark.streaming.blockInterval=1000

#spark存储模式设为序列化存储是才生效,序列化类

spark.serializer=org.apache.spark.serializer.KryoSerializer

spark-env.conf

# -SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)

# -SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).

# -SPARK_WORKER_CORES, to set the number of cores to use on this machine

# -SPARK_WORKER_INSTANCES, to set the number of worker processes per node

性能优化

1、spark中有两个性能优化比较重要的指标,数据压缩和序列化

数据压缩:

工具:目前一般采用LZF和Snappy两种方式,LZF压缩比高,Snappy压缩时间花费长

配置:

spark-env.sh

export SPARK_JAVA_OPTS="-Dspark.broadcast.compress"

程序配置:conf.set("spark.broadcast.compress",true)

配置压缩方式:spark.io.compression.codec

序列化:

目的:进程间通讯、数据持久化到磁盘

工具:Java自身序列化和Kyro序列化,Kyro拥有紧凑、快速、轻量、扩展性好、自定义强等有点

发布到yarn上面的应用需要设置的几个特殊参数

--num-executors  控制为这个应用分配多少个Executor , 仅限于Spark on Yarn模式

--executor-memory  控制应用被分配到的每个Executor的内存大小,默认1G 支持Spark standalone、Yarn、Mesos

--executor-cores 控制应用被分配到的每个Executor的CPU核心数量

通过上面这些参数可以限制用户提交的应用不会过多的占用系统资源

其他

--supervise 失败后是否重启Driver,仅限于Spark  standalone模式

--driver-cores NUM Driver程序的使用CPU个数,仅限于Spark standalone模式

--total-executor-cores NUM executor使用的总核数,仅限于Spark standalone、Spark on Mesos模式

2、lineage(血统)

Narrow依赖:一个父分区对应一个子分区或多个父分区只对应一个子分区

Wide依赖:一个父分区对应多个子分区或多个父分区对应多个子分区

这里我们优先选择使用Narrow Dependency,向上会查重算性能消耗比较低,而Wide方式会出现计算过度,由于依赖的父子分区较多,链条复杂,故计算消耗的性能也比较多,同时由于需要通过父分区重算数据,那么也会重新计算已经计算过未丢失的子分区数据,所以导致大量冗余数据重算开销。

3、Checkpoint

用于设置检查点数据的存储,进而将数据存储备份,是对lineage做容错的辅助,lineage过长会导致容错成本过高,那么我们可以使用检查点方式进行数据恢复,在中间结果进行检查点容错,若出现分区丢失,可以直接通过检查点还原了,在做检查点时,记住需要对数据进行一个Cache,也就是最好是内存中已经缓存的RDD,否则会造成重新计算问题,消耗性能。

4、Shuffle

本意为洗牌、混洗,把一组有一定规则的数据进行打散重新组合转换为一组无规则随机数据分区。而在Spark中Shuffle则表示把一组无规则的数据尽量转换成一组具有一定规则的数据,和它本意正好相反。

5、减少重复Jar包复制分发

由于spark每次提交任务的时候,都会把相应的jar包提交到任务所在的机器中,这样同一个任务多次提交后,会出现jar包重复提交占用磁盘空间,为了减少jar包重复提交,我们需要把应用任务需要使用到的jar包上传到HDFS中,进行地址引用即可,这样就不会出现每次发布任务都上次一次jar包的情况了,达到了多次提交都引用同一个地址的jar包目的,减少了系统磁盘的占用。

6、监控java虚拟机(Linux)的性能

推荐采用:yourkit,一个比较友好的Java Profile工具

特殊说明(Spark、Flink):

1、流式计算:Flink、Storm支持毫秒级计算。Spark目前(V1.5.1)只支持秒级计算,并可运行在100个节点以上的集群。Storm目前最小的延迟是100ms左右。

Flink是一行一行处理,而Spark是基于数据片集合(RDD)进行小批量处理,所以Spark在流式处理方面,不可避免增加一些延时。

2、HADOOP兼容:Flink对Hadoop有着更好的兼容,如可以支持原生HBase的TableMapper和TableReducer,唯一不足是现在只支持老版本的MapReduce方法,新版本的MapReduce方法无法得到支持,Spark则不支持TableMapper和TableReducer这些方法。

3、SQL支持:Spark对SQL的支持比Flink支持的范围要大一些,另外Spark支持对SQL的优化,而Flink支持主要是对API级的优化。

4、Flink支持自动优化迭代式计算

5、Spark后续优势Spark SQL

6、Spark Streaming的Lineage容错方式,数据都是多分冗余容错的,如果数据来至与HDFS,那么默认一般HDFS是三份备份,如果数据来自于网络,那么它会对每一个数据流复制两份到其他的机器,冗余容错。

到此,相信大家对“Spark HA的部署方案”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


文章名称:SparkHA的部署方案
路径分享:http://azwzsj.com/article/piiijg.html