Previous chapter To contents Next chapter

Chapter 10, Threads

Threads are used to run several Pike functions at the same time without having to start several Pike processes. Using threads often simplifies coding and because the threads are within the same process, data can be shared or sent to other threads very fast. Threads are not supported on all systems, you may test if you have thread support with the preprocessor construction #if constant(thread_create). Pike needs POSIX or UNIX thread support when compiled to support threads.

10.1 Starting a thread

Starting a thread is very easy. You simply call thread_create with a function pointer and any arguments it needs and that function will be executed in a separate thread. The function thread_create will return immediately and both the calling function and the called function will execute at the same time. Example:
void foo(int x)
{
    for(int e=0;e<5;e++)
    {
        sleep(1);
        write("Hello from thread "+x+".\n");
    }
}

int main()
{
    thread_create(foo, 2);
    thread_create(foo, 3);
    foo(1);
}
This may all seem very simple, but there are a few complications to watch out for:
Accessing unlocked data
Look at this code:
void mapadd(mapping m, int i, int j)
{
    if(map[i])
        map[i]+=({j});
    else
        map[i]=({j});
}
This is quite harmless as long as it is only used from one thread at a time, but if two threads call it it at the same time, there is a slight chance that both threads will discover that map[i] is zero and both threads will then do map[i]=({j}); and one value of j will be lost. This type of bug can be extremely hard to debug. The above problem can be solved with the help of Mutexes and Condition variables. Mutexes are basically a way to keep other threads out while a task is being performed. Conditions, or condition variables, are used to inform other threads that they don't have to wait any longer. Pike also provides two different kinds of pipelines to send data from one thread to another, which makes it very simple to write threaded programs. Let's look at an example:
#!/usr/local/bin/pike
import Thread; // We need fifos
inherit Fifo; // Fifo used to supply workers
inherit Fifo : ended; // Fifo used to wait for workers

void worker(string lookfor)
{
    while(string file=Fifo::read())
    {
        int linenum=1;
        object o=Stdio.FILE(file,"r");
        while(string line=o->gets())
        {
            if(search(line, lookfor) >=0)
                write(sprintf("%s:%d: %s\n",file, linenum, line));

            linenum++;
        }
    }
    ended::write(0);
}

int main(int argc, array(string) argv)
{
    for(int e=0;e<4;e++) // Start workers
        thread_create(worker,argv[e]);
    for(int e=2;e<argc;e++) // Feed workers
        Fifo::write(argv[1]);
    for(int e=0;e<4;e++) // Tell workers to die
        Fifo::write(0);
    for(int e=0;e<4;e++) // Wait for workers to die
        ended::read();
    exit(0);
}
This is an example of a simple grep-like program. It looks for the string given as first argument to the program in the files given as the rest of the arguments. Don't worry if you do not understand it yet. Read the descriptions of the functions and classes below and come back and read this example again.
Deadlocks
Deadlocks arise when two threads are waiting for each other to do something. This bug can often arise when several threads need access to a number of resources such as files or other I/O devices. What may happen is that one thread has locked device #1 and is trying to lock device #2 while another thread has locked device #2 and is trying to lock device #1. This type of bug is generally easier to find, but may require a lot of work to fix.

10.2 Threads reference section

This section describes all thread-related functions and classes.

FUNCTION
Thread.thread_create - create a thread

SYNTAX
object thread_create(function f, mixed ... args);

DESCRIPTION
This function creates a new thread which will run simultaneously to the rest of the program. The new thread will call the function f with the arguments args. When f returns the thread will cease to exist. All Pike functions are 'thread safe' meaning that running a function at the same time from different threads will not corrupt any internal data in the Pike process. The returned value will be the same as the return value of this_thread() for the new thread.

NOTE
This function is only available on systems with POSIX or UNIX threads support.

SEE ALSO
Thread.Mutex, Thread.Condition and Thread.this_thread

FUNCTION
Thread.this_thread - return thread id

SYNTAX
object this_thread();

DESCRIPTION
This function returns the object that identifies this thread.

SEE ALSO
Thread.thread_create

FUNCTION
Thread.all_threads - return all thread ids

SYNTAX
array(object) all_threads();

DESCRIPTION
This function returns an array with the thread ids of all threads.

SEE ALSO
Thread.thread_create

CLASS
Thread.Mutex - mutex locks

DESCRIPTION
Thread.Mutex is a pre-compiled Pike program that implements mutual exclusion locks. Mutex locks are used to prevent multiple threads from simultaneously execute sections of code which access or change shared data. The basic operations for a mutex is locking and unlocking, if a thread attempts to lock an already locked mutex the thread will sleep until the mutex is unlocked.

NOTE
Mutex locks are only available on systems with POSIX or UNIX threads support.

In POSIX threads, mutex locks can only be unlocked by the same thread that locked them. In Pike any thread can unlock a locked mutex.

EXAMPLE
/* This simple program can be used to exchange data between two
 * programs. It is similar to Thread.Fifo, but can only hold one
 * element of data.
 */
inherit Thread.Mutex : r_mutex;
inherit Thread.Mutex : w_mutex;
object r_lock=r_mutex::lock();
object w_lock;
mixed storage;

void write(mixed data)
{
    w_lock=w_mutex::lock();
    storage=data;
    destruct(r_lock);
}

mixed read()
{
    mixed tmp;
    r_lock=r_mutex::lock();
    tmp=storage;
    storage=0;
    destruct(w_lock);
    return tmp;
}

METHOD
Thread.Mutex.lock - lock the mutex

SYNTAX
object lock();

DESCRIPTION
This function attempts to lock the mutex, if the mutex is already locked the current thread will sleep until the lock is unlocked by some other thread. The value returned is the 'key' to the lock. When the key is destructed or has no more references the lock will automatically be unlocked. The key will also be destructed if the lock is destructed.

METHOD
Thread.Mutex.trylock - try to lock the mutex

SYNTAX
object trylock();

DESCRIPTION
This function performs the same operation as lock(), but if the mutex is already locked zero will be returned instead of sleeping until the lock is unlocked.

CLASS
Thread.Condition - condition variables

DESCRIPTION
Thread.Condition is a pre-compiled Pike program that implements condition variables. Condition variables are used by threaded programs to wait for events happening in other threads.

NOTE
Condition variables are only available on systems with POSIX or UNIX threads support.

EXAMPLE
// This program implements a fifo that can be used to send
// data between two threads.
inherit Thread.Condition : r_cond;
inherit Thread.Condition: w_cond;
inherit Thread.Mutex: lock;

array buffer = allocate(128);
int r_ptr, w_ptr;

int query_messages() { return w_ptr - r_ptr; }

// This function reads one mixed value from the fifo.
// If no values are available it blocks until a write has been done.
mixed read()
{
    mixed tmp;
    // We use this mutex lock to make sure no write() is executed
    // between the query_messages and the wait() call. If it did
    // we would wind up in a deadlock.
    object key=lock::lock();
    while(!query_messages()) r_cond::wait(key);
    tmp=buffer[r_ptr++ % sizeof(buffer)];
    w_cond::signal();
    return tmp;
}

// This function pushes one mixed value on the fifo.
// If the fifo is full it blocks until a value has been read.
void write(mixed v)
{
    object key=lock::lock();
    while(query_messages() == sizeof(buffer)) w_cond::wait(key);
    buffer[w_ptr++ % sizeof(buffer)]=v;
    r_cond::signal();
}

SEE ALSO
Thread.Mutex

METHOD
Thread.Condition.wait - wait for condition

SYNTAX
void wait();
void wait(object mutex_key);

DESCRIPTION
This function makes the current thread sleep until the condition variable is signalled. The optional argument should be the 'key' to a mutex lock. If present the mutex lock will be unlocked before waiting for the condition in one atomic operation. After waiting for the condition the mutex referenced by mutex_key will be re-locked.

SEE ALSO
Thread.Mutex->lock

METHOD
Thread.Condition.signal - signal a condition variable

SYNTAX
void signal();

DESCRIPTION
Signal wakes up one of the threads currently waiting for the condition.

BUGS
It sometimes wakes up more than one thread.

METHOD
Thread.Condition.broadcast - signal all waiting threads

SYNTAX
void broadcast();

DESCRIPTION
This function wakes up all threads currently waiting for this condition.

CLASS
Thread.Fifo - first in, first out object

DESCRIPTION
Thread.Fifo implements a fixed length fifo. A fifo is a queue of values and is often used as a stream of data between two threads.

NOTE
Fifos are only available on systems with POSIX threads support.

SEE ALSO
Thread.Queue

METHOD
Thread.Fifo.create - initialize the fifo

SYNTAX
void create(int size);
object(Thread.Fifo) Thread.Fifo();
object(Thread.Fifo) Thread.Fifo(int size);

DESCRIPTION
The function create() is called when the fifo is cloned, if the optional size argument is present it sets how many values can be written to the fifo without blocking. The default size is 128.

METHOD
Thread.Fifo.write - queue a value

SYNTAX
void write(mixed value);

DESCRIPTION
This function puts a value last in the fifo. If there is no more room in the fifo the current thread will sleep until space is available.

METHOD
Thread.Fifo.read - read a value from the fifo

SYNTAX
mixed read();

DESCRIPTION
This function retrieves a value from the fifo. Values will be returned in the order they were written. If there are no values present in the fifo the current thread will sleep until some other thread writes a value to the fifo.

METHOD
Thread.Fifo.size - return number of values in fifo

SYNTAX
int size();

DESCRIPTION
This function returns how many values are currently in the fifo.

CLASS
Thread.Queue - a queue of values

DESCRIPTION
Thread.Queue implements a queue, or a pipeline. The main difference between Thread.Queue and Thread.Fifo is that queues will never block in write(), only allocate more memory.

NOTE
Queues are only available on systems with POSIX or UNIX threads support.

SEE ALSO
Thread.Fifo

METHOD
Thread.Queue.write - queue a value

SYNTAX
void write(mixed value);

DESCRIPTION
This function puts a value last in the queue. If the queue is too small to hold the value the queue will be expanded to make room for it.

METHOD
Thread.Queue.read - read a value from the queue

SYNTAX
mixed read();

DESCRIPTION
This function retrieves a value from the queue. Values will be returned in the order they were written. If there are no values present in the queue the current thread will sleep until some other thread writes a value to the queue.

METHOD
Thread.Queue.size - return number of values in queue

SYNTAX
int queue->size();

DESCRIPTION
This function returns how many values are currently in the queue.

CLASS
Thread.thread_local - Thread local variable class

DESCRIPTION
This class allows you to have variables which are separate for each thread that uses it. It has two methods: get and set. A value stored in an instance of thread_local can only be retreived by that same thread.

METHOD
Thread.thread_local.thread_local.set - Set the thread_local value

SYNTAX
mixed set(mixed value);

DESCRIPTION
This sets the value returned by the get method. Note that this value can only be retreived by the same thread. Calling this method does not affect the value returned by get when called by another thread.

METHOD
Thread.thread_local.thread_local.get - Get the thread_local value

SYNTAX
mixed set(mixed value);

DESCRIPTION
This returns the value prevoiusly stored in the thread_local by the set method by this thread.

10.3 Threads example

Let's look at an example of how to work with threads. This program is the same minimal WWW server as in
chapter 9 "File I/O" but it has been re-written to use threads, as you can see it is a lot smaller this way. This is because we can use blocking I/O operations instead of non-blocking and callbacks. This also makes the program much easier to follow:
#!/usr/local/bin/pike

/* A very small threaded httpd capable of fetching files only. * Written by Fredrik Hübinette as a demonstration of Pike */

import Thread;
inherit Stdio.Port;

/* number of bytes to read for each write */
#define BLOCK 16384

/* Where do we have the html files ? */
#define BASE "/home/hubbe/pike/src/"

/* File to return when we can't find the file requested */
#define NOFILE "/home/hubbe/www/html/nofile.html"

/* Port to open */
#define PORT 1905

/* Number of threads to start */
#define THREADS 5

// There will be one of these for each thread
class worker
{
    inherit Stdio.FILE : socket; // For communication with the browser
    inherit Stdio.File : file; // For reading the file from disc

    void create(function accept)
    {
        string cmd, input, tmp;

        while(1)
        {
            socket::close(); // Close previous connection
            file::close();

            object o=accept(); // Accept a connection
            if(!o) continue;
            socket::assign(o);
            destruct(o);

            // Read request
            sscanf(socket::gets(),"%s %s%*[\012\015 \t]",cmd, input);
            if(cmd!="GET")
            {
                werror("Only method GET is supported.\n");
                continue;
            }

            // Open the requested file
            sscanf(input,"%*[/]%s",input);
            input=BASE+combine_path("/",input);
            
            if(!file::open(input,"r"))
            {
                if(!file::open(NOFILE,"r"))
                {
                    werror("Couldn't find default file.\n");
                    continue;
                }
            }

            // Copy data to socket
            while(socket::write(file::read(BLOCK))==BLOCK);
        }
    }
};

int main(int argc, array(string) argv)
{
    werror("Starting minimal threaded httpd\n");

    // Bind the port, don't set it nonblocking
    if(!bind(PORT))
    {
        werror("Failed to open socket (already bound?)\n");
        return 17;
    }

    // Start worker threads
    for(int e=1;e<THREADS;e++) thread_create(worker,accept);
    worker(accept);
}

As stated in the beginning of this chapter; Pike threads are only available on some UNIX systems. The above example does not work if your system does not have threads.


Previous chapter To contents Next chapter