Akka Dispatchers和Routers
Akka Dispatcher是维持Akka Actor动作的核心组件,是整个Akka框架的引擎。它是基于Java的Executor框架来实现的。Dispatcher控制和协调消息并将其分发给运行在底层线程上的Actor,由它来负责调度资源的优化,并保证任务以最快的速度执行。
Akka的高稳定性是建立在“Let It Crash”模型之上的,该模型是基于Supervision和Monitoring实现的。通过定义Supervision和监管策略,实现系统异常处理。
Akka为了保证事务的一致,引入了STM的概念。STM使用的是“乐观锁”,执行临界区代码后,会检测是否产生冲突,如果产生冲突,将回滚修改,重新执行临界区代码。
Akka中,Dispatcher基于Java Executor框架来实现,提供了异步执行任务的能力。Executor是基于生产者——消费者模型来构建的。这意味着任务的提交和任务的执行是在不同的线程中隔离执行的,即提交任务的线程与执行任务的线程是不同的。
Executor框架有两个重要实现:
ThreadPoolExecutor:该实现从预定义的线程池中选取线程来执行任务。
ForkJoinPool:使用相同的线程池模型,提供了工作窃取的支持。
Dispatcher运行在线程之上,负责分发其邮箱里面的Actors和Messages到executor中的线程上运行。在Akka中,提供了4种类型的Dispatcher:
- Dispatcher
- Pinned Dispatcher
- Balancing Dispatcher
- Calling Thread Dispatcher
对应的也有4种默认的邮箱: - Unbounded mailbox
- Bounded mailbox
- Unbounded priority mailbox
- Bounded priority mailbox
为Actor指定派发器
一般Actor都会有缺省的派发器,如果要指定派发器,要做两件事:
1)在实例化Actor时,指定派发器:
val myActor = context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"),"myActor")
- 1
2)创建Actor时,使用withDispatcher指定派发器,如my-dispatcher,然后在applicaction.conf配置文件中配置派发器
使用Dispatcher派发器
my-dispatcher{
# Dispatcher是基于事件的派发器名称
type = Dispatcher
# 使用何种ExecutionService executor = "fork-join-executor"
# 配置fork join池
fork-join-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10
}
# throughput定义了线程切换到另一个Actor之前处理的消息数上限
# 设置为1表示尽可能公平
throughput = 100
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
使用PinnedDispatcher派发器
my-dispatcher{
# Dispatcher是基于事件的派发器名称
type = PinnedDispatcher
# 使用何种ExecutionService
executor = "thread-pool-executor"
# 配置fork join池
thread-pool-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10
}
# throughput定义了线程切换到另一个Actor之前处理的消息数上限
# 设置为1表示尽可能公平
throughput = 100
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
不同派发器的介绍
-
Dispatcher
Dispatcher是Akka中默认的派发器,它是基于事件的分发器,该派发器绑定一组Actor到线程池中。该派发器有如下特点:
1)每一个Actor都有自己的邮箱
2)该派发器都可以被任意数量的Actor共享
3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持
4)该派发器是非阻塞的。 -
Balancing Dispatcher
该派发器是基于事件的分发器,它会将任务比较多的Actor的任务重新分发到比较闲的Actor上运行。该派发器有如下特点:
1)所有Actor共用一个邮箱
2)该派发器只能被同一种类型的Actor共享
3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持 -
Pinned Dispatcher
该派发器为每一个Actor提供一个单一的、专用的线程。这种做法在I/O操作或者长时间运行的计算中很有用。该派发器有如下特点:
1)每一个Actor都有自己的邮箱
2)每一个Actor都有专用的线程,该线程不能和其他Actor共享
3)该派发器有一个Executor线程池
4)该派发器在阻塞上进行了优化,如:如果程序正在进行I/O操作,那么这个Actor将会等到任务执行完成。这种阻塞型的操作在性能上要比默认的Dispatcher要好。 -
Calling Thread Dispatcher
该派发器主要用于测试,并且在当前线程运行任务,不会创建新线程,该派发器有如下特点:
1)每一个Actor都有自己的邮箱
2)该派发器都可以被任意数量的Actor共享
3)该派发器由调用线程支持
邮箱
邮箱用于保存接收的消息,在Akka中除使用BalancingDispather分发器的Actor以外,每个Actor都拥有自己的邮箱。使用同一个BalancingDispather的所有Actor共享同一个邮箱实例。
邮箱是基于Java concurrent中的队列来实现的,它有如下特点:
1)阻塞队列,直到队列空间可用,或者队列中有可用元素
2)有界队列,它的大小是被限制的
缺省的邮箱实现
- UnboundedMailbox
底层是一个java.util.concurrent.ConcurrentLinkedQueue
是否阻塞:No
是否有界:No - BoundedMailbox
底层是一个java.util.concurrent.LinkedBlockingQueue
是否阻塞:Yes
是否有界:Yes - UnboundedPriorityMailbox
底层是一个java.util.concurrent.PriorityBlockingQueue
是否阻塞:Yes
是否有界:No - BoundedPriorityMailbox
底层是一个java.util.PriorityBlockingQueue
是否阻塞:Yes
是否有界:Yes
还有一些缺省的持久邮箱。
Router
当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees
Akka定义好的一些Router:
- akka.routing.RoundRobinRouter:轮转路由器将消息按照轮转顺序发送给routers
- akka.routing.RandomRouter:随机路由器随机选择一个router,并将消息发送给这个router
- akka.routing.SmallestMailboxRouter:最小邮箱路由器会在routers中选择邮箱里信息最少的router,然后把消息发送给它。
- akka.routing.BroadcastRouter:广播路由器将相同的消息发送给所有的routers
- akka.routing.ScatterGatherFirstCompletedRouter:敏捷路由器先将消息广播到所有routers,返回最先完成任务的router的结果给调用者。
路由器的使用
- RoundRobinPool 和 RoundRobinGroupRouter对routees使用轮询机制
- RandomPool 和 RandomGroupRouter随机选择routees发送消息
- BalancingPool尝试从繁忙的routee重新分配任务到空闲routee,所有的routee共享一个mailbox
- SmallestMailboxPoolRouter创建的所有routees中谁邮箱中的消息最少发给谁
- BroadcastPool 和 BroadcastGroup广播的路由器将接收到的消息转发到它所有的routee。
- ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup将消息发送给所有的routees,然后等待到收到第一个回复,将结果发送回原始发送者。其他的回复将被丢弃
- TailChoppingPool 和 TailChoppingGroup将首先发送消息到一个随机挑取的routee,短暂的延迟后发给第二个routee(从剩余的routee中随机挑选),以此类推。它等待第一个答复,并将它转回给原始发送者。其他答复将被丢弃。此Router的目标是通过查询到多个routee来减少延迟,假设其他的actor可能比第一个actor更快响应。
- ConsistentHashingPool 和 ConsistentHashingGroup对消息使用一致性哈希(consistent hashing)选择routee
有三种方式定义哪些数据作为一致性哈希键
定义路由的hashMapping,将传入的消息映射到它们一致哈希键。这使决策对发送者透明。·
这些消息可能会实现ConsistentHashable。键是消息的一部分,并很方便地与消息定义一起定义。·
消息可以被包装在一个ConsistentHashableEnvelope中,来定义哪些数据可以用来做一致性哈希。发送者知道要使用的键。
路由器的使用要先创建路由器后使用。 AKKA的路由由router和众多的routees组成,router和routees都是actor.Router即路由,是负责负载均衡和路由的抽象,有两种方法来创建router:
1.Actor Group
2.Actor Pool
当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees
根据不同的情况需要,Akka提供了几种路由策略。当然也可以创建自己的路由及策略。Akka提供的路由策略如下:
- akka.routing.RoundRobinRoutingLogic 轮询
- akka.routing.RandomRoutingLogic 随机
- akka.routing.SmallestMailboxRoutingLogic 空闲
- akka.routing.BroadcastRoutingLogic 广播
- akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集
- akka.routing.TailChoppingRoutingLogic 尾部断续
- akka.routing.ConsistentHashingRoutingLogic 一致性哈希
创建Router Actor
创建router actor 有两种方式:
- Pool(池)——routees都是router 的子actor,如果routees终止,router将把它们移除
- Group(群组)——routees都创建在router的外部,router通过使用actor来选择将消息发送到指定路径,但不监管routees是否终止。Router actor 向 routees 发送消息,与向普通actor发送消息一样通过其ActorRef。Router actor 不会改变消息的发送人,routees 回复消息时发送回原始发件人,而不是Router actor。
Pool(池)可以通过配置并使用代码在配置中获取的方法来实现 (例如创建一个轮询Router向5个routees发送消息)
Group(群组)有时我们需要单独地创建routees,然后提供一个Router来供其使用。可以通过将routees的路径传递给Router的配置,消息将通过ActorSelection来发送到这些路径。
有两种方式创建路由器:
Pool(池)
import akka.actor._
import akka.routing.{ActorRefRoutee, FromConfig, RoundRobinGroup, RoundRobinPool, RoundRobinRoutingLogic, Router}
object HelloScala {
def main(args: Array[String]): Unit = { // 创建router val _system = ActorSystem("testRouter") // 通知代码来实现路由器 val hahaRouter = _system.actorOf(RoundRobinPool(5).props(Props[WorkerRoutee]),"router111") hahaRouter ! RouteeMsg(333) val myRouter = _system.actorOf(Props[WorkerRoutee].withRouter(RoundRobinPool(nrOfInstances = 5))) myRouter ! RouteeMsg(22) val masterRouter = _system.actorOf(Props[MasterRouter],"masterRouter") masterRouter ! RouteeMsg(100)
}
}
class MasterRouter extends Actor{
var masterRouter = { val routees = Vector.fill(3){ val r = context.actorOf(Props[WorkerRoutee]) context watch r ActorRefRoutee(r) } Router(RoundRobinRoutingLogic(),routees)
}
override def receive: Receive = { case w: RouteeMsg => masterRouter.route(w,sender()) case Terminated(a) => masterRouter = masterRouter.removeRoutee(a) val r = context.actorOf(Props[WorkerRoutee]) context watch r masterRouter = masterRouter.addRoutee(r) }
}
// 定义routee对应的actor类型
case class RouteeMsg(s: Int)
class WorkerRoutee extends Actor{
override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}")
}
}
class WorkerRoutee2 extends Actor{
override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#@@@@@$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}")
}
}
class Cale extends Actor{
override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}")
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
Group(群组)
import akka.actor._
import akka.routing.{ RoundRobinGroup}
object HelloScala {
def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! RouteeMsg(13333)
}
}
class TestActor extends Actor{
val routee1 = context.actorOf(Props[WorkerRoutee],"w1")
val routee2 = context.actorOf(Props[WorkerRoutee],"w2")
val routee3 = context.actorOf(Props[WorkerRoutee],"w3")
val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter")
override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case _ =>
}
}
// 定义routee对应的actor类型
case class RouteeMsg(s: Int)
class WorkerRoutee extends Actor{
override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}")
}
}
class Cale extends Actor{
override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}")
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
特殊消息
Broadcast消息用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。
PoisonPill消息无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。但仍然能影响到routees,因为Router停止时它的子actor也会停止,就可能会造成消息未处理。因此我们可以将PoisonPill包装到Broadcast消息中。这样Router所管理的所有routees将会处理完消息后再处理PoisonPill并停止。
Kill消息当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。Router将抛出ActorKilledException并失败,然后Router根据监管的策略,被恢复、重启或终止。Router的子routee也将被暂停,也受Router监管的影响,但是独立在Router外部创建的routee将不会被影响。
import akka.actor._
import akka.routing.{Broadcast, RoundRobinGroup}
object HelloScala {
def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! PoisonPill
}
} class TestActor extends Actor{
val routee1 = context.actorOf(Props[WorkerRoutee],"w1")
val routee2 = context.actorOf(Props[WorkerRoutee],"w2")
val routee3 = context.actorOf(Props[WorkerRoutee],"w3")
val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter")
override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case RouteeBroadcast => testRouter ! Broadcast // 用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。 case Broadcast => println("TestActor receive a broadcast message") case Kill => testRouter ! Kill// 当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。 case PoisonPill => testRouter ! PoisonPill // 无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。 case _ =>
}
}
// 定义routee对应的actor类型
case class RouteeMsg(s: Int)
// 定义广播信息
case object RouteeBroadcast
class WorkerRoutee extends Actor{
override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}")
}
}
class Cale extends Actor{
override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}")
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
远程部署Router
既可以创建本地actor来作为Router,也可以命令Router在任一远程主机上部署子actor。需要将路由配置放在RemoteRouterConfig下,在远程部署的路径类中要添加akka-remote模块:
import akka.actor._
import akka.remote.routing.{RemoteRouterConfig}
import akka.routing.{Broadcast,RoundRobinPool}
object HelloScala {
def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val addresses = Seq( Address("akka.tcp","remotesys","otherhost",6666), AddressFromURIString("akka.tcp://othersys@anotherhost:6666") ) // WorkerRoutee 路由部署到远程的主机上 val routerRemote = _system.actorOf(RemoteRouterConfig(RoundRobinPool(5),addresses).props(Props[WorkerRoutee]))
}
}
// 定义routee对应的actor类型
case class RouteeMsg(s: Int)
class WorkerRoutee extends Actor{
override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}")
}
}
class Cale extends Actor{
override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}")
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
文章来源: blog.csdn.net,作者:WongKyunban,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/weixin_40763897/article/details/93516694
- 点赞
- 收藏
- 关注作者
评论(0)