用Go实现Actor模型
@ 归零 | 星期二,三月 22 日,2022 年 | 3 分钟阅读 | 更新于 星期二,三月 22 日,2022 年

随着多核CPU的出现,我们需要能够利用这些额外内核的编程模式,以并发的方式处理任务。 Actor模型就是这样一种模式,它模拟了大量独立的工作,以任何顺序处理,不需要锁同步。 Actor模型的一个非常普遍的用法可以在网络服务器中找到,Java中的Play! Java中的框架就是一个例子。一般来说,任何并发的应用程序都可以建立在Actor模型之上。

在这篇文章中,我将实践如何用Go实现一个原始的actor模型。我们将利用Go为并发提供的工具–goroutines、channels和wait groups。

首先,让我们来看看一个Actor。

Actor

一个 Actor 有一个任务队列和一个监听任务队列并执行任务的 goroutine。 这里的A是一个goroutine,它在任务队列上阻塞,并不断执行队列中的任务。 Actor 的interface 定义如下:

type Actor interface {
   AddTask(task Task)
   Start()
   Stop()
}

接下来看看 任务(task)

task是在Actor中执行的。它是对有Execute方法的interface的实现。任何可以通过调用Execute的任何东西。task是我们需要做的工作的业务实现。

type Task interface {
   Execute()
}

完整的系统架构图如下: image

actor system接口定义如下:

type ActorSystem interface {
   Run()
   SubmitTask(task Task) 
   Shutdown(shutdownWG *sync.WaitGroup)
}

调用SubmitTask方法提交Tasks 到ActorSystem中。taskAssigner 分配task到Actors。 每个Actor都有一个任务队列, 用于缓冲任务并逐一执行。

接下来我们深入研究一下每一个模块

ActorSystem

type ActorSystem struct {
	name           string
	assigner 	   entities.Actor
	wg             *sync.WaitGroup
	tracker        *tracker.Tracker
}

func (system *ActorSystem) Run() {
	log.Debug("actor system %s started \n", system.name)
  // start the assigner in seprate go routine
	go system.assigner.Start()
}

func (system *ActorSystem) SubmitTask(task entities.Task) error {
  // adding submitted task to assigner
	return system.assigner.AddTask(task)
}

func (system *ActorSystem) Shutdown(wg *sync.WaitGroup) {
	defer wg.Done()
	system.assigner.Stop()
	system.wg.Wait()
	system.tracker.Shutdown()
	log.Debug("actor system: %s shutdown completed ", system.name)
}

当ActorSystem启动时,它启动了一个taskAssigner actor, 每一个进入系统的Task都会通过调用AddTask方法添加到taskAssigner的actor中。

通过调用SubbmitTask方法,我们可以提交一个TaskActorSystem中。

在Shutdown时,它将关闭阻止任何新的传入任务的任务通道,等待将所有接收的任务分配给Actors。然后它调用每一个ActorStop,等待它们完成。

Task Assigner

我们将每个传入的Tasks放在任务通道,taskAssignerTask在每一个Actor的队列内部。

type AssignerActor struct {
	name     string
	closeSig chan bool
	tasks    chan entities.Task
	assignerIndex int
	tracker *tracker.Tracker
	scalar  *autoScalar
	*TaskActorPool
	*Config
}

func (assigner *AssignerActor) AddTask(task entities.Task) error{
	if len(assigner.tasks) >= assignerQueueSize {
		assigner.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Rejected)
		return errors.New("task queue is full")
	}
	// task getting added to assigner actor channel 
	assigner.tasks <- task
	assigner.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Submitted)
	return nil
}

taskAssigner内部处理tasks channel 和分发 task 到某个actor任务池中通过调用AddTask方法。

func (assigner *AssignerActor) Start() {
	poolStarted := make(chan bool)
	assigner.scalar = GetAutoScaler(assigner, poolStarted)
	<- poolStarted
        // will loop forever till tasks channel is closed
	for task := range assigner.tasks {
		for {
			assigner.poolLock.Lock()
			assigner.assignerIndex = assigner.assignerIndex % len(assigner.pool)
			actor := assigner.pool[assigner.assignerIndex]
			assigner.assignerIndex += 1
			assigner.poolLock.Unlock()
			// assigning task to a task actor from pool
			err := actor.AddTask(task)
			if err == nil {
				break
			}
		}
	}
	assigner.closeSig <- true
}

autoScalar 保持监听tasks的数目, 增加或减少 task actor pool的大小。

// auto scalar is part of task assigner actor
// It scales task actor pool based on queue len size
type autoScalar struct {
	*AssignerActor
	lastActorId int
	closingSig  chan bool
	closedSig   chan bool
}

func(scalar *autoScalar) run(poolStarted chan bool) {
	log.Debug("running auto scalar with min actor")
  // provision starting actors
	scalar.provisionActors(scalar.Config.MinActor)
	// waiting for scalar to start task actors
	poolStarted <- true
	completed := false

for !completed {
		select {
		case <- scalar.closingSig:
			completed = true
		case <-time.After(100 * time.Millisecond):
			if scalar.QueueSize() > scalar.UpscaleQueueSize && len(scalar.pool) < scalar.MaxActor {
				scalar.provisionActors(1)

			} else if scalar.QueueSize() < scalar.DownscaleQueueSize && len(scalar.pool) > scalar.MinActor {
				scalar.deprovisionActors(1)
			}
		}
	}
  // when it comes out, it closes all task actors in pool
	scalar.deprovisionActors(len(scalar.pool))
	log.Debug("scalar exited")
	scalar.closedSig <- true
}

Task Actor

它也是一个actor,它的职责是执行添加到它的channel中的任务

type TaskActor struct {
	id int
	closeSig chan bool
	wg *sync.WaitGroup
	tasks chan entities.Task
	tracker *tracker.Tracker
}

// add task only if channel has space
func (a *TaskActor) AddTask(task entities.Task) error {
	if len(a.tasks) >= taskQueueSize {
		return errors.New("filled queue")
	}
  // task added to channel
	a.tasks <- task
	return nil
}

func (a *TaskActor) Start() {
	defer a.wg.Done()
	a.wg.Add(1)
	log.Debug("starting actor :%d", a.id)

  for task := range a.tasks{
		task.Execute()
		a.tracker.GetTrackerChan() <- tracker.CreateCounterTrack(tracker.Task, tracker.Completed)
	}
	log.Debug("stopped actor :%d", a.id)
	a.closeSig <- true
}

func (a *TaskActor) Stop() {
  // closing task channel
	close (a.tasks)
	<- a.closeSig
}

Benchmarks

我们模拟了一个Web服务器。

  • 100K请求以2毫秒的间隔线性发送
  • 当时钟在前30秒的时间和[50-100)持续30秒钟内时钟,每个请求持续[0,50)〜25米秒〜25米秒。
  • 这模拟了我们在下游服务中突然变化的情况。我们希望保持我们的吞吐量检查,以便不增加任何任务的等待时间
func TestIOSimulationSystem(t *testing.T) {
  // creating a actor system
	ioSimSystem := CreateActorSystem("io_sim", &actor.Config{
		MinActor: 10,
		MaxActor: 100,
		AutoScale: actor.AutoScale{
			UpscaleQueueSize:   100,
			DownscaleQueueSize: 10,
		},
	})

	for i := 0; i < 100000; i += 1 {
		ioSimSystem.SubmitTask(CreateNumberPrinterTask(i))
		<-time.After(2 * time.Millisecond)
	}

	shutdown([]*ActorSystem{ioSimSystem})
}

type SimIOTask struct {
	num int
}

func (t *SimIOTask) Execute() {
	x := 0
	if time.Now().Second() > 30 {
		x = 100
	}
	duration := time.Duration(x+rand.Intn(75)) * time.Millisecond
	<- time.After(duration)
}

参考链接: 1、https://medium.com/better-programming/implementing-the-actor-model-in-golang-3579c2227b5e

© 2014 - 2022 Lionel's Blog

Powered by Hugo with theme Dream.