本文作者:KTV免费预定

怎样开启spark服务(如何启动spark集群)

KTV免费预定 2022年12月05日 18:50:04 1

本文目录一览:

cdh 安装的spark服务怎么用

Shark为了实现Hive兼容怎样开启spark服务,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);

同时还依赖Hive Metastore和Hive SerDe(用于兼容现有的各种Hive存储格式)。这一策略导致了两个问题,

第一是执行计划优化完全依赖于Hive,不方便添加新的优化策略;

二是因为MR是进程级并行,写代码的时候不是很注意线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支(至于为何相关修改没有合并到Hive主线,怎样开启spark服务我也不太清楚)。

Spark SQL解决了这两个问题。第一,Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe。也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。执行计划生成和优化都由Catalyst负责。借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行计划优化策略比Hive要简洁得多。

spark 怎么启动worker

基于spark1.3.1的源码进行分析

Spark master启动源码分析

1、在start-master.sh调用master的main方法,main方法调用

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new MasterArguments(argStrings, conf)

val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//启动系统和actor

actorSystem.awaitTermination()

}

2、调用startSystemAndActor启动系统和创建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

val timeout = AkkaUtils.askTimeout(conf)

val portsRequest = actor.ask(BoundPortsRequest)(timeout)

val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]

(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)

3、调用AkkaUtils.createActorSystem来创建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int = (ActorSystem, Int) = { actualPort =

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、调用Utils.startServiceOnPort启动一个端口上的服务,创建成功后调用doCreateActorSystem创建ActorSystem

5、ActorSystem创建成功后创建Actor

6、调用Master的主构造函数,执行preStart()

1、start-slaves.sh调用Worker类的main方法

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new WorkerArguments(argStrings, conf)

val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

actorSystem.awaitTermination()

}

2、调用startSystemAndActor启动系统和创建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

cores: Int,

memory: Int,

masterUrls: Array[String],

workDir: String,

workerNumber: Option[Int] = None,

conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems

val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")

val actorName = "Worker"

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,

conf = conf, securityManager = securityMgr)

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))

actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,

masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)

(actorSystem, boundPort)

}

3、调用AkkaUtils的createActorSystem创建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int = (ActorSystem, Int) = { actualPort =

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、创建完ActorSystem后调用Worker的主构造函数,执行preStart方法

override def preStart() {

assert(!registered)

logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(

host, port, cores, Utils.megabytesToString(memory)))

logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")

logInfo("Spark home: " + sparkHome)

createWorkDir()

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

shuffleService.startIfEnabled()

webUi = new WorkerWebUI(this, workDir, webUiPort)

webUi.bind()

registerWithMaster()

metricsSystem.registerSource(workerSource)

metricsSystem.start()

// Attach the worker metrics servlet handler to the web ui after the metrics system is started.

metricsSystem.getServletHandlers.foreach(webUi.attachHandler)

}

5、调用registerWithMaster方法向Master注册启动的worker

def registerWithMaster() {

// DisassociatedEvent may be triggered multiple times, so don't attempt registration

// if there are outstanding registration attempts scheduled.

registrationRetryTimer match {

case None =

registered = false

tryRegisterAllMasters()

connectionAttemptCount = 0

registrationRetryTimer = Some {

context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,

INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)

}

case Some(_) =

logInfo("Not spawning another attempt to register with the master, since there is an" +

" attempt scheduled already.")

}

}

6、调用tryRegisterAllMasters向Master发送注册的Worker消息

private def tryRegisterAllMasters() {

for (masterAkkaUrl - masterAkkaUrls) {

logInfo("Connecting to master " + masterAkkaUrl + "...")

val actor = context.actorSelection(masterAkkaUrl)

actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

}

}

7、Master的receiveWithLogging接收到消息执行

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =

{

logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

workerHost, workerPort, cores, Utils.megabytesToString(memory)))

if (state == RecoveryState.STANDBY) {

// ignore, don't send response

} else if (idToWorker.contains(id)) {

sender ! RegisterWorkerFailed("Duplicate worker ID")

} else {

val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

sender, workerUiPort, publicAddress)

if (registerWorker(worker)) {

persistenceEngine.addWorker(worker)

sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

schedule()

} else {

val workerAddress = worker.actor.path.address

logWarning("Worker registration failed. Attempted to re-register worker at same " +

"address: " + workerAddress)

sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "

+ workerAddress)

}

}

}

8、失败向worker返回失败消息,成功则返回Master的相关信息

9、返回消息后调用schedule,但是因为没有application,所以这时候不会进行资源的分配

至此整个Spark集群就已经启动完成

如何在eclipse运行spark

设置环境变量:HADOOP_HOME,在windows下调试Eclipse时,并不需要在windows下安装hadoop,只需要配置一下环境变量就好了,然后HADOOP_HOME执行的位置的bin下要有winUtils.exe,设置环境变量的方式不用讲了吧!

在Eclipse项目中添加spark-assembly-1.4.1-hadoop2.4.0.jar,这里添加你需要的版本。

注意,不要使用spark的分散的jar包,这一点主要是maven时需要注意,因为maven时它会下载好多jar,这里会引进第二个问题,序列化问题,应该是scala的问题。

设置环境变量SPARK_CLASSPATH,这个环境变量不是必须的,你看一下你的服务器是不是设置了。

其实这一步和下面的一步本质作用是一样的,都是告诉spark依赖的jar,而告诉spark依赖jar的方法有两种,一种是设置环境变量,另一种是addJar方式。作者是这样认为,如果公用的jar,那么就通过设置环境变量的方式,如果是算法私有的jar,就通过addJar方式。

注意,环境变量SPAKR_CLASSPATH的值是unix方式,就是跟服务器设置的一样,不要用win的方式“;”,为啥呢?作者认为,通过设置环境变量的方式时,在初始化spark上下文时,并没有分发jar到各个worker,所以需要指定jar在服务器的位置。(有问题的话,请留言,嘴拙)

addJar方式,上面讲过了。

与环境变量的方式区别在于,驱动程序启动,初始化上下文时,需要分发jar到各个worker,所以针对私有的jar,最好使用addJar方式。

到了这里,应该全部配置完成了,如果我讲述的比较清晰的话,你应该可以随意调试了。

如果存在问题,请指正。如果有疑问,请留言。

如何在本地安装运行Spark?

2.1.2 在Windows上安装与配置Spark

本节介绍在Windows系统上安装Spark的过程。在Windows环境下需要安装Cygwin模拟Linux的命令行环境来安装Spark。

(1)安装JDK

相对于Linux、Windows的JDK安装更加自动化,用户可以下载安装Oracle JDK或者OpenJDK。只安装JRE是不够的,用户应该下载整个JDK。

安装过程十分简单,运行二进制可执行文件即可,程序会自动配置环境变量。

(2)安装Cygwin

Cygwin是在Windows平台下模拟Linux环境的一个非常有用的工具,只有通过它才可以在Windows环境下安装Hadoop和Spark。具体安装步骤如下。

1)运行安装程序,选择install from internet。

2)选择网络最好的下载源进行下载。

3)进入Select Packages界面(见图2-2),然后进入Net,选择openssl及openssh。因为之后还是会用到ssh无密钥登录的。

另外应该安装“Editors Category”下面的“vim”。这样就可以在Cygwin上方便地修改配置文件。

最后需要配置环境变量,依次选择“我的电脑”→“属性”→“高级系统设置”→“环境变量”命令,更新环境变量中的path设置,在其后添加Cygwin的bin目录和Cygwin的usr\bin两个目录。

(3)安装sshd并配置免密码登录

1)双击桌面上的Cygwin图标,启动Cygwin,执行ssh-host-config -y命令,出现如图2-3所示的界面。

2)执行后,提示输入密码,否则会退出该配置,此时输入密码和确认密码,按回车键。最后出现Host configuration finished.Have fun!表示安装成功。

3)输入net start sshd,启动服务。或者在系统的服务中找到并启动Cygwin sshd服务。

注意,如果是Windows 8操作系统,启动Cygwin时,需要以管理员身份运行(右击图标,选择以管理员身份运行),否则会因为权限问题,提示“发生系统错误5”。

(4)配置SSH免密码登录

1)执行ssh-keygen命令生成密钥文件,如图2-4所示。

2)执行此命令后,在你的Cygwin\home\用户名路径下面会生成.ssh文件夹,可以通过命令ls -a /home/用户名 查看,通过ssh -version命令查看版本。

3)执行完ssh-keygen命令后,再执行下面命令,生成authorized_keys文件。

cd ~/.ssh/

cp id_dsa.pub authorized_keys

这样就配置好了sshd服务。

(5)配置Hadoop

修改和配置相关文件与Linux的配置一致,读者可以参照上文Linux中的配置方式,这里不再赘述。

(6)配置Spark

修改和配置相关文件与Linux的配置一致,读者可以参照上文Linux中的配置方式,这里不再赘述。

(7)运行Spark

1)Spark的启动与关闭

①在Spark根目录启动Spark。

./sbin/start-all.sh

②关闭Spark。

./sbin/stop-all.sh

2)Hadoop的启动与关闭

①在Hadoop根目录启动Hadoop。

./sbin/start-all.sh

②关闭Hadoop。

./sbin/stop-all.sh

3)检测是否安装成功

正常状态下会出现如下内容。

-bash-4.1# jps

23526 Jps

2127 Master

7396 NameNode

7594 SecondaryNameNode

7681 ResourceManager

1053 DataNode

31935 NodeManager

1405 Worker

如缺少进程请到logs文件夹下查看相应日志,针对具体问题进行解决。

科普Spark,Spark是什么,如何使用Spark

科普Spark怎样开启spark服务,Spark是什么,如何使用Spark

1.Spark基于什么算法怎样开启spark服务的分布式计算(很简单)

2.Spark与MapReduce不同在什么地方

3.Spark为什么比Hadoop灵活

4.Spark局限是什么

5.什么情况下适合使用Spark

什么是Spark

Spark是UC Berkeley AMP lab所开源怎样开启spark服务的类Hadoop MapReduce怎样开启spark服务的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。其架构如下图所示:

Spark与Hadoop的对比

Spark的中间数据放到内存中,对于迭代运算效率更高。

Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。

Spark比Hadoop更通用

Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。

这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。

不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。

容错性

在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。

可用性

Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。

Spark与Hadoop的结合

Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。

Spark的适用场景

Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小(大数据库架构中这是是否考虑使用Spark的重要因素)

由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。总的来说Spark的适用面比较广泛且比较通用。

运行模式

本地模式

Standalone模式

Mesoes模式

yarn模式

Spark生态系统

Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。

Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。

Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。

End.

阅读
分享