随着多核CPU的出现,我们需要能够利用这些额外内核的编程模式,以并发的方式处理任务。 Actor模型就是这样一种模式,它模拟了大量独立的工作,以任何顺序处理,不需要锁同步。 Actor模型的一个非常普遍的用法可以在网络服务器中找到,Java中的Play! Java中的框架就是一个例子。一般来说,任何并发的应用程序都可以建立在Actor模型之上。
在这篇文章中,我将实践如何用Go实现一个原始的actor模型。我们将利用Go为并发提供的工具–goroutines、channels和wait groups。
首先,让我们来看看一个Actor。
一个 Actor 有一个任务队列和一个监听任务队列并执行任务的 goroutine。 这里的A是一个goroutine,它在任务队列上阻塞,并不断执行队列中的任务。 Actor 的interface 定义如下:
type Actor interface {
AddTask(task Task)
Start()
Stop()
}
接下来看看 任务(task)
task是在Actor中执行的。它是对有Execute方法的interface的实现。任何可以通过调用Execute的任何东西。task是我们需要做的工作的业务实现。
type Task interface {
Execute()
}
完整的系统架构图如下:
actor system接口定义如下:
type ActorSystem interface {
Run()
SubmitTask(task Task)
Shutdown(shutdownWG *sync.WaitGroup)
}
调用SubmitTask
方法提交Task
s 到ActorSystem
中。taskAssigner
分配task到Actor
s。 每个Actor
都有一个任务队列, 用于缓冲任务并逐一执行。
接下来我们深入研究一下每一个模块
ActorSystem
type ActorSystem struct {
name string
assigner entities.Actor
wg *sync.WaitGroup
tracker *tracker.Tracker
}
func (system *ActorSystem) Run() {
log.Debug("actor system %s started \n", system.name)
// start the assigner in seprate go routine
go system.assigner.Start()
}
func (system *ActorSystem) SubmitTask(task entities.Task) error {
// adding submitted task to assigner
return system.assigner.AddTask(task)
}
func (system *ActorSystem) Shutdown(wg *sync.WaitGroup) {
defer wg.Done()
system.assigner.Stop()
system.wg.Wait()
system.tracker.Shutdown()
log.Debug("actor system: %s shutdown completed ", system.name)
}
当ActorSystem启动时,它启动了一个taskAssigner
actor, 每一个进入系统的Task
都会通过调用AddTask方法添加到taskAssigner
的actor中。
通过调用SubbmitTask
方法,我们可以提交一个Task
到ActorSystem
中。
在Shutdown时,它将关闭阻止任何新的传入任务的任务通道,等待将所有接收的任务分配给Actors。然后它调用每一个Actor
的Stop
,等待它们完成。
Task Assigner
我们将每个传入的Tasks
放在任务通道,taskAssigner
和Task
在每一个Actor的队列内部。
type AssignerActor struct {
name string
closeSig chan bool
tasks chan entities.Task
assignerIndex int
tracker *tracker.Tracker
scalar *autoScalar
*TaskActorPool
*Config
}
func (assigner *AssignerActor) AddTask(task entities.Task) error{
if len(assigner.tasks) >= assignerQueueSize {
assigner.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Rejected)
return errors.New("task queue is full")
}
// task getting added to assigner actor channel
assigner.tasks <- task
assigner.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Submitted)
return nil
}
taskAssigner
内部处理tasks channel 和分发 task 到某个actor任务池中通过调用AddTask方法。
func (assigner *AssignerActor) Start() {
poolStarted := make(chan bool)
assigner.scalar = GetAutoScaler(assigner, poolStarted)
<- poolStarted
// will loop forever till tasks channel is closed
for task := range assigner.tasks {
for {
assigner.poolLock.Lock()
assigner.assignerIndex = assigner.assignerIndex % len(assigner.pool)
actor := assigner.pool[assigner.assignerIndex]
assigner.assignerIndex += 1
assigner.poolLock.Unlock()
// assigning task to a task actor from pool
err := actor.AddTask(task)
if err == nil {
break
}
}
}
assigner.closeSig <- true
}
autoScalar
保持监听tasks
的数目, 增加或减少 task actor pool
的大小。
// auto scalar is part of task assigner actor
// It scales task actor pool based on queue len size
type autoScalar struct {
*AssignerActor
lastActorId int
closingSig chan bool
closedSig chan bool
}
func(scalar *autoScalar) run(poolStarted chan bool) {
log.Debug("running auto scalar with min actor")
// provision starting actors
scalar.provisionActors(scalar.Config.MinActor)
// waiting for scalar to start task actors
poolStarted <- true
completed := false
for !completed {
select {
case <- scalar.closingSig:
completed = true
case <-time.After(100 * time.Millisecond):
if scalar.QueueSize() > scalar.UpscaleQueueSize && len(scalar.pool) < scalar.MaxActor {
scalar.provisionActors(1)
} else if scalar.QueueSize() < scalar.DownscaleQueueSize && len(scalar.pool) > scalar.MinActor {
scalar.deprovisionActors(1)
}
}
}
// when it comes out, it closes all task actors in pool
scalar.deprovisionActors(len(scalar.pool))
log.Debug("scalar exited")
scalar.closedSig <- true
}
Task Actor
它也是一个actor,它的职责是执行添加到它的channel中的任务
type TaskActor struct {
id int
closeSig chan bool
wg *sync.WaitGroup
tasks chan entities.Task
tracker *tracker.Tracker
}
// add task only if channel has space
func (a *TaskActor) AddTask(task entities.Task) error {
if len(a.tasks) >= taskQueueSize {
return errors.New("filled queue")
}
// task added to channel
a.tasks <- task
return nil
}
func (a *TaskActor) Start() {
defer a.wg.Done()
a.wg.Add(1)
log.Debug("starting actor :%d", a.id)
for task := range a.tasks{
task.Execute()
a.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Completed)
}
log.Debug("stopped actor :%d", a.id)
a.closeSig <- true
}
func (a *TaskActor) Stop() {
// closing task channel
close (a.tasks)
<- a.closeSig
}
Benchmarks
我们模拟了一个Web服务器。
- 100K请求以2毫秒的间隔线性发送
- 当时钟在前30秒的时间和[50-100)持续30秒钟内时钟,每个请求持续[0,50)〜25米秒〜25米秒。
- 这模拟了我们在下游服务中突然变化的情况。我们希望保持我们的吞吐量检查,以便不增加任何任务的等待时间
func TestIOSimulationSystem(t *testing.T) {
// creating a actor system
ioSimSystem := CreateActorSystem("io_sim", &actor.Config{
MinActor: 10,
MaxActor: 100,
AutoScale: actor.AutoScale{
UpscaleQueueSize: 100,
DownscaleQueueSize: 10,
},
})
for i := 0; i < 100000; i += 1 {
ioSimSystem.SubmitTask(CreateNumberPrinterTask(i))
<-time.After(2 * time.Millisecond)
}
shutdown([]*ActorSystem{ioSimSystem})
}
type SimIOTask struct {
num int
}
func (t *SimIOTask) Execute() {
x := 0
if time.Now().Second() > 30 {
x = 100
}
duration := time.Duration(x+rand.Intn(75)) * time.Millisecond
<- time.After(duration)
}
参考链接: 1、https://medium.com/better-programming/implementing-the-actor-model-in-golang-3579c2227b5e