- 1.课程目标:使用Akka实现一个简易版的Spark通信框架
- 2.项目概述:
目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常多,比如前我们学过的Hadoop项目的RPC通信框架,但是Hadoop在设计之初就是为了运行长达数小时的批量而设计的,在某些极端的情况下,任务提交的延迟很高,所有Hadoop的RPC显得有些笨重。Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模型实现,Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。 3.项目实现:
3.1 通信业务逻辑
- 1、 Worker启动后,在preStart方法中与Master建立连接,向Master发送注册,将Worker的信息通过case class 封装起来发送给Master。
- 2、 Master接收到Worker的注册消息后将Worker的信息保存起来,然后向Worker反馈注册成功。
- 3、 Worker定期向Master发送心跳,为了报活。
- 4、 Master会定时清理超时的Worker。
3.2 重要类介绍
ActorSystem:在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。
注意:- 1、ActorSystem是一个进程中的老大,它负责创建和监督actor
- 2、ActorSystem是一个单例对象
- 3、actor负责通信
Actor:在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。
- 1、preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。
- 2、receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。
- 4.具体代码:
pom.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.scala</groupId>
<artifactId>yun2-akka-rpc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.14</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.3.14</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Master类:
package cn.akka.spark
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
//todo:利用akka的actor模型实现简易版的spark通信框架------Master端
class Master extends Actor {
println("====== Master constructor invoked ======")
//todo:定义一个map,用于存放对应的worker信息,key:workerId value:WorkerInfo
private val workersMap = new mutable.HashMap[String, WorkerInfo]()
//todo:定义一个List,用于存放所有的worker信息,方便后期按照worker的资源进行排序
private val workersList = new ListBuffer[WorkerInfo]
//todo:定义一个常量,表示每隔多久定时检查,15秒
val CHECK_INTERVAL=15000
//todo:它会在构造代码块之后被调用,并且只会被执行一次
override def preStart(): Unit = {
println("====== preStart method invoked =======")
//todo:master定时的清除超时的worker
//todo:手动导入隐式转换
import context.dispatcher
context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)
}
//todo:它会在preStart方法执行后被调用,并且是一直循环被执行
override def receive: Receive = {
//todo:接受worker的注册信息
case RegisterMessage(workerId, memory, cores) => {
//todo:判断当前worker是否注册,没有注册就添加到map中
if(!workersMap.contains(workerId)) {
val workerInfo = new WorkerInfo(workerId, memory, cores)
//todo:添加workerInfo信息到map中
workersMap(workerId) = workerInfo
//todo:添加workerInfo信息到list中
workersList.append(workerInfo)
}
//todo:反馈注册成功信息给worker
sender ! RegisteredMessage("success")
}
//todo:接受worker的心跳信息
case SendHeart(workerId) => {
// todo:判断是否已经注册
if(workersMap.contains(workerId)) {
//todo:获取当前系统时间
val lastTime = System.currentTimeMillis()
//todo:记录当前worker发送心跳的时间
workersMap(workerId).lastHeartBeat = lastTime
}
}
//todo:判断超时的worker
case CheckTimeOutWorker => {
//todo:获取当前的系统时间
val nowTime = System.currentTimeMillis()
//todo: nowTime - worker上一次注册的时间 > 检查时间间隔
val outTimeWorkers: ListBuffer[WorkerInfo] = workersList.filter(x => nowTime - x.lastHeartBeat > CHECK_INTERVAL)
//todo:清除掉超时的worker
for(t <- outTimeWorkers) {
//todo:在map中移除掉超时的worker信息
workersMap -= t.workerId
//todo:在list中移除掉超时的worker信息
workersList -= t
println("超时的worker: "+t.workerId)
}
println("存活的worker数量: "+workersList.size)
//todo:按照worker中内存大小降序排列
println(workersList.sortBy(x => x.memory).reverse)
}
}
}
object Master {
def main(args: Array[String]): Unit = {
//todo:定义master的IP地址
val host = args(0)
//todo:定义master的port端口
val port = args(1)
//todo:准备配置文件信息
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//todo:利用ConfigFactory解析字符串,获取对应的配置
val config = ConfigFactory.parseString(configStr)
//todo:1.创建ActorSystem,它是整个进程的老大,负责创建和监督actor,它是单例对象
val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", config)
//todo:2.ActorSystem创建actor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master), "masterActor")
//todo:3.向master actor发送消息
//masterActor ! "connect"
}
}
Worker类:
package cn.akka.spark
import java.util.UUID
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
//todo:利用akka的actor模型实现简易版的spark通信框架------Worker端
class Worker(memory: Int, cores: Int, masterHost: String, masterPort: String) extends Actor {
println("====== Worker constructor invoked ======")
//todo:定义workerID
private val workerId: String = UUID.randomUUID().toString
//todo:定义每隔多久发送一次心跳, 10秒
val HEART_BEART_INTERVAL = 10000
//todo:定义master
var master: ActorSelection =_
override def preStart(): Unit = {
println("====== preStart method invoked ======")
//todo:1.拿到 master actor 的引用
//todo:2.AcotorContext上下文对象调用actorSelection
//todo:3.需要:通信协议、master的ip地址、master的端口、master对应的ActorSystem,actor层级
master = context.actorSelection(s"akka.tcp://masterActorSystem@$masterHost:$masterPort/user/masterActor")
//todo:4.向master发送注册消息,包括:worker的id、memory、cores封装在一个样例类中
master ! RegisterMessage(workerId, memory, cores)
}
override def receive: Receive = {
case RegisteredMessage(message) => {
println("======" + message + ": 我已经注册成功!!! ======")
//todo:向master发送心跳
//todo:手动导入隐式转换
import context.dispatcher
context.system.scheduler.schedule(0 millis, HEART_BEART_INTERVAL millis, self, HeartBeat)
}
//todo:接受自己的心跳信息
case HeartBeat => {
master ! SendHeart(workerId)
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
//todo:worker的IP地址
val host = args(0)
//todo:worker的端口
val port = args(1)
//todo:worker的内存
val memory = args(2).toInt
//todo:worker的核数
val cores = args(3).toInt
//todo:master的ip地址
val masterHost = args(4)
//todo:master的端口
val masterPort = args(5)
//todo:准备配置信息
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//todo:利用ConfigActor解析字符串配置信息
val config = ConfigFactory.parseString(configStr)
//todo:1.创建ActorSystem对象,它是整个进程的老大,负责创建和监督actor,它是单例对象
val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", config)
//todo:2.利用workerActorSystem创建Worker的actor
val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker(memory, cores, masterHost, masterPort)), "workerActor")
//todo:3.向workerActor发送消息
workerActor ! "connect"
}
}
WorkerInfo类:
package cn.akka.spark
//todo:用于存放worker的资源信息
class WorkerInfo(val workerId: String, val memory: Int, val cores: Int) {
//todo:定义一个变量,用于存放worker每一次发送心跳的时间
var lastHeartBeat: Long =_
//todo:重写toString
override def toString: String = {
s"workerId:$workerId,memory:$memory,cores:$cores"
}
}
样例类:
package cn.akka.spark
//todo:将消息封装在样例类中
trait RemoteMessage extends Serializable {
}
//worker------>master worker向master发送注册信息,由于不在同一进程中,需要实现序列化
case class RegisterMessage(workerId: String, memory: Int, cores: Int) extends RemoteMessage
//master------>worker master向worker发送注册成功信息,由于不在同一进程中,需要实现序列化
case class RegisteredMessage(message: String) extends RemoteMessage
//worker------>worker worker给自己发送信息,由于在同一个进程中,不需要实现序列化
case object HeartBeat
//worker------>master worker向master发送心跳,由于不在同一进程中,需要实现序列化
case class SendHeart(workerId:String) extends RemoteMessage
//master------>master master定时检查,由于在同一进程中,不需要实现序列化
case object CheckTimeOutWorker
- 5.运行结果:
Worker:
====== Worker constructor invoked ======
====== preStart method invoked ======
======success: 我已经注册成功!!! ======
Master:
====== Master constructor invoked ======
====== preStart method invoked =======
存活的worker数量: 0
ListBuffer()
存活的worker数量: 1
ListBuffer(workerId:a9a579b6-0b60-4eee-a12b-c05e5492a306,memory:20,cores:16)
每一个成功的背后都有无数个无人知晓的黑夜。
因为
夜晚,是超越对手的最佳时机。
===================== 码农1024 =====================#蔺光岭#
还不快抢沙发