In POSIX threads, mutex locks can only be unlocked by the same thread
that locked them. In Pike any thread can unlock a locked mutex.
10.1 Starting a thread
Starting a thread is very easy. You simply call thread_create with a function
pointer and any arguments it needs and that function will be executed in a
separate thread. The function thread_create will return immediately and both
the calling function and the called function will execute at the same time. Example:
This may all seem very simple, but there are a few complications to
watch out for:
void foo(int x)
{
for(int e=0;e<5;e++)
{
sleep(1);
write("Hello from thread "+x+".\n");
}
}
int main()
{
thread_create(foo, 2);
thread_create(foo, 3);
foo(1);
}
This is quite harmless as long as it is only used from one thread at a time,
but if two threads call it it at the same time, there is a slight chance that
both threads will discover that map[i] is zero and both threads will
then do map[i]=({j}); and one value of j will be lost.
This type of bug can be extremely hard to debug.
The above problem can be solved with the help of Mutexes and Condition
variables. Mutexes are basically a way to keep other threads out while a task
is being performed. Conditions, or condition variables, are used to inform
other threads that they don't have to wait any longer. Pike also provides
two different kinds of pipelines to send data from one thread to another, which
makes it very simple to write threaded programs. Let's look at an example:
void mapadd(mapping m, int i, int j)
{
if(map[i])
map[i]+=({j});
else
map[i]=({j});
}
This is an example of a simple grep-like program. It looks for the string
given as first argument to the program in the files given as the rest
of the arguments. Don't worry if you do not understand it yet. Read the
descriptions of the functions and classes below and come back and read
this example again.
#!/usr/local/bin/pike
import Thread; // We need fifos
inherit Fifo; // Fifo used to supply workers
inherit Fifo : ended; // Fifo used to wait for workers
void worker(string lookfor)
{
while(string file=Fifo::read())
{
int linenum=1;
object o=Stdio.FILE(file,"r");
while(string line=o->gets())
{
if(search(line, lookfor) >=0)
write(sprintf("%s:%d: %s\n",file, linenum, line));
linenum++;
}
}
ended::write(0);
}
int main(int argc, array(string) argv)
{
for(int e=0;e<4;e++) // Start workers
thread_create(worker,argv[e]);
for(int e=2;e<argc;e++) // Feed workers
Fifo::write(argv[1]);
for(int e=0;e<4;e++) // Tell workers to die
Fifo::write(0);
for(int e=0;e<4;e++) // Wait for workers to die
ended::read();
exit(0);
}10.2 Threads reference section
This section describes all thread-related functions and classes.
/* This simple program can be used to exchange data between two
* programs. It is similar to Thread.Fifo, but can only hold one
* element of data.
*/
inherit Thread.Mutex : r_mutex;
inherit Thread.Mutex : w_mutex;
object r_lock=r_mutex::lock();
object w_lock;
mixed storage;
void write(mixed data)
{
w_lock=w_mutex::lock();
storage=data;
destruct(r_lock);
}
mixed read()
{
mixed tmp;
r_lock=r_mutex::lock();
tmp=storage;
storage=0;
destruct(w_lock);
return tmp;
}// This program implements a fifo that can be used to send
// data between two threads.
inherit Thread.Condition : r_cond;
inherit Thread.Condition: w_cond;
inherit Thread.Mutex: lock;
array buffer = allocate(128);
int r_ptr, w_ptr;
int query_messages() { return w_ptr - r_ptr; }
// This function reads one mixed value from the fifo.
// If no values are available it blocks until a write has been done.
mixed read()
{
mixed tmp;
// We use this mutex lock to make sure no write() is executed
// between the query_messages and the wait() call. If it did
// we would wind up in a deadlock.
object key=lock::lock();
while(!query_messages()) r_cond::wait(key);
tmp=buffer[r_ptr++ % sizeof(buffer)];
w_cond::signal();
return tmp;
}
// This function pushes one mixed value on the fifo.
// If the fifo is full it blocks until a value has been read.
void write(mixed v)
{
object key=lock::lock();
while(query_messages() == sizeof(buffer)) w_cond::wait(key);
buffer[w_ptr++ % sizeof(buffer)]=v;
r_cond::signal();
}
void wait(object mutex_key);
object(Thread.Fifo) Thread.Fifo();
object(Thread.Fifo) Thread.Fifo(int size);
#!/usr/local/bin/pike
/* A very small threaded httpd capable of fetching files only. * Written by Fredrik Hübinette as a demonstration of Pike */
import Thread;
inherit Stdio.Port;
/* number of bytes to read for each write */
#define BLOCK 16384
/* Where do we have the html files ? */
#define BASE "/home/hubbe/pike/src/"
/* File to return when we can't find the file requested */
#define NOFILE "/home/hubbe/www/html/nofile.html"
/* Port to open */
#define PORT 1905
/* Number of threads to start */
#define THREADS 5
// There will be one of these for each thread
class worker
{
inherit Stdio.FILE : socket; // For communication with the browser
inherit Stdio.File : file; // For reading the file from disc
void create(function accept)
{
string cmd, input, tmp;
while(1)
{
socket::close(); // Close previous connection
file::close();
object o=accept(); // Accept a connection
if(!o) continue;
socket::assign(o);
destruct(o);
// Read request
sscanf(socket::gets(),"%s %s%*[\012\015 \t]",cmd, input);
if(cmd!="GET")
{
werror("Only method GET is supported.\n");
continue;
}
// Open the requested file
sscanf(input,"%*[/]%s",input);
input=BASE+combine_path("/",input);
if(!file::open(input,"r"))
{
if(!file::open(NOFILE,"r"))
{
werror("Couldn't find default file.\n");
continue;
}
}
// Copy data to socket
while(socket::write(file::read(BLOCK))==BLOCK);
}
}
};
int main(int argc, array(string) argv)
{
werror("Starting minimal threaded httpd\n");
// Bind the port, don't set it nonblocking
if(!bind(PORT))
{
werror("Failed to open socket (already bound?)\n");
return 17;
}
// Start worker threads
for(int e=1;e<THREADS;e++) thread_create(worker,accept);
worker(accept);
}
As stated in the beginning of this chapter; Pike threads are only available on some UNIX systems. The above example does not work if your system does not have threads.