Actor并发编程写一个单机版的WordCount

Scala 2017-01-01

练习实战:

  用actor并发编程写一个单机版的WordCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果。

大致思想步骤:

  1、通过loop +react 方式去不断的接受消息(注意这里的消息就是我们当前的文件名称)
  2、利用case class样例类去匹配对应的操作
  3、其中scala中提供了文件读取的接口Source,通过 调用其fromFile方法去获取文件内容
  4、将每个文件的单词数量进行局部汇总,存放在一个ListBuffer中
  5、最后将ListBuffer中的结果进行全局汇总

测试数据格式:

aaa.txt

hello hadoop
hello storm
hello spark

bbb.txt

hadoop storm
storm spark
hello actor

ccc.txt

hadoop hello
storm hello
spark hello

核心代码:

package cn.scala.wordcount

import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source

/**
  * Created with IntelliJ IDEA
  * Created By GuangLing_Lin
  * Date: 2017/12/30
  * function:利用scala中的actor并发编程实现:多个文件作为输入,先进行局部汇总,最终再进行全部汇总
  */

// 定义样例类
// 提交任务
case class SubmitTask(fileName: String)

// 封装每一个文件单词出现的次数
case class ResultTask(result: Map[String, Int])

class Task extends Actor {
  // 1.重写act方法
  override def act(): Unit = {
    loop {
      react {

        case SubmitTask(fileName) => {
          // 2.读取数据文件,利用scala.io.Source的fromFile方法读取数据
          val lines: String = Source.fromFile(fileName).mkString
          // 3.按照换行符进行截取,window下的文件换行符是“\r\n” ,Linux是“\n”
          val linesArray: Array[String] = lines.split("\r\n")
          // 4.按照空格进行切分,并且压平
          val words: Array[String] = linesArray.flatMap(_.split(" "))
          // 5.每个单词记为1
          val wordAndOne: Array[(String, Int)] = words.map(x => (x, 1))
          // 6.按照单词进行分组
          val wordGroup: Map[String, Array[(String, Int)]] = wordAndOne.groupBy(_._1)
          // 7.通过mapValues方法拿到map所有key对应的value
          val result: Map[String, Int] = wordGroup.mapValues(_.length)
          // 8.把结果返回给发送方
          sender ! ResultTask(result)
        }

      }
    }
  }
}

object ActorWordCount {
  def main(args: Array[String]): Unit = {
    // 定义一个Set集合,用于存放每次异步返回的结果
    val hashSet = new mutable.HashSet[Future[Any]]()
    // 定义List集合,用于存放真正的结果数据
    val taskList = new ListBuffer[ResultTask]

    // 1.准备数据文件
    val files = Array("d://aaa.txt", "d://bbb.txt", "d://ccc.txt")

    // 2.遍历数据文件,发送消息
    for (fileName <- files) {
      // 3.针对每一个文件,创建一个actor实例
      val task = new Task
      // 4. 启动actor
      task.start()
      // 5. 向actor提交任务
      val result: Future[Any] = task !! SubmitTask(fileName)
      // 6.存放异步返回结果到set集合中
      hashSet += result
    }

    // 7.处理hashSet集合中的数据
    while (hashSet.size > 0) {
      // 8.判断对应真正完成任务的结果
      val completedTask: mutable.HashSet[Future[Any]] = hashSet.filter(_.isSet)
      // 9.遍历已完成任务的结果
      for (c <- completedTask) {
        // 10.获取Future中的数据
        val data: Any = c.apply()
        // 11.将Any类型转化为ResultTask
        val task: ResultTask = data.asInstanceOf[ResultTask]
        // 12.将真正的结果保存在List集合中
        taskList += task
        // 13.将处理完成的任务删除
        hashSet -= c
      }
    }

    println(taskList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2)))

  }
}

测试结果:

Map(actor -> 1, storm -> 4, hadoop -> 3, spark -> 3, hello -> 7)

每一个成功的背后都有无数个无人知晓的黑夜。

因为

夜晚,是超越对手的最佳时机。

===================== 码农1024 =====================#蔺光岭#


本文由 蔺光岭 创作,采用 知识共享署名 4.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论