TechnicalWriting

Scott Ralph's Personal Blog about Scala, Spark, Big Data, ML, and AI


Project maintained by scottralph Hosted on GitHub Pages — Theme by mattgraham

Dec 03, 2020, A Simple Scala AKKA Job Scheduling Actor

The Problem

I wanted to make a simple Scala AKKA hello-world example that was not trivial, showed at least a skeleton of something useful, but also simple.

The example I settled on was a simple job scheduler that could create a new TaskWorker, send it a message to start work, and receive messages about the status of the work of the Task.

For completeness the project is built using the following SBT file:

name := "JobScheduler"
version := "0.1"

scalaVersion := "2.13.2"

val AkkaTypedVersion = "2.6.10"
val AkkaHttpVersion = "10.2.1"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % AkkaTypedVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaTypedVersion,
)

You can omit the HTTP library, but that is quite useful for other things too.

The main app is relatively simple:

package jobcoordinator

import akka.actor.typed.{ActorSystem}

object Main extends App {
  println("Creating Actor System...")
  val actor = ActorSystem[TaskSchedulerMessage](TaskScheduler(), "task-scheduler")
  actor ! TaskOneRequest("a-task-id")
}

This creates the actor system, which accepts messages of type TaskSchedulerMessage. It is a common pattern to create a trait for all messages for which an actor is able to receive.

The first set of messages are two abstract tasks that I will call “task1” and “task2”.

These messages are:

sealed trait TaskSchedulerMessage;
case class TaskOneRequest(taskId: String) extends TaskSchedulerMessage
case class TaskTwoRequest(taskId: String) extends TaskSchedulerMessage

The line

actor ! TaskOneRequest("a-task-id")

simply sends a TaskOneRequest message.

We take some taskId as a descriptor, keeping track of the outstanding tasks. Note that the messages are all case classes as they are all immutable and serializable by definition.

The TaskScheduler must also receive status messages back, so these messages also inherit from that same trait.

case class TaskStartedResponse(taskId: String) extends TaskSchedulerMessage
case class TaskCompletedResponse(taskId: String) extends TaskSchedulerMessage

Now we can introduce the TaskScheduler

Task Scheduler

The imports are simple:

package jobcoordinator

import akka.actor.typed.Behavior
import akka.actor.typed.javadsl.{ActorContext, Behaviors}
import jobcoordinator.TaskOneWorker.Start

The last import defines the message for starting the task.

Creating a Task Scheduler is done with the following object:

object TaskScheduler {
  def apply() : Behavior[TaskSchedulerMessage] =
    Behaviors.setup[TaskSchedulerMessage](context =>
       new TaskScheduler(context).waitForMessage)
}

The setup is used to define an actor behavior, defined in the class:

class TaskScheduler(context: ActorContext[TaskSchedulerMessage]) {
  val waitForMessage: Behavior[TaskSchedulerMessage] = {
    Behaviors.receiveMessage[TaskSchedulerMessage] {
      case TaskOneRequest(taskId) => {
        println("Am in task-scheduler waitForMessage")
        val worker = context.spawn(TaskOneWorker(taskId), "task-1")
        worker ! Start(context.getSelf, taskId)
        Behaviors.same
      }
      case TaskStartedResponse(taskId) => {
        println("Got a task started")
        Behaviors.same
      }
      case TaskCompletedResponse(taskId) => {
        println("Got a task completed")
        Behaviors.same
      }
    }
  }
}

The scheduler behavior is called waitForMessage as it only defines the behaviors of receiving messags. For brevity only tasks of type one are supported, but adding more is simple.

The first case-class defines what happens when a TaskOneRequest is received:

The other two messages are simple responses to notifications about the starting and completion of the task.

Each of the message cases returns

Behaviors.same

indicating that the behavior of the actor continues to be the same upon returning control to the ActorSystem. If we wanted to stop the TaskScheduler, we could return

Behaviors.stopped

perhaps as a response to a shutdown message (as it is shown the program will not exit.)

Now let’s look at the Worker

TaskOneWorker

package jobcoordinator

import akka.actor.typed.{ActorRef, Behavior }
import akka.actor.typed.javadsl.{ ActorContext,  Behaviors }
import jobcoordinator.TaskOneWorker.TaskWorkerCommand

object TaskOneWorker {

  sealed trait TaskWorkerCommand
  case class Start(replyTo : ActorRef[TaskSchedulerMessage],
    args: String) extends TaskWorkerCommand

  def apply(taskId: String): Behavior[TaskWorkerCommand] = {
    Behaviors.setup[TaskWorkerCommand](context => new
        TaskOneWorker(context, taskId).waitForMessage)
  }
}

The first part just defines the messages that the worker can handle. The Start message has an ActorRef parameter as mentioned earlier. The args field could be anything for running the task – we are just passing the taskId.

class TaskOneWorker(context : ActorContext[TaskWorkerCommand],
    taskId: String) {

  import TaskOneWorker._

  def run(args: String): Unit = {
  }

  val waitForMessage: Behavior[TaskWorkerCommand] =
    Behaviors.receiveMessage[TaskWorkerCommand] {
      case Start(replyTo, args) => {
        replyTo ! TaskStartedResponse(taskId)
        this.run(args)
        replyTo ! TaskCompletedResponse(taskId)
        Behaviors.same
      }
    }
}

The behavior of the worker is simple and consists of:

And that’s it! The output demonstrates that the actors are communicating correctly:

Creating Actor System...
Am in task-scheduler waitForMessage
Got a task started
Got a task completed