
Threading
Overview
The sys::Thread
class is used to manage a thread of execution. Under the covers Fan threads are mapped to Java or .NET threads, which in turn are theoretically mapped to operating system threads. Fan threads are based on a concurrency model with no shared mutable state.
Current Thread
You can get a reference the currently running thread using the Thread.current
method. For example to dump the stack trace of the current thread:
Thread.current.trace
Thread Lifecycle
The thread lifecycle is composed of three states:
- New: a thread has been created via
Thread.make
, but not yet started - Running: a thread is actively running
- Dead: the thread has been stopped
The first two states new and running are the active state which means the thread:
- Is globally mapped by
Thread.name
Thread.find
will find itThread.list
will list it
Once the thread enters the dead state it is no longer mapped by name.
A thread is created with an optional name and an optional run method. If name is provided then it must be unique to the VM and that name can be used to lookup the thread via Thread.find
. If name is not provided, then a name is auto-generated. If run is provided, then it is the function invoked to execute the thread - otherwise you must subclass Thread
and override the run()
method. If a function is provided then it must be immutable (it cannot capture any state from the creating thread).
A thread is started with the Thread.start
method. A thread runs until either it's run function terminates or someone calls the Thread.stop
method. The stop
method only sets a flag, it does not immediately stop the thread. The thread doesn't actually stop until it comes to an interruptible point and rechecks the flag.
Join
The simplest way to communicate with a thread is to use Thread.join
to block until the thread terminates. The join
method will return the result of the thread's run method. This object can be mutable if desired, but only the first joined thread can access the result. A simple example:
t := Thread.make(null) |->Obj| { Thread.sleep(5sec); return "hi!" } t.start echo("Waiting...") echo(t.join)
The code above will spawn a thread which sleeps for 5sec then returns a string message. The calling thread will block for 5sec, then print the string "hi" to the console.
Message Passing
A more sophisticated way to communicate state is via messaging. Every thread has a message queue which may be used to send and receive messages. The messages themselves must be either immutable or serializable to ensure thread safety.
You can send an async message via the Thread.sendAsync
method. An async message is placed on the thread's message queue and immediately returns. It is a "fire and forget" message. If the function processing the message returns a result or throws an exception it is ignored.
You can send a sync message via the Thread.sendSync
method. A sync message will block the caller until the thread processes the message. The result is returned to the caller, or if an exception was raised processing the message, then that exception is raised to the caller.
Threads which call other threads via sendSync
should never receive messages themselves as this might lead to deadlocks. Best practice is to design service threads using strictly asynchronous messaging, and keep synchronous messaging on client threads which don't service requests themselves.
Processing Messages
Threads which wish to process their message queue, must provide a run function which enters into a main loop via the Thread.loop
method. The loop
method takes a receive callback function, which is invoked whenever a message is ready to be processed. A simple messaging example:
static Void main() { r := Thread.make("reflector") |Thread t| { t.loop |Obj msg->Obj| { echo("reflector received $msg") return msg } } r.start for (i:=0; i<5; ++i) { Thread.sleep(500ms) echo("ping -> " + r.sendSync(i)) } }
Let's dissect this code. The reflector thread is our "server thread" which accepts messages, prints them to the console, then returns them back to the caller. The thread is created using two nested closures. The top closure is the run method itself, which does only one thing - it enters the main loop. The function passed to the main loop is the inner closure which echoes and returns the message. If you find the closures confusing, we could write that same functionality using two additional methods:
static Void run(Thread t) { t.loop(&process) } static Obj process(Obj msg) { echo("reflector received $msg"); return msg } static Void main() { reflector := Thread.make("reflector", &run) reflector.start ...
Flow Control
There are various approaches to flow control, but for now Fan uses a simple mechanism. Every thread has a max queue size. If an attempt is made to post a message via either sendSync
or sendAsync
to a full queue, then the calling thread is blocked until there is space on the queue. In the future we probably want a more sophisticated approach to flow control - maybe with some deadlock detection.
Timers
The sendLater
method can be used to setup a timer. Timers post back to the message queue when they expire. You can use sendLater
to setup a one-shot timer or a repeating timer. Example:
x := Thread.make(null) |Thread t| { t.sendLater(1sec, "1sec one-shot") t.sendLater(2sec, "3sec one-shot") t.sendLater(1sec, "1sec repeating", true) t.loop |Obj msg| { echo(msg) } } x.start.join
The sendLater
method returns an object called the ticket which can be pass to cancelLater
method to cancel the timer:
ticket := sendLater(10sec, "hi") ... cancelLater(ticket)
Thread Locals
Every thread has thread locals - a string/object map for storing thread global variables. Thread locals are accessed using the Thread.locals
method. To prevent naming collisions, you should prefix your map keys with your pod name:
// store a thread local Thread.locals["acme.global"] = "hum bug" // get a thread local Thread.locals["acme.global"]