spark分布式计算框架
创始人
2024-01-30 21:08:02
0

MapReduce是计算逻辑清晰的,只有两个步骤,任务是JVM进程级别,每执行到什么步骤 去申请具体的资源。

 

而spark根本不知道具体有几个stage,逻辑未知,每个人的job stage等根本不知道。它是默认倾向于抢占资源的,他会在sparkContext()这个函数执行的时候,直接根据下面textFile()代码逻辑抢占所有资源,任务以JVM线程的级别泡在Excutor里面

目前已知的: 每一个Excutor里面的就是一个job的stage,一个excutor跑在某一个节点,等需要shuffle的时候,另一个节点通过他的excutor把刚才的数据拿过来


Spark-Core源码解析:

首先 这些都是分布式计算框架,不同个体对象之间要使用RPC

1. RPC

 下图为Master.scala里面 start-master.sh的具体流程

 RPC也就是发送方与接收方通过传输层进行远程通信连接,来调用对方远程的服务。传输层里面最代表的就是Netty,他把各种IO进行了封装了只提供相应接口。

除此之外,不同主机可能会有多个进程进行传输,所以在通信时可以指定传输目标的IO,防止接收混乱。

分发 与 队列设置也是帮助多个实体进行传输。

内部的inbox是为了暂时存储,方便未来看给自己的哪个函数调用。

除此之外 ,还要设定消息投递规则,如接收不到就重发 或者 只发一次等

什么是 RPC 框架 - jiuchengi - 博客园


 2. start-master首先要创建RpcEnv环境

一共分为两部分 Dispatcher(分发器)和传输服务(Netty)

Dispatcher 就是那个用来分发处理的   postMessage来生成data放入receive队列里面,threadpool线程池来使用死循环处理信息,根据信息具体内容实现不同 分发 操作

传输服务:

 上图是spark-core Master.scala里 RpcEnv(就是包含了分发dispatcher 以及传输层Netty)里的源码 是传输服务这一部分(通过startServer开启),也就是具体的Netty实现

上面的Loop就是一个类似死循环,不能让线程频繁的创建消亡,只开通

下面bootstrap里面的channel可以直接实现接收读取数据或者发送数据,以往写java的时候必须要实现input类和output类,而这里一个channel就可以直接实现读写操作,具体实现在里面的Handler里面,Handler就是实现了区分读取和写数据,然后将数据使用RpcEnv里的postMessage保存起来(应该是对应于那个写进队列操作)。而这个postMessage就是DIspatcher里的方法


3.经过创建RpcEnv环境后(Dispatcher和传输层Netty等之后),开始对Master进行操作。

最终结果就是利用rpcEnv创建EndPoint(Master)和 EndPointRef(对象主机的进程)

首先Master就是继承于 EndPoint     里面重写了receive方法 这个是 大前提

方法中想创建的EndPoint(启动Master角色)返回的是一个引用EndPointRef,EndPoint有receive和receiveAndReplay方法用来接收引用传的数据(send方法和ask方法),根据引用传不同的数据 相应不同的方法(对应于不同的角色)

这里的EndPoint主要是针对不同角色和Master进行交涉的时候,比如Driver想申请资源,创建引用传入message为Driver信息,一般来说一个EndPoint对应多个EndPointRef,

这种思想类似于接口 ref就是接口实现

在rpcenv里面注入master实体

并开始处理各类endpoint请求

val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    // name是别台主机的 endpoint是master def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {val addr = RpcEndpointAddress(nettyEnv.address, name) //别台主机的名字但是放 master的地址val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) //endpointRef就存着另一台主机端点 他的端口号主机地址等synchronized {if (stopped) {throw new IllegalStateException("RpcEnv has been stopped")}if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")}val data = endpoints.get(name)endpointRefs.put(data.endpoint, data.ref)receivers.offer(data)  // for the OnStart message}endpointRef}

里面会将各类主机信息注册到分发器Dispatcher,同时在填写分发器内部数据EndpointData时候,填的数据里面有个inbox来处理具体东西,他会先有个一个同步操作 即

inbox.synchronized {messages.add(OnStart)
}

这个操作会将启动进程放入message未来的队列里面,未来多线程运行的时候,他就会先启动。标志着start-master的开始。(它是由初始化inbox的时候生成的,所以先于所有其他端点信息进入队列)

后续的队列处理才会处理那些天的endpoints

也就是

val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

这一步 不仅注册了端点信息,还实现了startmaster,都是通过放入dispatcher处理队列里面实现的

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...