本文作者:KTV免费预定

spark的注册服务(spark客服电话)

KTV免费预定 2023年01月14日 16:26:17 2

本文目录一览:

大疆晓Spark如何进行民航局实名登记?

(1)注册并登录登记系统spark的注册服务

(2)点击左侧的“无人机管理”spark的注册服务,“新增品牌无人机”;

(3)按照指引填写无人机序列号(即SN码)spark的注册服务,星号必填,厂家选择:深圳市大疆创新科技有限公司;

(4)填写完成后,在邮箱中接收二维码,并打印处理,贴在机身醒目位置。

spark 怎么启动worker

基于spark1.3.1的源码进行分析

Spark master启动源码分析

1、在start-master.sh调用master的main方法spark的注册服务,main方法调用

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new MasterArguments(argStrings, conf)

val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//启动系统和actor

actorSystem.awaitTermination()

}

2、调用startSystemAndActor启动系统和创建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

val timeout = AkkaUtils.askTimeout(conf)

val portsRequest = actor.ask(BoundPortsRequest)(timeout)

val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]

(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)

3、调用AkkaUtils.createActorSystem来创建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int = (ActorSystem, Int) = { actualPort =

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、调用Utils.startServiceOnPort启动一个端口上的服务spark的注册服务,创建成功后调用doCreateActorSystem创建ActorSystem

5、ActorSystem创建成功后创建Actor

6、调用Master的主构造函数spark的注册服务,执行preStart()

1、start-slaves.sh调用Worker类的main方法

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new WorkerArguments(argStrings, conf)

val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

actorSystem.awaitTermination()

}

2、调用startSystemAndActor启动系统和创建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

cores: Int,

memory: Int,

masterUrls: Array[String],

workDir: String,

workerNumber: Option[Int] = None,

conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems

val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")

val actorName = "Worker"

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,

conf = conf, securityManager = securityMgr)

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))

actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,

masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)

(actorSystem, boundPort)

}

3、调用AkkaUtils的createActorSystem创建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int = (ActorSystem, Int) = { actualPort =

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、创建完ActorSystem后调用Worker的主构造函数spark的注册服务,执行preStart方法

override def preStart() {

assert(!registered)

logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(

host, port, cores, Utils.megabytesToString(memory)))

logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")

logInfo("Spark home: " + sparkHome)

createWorkDir()

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

shuffleService.startIfEnabled()

webUi = new WorkerWebUI(this, workDir, webUiPort)

webUi.bind()

registerWithMaster()

metricsSystem.registerSource(workerSource)

metricsSystem.start()

// Attach the worker metrics servlet handler to the web ui after the metrics system is started.

metricsSystem.getServletHandlers.foreach(webUi.attachHandler)

}

5、调用registerWithMaster方法向Master注册启动的worker

def registerWithMaster() {

// DisassociatedEvent may be triggered multiple times, so don't attempt registration

// if there are outstanding registration attempts scheduled.

registrationRetryTimer match {

case None =

registered = false

tryRegisterAllMasters()

connectionAttemptCount = 0

registrationRetryTimer = Some {

context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,

INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)

}

case Some(_) =

logInfo("Not spawning another attempt to register with the master, since there is an" +

" attempt scheduled already.")

}

}

6、调用tryRegisterAllMasters向Master发送注册的Worker消息

private def tryRegisterAllMasters() {

for (masterAkkaUrl - masterAkkaUrls) {

logInfo("Connecting to master " + masterAkkaUrl + "...")

val actor = context.actorSelection(masterAkkaUrl)

actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

}

}

7、Master的receiveWithLogging接收到消息执行

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =

{

logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

workerHost, workerPort, cores, Utils.megabytesToString(memory)))

if (state == RecoveryState.STANDBY) {

// ignore, don't send response

} else if (idToWorker.contains(id)) {

sender ! RegisterWorkerFailed("Duplicate worker ID")

} else {

val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

sender, workerUiPort, publicAddress)

if (registerWorker(worker)) {

persistenceEngine.addWorker(worker)

sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

schedule()

} else {

val workerAddress = worker.actor.path.address

logWarning("Worker registration failed. Attempted to re-register worker at same " +

"address: " + workerAddress)

sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "

+ workerAddress)

}

}

}

8、失败向worker返回失败消息,成功则返回Master的相关信息

9、返回消息后调用schedule,但是因为没有application,所以这时候不会进行资源的分配

至此整个Spark集群就已经启动完成

科普Spark,Spark是什么,如何使用Spark

科普Spark,Spark是什么,如何使用Spark

1.Spark基于什么算法的分布式计算(很简单)

2.Spark与MapReduce不同在什么地方

3.Spark为什么比Hadoop灵活

4.Spark局限是什么

5.什么情况下适合使用Spark

什么是Spark

Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。其架构如下图所示:

Spark与Hadoop的对比

Spark的中间数据放到内存中,对于迭代运算效率更高。

Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。

Spark比Hadoop更通用

Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。

这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。

不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。

容错性

在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。

可用性

Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。

Spark与Hadoop的结合

Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。

Spark的适用场景

Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小(大数据库架构中这是是否考虑使用Spark的重要因素)

由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。总的来说Spark的适用面比较广泛且比较通用。

运行模式

本地模式

Standalone模式

Mesoes模式

yarn模式

Spark生态系统

Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。

Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。

Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。

End.

Spark通信框架Spark Network Common

一直以来spark的注册服务,基于Akka实现的RPC通信框架是Spark引以为豪的主要特性spark的注册服务,也是与Hadoop等分布式计算框架对比过程中一大亮点,但是时代和技术都在演化,从Spark1.3.1版本开始, 为了解决大块数据(如Shuffle)的传输问题 ,Spark引入了Netty通信框架,到了1.6.0版本,Netty完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。

JAVA IO也经历了几次演化,从最早的BIO(阻塞式/非阻塞IO),到1.4版本的NIO(IO复用),到1.7版本的NIO2.0/AIO(异步IO)。

基于早期BIO来实现高并发网络服务器都是依赖多线程来实现,但是线程开销较大,BIO的瓶颈明显,NIO的出现解决了这一大难题, 基于IO复用解决了IO高并发 。

但是NIO有也有几个缺点:

因为这几个原因,促使了很多JAVA-IO通信框架的出现,Netty就是其中一员,它也因为高度的稳定性,功能性,性能等特性,成为Java开发的首选

首先是NIO的上层封装,Netty提供了NioEventLoopGroup / NioSocketChannel / NioServerSocketChannel的组合来完成实际IO操作,继而在此之上实现数据流Pipeline以及EventLoop线程池等功能。

另外它又重写了NIO,JDK-NIO底层是基于Epoll的LT模式来实现,而Netty是基于Epoll的ET模式实现的一组IO操作EpollEventLoopGroup / EpollSocketChannel / EpollServerSocketChannel

Netty对两种实现进行完美的封装,可以根据业务的需求来选择不同的实现

从Akka出现背景来说,它是基于Actor的RPC通信系统,它的核心概念也是Message,它是基于协程的,性能不容置疑;基于scala的偏函数,易用性也没有话说,但是它毕竟只是RPC通信,无法适用大的package/stream的数据传输,这也是Spark早期引入Netty的原因。

首先不容置疑的是Akka可以做到的,Netty也可以做到,但是Netty可以做到,Akka却无法做到。原因是啥?在软件栈中,Akka相比Netty要Higher一点,它专门针对RPC做了很多事情,而Netty相比更加基础一点, 可以为不同的应用层通信协议(RPC,FTP,HTTP等)提供支持 ,在早期的Akka版本,底层的NIO通信就是用的Netty。

其次一个优雅的工程师是不会允许一个系统中容纳两套通信框架spark的注册服务!最后,虽然Netty没有Akka协程级的性能优势,但是Netty内部高效的Reactor线程模型,无锁化的串行设计,高效的序列化,零拷贝,内存池等特性也保证了Netty不会存在性能问题。

那么Spark是怎么用Netty来取代Akka呢?一句话,利用偏函数的特性,基于Netty“仿造”出一个简约版本的Actor模型。

对于Network通信,不管传输的是序列化后的对象还是文件,在网络上表现的都是字节流。在传统IO中,字节流表示为Stream;在NIO中,字节流表示为ByteBuffer;在Netty中字节流表示为ByteBuff或FileRegion;在Spark中,针对Byte也做了一层包装,支持对Byte和文件流进行处理,即ManagedBuffer;

ManagedBuffer包含了三个函数createInputStream(),nioByteBuffer(),convertToNetty()来对Buffer进行“类型转换”,分别获取stream,ByteBuffer,ByteBuff或FileRegion;NioManagedBuffer / NettyManagedBuffer / FileSegmentManagedBuffer也是针对性提供了具体的实现。

更好的理解ManagedBuffer :比如Shuffle BlockManager模块需要在内存中维护本地executor生成的shuffle-map输出的文件引用,从而可以提供给shuffleFetch进行远程读取,此时文件表示为FileSegmentManagedBuffer,shuffleFetch远程调用FileSegmentManagedBuffer.nioByteBuffer / createInputStream函数从文件中读取为Bytes,并进行后面的网络传输。如果已经在内存中bytes就更好理解了,比如将一个字符数组表示为NettyManagedBuffer。

协议是应用层通信的基础,它提供了应用层通信的数据表示,以及编码和解码的能力。在Spark Network Common中,继承AKKA中的定义,将协议命名为Message,它继承Encodable,提供了encode的能力。

Message根据请求响应可以划分为RequestMessage和ResponseMessage两种;对于Response,根据处理结果,可以划分为Failure和Success两种类型;根据功能的不同,主要划分为Stream,ChunkFetch,Rpc。

Server构建在Netty之上,它提供两种模型NIO和Epoll,可以通过参数(spark.[module].io.mode)进行配置,最基础的module就是shuffle,不同的IOMode选型,对应了Netty底层不同的实现,Server的Init过程中,最重要的步骤就是根据不同的IOModel完成EventLoop和Pipeline的构造

其中,MessageEncoder/Decoder针对网络包到Message的编码和解码,而最为核心就TransportRequestHandler,它封装了对所有请求/响应的处理;

TransportChannelHandler内部实现也很简单,它封装了responseHandler和requestHandler,当从Netty中读取一条Message以后,根据判断路由给相应的responseHandler和requestHandler。

Sever提供的RPC,ChunkFecth,Stream的功能都是依赖TransportRequestHandler来实现的;从原理上来说,RPC与ChunkFecth / Stream还是有很大不同的,其中RPC对于TransportRequestHandler来说是功能依赖,而ChunkFecth / Stream对于TransportRequestHandler来说只是数据依赖。

怎么理解?即TransportRequestHandler已经提供了ChunkFecth / Stream的实现,只需要在构造的时候,向TransportRequestHandler提供一个streamManager,告诉RequestHandler从哪里可以读取到Chunk或者Stream。而RPC需要向TransportRequestHandler注册一个rpcHandler,针对每个RPC接口进行功能实现,同时RPC与ChunkFecth / Stream都会有同一个streamManager的依赖,因此注入到TransportRequestHandler中的streamManager也是依赖rpcHandler来实现,即rpcHandler中提供了RPC功能实现和streamManager的数据依赖。

Server是通过监听一个端口,注入rpcHandler和streamManager从而对外提供RPC,ChunkFecth,Stream的服务,而Client即为一个客户端类,通过该类,可以将一个streamId / chunkIndex对应的ChunkFetch请求,streamId对应的Stream请求,以及一个RPC数据包对应的RPC请求发送到服务端,并监听和处理来自服务端的响应;其中最重要的两个类即为TransportClient和TransportResponseHandler分别为上述的“客户端类”和“监听和处理来自服务端的响应"。

那么TransportClient和TransportResponseHandler是怎么配合一起完成Client的工作呢? 由TransportClient将用户的RPC,ChunkFecth,Stream的请求进行打包并发送到Server端,同时将用户提供的回调函数注册到TransportResponseHandler,TransportResponseHandler是TransportChannelHandler的一部分,在TransportChannelHandler接收到数据包,并判断为响应包以后,将包数据路由到TransportResponseHandler中,在TransportResponseHandler中通过注册的回调函数,将响应包的数据返回给客户端

无论是BlockTransfer还是ShuffleFetch都需要跨executor的数据传输,在每一个executor里面都需要运行一个Server线程(后面也会分析到,对于Shuffle也可能是一个独立的ShuffleServer进程存在)来提供对Block数据的远程读写服务

在每个Executor里面,都有一个BlockManager模块,它提供了对当前Executor所有的Block的“本地管理”,并对进程内其他模块暴露getBlockData(blockId: BlockId): ManagedBuffer的Block读取接口,但是这里GetBlockData仅仅是提供本地的管理功能,对于跨远程的Block传输,则由NettyBlockTransferService提供服务。

NettyBlockTransferService本身即是Server,为其他其他远程Executor提供Block的读取功能,同时它即为Client,为本地其他模块暴露fetchBlocks的接口,支持通过host/port拉取任何Executor上的一组的Blocks。

源码位置 spark-core: org.apache.spark.network.netty

NettyBlockTransferService作为一个Server,与Executor或Driver里面其他的服务一样,在进程启动时,由SparkEnv初始化构造并启动服务,在整个运行时的一部分。

一个Server的构造依赖RpcHandler提供RPC的功能注入以及提供streamManager的数据注入。对于NettyBlockTransferService,该RpcHandler即为NettyBlockRpcServer,在构造的过程中,需要与本地的BlockManager进行管理,从而支持对外提供本地BlockMananger中管理的数据

RpcHandler提供RPC的功能注入 在这里还是属于比较“简陋的”,毕竟他是属于数据传输模块,Server中提供的chunkFetch和stream已经足够满足他的功能需要,那现在问题就是怎么从streamManager中读取数据来提供给chunkFetch和stream进行使用呢?

就是NettyBlockRpcServer作为RpcHandler提供的一个Rpc接口之一:OpenBlocks,它接受由Client提供一个Blockids列表,Server根据该BlockIds从BlockManager获取到相应的数据并注册到streamManager中,同时返回一个StreamID,后续Client即可以使用该StreamID发起ChunkFetch的操作。

从NettyBlockTransferService作为一个Server,我们基本可以推测NettyBlockTransferService作为一个Client支持fetchBlocks的功能的基本方法:

同时,为了提高服务端稳定性,针对fetchBlocks操作NettyBlockTransferService提供了非重试版本和重试版本的BlockFetcher,分别为OneForOneBlockFetcher和RetryingBlockFetcher,通过参数(spark.[module].io.maxRetries)进行配置,默认是重试3次

在Spark,Block有各种类型,可以是ShuffleBlock,也可以是BroadcastBlock等等,对于ShuffleBlock的Fetch,除了由Executor内部的NettyBlockTransferService提供服务以外,也可以由外部的ShuffleService来充当Server的功能,并由专门的ExternalShuffleClient来与其进行交互,从而获取到相应Block数据。功能的原理和实现,基本一致,但是问题来了, 为什么需要一个专门的ShuffleService服务呢? 主要原因还是为了做到任务隔离,即减轻因为fetch带来对Executor的压力,让其专心的进行数据的计算。

在目前Spark中,也提供了这样的一个AuxiliaryService:YarnShuffleService,但是对于Spark不是必须的,如果你考虑到需要“ 通过减轻因为fetch带来对Executor的压力 ”,那么就可以尝试尝试。

同时,如果启用了外部的ShuffleService,对于shuffleClient也不是使用上面的NettyBlockTransferService,而是专门的ExternalShuffleClient,功能逻辑基本一致!

Akka的通信模型是基于Actor,一个Actor可以理解为一个Service服务对象,它可以针对相应的RPC请求进行处理,如下所示,定义了一个最为基本的Actor:

Actor内部只有唯一一个变量(当然也可以理解为函数了),即Receive,它为一个偏函数,通过case语句可以针对Any信息可以进行相应的处理,这里Any消息在实际项目中就是消息包。

另外一个很重要的概念就是ActorSystem,它是一个Actor的容器,多个Actor可以通过name-Actor的注册到Actor中,在ActorSystem中可以根据请求不同将请求路由给相应的Actor。ActorSystem和一组Actor构成一个完整的Server端,此时客户端通过host:port与ActorSystem建立连接,通过指定name就可以相应的Actor进行通信,这里客户端就是ActorRef。所有Akka整个RPC通信系列是由Actor,ActorRef,ActorSystem组成。

Spark基于这个思想在上述的Network的基础上实现一套自己的RPC Actor模型,从而取代Akka。其中RpcEndpoint对应Actor,RpcEndpointRef对应ActorRef,RpcEnv即对应了ActorSystem。

RpcEndpoint与Actor一样,不同RPC Server可以根据业务需要指定相应receive/receiveAndReply的实现,在Spark内部现在有N多个这样的Actor,比如Executor就是一个Actor,它处理来自Driver的LaunchTask/KillTask等消息。

RpcEnv相对于ActorSystem:

RpcEndpointRef即为与相应Endpoint通信的引用,它对外暴露了send/ask等接口,实现将一个Message发送到Endpoint中。

这就是新版本的RPC框架的基本功能,它的实现基本上与Akka无缝对接,业务的迁移的功能很小,目前基本上都全部迁移完了。

RpcEnv不仅从外部接口与Akka基本一致,在内部的实现上,也基本差不多,都是按照MailBox的设计思路来实现的;

RpcEnv即充当着Server,同时也为Client内部实现。

当作为Server ,RpcEnv会初始化一个Server,并注册NettyRpcHandler。RpcHandler的receive接口负责对每一个请求进行处理,一般情况下,简单业务可以在RpcHandler直接完成请求的处理,但是考虑一个RpcEnv的Server上会挂载了很多个RpcEndpoint,每个RpcEndpoint的RPC请求频率不可控,因此需要对一定的分发机制和队列来维护这些请求,其中Dispatcher为分发器,InBox即为请求队列;

在将RpcEndpoint注册到RpcEnv过程中,也间接的将RpcEnv注册到Dispatcher分发器中,Dispatcher针对每个RpcEndpoint维护一个InBox,在Dispatcher维持一个线程池(线程池大小默认为系统可用的核数,当然也可以通过spark.rpc.netty.dispatcher.numThreads进行配置),线程针对每个InBox里面的请求进行处理。当然实际的处理过程是由RpcEndpoint来完成。

其次RpcEnv也完成Client的功能实现 ,RpcEndpointRef是以RpcEndpoint为单位,即如果一个进程需要和远程机器上N个RpcEndpoint服务进行通信,就对应N个RpcEndpointRef(后端的实际的网络连接是公用,这个是TransportClient内部提供了连接池来实现的),当调用一个RpcEndpointRef的ask/send等接口时候,会将把“消息内容+RpcEndpointRef+本地地址”一起打包为一个RequestMessage,交由RpcEnv进行发送。注意这里打包的消息里面包括RpcEndpointRef本身是很重要的,从而可以由Server端识别出这个消息对应的是哪一个RpcEndpoint。

和发送端一样,在RpcEnv中,针对每个remote端的host:port维护一个队列,即OutBox,RpcEnv的发送仅仅是把消息放入到相应的队列中,但是和发送端不一样的是:在OutBox中没有维护一个所谓的线程池来定时清理OutBox,而是通过一堆synchronized来实现的,add之后立刻消费。

摘自:Github/ColZer

spark安装与运行模式

Spark spark的注册服务的运行模式有 Local(也称单节点模式)spark的注册服务,Standalone(集群模式),Spark on Yarn(运行在Yarn上),Mesos以及K8s等常用模式,本文介绍前三种模式。

Spark-shell 参数

Spark-shell 是以一种交互式命令行方式将Spark应用程序跑在指定模式上,也可以通过Spark-submit提交指定运用程序,Spark-shell 底层调用的是Spark-submit,二者的使用参数一致的,通过- -help 查看参数:

sparkconf的传入有三种方式:

1.通过在spark应用程序开发的时候用set()方法进行指定

2.通过在spark应用程序提交的时候用过以上参数指定,一般使用此种方式,因为使用较为灵活

3.通过配置spark-default.conf,spark-env.sh文件进行指定,此种方式较shell方式级别低

Local模式

Local 模式是最简单的一种Spark运行方式,它采用单节点多线程(cpu)方式运行,local模式是一种OOTB(开箱即用)的方式,只需要在spark-env.sh导出JAVA_HOME,无需其spark的注册服务他任何配置即可使用,因而常用于开发和学习

方式:./spark-shell - -master local[n] ,n代表线程数

Standalone模式

Spark on Yarn

on Yarn的俩种模式

客户端的Driver将应用提交给Yarn后,Yarn会先后启动ApplicationMaster和excutor,另外ApplicationMaster和executor都装在在container里运行,container默认的内存是1g,ApplicationMaster分配的内存是driver-memory,executor分配的内存是executor-memory.同时,因为Driver在客户端,所以程序的运行结果可以在客户端显示,Driver以进程名为SparkSubmit的形式存在。

Cluster 模式

1.由client向ResourceManager提交请求,并上传Jar到HDFS上

这期间包括四个步骤:

a).连接到RM

b).从RM ASM(applicationsManager)中获得metric,queue和resource等信息。

c).upload app jar and spark-assembly jar

d).设置运行环境和container上下文

2.ResourceManager向NodeManager申请资源,创建Spark ApplicationMaster(每个SparkContext都有一个ApplicationManager)

3.NodeManager启动Spark App Master,并向ResourceManager ASM注册

4.Spark ApplicationMaster从HDFS中找到jar文件,启动DAGScheduler和YARN Cluster Scheduler

5.ResourceManager向ResourceManager ASM注册申请container资源(INFO YarnClientImpl: Submitted application)

6.ResourceManager通知NodeManager分配Container,这是可以收到来自ASM关于container的报告。(每个container的对应一个executor)

7.Spark ApplicationMaster直接和container(executor)进行交互,完成这个分布式任务。

进入spark安装目录下的conf文件夹

[atguigu@hadoop102 module] mv slaves.template slaves

[atguigu@hadoop102 conf] vim slaves

hadoop102

hadoop103

hadoop104

4)修改spark-env.sh文件,添加如下配置:

[atguigu@hadoop102 conf]$ vim spark-env.sh

SPARK_MASTER_HOST=hadoop102

SPARK_MASTER_PORT=7077

5)分发spark包

[atguigu@hadoop102 module] sbin/start-all.sh

注意:如果遇到 “JAVA_HOME not set” 异常,可以在sbin目录下的spark-config.sh 文件中加入如下配置:

export JAVA_HOME=XXXX

官方求PI案例

spark-submit

--class org.apache.spark.examples.SparkPi

--master spark://server-2:7077

--executor-memory 1G

--total-executor-cores 2

/home/xxx/software/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar

100

spark-shell

--master spark://server-2:7077

--executor-memory 1g

--total-executor-cores 2

spark-shell --master spark://server-2:7077 --executor-memory 1g --total-executor-cores 2

参数:--master spark://server-2:7077 指定要连接的集群的master

Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出

yarn-cluster:Driver程序运行在由RM(ResourceManager)启动的AP(APPMaster)适用于生产环境。

安装使用

1)修改hadoop配置文件yarn-site.xml,添加如下内容:

2)修改spark-env.sh,添加如下配置:

[atguigu@hadoop102 conf]$ vi spark-env.sh

YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop

3)分发配置文件

[atguigu@hadoop102 conf] xsync spark-env.sh

4)执行一个程序

spark-submit

--class org.apache.spark.examples.SparkPi

--master yarn

--deploy-mode client

/home/xxx/software/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar

100

注意:在提交任务之前需启动HDFS以及YARN集群。

日志查看

修改配置文件spark-defaults.conf

添加如下内容:

spark.yarn.historyServer.address=server-2:18080

spark.history.ui.port=18080

2)重启spark历史服务

[atguigu@hadoop102 spark] sbin/start-history-server.sh

starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/module/spark/logs/spark-atguigu-org.apache.spark.deploy.history.HistoryServer-1-hadoop102.out

3)提交任务到Yarn执行

spark-submit

--class org.apache.spark.examples.SparkPi

--master yarn

--deploy-mode client

/home/xxx/software/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar

100

阅读
分享