Akka实现一个简易版的Spark通信框架

Scala 2017-01-02

  • 1.课程目标:使用Akka实现一个简易版的Spark通信框架
  • 2.项目概述:
      目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常多,比如前我们学过的Hadoop项目的RPC通信框架,但是Hadoop在设计之初就是为了运行长达数小时的批量而设计的,在某些极端的情况下,任务提交的延迟很高,所有Hadoop的RPC显得有些笨重。Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模型实现,Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。
  • 3.项目实现:

    • 3.1 通信业务逻辑

      • 1、 Worker启动后,在preStart方法中与Master建立连接,向Master发送注册,将Worker的信息通过case class 封装起来发送给Master。
      • 2、 Master接收到Worker的注册消息后将Worker的信息保存起来,然后向Worker反馈注册成功。
      • 3、 Worker定期向Master发送心跳,为了报活。
      • 4、 Master会定时清理超时的Worker。
    • 3.2 重要类介绍

      • ActorSystem:在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。
        注意:

        • 1、ActorSystem是一个进程中的老大,它负责创建和监督actor
        • 2、ActorSystem是一个单例对象
        • 3、actor负责通信
      • Actor:在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。

        • 1、preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。
        • 2、receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。
  • 4.具体代码:

pom.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.scala</groupId>
    <artifactId>yun2-akka-rpc</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

Master类:

package cn.akka.spark

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

//todo:利用akka的actor模型实现简易版的spark通信框架------Master端
class Master extends Actor {

  println("====== Master constructor invoked ======")

  //todo:定义一个map,用于存放对应的worker信息,key:workerId value:WorkerInfo
  private val workersMap = new mutable.HashMap[String, WorkerInfo]()

  //todo:定义一个List,用于存放所有的worker信息,方便后期按照worker的资源进行排序
  private val workersList = new ListBuffer[WorkerInfo]

  //todo:定义一个常量,表示每隔多久定时检查,15秒
  val CHECK_INTERVAL=15000

  //todo:它会在构造代码块之后被调用,并且只会被执行一次
  override def preStart(): Unit = {
    println("====== preStart method invoked =======")

    //todo:master定时的清除超时的worker
    //todo:手动导入隐式转换
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)
  }

  //todo:它会在preStart方法执行后被调用,并且是一直循环被执行
  override def receive: Receive = {
    //todo:接受worker的注册信息
    case RegisterMessage(workerId, memory, cores) => {
      //todo:判断当前worker是否注册,没有注册就添加到map中
      if(!workersMap.contains(workerId)) {
        val workerInfo = new WorkerInfo(workerId, memory, cores)
        //todo:添加workerInfo信息到map中
        workersMap(workerId) = workerInfo
        //todo:添加workerInfo信息到list中
        workersList.append(workerInfo)
      }

      //todo:反馈注册成功信息给worker
      sender ! RegisteredMessage("success")
    }

    //todo:接受worker的心跳信息
    case SendHeart(workerId) => {
      // todo:判断是否已经注册
      if(workersMap.contains(workerId)) {
        //todo:获取当前系统时间
        val lastTime = System.currentTimeMillis()
        //todo:记录当前worker发送心跳的时间
        workersMap(workerId).lastHeartBeat = lastTime
      }
    }

    //todo:判断超时的worker
    case CheckTimeOutWorker => {
      //todo:获取当前的系统时间
      val nowTime = System.currentTimeMillis()
      //todo: nowTime - worker上一次注册的时间 > 检查时间间隔
      val outTimeWorkers: ListBuffer[WorkerInfo] = workersList.filter(x => nowTime - x.lastHeartBeat > CHECK_INTERVAL)
      //todo:清除掉超时的worker
      for(t <- outTimeWorkers) {
        //todo:在map中移除掉超时的worker信息
        workersMap -= t.workerId

        //todo:在list中移除掉超时的worker信息
        workersList -= t

        println("超时的worker: "+t.workerId)
      }

      println("存活的worker数量: "+workersList.size)

      //todo:按照worker中内存大小降序排列
      println(workersList.sortBy(x => x.memory).reverse)
    }

  }

}

object Master {

  def main(args: Array[String]): Unit = {
    //todo:定义master的IP地址
    val host = args(0)

    //todo:定义master的port端口
    val port = args(1)

    //todo:准备配置文件信息
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    //todo:利用ConfigFactory解析字符串,获取对应的配置
    val config = ConfigFactory.parseString(configStr)

    //todo:1.创建ActorSystem,它是整个进程的老大,负责创建和监督actor,它是单例对象
    val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", config)

    //todo:2.ActorSystem创建actor
    val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master), "masterActor")

    //todo:3.向master actor发送消息
    //masterActor ! "connect"

  }
}

Worker类:

package cn.akka.spark

import java.util.UUID

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

//todo:利用akka的actor模型实现简易版的spark通信框架------Worker端
class Worker(memory: Int, cores: Int, masterHost: String, masterPort: String) extends Actor {

  println("====== Worker constructor invoked ======")

  //todo:定义workerID
  private val workerId: String = UUID.randomUUID().toString

  //todo:定义每隔多久发送一次心跳, 10秒
  val HEART_BEART_INTERVAL = 10000

  //todo:定义master
  var master: ActorSelection =_

  override def preStart(): Unit = {
    println("====== preStart method invoked ======")
    //todo:1.拿到 master actor 的引用
    //todo:2.AcotorContext上下文对象调用actorSelection
    //todo:3.需要:通信协议、master的ip地址、master的端口、master对应的ActorSystem,actor层级
    master = context.actorSelection(s"akka.tcp://masterActorSystem@$masterHost:$masterPort/user/masterActor")

    //todo:4.向master发送注册消息,包括:worker的id、memory、cores封装在一个样例类中
    master ! RegisterMessage(workerId, memory, cores)

  }

  override def receive: Receive = {

    case RegisteredMessage(message) => {
      println("======" + message + ": 我已经注册成功!!! ======")

      //todo:向master发送心跳
      //todo:手动导入隐式转换
      import context.dispatcher
      context.system.scheduler.schedule(0 millis, HEART_BEART_INTERVAL millis, self, HeartBeat)
    }

    //todo:接受自己的心跳信息
    case HeartBeat => {
      master ! SendHeart(workerId)
    }

  }

}

object Worker {

  def main(args: Array[String]): Unit = {
    //todo:worker的IP地址
    val host = args(0)

    //todo:worker的端口
    val port = args(1)

    //todo:worker的内存
    val memory = args(2).toInt

    //todo:worker的核数
    val cores = args(3).toInt

    //todo:master的ip地址
    val masterHost = args(4)

    //todo:master的端口
    val masterPort = args(5)

    //todo:准备配置信息
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin
    //todo:利用ConfigActor解析字符串配置信息
    val config = ConfigFactory.parseString(configStr)

    //todo:1.创建ActorSystem对象,它是整个进程的老大,负责创建和监督actor,它是单例对象
    val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", config)

    //todo:2.利用workerActorSystem创建Worker的actor
    val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker(memory, cores, masterHost, masterPort)), "workerActor")

    //todo:3.向workerActor发送消息
    workerActor ! "connect"

  }

}

WorkerInfo类:

package cn.akka.spark

//todo:用于存放worker的资源信息
class WorkerInfo(val workerId: String, val memory: Int, val cores: Int) {
  //todo:定义一个变量,用于存放worker每一次发送心跳的时间
  var lastHeartBeat: Long =_

  //todo:重写toString
  override def toString: String = {
    s"workerId:$workerId,memory:$memory,cores:$cores"
  }

}

样例类:

package cn.akka.spark

//todo:将消息封装在样例类中
trait RemoteMessage extends Serializable {

}

//worker------>master worker向master发送注册信息,由于不在同一进程中,需要实现序列化
case class RegisterMessage(workerId: String, memory: Int, cores: Int) extends RemoteMessage

//master------>worker master向worker发送注册成功信息,由于不在同一进程中,需要实现序列化
case class RegisteredMessage(message: String) extends RemoteMessage

//worker------>worker worker给自己发送信息,由于在同一个进程中,不需要实现序列化
case object HeartBeat

//worker------>master  worker向master发送心跳,由于不在同一进程中,需要实现序列化
case class SendHeart(workerId:String) extends RemoteMessage

//master------>master master定时检查,由于在同一进程中,不需要实现序列化
case object  CheckTimeOutWorker
  • 5.运行结果:

Worker:

====== Worker constructor invoked ======
====== preStart method invoked ======
======success: 我已经注册成功!!! ======

Master:

====== Master constructor invoked ======
====== preStart method invoked =======
存活的worker数量: 0
ListBuffer()
存活的worker数量: 1
ListBuffer(workerId:a9a579b6-0b60-4eee-a12b-c05e5492a306,memory:20,cores:16)

每一个成功的背后都有无数个无人知晓的黑夜。

因为

夜晚,是超越对手的最佳时机。

===================== 码农1024 =====================#蔺光岭#


本文由 蔺光岭 创作,采用 知识共享署名 4.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论