Next version of JABA
[jabaws.git] / binaries / src / fasta34 / dec_pthr_subs.c
1
2 /* copyright (c) 1996, 1997, 1998, 1999 William R. Pearson and the
3    U. of Virginia */
4
5 /* $Name: fa_34_26_5 $ - $Id: dec_pthr_subs.c,v 1.2 2006/04/12 18:00:02 wrp Exp $ */
6
7 /* this file isolates the pthreads calls from the main program */
8
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <time.h>
13 #include <sys/types.h>
14 #include <signal.h>
15
16 #include "param.h"
17
18 #include <pthread.h>
19 #define XTERNAL
20 #include "thr.h"
21 #undef XTERNAL
22 #include "pthr_subs.h"
23
24 extern void work_thread (struct thr_str *work_info);
25
26 /* start the threads working */
27
28 void init_thr(int nthreads, struct thr_str *work_info)
29 {
30   int status, i;
31   pthread_attr_t thread_attr;
32
33   if (nthreads > MAX_WORKERS) {
34     fprintf ( stderr," cannot start %d threads, max: %d\n",
35               nthreads, MAX_WORKERS);
36     exit(1);
37   }
38
39   /* mutex and condition variable initialisation */
40
41   status = pthread_mutex_init(&reader_mutex, pthread_mutexattr_default);
42   check(status,"Reader_mutex init bad status\n");
43    
44   status = pthread_mutex_init(&worker_mutex, pthread_mutexattr_default);
45   check(status,"Worker_mutex init bad status\n");
46
47   status = pthread_cond_init(&reader_cond_var, pthread_condattr_default);
48   check(status,"Reader_cond_var init bad status\n");
49
50   status = pthread_cond_init(&worker_cond_var, pthread_condattr_default);
51   check(status,"Worker_cond_var init bad status\n");
52
53   status = pthread_mutex_init(&start_mutex, pthread_mutexattr_default);
54   check(status,"Start_mutex init bad status\n");
55
56   status = pthread_cond_init(&start_cond_var, pthread_condattr_default);
57   check(status,"Start_cond_var init bad status\n");
58
59   /* change stacksize on threads */    /***************************/
60
61   status = pthread_attr_create( &thread_attr );
62   check(status,"attribute create bad status\n");
63
64   status = pthread_attr_setstacksize( &thread_attr, 1000000);
65   check(status,"stacksize change bad status\n");
66
67   /* start the worker threads */
68
69   for (work_info->worker=0; work_info->worker < nthreads;
70        work_info->worker++) {
71     /**********************/
72     status=pthread_create(&threads[work_info->worker],thread_attr,
73                           (pthread_startroutine_t)&work_thread,
74                           (pthread_addr_t)work_info);
75     check(status,"Pthread_create failed\n");
76   }
77 }
78
79 void start_thr()
80 {
81   int status;
82
83   /* tell threads to proceed */
84
85   status = pthread_mutex_lock(&start_mutex);
86   check(status,"Start_mutex lock bad status in main\n");
87
88   start_thread = 0;  /* lower predicate */
89
90   status = pthread_cond_broadcast(&start_cond_var);
91   status = pthread_mutex_unlock(&start_mutex);
92   check(status,"Start_mutex unlock bad status in main\n");
93 }
94
95 void get_rbuf(struct buf_head **cur_buf, int max_work_buf)
96 {
97   int status;
98
99   status = pthread_mutex_lock(&reader_mutex);  /* lock reader_buf structure */
100
101   check(status,"Reader_mutex lock in master bad status\n");
102
103   /* no reader bufs:  wait for signal to proceed */
104   while (num_reader_bufs == 0) {
105     pthread_cond_wait(&reader_cond_var,&reader_mutex);
106   }
107
108   *cur_buf = reader_buf[reader_buf_readp];  /* get the buffer address */
109   reader_buf_readp = (reader_buf_readp+1)%(max_work_buf);  /* increment index */
110   num_reader_bufs--;
111
112   status = pthread_mutex_unlock(&reader_mutex);  /* unlock structure */
113   check(status,"Reader_mutex unlock in master bad status\n");
114 }
115
116 void put_rbuf(struct buf_head *cur_buf, int max_work_buf)
117 {
118   int status;
119
120   /* give the buffer to a thread, and wait for more */
121   status = pthread_mutex_lock(&worker_mutex);  /* lock worker_buf_structure */
122   check(status,"Worker_mutex lock in master bad status\n");
123
124   /*  Put buffer onto available for workers list */
125   worker_buf[worker_buf_readp] = cur_buf;
126   worker_buf_readp = (worker_buf_readp+1)%(max_work_buf);
127   num_worker_bufs++;   /* increment number of buffers available to workers */
128
129   /*  Signal one worker to wake and start work */
130   status = pthread_cond_signal(&worker_cond_var);
131
132   status = pthread_mutex_unlock(&worker_mutex);
133   check(status,"Worker_mutex unlock in master bad status\n"); 
134 }
135
136 void put_rbuf_done(int nthreads, struct buf_head *cur_buf, int max_work_buf)
137 {
138   int status, i;
139   void *exit_value;
140
141   /* give the buffer to a thread, and wait for more */
142   status = pthread_mutex_lock(&worker_mutex);  /* lock worker_buf_structure */
143   check(status,"Worker_mutex lock in master bad status\n");
144
145   /*  Put buffer onto available for workers list */
146   worker_buf[worker_buf_readp] = cur_buf;
147   worker_buf_readp = (worker_buf_readp+1)%(max_work_buf);
148   num_worker_bufs++;   /* increment number of buffers available to workers */
149
150   /*  Signal one worker to wake and start work */
151
152   reader_done = 1;
153   status = pthread_cond_broadcast(&worker_cond_var);
154
155   status = pthread_mutex_unlock(&worker_mutex);
156   check(status,"Worker_mutex unlock in master bad status\n"); 
157
158   /* wait for all buffers available (means all do_workers are done) */
159  
160   for (i=0; i < nthreads; i++) {
161     status = pthread_join( threads[i], &exit_value);
162     check(status,"Pthread_join bad status\n");
163
164     status = pthread_detach( &threads[i]);
165     check(status,"Pthread_detach bad status\n");
166   } 
167 }
168
169 void wait_thr()
170 {
171   int status;
172
173   /* Wait on master to give start signal */
174   status = pthread_mutex_lock(&start_mutex);
175   check(status,"Start_mutex lock bad status in worker\n");
176
177   while (start_thread) {
178          status = pthread_cond_wait(&start_cond_var, &start_mutex);
179          check(status,"Start_cond_wait bad status in worker\n");
180   }
181
182   status = pthread_mutex_unlock(&start_mutex);
183   check(status,"Start_mutex unlock bad status in worker\n");
184 }
185
186 int get_wbuf(struct buf_head **cur_buf, int max_work_buf)
187 {
188   int status;
189
190   /* get a buffer to work on */
191   status = pthread_mutex_lock(&worker_mutex);
192   check(status,"First worker_mutex lock in worker bad status\n");
193
194   /*  No worker_bufs available:  wait for reader to produce some */
195   while (num_worker_bufs == 0) {
196     /*  Exit if reader has finished */
197     if (reader_done) {
198       pthread_mutex_unlock(&worker_mutex);
199       return 0;
200     }
201     pthread_cond_wait(&worker_cond_var,&worker_mutex);
202   } /* end while */
203
204   /*  Get the buffer from list */
205   *cur_buf = worker_buf[worker_buf_workp];
206   worker_buf_workp = (worker_buf_workp+1)%(max_work_buf);
207   num_worker_bufs--;
208
209   status = pthread_mutex_unlock(&worker_mutex);
210   check(status,"First worker_mutex unlock in worker bad status\n");
211   return 1;
212 }
213
214 void put_wbuf(struct buf_head *cur_buf, int max_work_buf)
215 {
216   int status;
217
218   /* put buffer back on list for reader */
219   status = pthread_mutex_lock(&reader_mutex);
220   check(status,"Reader_mutex lock in worker bad status\n");
221     
222   reader_buf[reader_buf_workp] = cur_buf;
223   reader_buf_workp = (reader_buf_workp+1)%(max_work_buf);
224   num_reader_bufs++;
225
226      /* No reader_bufs available:  wake reader */
227   if (num_reader_bufs == 1) {
228     pthread_cond_signal(&reader_cond_var);
229   }
230
231   status = pthread_mutex_unlock(&reader_mutex);
232   check(status,"Reader_mutex unlock in worker bad status\n");
233 }