Overview
This is a brief tutorial on how write parallel applications using
POSIX threads. Basically, this document is a short version of DEC's
300+ page manual; it is intended to provide sufficient information to
enable the development of reasonably sophisticated parallel scientific
codes in C.
1 May 1998: |
The code and documentation contained herein now conforms to
POSIX 1003.1c-1995.
The revised code described here has only been tested on Linux systems (LinuxThreads and glibc native threads). |
What are threads?
A thread is a sequence of instructions to be executed within
a program. Normal UNIX processes consist of a single thread of
execution that starts in main(). In other words, each line of your
code is executed in turn, exactly one line at a time. Before threads,
the normal way to achieve multiple instruction sequences (ie, doing
several things at once, in parallel) was to use the fork() and exec()
system calls to create several processes -- each being a single
thread of execution.
In the UNIX operating system, every process has the following:
Earlier this semester, Anu gave a talk on PVM. PVM is based on IPC mechanisms; e.g., it uses sockets to communicate between processes on different machines across a network. On multiprocessor machines, PVM can be configured to use the System V IPCs to achieve communication between processes on different CPUs. One nice side effect of using these standard mechanisms is that PVM code is fairly portable between UNIX platforms.
There are advantages and disadvantages to this method of achieving parallelism. Since you have to arrange for the communication yourself, it is completely clear what data is being shared. On the negative side, IPC mechanisms are implemented as system calls (they require kernel intervention to complete) and involve a fair amount of overhead. It can happen that the communication mechanism itself becomes a processing bottleneck.
In a threaded environment, this model of parallelism is turned on its head. Instead of there being several singly threaded processes, we have one process consisting of multiple threads. In other words, each user process can have several threads of execution -- different parts of your code can be executing simultaneously. The collection of threads form a single process: each thread in the process sees the same virtual address space, files, etc.
This last fact has interesting implications. For example, when one thread opens a file, it is immediately possible for all other threads to access the file. Similarly, if one thread alters the value of a global variable, all other threads instantly see the new value. In a multithreaded process, the cost of communication is negligible. The downside of this feature is that a threaded process cannot exist across machines (because the underlying resources are bound to the local host).
When writing multithreaded applications, you must take the initiative to protect shared resources. For example, consider the code
i = i + 1This statement is well defined for a single thread of execution: get i, add 1, and put the result back. For multiple threads of execution, the statement is ill defined. If 2 or more CPUs attempt to perform this operation simultaneously, the result is indeterminate -- the variable i will get incremented anywhere from one to N times, where N is the number of threads that execute this statement. In a multithreaded environment, it is necessary for the programmer to ensure that access to shared resources is managed in a sane fashion.
Why should you use threads? Well, maybe you shouldn't. Developing a threaded program can take longer than a nonthreaded program. If you don't need the benefits of threads or don't intend to run your code many times, it's probably not worth the trouble. On the other hand, threading your code has several payoffs:
POSIX threads
The pthreads library is a set of C calls that provides a mechanism for
writing multithreaded code. There are three major classes of calls
in this library:
The third class will not be considered in this document. Thread scheduling can not only considerably complicate your algorithm, but it can also get you into trouble when migrating your code from uniprocessor to multiprocessor machines. From what I have seen so far, fiddling with thread priorities isn't worth the trouble in many scientific applications, anyway.
Each thread sees the same global data and files; however, each thread has its own stack and registers (so that variables in the automatic and register storage classes are private).
+-----------------------------------------------------------------------+ | Process | | | | +-------+ +-------------+ +-------------+ +-------------+ | | | Files | | Thread | | Thread | | Thread | | | +-------+ |+-----------+| |+-----------+| |+-----------+| | | || Registers || || Registers || || Registers || | | |+-----------+| |+-----------+| |+-----------+| | | | | | | | | | | .................................................................... | | . | | | | | | . | | . Memory | | | | | | . | | . | +---------+ | | +---------+ | | +---------+ | . | | . +--------+ | | Stack | | | | Stack | | | | Stack | | . | | . | Heap | | | | | | | | | | | | | . | | . +--------+ | | | | | | | | | | | | . | | . | | | | | | | | | | | | . | | . +--------+ | | | | | | | | | | | | . | | . | Static | | | | | | | | | | | | | . | | . +--------+ | | | | | | | | | | | | . | | . | | | | | | | | | | | | . | | . +--------+ | | | | | | | | | | | | . | | . | Code | | +---------+ | | +---------+ | | +---------+ | . | | . +--------+ +-------------+ +-------------+ +-------------+ . | | . . | | .................................................................... | | | +-----------------------------------------------------------------------+ (From the DECthreads manual)
Thread creation and destruction
The first example we'll consider is the usual "Hello, world" program.
The basic flow of the program is as follows:
Time Master Thread | | | | create workers with pthread_create() | | | // \\ workers start up \ / / | | \ | | | | | | | | | workers do their jobs | | | | \ \ / / \\ // workers terminate | | join workers with pthread_join() | Master ThreadStarting from an initial thread in main(), we'll create several worker threads. Each of these will print its message while the original thread waits for them to complete. As each one exits, the master thread will join it and destroy the system resources it uses. Finally, the original thread will call exit() and the whole process will terminate.
1: /****************************************************************** 2: * simple.c -- multithreaded "hello world" 3: * 4: * Author: Mark Hays <hays@math.arizona.edu> 5: */ 6: 7: /* Linux with glibc: 8: * _REENTRANT to grab thread-safe libraries 9: * _POSIX_SOURCE to get POSIX semantics 10: */ 11: #ifdef __linux__ 12: # define _REENTRANT 13: # define _POSIX_SOURCE 14: #endif 15: 16: /* Hack for LinuxThreads */ 17: #ifdef __linux__ 18: # define _P __P 19: #endif 20: 21: #include <pthread.h> 22: 23: #include <string.h> /* for strerror() */ 24: 25: #include <stdio.h> 26: 27: #define NTHREADS 4 28: 29: #define errexit(code,str) \ 30: fprintf(stderr,"%s: %s\n",(str),strerror(code)); \ 31: exit(1); 32: 33: /******** this is the thread code */ 34: void *hola(void * arg) 35: { 36: int myid=*(int *) arg; 37: 38: printf("Hello, world, I'm %d\n",myid); 39: return arg; 40: } 41: 42: /******** this is the main thread's code */ 43: int main(int argc,char *argv[]) 44: { 45: int worker; 46: pthread_t threads[NTHREADS]; /* holds thread info */ 47: int ids[NTHREADS]; /* holds thread args */ 48: int errcode; /* holds pthread error code */ 49: int *status; /* holds return code */ 50: 51: /* create the threads */ 52: for (worker=0; worker<NTHREADS; worker++) { 53: ids[worker]=worker; 54: if (errcode=pthread_create(&threads[worker],/* thread struct */ 55: NULL, /* default thread attributes */ 56: hola, /* start routine */ 57: &ids[worker])) { /* arg to routine */ 58: errexit(errcode,"pthread_create"); 59: } 60: } 61: /* reap the threads as they exit */ 62: for (worker=0; worker<NTHREADS; worker++) { 63: /* wait for thread to terminate */ 64: if (errcode=pthread_join(threads[worker],(void *) &status)) { 65: errexit(errcode,"pthread_join"); 66: } 67: /* check thread's exit status and release its resources */ 68: if (*status != worker) { 69: fprintf(stderr,"thread %d terminated abnormally\n",worker); 70: exit(1); 71: } 72: } 73: return(0); 74: } 75: 76: /* EOF simple.c */
On Linux systems, you compile this program with the command:
prompt$ cc -o simple simple.c -lpthread
Here's a blow-by-blow account of what is going on. The program starts in main() on line 43. It will create NTHREADS threads (NTHREADS is defined on line 27) on line 54, each of which starts executing the function hola() on line 34. The main thread waits for each of the worker threads to terminate by calling pthread_join() for each thread on line 64. Meanwhile, each of the worker threads prints a message and its thread ID and then exits. Once the main thread has reaped all of the workers, the main program exits.
Now for the details. Threads are created with the pthread_create() call. The first argument is a pointer to a pthread_t -- an opaque data structure (i.e., it is a black box) that describes the thread. Each thread must have a unique pthread_t to describe it. In this case, we store the descriptor for each thread in the array threads[] defined on line 46. The second argument to pthread_create() is of type pthread_attr_t and specifies the attributes of the thread being created (scheduling priority, etc). Normally you can use the value NULL for this argument. The third argument is the name of the function in which the thread should start -- this function is expected to return a void *. The last argument is passed untouched to the start routine. In this case, we just pass the thread's number (loop count) to the start routine. In more complicated situations, you can pass a pointer to a block of thread-specific data. pthread_create() returns 0 on success and an error number on failure. If it fails, we print a message with perror() and exit (line 58). As a general rule, you should always check for system and library call errors. When programming with pthreads, it is suicidal not to check for error conditions. Henry Spencer's Sixth Commandment for C programmers sums it up nicely:
If a function be advertised to return an error code in the event of difficulties, thou shalt check for that code, yea, even though the checks triple the size of thy code and produce aches in thy typing fingers, for if thou thinkest "it cannot happen to me," the gods shall surely punish thee for thy arrogance.
The main thread next waits for each of the workers to finish in lines 62-72. It calls pthread_join() to wait for the thread to terminate. The thread we're waiting on is specified in the first argument and its exit code is placed into the second argument (declared on line 49 as type int *). We check the exit code and issue a message (and exit) if it's not what we expect.
The code that each worker thread executes occupies lines 34-40. First, we cast the argument to the start routine to an int (since we passed a pointer to an int to pthread_create()). The thread then prints its message and returns its thread ID. At this point, the thread terminates and is reaped when the master thread calls pthread_join(). You can also have a thread terminate itself with the pthread_exit() call.
The call pthread_join() causes the calling thread to go to sleep until the specified thread terminates. In the examples given in this document, the master thread cannot proceed until all the workers complete their tasks. Once a thread has been joined, its system resources are reclaimed. If you need to terminate a running thread, use pthread_cancel() and pthread_testcancel().
Once you join a thread, you cannot reassign it a new task. The reason is that once a thread has been joined, it has (by definition) terminated. If you need more threads to perform another task, you have to call pthread_create() again.
Aside from returning a thread's exit status, the major function of pthread_join() is to reclaim the system resources used by the terminated thread. If you forget to call pthread_join() in a program that creates many threads, you will run into problems sooner or later (like what happens when you call malloc() many times and never call free()).
Sometimes you do not care about the thread's exit status; in this case, having to call pthread_join() becomes an annoyance. The pthread_detach() function is helpful in such cases. You call pthread_detach() with a pthread_t argument. This argument must be the descriptor for a running thread. After the call completes, it is no longer possible for the rest of the program to reference that thread in any way. The selected thread simply runs to completion, at which point its systems resources are automatically reclaimed. This might be a useful way to implement runtime graphics: you have one of these "free" threads send your data to some graphics program.
The source for simple.c looks a little nastier than it needs to. For example, much of the code is related to error checking of the pthread_xxx calls. To help eliminate some of this clutter, I have written a small library of macros and C code to hide some of these details. The macros and library calls all have the prefix "pt_" to distinguish them from the pthreads calls they wrap. They all have the form:
pt_xxx(object *obj,..., char *msg)The "object *" is always a pointer to a pthreads object (or one of the derived types discussed later). The "char *msg" is a message that is printed in case of an error. When errors occur, your code will exit. Since most pthreads routines only return the EINVAL error, this seems acceptable -- usually it means that you've attempted to operate on an invalid pointer. Let's use these macros to clean things up a bit -- the new code is called simple2.c:
1: /****************************************************************** 2: * simple2.c -- multithreaded "hello world" using pt_xxx macros 3: * 4: * Author: Mark Hays <hays@math.arizona.edu> 5: */ 6: 7: #include "pt.h" 8: 9: #include <stdio.h> 10: 11: #define NTHREADS 4 12: 13: /******** this is the thread code */ 14: pt_addr_t hola(pt_addr_t arg) 15: { 16: printf("Hello, world, I'm %d\n",*(int *) arg); 17: return arg; 18: } 19: 20: /******** this is the main thread's code */ 21: int main(int argc,char *argv[]) 22: { 23: int worker; 24: pthread_t threads[NTHREADS]; /* holds thread info */ 25: int ids[NTHREADS]; /* holds thread id */ 26: int *status; /* holds return code */ 27: 28: /* create the threads */ 29: for (worker=0; worker<NTHREADS; worker++) { 30: ids[worker]=worker; 31: pt_create(&threads[worker], /* thread struct */ 32: hola, /* start routine */ 33: &ids[worker], /* arg to routine */ 34: "pt_create"); /* error message */ 35: } 36: /* reap the threads as they exit */ 37: for (worker=0; worker<NTHREADS; worker++) { 38: 39: /* wait for thread to terminate */ 40: pt_wait(&threads[worker],(pt_addr_t) &status,"pt_wait"); 41: 42: /* check thread's exit status */ 43: if (*status != worker) { 44: fprintf(stderr,"thread %d terminated abnormally\n",worker); 45: exit(1); 46: } 47: } 48: return(0); 49: } 50: 51: /* EOF simple2.c */
What have we done? First, we include pt.h instead of pthread.h on line 7. It is very important to include this file before any other header files. Next, we have changed the declaration of hola() to accept and return an pt_addr_t instead of a void *. The major changes are in main(), though. Instead of calling pthread_create(), we use the macro pt_create(). This is simply a wrapper that handles error checking and typecasting of arguments. Finally, the call to pthread_join() has been replaced with the pt_wait() macro (which does error checking on the underlying pthread_join() call). The following summarizes the usage of these macros:
void pt_create(pthread_t *thread, pt_startroutine_t start, pt_addr_t arg, char *msg) void pt_wait(pthread_t *thread, pt_addr_t *status, char *msg)In pt_wait(), you can pass in NULL for the status argument if you don't care what the thread's return status is.
Not to beat a dead horse, but can we do any better than this? Actually, yes. Having several threads execute the same function in parallel is a very common scenario. The mini-library provides a C function called pt_fork() that combines most of lines 29-47 in a single call. The usage of pt_fork() is:
void pt_fork(int nthreads, pt_startroutine_t start, pt_addr_t arg, pt_addr_t *exitcodes)As in pt_wait(), you can pass in NULL for exitcodes if your threads don't return any useful exit status. Here's pt_fork() in action:
1: /****************************************************************** 2: * simple3.c -- multithreaded "hello world" using pt_fork() 3: * 4: * Author: Mark Hays <hays@math.arizona.edu> 5: */ 6: 7: #include "pt.h" 8: 9: #include <stdio.h> 10: 11: #define NTHREADS 4 12: 13: /******** this is the thread code */ 14: pt_addr_t hola(pt_arg_t *info) 15: { 16: int id=pt_myid(info); 17: 18: printf("Hello, world, I'm %d\n",id); 19: /* stash our id into our slot */ 20: *((int *) pt_data(info) + id)=id; 21: /* no meaningful return value */ 22: return(NULL); 23: } 24: 25: /******** this is the main thread's code */ 26: int main(int argc, char *argv[]) 27: { 28: int i; 29: int retvals[NTHREADS]; 30: 31: /* init retvals */ 32: for (i=0; i<NTHREADS; i++) retvals[i]=-1; 33: 34: /* do it */ 35: pt_fork(NTHREADS, /* # of threads to create */ 36: hola, /* routine they execute */ 37: retvals, /* a piece of global data they all share */ 38: NULL); /* ignore exit codes */ 39: 40: /* check return values */ 41: for (i=0; i<NTHREADS; i++) 42: if (retvals[i]!=i) { 43: fprintf(stderr,"thread %d didn't return a value\n",i); 44: exit(1); 45: } 46: return(0); 47: } 48: 49: /* EOF simple3.c */
The major change in main() is the use of pt_fork() on lines 35-38. It creates NTHREADS threads that each execute hola() and waits for them to complete. Since exitcodes is NULL, the exit values of the worker thread are ignored; instead, each thread places its status directly into the retvals array. This is done to avoid casting the integer status to a void *, which can get hairy on 64 bit machines. The declaration of hola() has also changed. In order to use pt_fork(), the start routine must be declared to accept a single argument -- a pointer to a pt_arg_t structure. Inside the start routine, the following macros operate on this structure (which shouldn't otherwise be manipulated):
To compile simple2 and simple3, type
prompt$ make simple2 simple3They are linked with libpt.a which make automatically builds -- see the Makefile for details. The library source consists of the C files pt.c and ptf77.c.
In the first three examples, there is no shared data; therefore, it can be used as the basis for any communication-free algorithm.
Resource locking
Earlier we noted that the statement i=i+1 did not make
sense in a threaded environment. In order to fix this, it is
necessary to invent some kind of locking mechanism that
serializes access to the variable i. Before getting into this,
let's have a brief look at what sorts of things need to be
locked.
For concreteness, consider the following code fragment and assume that multiple threads will execute func() simultaneously:
int i; /* i is shared */ void func(arg1,arg2) int arg1; /* arg1 is private */ float *arg2; /* arg2 is private, *arg2 is shared */ { int zzz; /* zzz is private (auto storage class) */ static float f; /* f is shared (static storage class) */ }Any thread may freely modify the integers arg1 and zzz because they live on each thread's stack. Threads may freely alter arg2, but not *arg2 or arg2[zzz]. The variables i and f are shared among all threads. Some kind of locking mechanism is needed before i, f, and *arg2 can be accessed in a well-defined way.
Pthreads provides an object called a "mutex" (short for "mutual exclusion") to facilitate this sort of resource locking. Once you create a mutex, there are two things you can do with it:
Let's modify the previous code so that we can properly access i (we'll show how to create mutexes later):
int i; /* i is shared */ pthread_mutex_t ilock; /* controls access to i */ ... void func(arg1,arg2) int arg1; /* arg1 is private */ float *arg2; /* arg2 is private, *arg2 is shared */ { int zzz; /* zzz is private (auto storage class) */ static float f; /* f is shared (static storage class) */ pthread_mutex_lock(&ilock); /* lock i */ something = i++; /* access i */ pthread_mutex_unlock(&ilock); /* unlock i */ }It is up to the programmer to determine exactly what the mutex protects. For example, the mutex ilock could be used to protect i and *arg2. You could have another mutex that serializes access to f. The mutex itself has no idea what its role is in your program -- it is totally up to you to use a mutex to protect shared data.
It is apparent that serializing access to a piece of data can bring all other threads to a screeching halt. Equally apparent is fact that all global data that can be read and written by simultaneously executing threads must be protected by a mutex. In order to minimize the ill effects, it is important to get in and out as quickly as possible. For example, if a thread needs to read some global data, consider copying the data into the thread's private storage instead of keeping a mutex locked for a long period of time. If you need to update a piece of global data, try to calculate the new value with the mutex unlocked -- lock it only when you dump the new value in.
As an example of mutex usage, let's try something more complicated: a parallel matrix multiplication routine. The goal is to multiply an MxN matrix called A by an NxP matrix called B and then store the result into the MxP matrix called C. We could modify the previous example to have each thread do an equal share of the work (this is homework). Instead, we'll have the threads run in a loop that computes single rows of the result, C. This method has certain advantages that will become clear in the discussion below (instead of using the pthread_mutex_xxx routines, we'll use the pt_mutex_xxx wrappers defined in pt.h).
Here's the code:
1: /***************************************************************** 2: * mmult.c -- multithreaded matrix multiplication example 3: * 4: * Author: Mark Hays <hays@math.arizona.edu> 5: */ 6: 7: #include "pt.h" 8: 9: #include <stdio.h> 10: #include <stdlib.h> 11: 12: #define NTHREADS 4 13: 14: /* global data area */ 15: int mwork,nwork,pwork; /* dimensions of matrices */ 16: double **mata,**matb,**matc; /* ptrs to the matrices */ 17: 18: int mdone; /* # of rows already done */ 19: pthread_mutex_t mdone_mutex; /* lock for mdone */ 20: 21: /****************************************************** 22: * return the next row number to do or -1 if we're done 23: */ 24: int nextrow() 25: { 26: int res; 27: 28: pt_mutex_lock(&mdone_mutex,"mutex lck"); /* lock the mutex */ 29: res=mdone++; /* get & incr row cntr */ 30: pt_mutex_unlock(&mdone_mutex,"mutex unlck"); /* unlock the mutex */ 31: return(res<mwork ? res : -1); /* return -1 if done */ 32: } 33: 34: /****************************************************************** 35: * main code for each thread -- it computes rows of the product a*b 36: */ 37: pt_addr_t thread_code(pt_arg_t arg) 38: { 39: int n,p; /* n and p counters */ 40: int m; /* row we're working on */ 41: double sum; /* temporary var */ 42: 43: while ((m=nextrow())!=-1) { /* m=row to do */ 44: for (p=0; p<pwork; p++) { /* p=col to do, do each col */ 45: for (sum=0.0,n=0; n<nwork; n++) { 46: sum += mata[m][n]*matb[n][p]; /* compute the elt */ 47: } 48: matc[m][p]=sum; /* save result in c */ 49: } 50: } 51: return(NULL); 52: } 53: 54: /*********************************** 55: * this is the main multiply routine 56: */ 57: void domult(int m,int n,int p, 58: double **a, /* a: m x n matrix */ 59: double **b, /* b: n x p matrix */ 60: double **c) /* c: m x p matrix=a*b */ 61: { 62: mdone=0; mwork=m; nwork=n; pwork=p; /* init glb state info */ 63: mata=a; matb=b; matc=c; 64: 65: pt_fork(NTHREADS,thread_code,NULL,NULL); /* run code in parallel */ 66: } 67: 68: /************************** 69: * allocate an m x n matrix 70: */ 71: double **newmat(int m,int n) 72: { 73: double **res=(double **) malloc(m*sizeof(double *)); 74: int i; 75: 76: for (i=0; i<m; i++) 77: res[i]=(double *) malloc(n*sizeof(double)); 78: return(res); 79: } 80: 81: /************** 82: * main program 83: */ 84: int main(int argc,char *argv[]) 85: { 86: double **a=newmat(2,2),**b=newmat(2,2),**c=newmat(2,2); 87: 88: a[0][0]=1.0; a[0][1]=2.0; /* initialize a */ 89: a[1][0]=2.0; a[1][1]=1.0; 90: 91: b[0][0]=1.0; b[0][1]=2.0; /* initialize b */ 92: b[1][0]=3.0; b[1][1]=4.0; 93: 94: pt_mutex_init(&mdone_mutex,"init mutex"); /* initialize the mutex */ 95: 96: domult(2,2,2,a,b,c); /* do the mmult: ans is */ 97: printf("%8.4f%8.4f\n",c[0][0],c[0][1]); /* 7.0000 10.0000 */ 98: printf("%8.4f%8.4f\n",c[1][0],c[1][1]); /* 5.0000 8.0000 */ 99: 100: pt_mutex_destroy(&mdone_mutex,"del mutex"); 101: return(0); 102: } 103: 104: /* EOF mmult.c */
The main() routine in this program simply allocates and initializes the matrices a and b. It also creates the result matrix c. On lines 94, a mutex is created. The first argument is a pointer to the mutex descriptor and the second argument is the error message to print in case of difficulties. To deallocate a mutex, you use pt_mutex_destroy() (see line 100). On line 96 we call the parallel matrix multiply routine and pass the matrix dimensions and the three matrices A, B, and C. Once domult() returns, we print out the result and exit.
The domult() routine looks a lot like the main() from the previous example. After initializing some global state information in lines 62-63, domult() creates the worker threads and then reaps them, just as in the previous example. It is OK to use the global variables mwork, mata, etc. in this example because the individual threads only read these values -- the driver routine domult() is the only thing that writes to them. If you need to call a multithreaded code recursively for some reason, you can make the state information local to the driver routine and pass a pointer to the thread code in the pt_fork() call.
The worker threads simply execute the routine thread_code() in lines 37-52. Each thread calls nextrow() to get the index of the next row of c (aka matc) to be computed. It then calculates all elements of the product along this row. When all rows are done, nextrow() returns -1, which causes the calling thread to terminate. The nextrow() routine gets the next row to be done from the counter mdone and then increments mdone. This operation is protected by the mutex mdone_mutex (created in main()).
Why compute a whole row of the product? We could just as easily have a next_element() function that returns the next element to be done. However, experience indicates that the more work a thread can do independently, the less time it sits around waiting to lock mutexes. For an NxN matrix, the modified algorithm would have to lock and unlock the mutex N^2 times, as opposed to N times for the present algorithm. Not only is there more overhead due to making the extra mutex calls, but the number of contentions for access to mdone also increases drastically. Generally, you want to reduce contention wherever possible -- reducing the number of mutex operations is a good place to start.
Why not eliminate the mutex altogether? In fact, if each thread does 1/NTHREADS share of the mwork rows, we don't need a mutex at all and this becomes a contrived example. However, there is some benefit (in the real world) to this methodology. If you set NTHREADS to 4 on a 4 CPU machine and you are the only person using this machine, eliminating the mutex might be satisfactory (if no other jobs were executed while you were doing parallel stuff). That's the perfect world. In the real world, other users will run jobs; for example, emacs, tex, netscape, and doom (during our creative time). If you are lucky, you might get 3 CPUs for most of the time. If you set NTHREADS to 4, the speed of the multiply is bound by the slowest thread (since each must do 1/4 of the total work). Your only option is to recompile your code for different times of day (based on what you think NTHREADS ought to be).
So what does the mutex do for you? Since each thread just asks "what's the next row?", it is no problem if one thread does less work -- the others will pick up the slack. In other words, this "bag of tasks" algorithm has an unintended load-balancing feature. This is Very Useful.
Libraries and global data
We've seen that it is necessary to protect all global data with
mutexes so that our code runs in a deterministic manner. What
does this imply for libraries? Libraries must clearly follow the
same rules; unfortunately, you don't necessarily know whether or
not a given library makes use of global data. The DEC threads
manual says:
Assume that a call is not thread-safe unless the manpage tells you otherwise.
How can you get burned? As an example consider the malloc() call. This call allocates and returns a pointer to a new chunk of storage; the chunk lives in the shared global data area. The data structures that malloc() uses to keep track of free and allocated blocks, while not directly accessible to the programmer, also live in the global data area. Therefore, if two threads call malloc() without using a mutex, chaos can result. You might think that you could simply wrap all calls to malloc() with a mutex lock/unlock sequence. This will not work. Why? Well, suppose that each thread calls printf():
printf("i=%d s=%s\n",anint,astring);The printf() call makes implicit use of malloc() to allocate the buffer that holds the string you are printing. Worse, the printf() call doesn't know about your malloc mutex (and doesn't use it). So, if 2 or more threads call any combination of printf() and malloc(), your code could coredump.
This example illustrates the basic problems you can get into with libraries. Unfortunately, there is no general solution to the problem. The POSIX threads specification does demand that the calls in the C library (including malloc() and printf()) be thread-safe. For other code (that you might download off the 'net), your only recourse may be to "safen" it by hand.
One test for thread-safeness is to ask whether or not the code in question is re-entrant; that is, whether or not it can directly or indirectly call itself recursively. If a piece of code is re-entrant, it is probably thread safe. By the way, this implies that no FORTRAN code is thread-safe (but see below for a counterexample ;-).
Condition variables
So far, our threads have simply run to completion on a given task. In
fact, the only communication we've seen is the message "we're done."
Sometimes this is not sufficient -- synchronization needs to occur
along the way; for example, a calculation might consist of several
parts, each dependent on the result of the previous one. One thought
is to destroy and respawn new threads for each stage of the
computation. This is certainly possible; however, the pthreads library
provides a more efficient solution.
The POSIX threads standard gives you an object that helps meet this goal: the condition variable. Mutexes are intended for fine-grained control (as in the last example); condition variables are intended for coarse grained control (as described in the previous paragraph). The mutex stands by itself: once you create it, you lock and unlock it to your heart's content. The condition variable does not stand by itself. In fact, condition variables are usually used in conjunction with two other things: a mutex and a regular old global variable (called a "predicate"). First, let's look at some typical condition variable code (using "raw" pthreads calls without error checking):
pthread_mutex_lock(&mutex); /* lock mutex */ while (!predicate) { /* check predicate */ pthread_cond_wait(&condvar,&mutex); /* go to sleep - recheck pred on awakening */ } pthread_mutex_unlock(&mutex); /* unlock mutex */This code has two possible behaviors. If predicate is nonzero, the thread locks the mutex, falls through the loop, and unlocks the mutex. In other words, nothing happens. If predicate is zero, the thread locks the mutex, enters the loop, and calls pthread_cond_wait(). pthread_cond_wait() does the following:
pthread_mutex_lock(&mutex); /* lock the mutex */ predicate=1; /* set the predicate */ pthread_cond_broadcast(&condvar); /* wake everyone up */ pthread_mutex_unlock(&mutex); /* unlock the mutex */The important call is pthread_cond_broadcast() -- it is a wakeup call to all threads waiting on a condition variable. The first thread wakes up still in the call to pthread_cond_wait() -- this call completes after it
Here's a brief summary of the roles of the various pieces:
pthread_mutex_lock(&mutex); /* lock mutex */ while (!predicate) { /* check predicate */ pthread_mutex_unlock(&mutex); /* unlock mutex */ pthread_cond_wait(&condvar); /* go to sleep - recheck pred on awakening */ pthread_mutex_lock(&mutex); /* lock mutex */ } pthread_mutex_unlock(&mutex); /* unlock mutex */Besides being longer, it has a serious problem. If the thread enters the loop, there is a moment of indeterminacy just before pthread_cond_wait() is called. It is possible that another thread might acquire the mutex, set the predicate, and issue the broadcast before pthread_cond_wait() gets called. In other words, the thread goes to sleep when it shouldn't. The basic problem is that the mutex unlock and condition variable wait are not guaranteed to be an atomic operation. If, on the other hand, pthread_cond_wait() unlocks the mutex, the UNIX kernel can guarantee that the calling thread will not miss the broadcast -- being the kernel has its privileges.
A useful pthreads structure
Before moving on to a real example that uses condition variables, let's
define a data structure that repackages some of the condition variable's
strangeness. We will need an object that synchronizes N threads
at a given point in the code. The interface should look something
like:
[calculation 1] /* do first computation */ pt_gate_sync(&gate); /* wait for all threads to finish calcn 1 */ [calculation 2] /* proceed */The basic constraint is that calculation 2 cannot begin until calculation 1 is complete. The overall strategy is simple: N-1 threads will call pthread_cond_wait() on a condition variable and go to sleep. The Nth thread will call pthread_cond_broadcast() and wake the other threads up. At this point, the calculation will continue.
First, let's look at the data structure:
231: typedef struct _pt_gate_t_ { 232: int ngate; 233: int nthreads; 234: pthread_mutex_t mutex; 235: pthread_mutex_t block; 236: pthread_cond_t condvar; 237: pthread_cond_t last; 238: } pt_gate_t; 239: 240: extern void pt_gate_init(pt_gate_t *gate,int nthreads); 241: extern void pt_gate_destroy(pt_gate_t *gate); 242: extern void pt_gate_sync(pt_gate_t *gate);The main data type, pt_gate_t, contains several pthreads objects. There are functions to initialize and destroy this compound object, and a function to perform the behavior specified above. The roles of the various structure members will become clear in a moment.
Now let's look at the C source:
68: void pt_gate_init(pt_gate_t *gate,int nthreads) 69: { 70: gate->ngate=0; gate->nthreads=nthreads; 71: pt_mutex_init( &gate->mutex, "gate: init mutex"); 72: pt_mutex_init( &gate->block, "gate: init block"); 73: pt_cond_init (&gate->condvar, "gate: init condvar"); 74: pt_cond_init ( &gate->last, "gate: init last"); 75: } 76: 77: /************************************************* 78: * destroy a gate variable 79: */ 80: void pt_gate_destroy(pt_gate_t *gate) 81: { 82: gate->ngate=gate->nthreads=0; 83: pt_mutex_destroy( &gate->mutex, "gate: destroy mutex"); 84: pt_mutex_destroy( &gate->block, "gate: destroy block"); 85: pt_cond_destroy (&gate->condvar, "gate: destroy condvar"); 86: pt_cond_destroy ( &gate->last, "gate: destroy last"); 87: } 88: 89: /************************************************* 90: * enter the gate 91: */ 92: void pt_gate_sync(pt_gate_t *gate) 93: { 94: if (gate->nthreads<2) return; /* trivial case */ 95: pt_mutex_lock(&gate->block, /* lock the block -- new */ 96: "gate: lock block"); /* threads sleep here */ 97: pt_mutex_lock(&gate->mutex, /* lock the mutex */ 98: "gate: lock mutex"); 99: if (++(gate->ngate) < gate->nthreads) { /* are we the last one in? */ 100: pt_mutex_unlock(&gate->block, /* no, unlock block and */ 101: "gate: unlock block 1"); 102: pt_cond_wait(&gate->condvar, /* go to sleep */ 103: &gate->mutex, 104: "gate: wait condvar"); 105: } else { /* yes, we're last */ 106: pt_cond_broadcast(&gate->condvar, /* wake everyone up and */ 107: "gate: bcast condvar"); 108: pt_cond_wait(&gate->last,&gate->mutex,/* go to sleep til they're */ 109: "gate: wait last"); /* all awake... then */ 110: pt_mutex_unlock(&gate->block, /* release the block */ 111: "gate: unlock block 2"); 112: } 113: if (--(gate->ngate)==1) { /* next to last one out? */ 114: pt_cond_broadcast(&gate->last, /* yes, wake up last one */ 115: "gate: bcast last"); 116: } 117: pt_mutex_unlock(&gate->mutex, /* release the mutex */ 118: "gate: unlock mutex"); 119: }The gate_init() function simply initializes the members of the gate_t structure. It takes two arguments: a pointer to the gate_t being initialized and N, the number of threads the gate is supposed to synchronize.
Gate_destroy() frees the resources associated with a gate_t structure.
The behavior we descibed earlier occurs in pt_gate_sync() and is implemented in lines 92-119. Ignore all statements involving gate->block for a moment. The mutex gate->mutex simply protects access to gate->ngate. The calling thread locks this mutex in line 97. The member gate->nthreads does not need protection: it's value is only updated by the gate_init() call. In line 99, the ngate member is incremented and compared to nthreads. If the calling thread is not the last one, pt_cond_wait is called. This puts the thread to sleep until the last thread calls pt_gate_sync().
The last thread executes the else clause on line 105. First it calls pthread_cond_broadcast() (once this thread releases gate->mutex, the sleeping threads will begin waking up). Next, it goes to sleep on the gate->last condition variable (whose role will become clearer below), which has the side effect of unlocking gate->mutex. At this point, the other threads wake up one at a time (after locking gate->mutex). Each thread decrements gate->ngate, releases gate->mutex, and finally leaves pt_gate_sync(). Except for the last thread to wake up: it wakes up the thread sleeping on gate->last before exiting pt_gate_sync().
In order to understand the role of gate->block, consider the following code fragment:
gate_t gate; while (condition) { [do something] enter_gate(&gate); [do something else] }If the computations in square brackets do not take very long (for example, you're doing a small problem because you're debugging your algorithm), it would be possible for a thread that just woke up to almost instantly re-enter the gate and goof things up for the threads that are still asleep. This happens more often than you would think, and results in your code mysteriously hanging up. The extra mutex prevents this pathology at fairly low cost. In my opinion, tracking down these cases is the most time consuming part of threads programming; such bugs are inherently intermittent and depend on the system load average, disk activity, etc.
Since the threads wake up from pthread_cond_wait() one at a time, it is necessary to restrict access to the gate until all the newly awakened threads actually leave the gate (so that ngate==0 and the incoming thread does the right thing at the if in line 99). The code locks access to the gate via the gate->block mutex; in fact, it is the first thing locked on entry to pt_gate_sync(). If a thread goes to sleep in line 102, it must first release the block so that other threads can get in and open the gate. The last thread wakes the sleeping threads up with the gate blocked. Thus, no other threads can enter the gate while the wakeup call occurs. The last thread to leave the gate unlocks the block mutex, thus allowing other threads in. Why do we need the gate->last condition variable? Because, as far as I can tell, it is illegal for a thread to unlock a mutex held by another thread.
Now that we have the building blocks, let's do an example.
Parallel LU decomposition
Below is the source code (courtesy of Anu Rao) for a parallel LU
decomposition:
1: /* Anu Rao 11/1/94 CSC 652 2: LU decomposition using the incremental method; row parallelized 3: Assumptions: global variables: matrix a */ 4: 5: #include "pt.h" 6: 7: #include <stdio.h> 8: #include <stdlib.h> 9: 10: #define max(a,b) ((a) > (b) ? (a) : (b)) 11: 12: double **a; 13: 14: int asize,nworkers; 15: 16: pt_gate_t gate; 17: 18: void LU(pt_arg_t *info) 19: { 20: int myID=pt_myid(info); 21: int i, j, k; 22: int startrow, endrow, goof; 23: 24: /* define starting & ending points for processor myID */ 25: startrow = (myID * asize)/nworkers; 26: endrow = startrow + (asize/nworkers) - 1; 27: 28: for ( k = 0; k <= asize; k++ ) { 29: goof = max(k+1,startrow); 30: for ( i = goof; i <= endrow; i++ ) 31: a[i][k] /= a[k][k]; 32: pt_gate_sync(&gate); 33: 34: /* update l's and u's by one term */ 35: for ( i = goof; i <= endrow; i++ ) { 36: for ( j = k+1; j < asize; j++ ) 37: a[i][j] -= a[i][k] * a[k][j]; 38: } 39: pt_gate_sync(&gate); 40: } 41: } 42: 43: int main(int argc,char *argv[]) 44: { 45: int i, j; 46: 47: if (argc!=3) { 48: asize=8; 49: nworkers=4; 50: } else { 51: /* read in size of matrix, number of processors */ 52: asize = atoi(argv[1]); 53: nworkers = atoi(argv[2]); 54: } 55: 56: /* allocate a to be an n by n matrix */ 57: a = (double **)malloc(asize*sizeof(double *)); 58: for (i = 0; i < asize; i++) { 59: a[i] = (double *)malloc(asize*sizeof(double)); 60: } 61: 62: /* set up the gate */ 63: pt_gate_init(&gate,nworkers); 64: 65: /* initialize a */ 66: for (i = 0; i < asize; i++) { 67: for (j = 0; j < asize; j++) { 68: a[i][j] = 1.0; 69: if (i == j) a[i][j] = 2.0; 70: } 71: } 72: 73: /* do the decomposition */ 74: pt_fork(nworkers,LU,NULL,NULL); 75: 76: /* print results */ 77: for (i = 0; i < asize; i++) { 78: for (j = 0; j < asize; j++) { 79: printf("%8.3lf ",a[i][j]); 80: } 81: printf("\n"); 82: } 83: 84: return(0); 85: } 86: 87: /* EOF ludcmp.c */
Threads and FORTRAN
It is a sad fact that FORTRAN makes implicit use of global data.
For example, in the code fragment
subroutine sub integer i do i=1,100 ... end dothe temporary loop variable i is probably in the global data segment. The only way this subroutine can run in parallel (using pthreads) is to lock the entire loop with a mutex -- there is no other way to protect the loop counter i. In other words, you cannot run it in parallel using pthreads (this is not quite true: the DEC FORTRAN compiler has an option that will allocate i on the stack; however, this is highly nonportable and probably not worth doing). The fundamental problem is that FORTRAN code is, by definition, non-reentrant; it is illegal for a FORTRAN subroutine to call itself recursively, either directly or indirectly.
The High Performance FORTRAN (HPF) "standard" provides a set of extensions that are intended to get around this. HPF gives you routines that add arrays, compute dot products, etc in parallel. HPF appears to provide calls that do many common tasks (I haven't used it).
The only way I know of to run FORTRAN in parallel on a shared memory multiprocessor is to use a parallelizing compiler or HPF. If the compiler can discover parallelism in your algorithm, this strategy should give acceptable results. I have heard mixed reviews of such compilers: some people swear by them; others say they stink.
If you need to do something nontrivial in parallel, you can always write that part in C using pthreads and have FORTRAN call your C code. However, this reduces the portability of your code.
Although it is not generally possible to have several threads execute the same routine in parallel in FORTRAN, it is sometimes possible to have different routines executing simultaneously. As an example, let's suppose that our FORTRAN code can be broken up into two pieces: one to do the calculations and another to process and write the data to a file. Once a stage of the overall computation is complete, we can hand the results over to a processing and I/O routine. Having done this, our compuational engine can proceed with the next step.
This is much different than the pt_fork routine: in pt_fork, each thread is performing a similar task. In the above example, the threads are performing orthogonal tasks. In FORTRAN, this is necessary: it is the only way to achieve address space separation among the threads.
The source code for this FORTRAN example is in the files rwf.com and rwf.f. It uses support routines from ptf77.c and pt.c. There is a new C data structure defined in pt.c and pt.h that implements a threaded pipeline stage. The above example uses a one stage pipeline. This structure is described in the next section. As always, this code has withstood minutes of testing ;-)
A Pipeline Structure
Here is a depiction of how a pipeline stage works:
main thread worker thread \ / \ \ / \ gate1 | / \ | / \ | setup() | stage() \ / | \ / | gate2 | / \ / / \_______/ | main continuesThe pipeline consists of a worker thread, two gates (each managing two threads), and two functions. The first function, stage(), is the code that is executed by the worker thread. The second function, setup(), contains initialization code that is executed by the master thread. Typically, you'd use it to set up the environment for the stage() function.
Let's run through a pipeline pass. When the pipeline is created, the worker thread is immediately dropped into gate1. Since this gate manages two threads, the worker thread simply blocks.
When it is time for the worker to do its thing, the master thread calls pt_pipeline_execute(). The following happen inside of this call: first, the master thread enters gate1. This has the effect of releasing both of the threads from gate1. The worker thread immediately enters gate2 and blocks. Why? So that the master thread can execute the setup() function without interference. As stated before, setup() will typically initialize global data that is processed (and only processed) by the worker in stage(). You can think of gate2 as a sort of "super mutex". Having executed setup(), the master thread drops into gate2, at which point both threads emerge from gate2. The master thread now returns from pt_pipeline_execute() and goes about its business. Meanwhile, the worker thread executes its code in stage(). Once this is complete, it drops back into gate1 where it sleeps until another thread calls pt_pipeline_execute().
What happens if the master thread calls pt_pipeline_execute() before the previous invocation of stage() completes? Since the master thread enters gate1, it will block until the worker thread finishes stage() and also enters gate1. In other words, the pipeline stage is completely synchronous.
Here's a small C program that implements multithreaded IO using a one stage pipeline:
1: /*********************************************************************** 2: * rw.c -- implement I/O in separate thread 3: * 4: * Author: Mark Hays <hays@math.arizona.edu> 5: * 6: * NOTE: see the notes in rwf.f for warnings/restrictions. 7: * 8: */ 9: 10: #include "pt.h" 11: 12: #include <stdio.h> 13: #include <stdlib.h> 14: 15: /* global data we'll pass around */ 16: typedef struct _rw_info_t { 17: float *buffer,*maindata; 18: int count; 19: } rw_info_t; 20: 21: /* master does this before slave executes */ 22: void *setup(rw_info_t *info) 23: { 24: float *src=info->maindata,*dst=info->buffer; 25: int n=info->count,i; 26: 27: /* copy main program's data info I/O buffer */ 28: for (i=0; i<n; i++) *dst++=*src++; 29: } 30: 31: /* slave runs this */ 32: void *doio(rw_info_t *info) 33: { 34: float *buf=info->buffer; 35: int n=info->count,i; 36: 37: for (i=0; i<n; i++) printf("%12.4f ",*buf++); 38: printf("\n"); 39: } 40: 41: #define N 4 42: 43: int main(int argc,char *argv[]) 44: { 45: int i,j; 46: float *data; 47: rw_info_t info; 48: pt_pipeline_t p; 49: 50: /* initialize info structure */ 51: info.buffer=(float *) malloc(N*sizeof(float)); 52: info.maindata=data=(float *) malloc(N*sizeof(float)); 53: info.count=N; 54: 55: /* initial data */ 56: for (j=0; j<N; j++) data[j]=j+1; 57: 58: /* set up the pipeline */ 59: pt_pipeline_init(&p, /* pt_pipeline_t struct */ 60: &info, /* global (shared) data */ 61: setup, /* stage setup routine */ 62: doio); /* slave's code */ 63: /* main code */ 64: for (i=0; i<5; i++) { 65: for (j=0; j<N; j++) data[j]*=2; 66: pt_pipeline_execute(&p); 67: } 68: /* clean up and exit */ 69: pt_pipeline_destroy(&p); 70: return(0); 71: } 72: 73: /* EOF rw.c */
The important feature is that the main thread copies the data to be written into a buffer that the IO thread uses. This way, the main thread can safely modify its data as the IO thread processes and writes out a copy of the data. For a large enough simulation (or with slow enough IO), this can be a big win in terms of time. The downside is that two copies of the data must be maintained, which can be a problem for large-memory simulations.
It is very important to call pt_pipeline_destroy(). Besides freeing the pthreads structures, it kills the worker thread after making sure that any currently executing stage() function completes. It accomplishes this by entering gate1 before shutting everything down.
Links to the FORTRAN version of this program appear in the previous section. The code is quite similar and demonstrates that it is indeed possible to use pthreads and FORTRAN together.
List of major pthreads routines
Glossary
Below is a somewhat loose glossary of some terms used in this document.