Next version of JABA
[jabaws.git] / binaries / src / fasta34 / pthr_subs2.c
1
2 /* copyright (c) 1996, 1997, 1998, 1999 William R. Pearson and the
3    U. of Virginia */
4
5 /* modified to do more initialization of work_info here, rather than in main() */
6
7 /* $Name: fa_34_26_5 $ - $Id: pthr_subs2.c,v 1.9 2006/06/22 02:35:05 wrp Exp $ */
8
9 /* this file isolates the pthreads calls from the main program */
10
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <time.h>
15 #include <sys/types.h>
16 #include <signal.h>
17
18 #include "defs.h"
19 #include "structs.h"            /* mngmsg, libstruct */
20 #include "param.h"              /* pstruct, thr_str, buf_head, rstruct */
21
22 #include <pthread.h>
23 #define XTERNAL
24 #include "thr.h"
25 #undef XTERNAL
26 #include "pthr_subs.h"
27
28 extern void work_thread (struct thr_str *);
29
30 /* start the threads working */
31
32 void init_thr(int nthreads, struct thr_str *work_info,
33               struct mngmsg m_msg, struct pstruct *ppst,
34               unsigned char *aa0, int max_work_buf)
35 {
36   int status, i;
37   pthread_attr_t thread_attr;
38
39   if (nthreads > MAX_WORKERS) {
40     fprintf ( stderr," cannot start %d threads, max: %d\n",
41               nthreads, MAX_WORKERS);
42     exit(1);
43   }
44
45   /* set up work_info[] structure, set parameters */
46
47   for (i=0; i<nthreads; i++) {
48     work_info[i].n0 = m_msg.n0;
49     work_info[i].nm0 = m_msg.nm0;
50     work_info[i].qframe = m_msg.qframe;
51     work_info[i].qshuffle = m_msg.qshuffle;
52     work_info[i].ppst = ppst;
53     work_info[i].aa0 = aa0;
54     work_info[i].max_work_buf=max_work_buf;
55     work_info[i].worker=i;
56     work_info[i].max_tot=m_msg.max_tot;
57   }
58
59   /* mutex and condition variable initialisation */
60
61   status = pthread_mutex_init(&reader_mutex, NULL);
62   check(status,"Reader_mutex init bad status\n");
63    
64   status = pthread_mutex_init(&worker_mutex, NULL);
65   check(status,"Worker_mutex init bad status\n");
66
67   status = pthread_cond_init(&reader_cond_var, NULL);
68   check(status,"Reader_cond_var init bad status\n");
69
70   status = pthread_cond_init(&worker_cond_var, NULL);
71   check(status,"Worker_cond_var init bad status\n");
72
73   status = pthread_mutex_init(&start_mutex, NULL);
74   check(status,"Start_mutex init bad status\n");
75
76   status = pthread_cond_init(&start_cond_var, NULL);
77   check(status,"Start_cond_var init bad status\n");
78
79   /* change stacksize on threads */    /***************************/
80
81   status = pthread_attr_init( &thread_attr );
82   check(status,"attribute create bad status\n");
83
84 #ifdef IRIX
85   if (pthread_attr_setscope( &thread_attr, 2) != NULL) 
86     status = pthread_attr_setscope( &thread_attr,PTHREAD_SCOPE_PROCESS);
87   check(status,"set scope on IRIX bad status\n");
88 #endif
89
90 #ifdef FASTA_setscope
91   status = pthread_attr_setscope( &thread_attr, PTHREAD_SCOPE_SYSTEM);
92   check(status,"set scope bad status\n");
93 #endif
94
95   /* start the worker threads */
96
97   for (i=0; i < nthreads; i++) {
98     /**********************/
99     status=pthread_create(&threads[i],&thread_attr,
100                           (void *(*)(void *))&work_thread,&work_info[i]);
101     check(status,"Pthread_create failed\n");
102   }
103 }
104
105 /* start_mutex/start_cont_var provides exclusive access to 
106    extern int start_thread */
107
108 void start_thr()
109 {
110   int status;
111
112   /* tell threads to proceed */
113
114   status = pthread_mutex_lock(&start_mutex);
115   check(status,"Start_mutex lock bad status in main\n");
116
117   start_thread = 0;  /* lower predicate */
118
119   status = pthread_cond_broadcast(&start_cond_var);
120   status = pthread_mutex_unlock(&start_mutex);
121   check(status,"Start_mutex unlock bad status in main\n");
122 }
123
124 void get_rbuf(struct buf_head **cur_buf, int max_work_buf)
125 {
126   int status;
127
128   status = pthread_mutex_lock(&reader_mutex);  /* lock reader_buf structure */
129
130   check(status,"Reader_mutex lock in master bad status\n");
131
132   /* no reader bufs:  wait for signal to proceed */
133   while (num_reader_bufs == 0) {
134     pthread_cond_wait(&reader_cond_var,&reader_mutex);
135   }
136
137   *cur_buf = reader_buf[reader_buf_readp];  /* get the buffer address */
138   reader_buf_readp = (reader_buf_readp+1)%(max_work_buf);  /* increment index */
139   num_reader_bufs--;
140
141   status = pthread_mutex_unlock(&reader_mutex);  /* unlock structure */
142   check(status,"Reader_mutex unlock in master bad status\n");
143 }
144
145 void put_rbuf(struct buf_head *cur_buf, int max_work_buf)
146 {
147   int status;
148
149   /* give the buffer to a thread, and wait for more */
150   status = pthread_mutex_lock(&worker_mutex);  /* lock worker_buf_structure */
151   check(status,"Worker_mutex lock in master bad status\n");
152
153   /*  Put buffer onto available for workers list */
154   worker_buf[worker_buf_readp] = cur_buf;
155   worker_buf_readp = (worker_buf_readp+1)%(max_work_buf);
156   num_worker_bufs++;   /* increment number of buffers available to workers */
157
158   /*  Signal one worker to wake and start work */
159   status = pthread_cond_signal(&worker_cond_var);
160
161   status = pthread_mutex_unlock(&worker_mutex);
162   check(status,"Worker_mutex unlock in master bad status\n"); 
163 }
164
165 void put_rbuf_done(int nthreads, struct buf_head *cur_buf, int max_work_buf)
166 {
167   int status, i;
168   void *exit_value;
169
170   /* give the buffer to a thread, and wait for more */
171   status = pthread_mutex_lock(&worker_mutex);  /* lock worker_buf_structure */
172   check(status,"Worker_mutex lock in master bad status\n");
173
174   /*  Put buffer onto available for workers list */
175   worker_buf[worker_buf_readp] = cur_buf;
176   worker_buf_readp = (worker_buf_readp+1)%(max_work_buf);
177   num_worker_bufs++;   /* increment number of buffers available to workers */
178
179   /*  Signal one worker to wake and start work */
180
181   reader_done = 1;
182   status = pthread_cond_broadcast(&worker_cond_var);
183
184   status = pthread_mutex_unlock(&worker_mutex);
185   check(status,"Worker_mutex unlock in master bad status\n"); 
186
187   /* wait for all buffers available (means all do_workers are done) */
188  
189   for (i=0; i < nthreads; i++) {
190     status = pthread_join( threads[i], &exit_value);
191     check(status,"Pthread_join bad status\n");
192   } 
193 }
194
195 /* wait for extern int start_thread == 0 */
196
197 void wait_thr()
198 {
199   int status;
200
201   /* Wait on master to give start signal */
202   status = pthread_mutex_lock(&start_mutex);
203   check(status,"Start_mutex lock bad status in worker\n");
204
205   while (start_thread) {
206          status = pthread_cond_wait(&start_cond_var, &start_mutex);
207          check(status,"Start_cond_wait bad status in worker\n");
208   }
209
210   status = pthread_mutex_unlock(&start_mutex);
211   check(status,"Start_mutex unlock bad status in worker\n");
212 }
213
214 int get_wbuf(struct buf_head **cur_buf, int max_work_buf)
215 {
216   int status;
217
218   /* get a buffer to work on */
219   status = pthread_mutex_lock(&worker_mutex);
220   check(status,"First worker_mutex lock in worker bad status\n");
221
222   /*  No worker_bufs available:  wait for reader to produce some */
223   while (num_worker_bufs == 0) {
224     /*  Exit if reader has finished */
225     if (reader_done) {
226       pthread_mutex_unlock(&worker_mutex);
227       return 0;
228     }
229     pthread_cond_wait(&worker_cond_var,&worker_mutex);
230   } /* end while */
231
232   /*  Get the buffer from list */
233   *cur_buf = worker_buf[worker_buf_workp];
234   worker_buf_workp = (worker_buf_workp+1)%(max_work_buf);
235   num_worker_bufs--;
236
237   status = pthread_mutex_unlock(&worker_mutex);
238   check(status,"First worker_mutex unlock in worker bad status\n");
239   return 1;
240 }
241
242 void put_wbuf(struct buf_head *cur_buf, int max_work_buf)
243 {
244   int status;
245
246   /* put buffer back on list for reader */
247   status = pthread_mutex_lock(&reader_mutex);
248   check(status,"Reader_mutex lock in worker bad status\n");
249     
250   reader_buf[reader_buf_workp] = cur_buf;
251   reader_buf_workp = (reader_buf_workp+1)%(max_work_buf);
252   num_reader_bufs++;
253
254   /* No reader_bufs available:  wake reader */
255   if (num_reader_bufs == 1) {
256     pthread_cond_signal(&reader_cond_var);
257   }
258
259   status = pthread_mutex_unlock(&reader_mutex);
260   check(status,"Reader_mutex unlock in worker bad status\n");
261 }