logo

Threading

Overview

The sys::Thread class is used to manage a thread of execution. Under the covers Fan threads are mapped to Java or .NET threads, which in turn are theoretically mapped to operating system threads. Fan threads are based on a concurrency model with no shared mutable state.

Current Thread

You can get a reference the currently running thread using the Thread.current method. For example to dump the stack trace of the current thread:

Thread.current.trace

Thread Lifecycle

The thread lifecycle is composed of three states:

  • New: a thread has been created via Thread.make, but not yet started
  • Running: a thread is actively running
  • Dead: the thread has been stopped

The first two states new and running are the active state which means the thread:

Once the thread enters the dead state it is no longer mapped by name.

A thread is created with an optional name and an optional run method. If name is provided then it must be unique to the VM and that name can be used to lookup the thread via Thread.find. If name is not provided, then a name is auto-generated. If run is provided, then it is the function invoked to execute the thread - otherwise you must subclass Thread and override the run() method. If a function is provided then it must be immutable (it cannot capture any state from the creating thread).

A thread is started with the Thread.start method. A thread runs until either it's run function terminates or someone calls the Thread.stop method. The stop method only sets a flag, it does not immediately stop the thread. The thread doesn't actually stop until it comes to an interruptible point and rechecks the flag.

Join

The simplest way to communicate with a thread is to use Thread.join to block until the thread terminates. The join method will return the result of the thread's run method. This object can be mutable if desired, but only the first joined thread can access the result. A simple example:

t := Thread.make(null) |->Obj| { Thread.sleep(5sec); return "hi!" }
t.start
echo("Waiting...")
echo(t.join)

The code above will spawn a thread which sleeps for 5sec then returns a string message. The calling thread will block for 5sec, then print the string "hi" to the console.

Message Passing

A more sophisticated way to communicate state is via messaging. Every thread has a message queue which may be used to send and receive messages. The messages themselves must be either immutable or serializable to ensure thread safety.

You can send an async message via the Thread.sendAsync method. An async message is placed on the thread's message queue and immediately returns. It is a "fire and forget" message. If the function processing the message returns a result or throws an exception it is ignored.

You can send a sync message via the Thread.sendSync method. A sync message will block the caller until the thread processes the message. The result is returned to the caller, or if an exception was raised processing the message, then that exception is raised to the caller.

Threads which call other threads via sendSync should never receive messages themselves as this might lead to deadlocks. Best practice is to design service threads using strictly asynchronous messaging, and keep synchronous messaging on client threads which don't service requests themselves.

Processing Messages

Threads which wish to process their message queue, must provide a run function which enters into a main loop via the Thread.loop method. The loop method takes a receive callback function, which is invoked whenever a message is ready to be processed. A simple messaging example:

static Void main()
{
  r := Thread.make("reflector") |Thread t|
  {
    t.loop |Obj msg->Obj|
    {
      echo("reflector received $msg")
      return msg
    }
  }
  r.start

  for (i:=0; i<5; ++i)
  {
    Thread.sleep(500ms)
    echo("ping -> " + r.sendSync(i))
  }
}

Let's dissect this code. The reflector thread is our "server thread" which accepts messages, prints them to the console, then returns them back to the caller. The thread is created using two nested closures. The top closure is the run method itself, which does only one thing - it enters the main loop. The function passed to the main loop is the inner closure which echoes and returns the message. If you find the closures confusing, we could write that same functionality using two additional methods:

static Void run(Thread t) { t.loop(&process) }
static Obj process(Obj msg) { echo("reflector received $msg"); return msg }
static Void main()
{
  reflector := Thread.make("reflector", &run)
  reflector.start
  ...

Flow Control

There are various approaches to flow control, but for now Fan uses a simple mechanism. Every thread has a max queue size. If an attempt is made to post a message via either sendSync or sendAsync to a full queue, then the calling thread is blocked until there is space on the queue. In the future we probably want a more sophisticated approach to flow control - maybe with some deadlock detection.

Timers

The sendLater method can be used to setup a timer. Timers post back to the message queue when they expire. You can use sendLater to setup a one-shot timer or a repeating timer. Example:

x := Thread.make(null) |Thread t|
{
  t.sendLater(1sec, "1sec one-shot")
  t.sendLater(2sec, "3sec one-shot")
  t.sendLater(1sec, "1sec repeating", true)
  t.loop |Obj msg| { echo(msg) }
}
x.start.join

The sendLater method returns an object called the ticket which can be pass to cancelLater method to cancel the timer:

ticket := sendLater(10sec, "hi")
...
cancelLater(ticket)

Thread Locals

Every thread has thread locals - a string/object map for storing thread global variables. Thread locals are accessed using the Thread.locals method. To prevent naming collisions, you should prefix your map keys with your pod name:

// store a thread local
Thread.locals["acme.global"] = "hum bug"

// get a thread local
Thread.locals["acme.global"]