练习实战:
用actor并发编程写一个单机版的WordCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果。
大致思想步骤:
1、通过loop +react 方式去不断的接受消息(注意这里的消息就是我们当前的文件名称)
2、利用case class样例类去匹配对应的操作
3、其中scala中提供了文件读取的接口Source,通过 调用其fromFile方法去获取文件内容
4、将每个文件的单词数量进行局部汇总,存放在一个ListBuffer中
5、最后将ListBuffer中的结果进行全局汇总
测试数据格式:
aaa.txt
hello hadoop
hello storm
hello spark
bbb.txt
hadoop storm
storm spark
hello actor
ccc.txt
hadoop hello
storm hello
spark hello
核心代码:
package cn.scala.wordcount
import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source
/**
* Created with IntelliJ IDEA
* Created By GuangLing_Lin
* Date: 2017/12/30
* function:利用scala中的actor并发编程实现:多个文件作为输入,先进行局部汇总,最终再进行全部汇总
*/
// 定义样例类
// 提交任务
case class SubmitTask(fileName: String)
// 封装每一个文件单词出现的次数
case class ResultTask(result: Map[String, Int])
class Task extends Actor {
// 1.重写act方法
override def act(): Unit = {
loop {
react {
case SubmitTask(fileName) => {
// 2.读取数据文件,利用scala.io.Source的fromFile方法读取数据
val lines: String = Source.fromFile(fileName).mkString
// 3.按照换行符进行截取,window下的文件换行符是“\r\n” ,Linux是“\n”
val linesArray: Array[String] = lines.split("\r\n")
// 4.按照空格进行切分,并且压平
val words: Array[String] = linesArray.flatMap(_.split(" "))
// 5.每个单词记为1
val wordAndOne: Array[(String, Int)] = words.map(x => (x, 1))
// 6.按照单词进行分组
val wordGroup: Map[String, Array[(String, Int)]] = wordAndOne.groupBy(_._1)
// 7.通过mapValues方法拿到map所有key对应的value
val result: Map[String, Int] = wordGroup.mapValues(_.length)
// 8.把结果返回给发送方
sender ! ResultTask(result)
}
}
}
}
}
object ActorWordCount {
def main(args: Array[String]): Unit = {
// 定义一个Set集合,用于存放每次异步返回的结果
val hashSet = new mutable.HashSet[Future[Any]]()
// 定义List集合,用于存放真正的结果数据
val taskList = new ListBuffer[ResultTask]
// 1.准备数据文件
val files = Array("d://aaa.txt", "d://bbb.txt", "d://ccc.txt")
// 2.遍历数据文件,发送消息
for (fileName <- files) {
// 3.针对每一个文件,创建一个actor实例
val task = new Task
// 4. 启动actor
task.start()
// 5. 向actor提交任务
val result: Future[Any] = task !! SubmitTask(fileName)
// 6.存放异步返回结果到set集合中
hashSet += result
}
// 7.处理hashSet集合中的数据
while (hashSet.size > 0) {
// 8.判断对应真正完成任务的结果
val completedTask: mutable.HashSet[Future[Any]] = hashSet.filter(_.isSet)
// 9.遍历已完成任务的结果
for (c <- completedTask) {
// 10.获取Future中的数据
val data: Any = c.apply()
// 11.将Any类型转化为ResultTask
val task: ResultTask = data.asInstanceOf[ResultTask]
// 12.将真正的结果保存在List集合中
taskList += task
// 13.将处理完成的任务删除
hashSet -= c
}
}
println(taskList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2)))
}
}
测试结果:
Map(actor -> 1, storm -> 4, hadoop -> 3, spark -> 3, hello -> 7)
每一个成功的背后都有无数个无人知晓的黑夜。
因为
夜晚,是超越对手的最佳时机。
===================== 码农1024 =====================#蔺光岭#
还不快抢沙发