2 /* copyright (c) 1996, 1997, 1998, 1999 William R. Pearson and the
5 /* modified to do more initialization of work_info here, rather than in main() */
7 /* $Name: fa_34_26_5 $ - $Id: pthr_subs2.c,v 1.9 2006/06/22 02:35:05 wrp Exp $ */
9 /* this file isolates the pthreads calls from the main program */
15 #include <sys/types.h>
19 #include "structs.h" /* mngmsg, libstruct */
20 #include "param.h" /* pstruct, thr_str, buf_head, rstruct */
26 #include "pthr_subs.h"
28 extern void work_thread (struct thr_str *);
30 /* start the threads working */
32 void init_thr(int nthreads, struct thr_str *work_info,
33 struct mngmsg m_msg, struct pstruct *ppst,
34 unsigned char *aa0, int max_work_buf)
37 pthread_attr_t thread_attr;
39 if (nthreads > MAX_WORKERS) {
40 fprintf ( stderr," cannot start %d threads, max: %d\n",
41 nthreads, MAX_WORKERS);
45 /* set up work_info[] structure, set parameters */
47 for (i=0; i<nthreads; i++) {
48 work_info[i].n0 = m_msg.n0;
49 work_info[i].nm0 = m_msg.nm0;
50 work_info[i].qframe = m_msg.qframe;
51 work_info[i].qshuffle = m_msg.qshuffle;
52 work_info[i].ppst = ppst;
53 work_info[i].aa0 = aa0;
54 work_info[i].max_work_buf=max_work_buf;
55 work_info[i].worker=i;
56 work_info[i].max_tot=m_msg.max_tot;
59 /* mutex and condition variable initialisation */
61 status = pthread_mutex_init(&reader_mutex, NULL);
62 check(status,"Reader_mutex init bad status\n");
64 status = pthread_mutex_init(&worker_mutex, NULL);
65 check(status,"Worker_mutex init bad status\n");
67 status = pthread_cond_init(&reader_cond_var, NULL);
68 check(status,"Reader_cond_var init bad status\n");
70 status = pthread_cond_init(&worker_cond_var, NULL);
71 check(status,"Worker_cond_var init bad status\n");
73 status = pthread_mutex_init(&start_mutex, NULL);
74 check(status,"Start_mutex init bad status\n");
76 status = pthread_cond_init(&start_cond_var, NULL);
77 check(status,"Start_cond_var init bad status\n");
79 /* change stacksize on threads */ /***************************/
81 status = pthread_attr_init( &thread_attr );
82 check(status,"attribute create bad status\n");
85 if (pthread_attr_setscope( &thread_attr, 2) != NULL)
86 status = pthread_attr_setscope( &thread_attr,PTHREAD_SCOPE_PROCESS);
87 check(status,"set scope on IRIX bad status\n");
91 status = pthread_attr_setscope( &thread_attr, PTHREAD_SCOPE_SYSTEM);
92 check(status,"set scope bad status\n");
95 /* start the worker threads */
97 for (i=0; i < nthreads; i++) {
98 /**********************/
99 status=pthread_create(&threads[i],&thread_attr,
100 (void *(*)(void *))&work_thread,&work_info[i]);
101 check(status,"Pthread_create failed\n");
105 /* start_mutex/start_cont_var provides exclusive access to
106 extern int start_thread */
112 /* tell threads to proceed */
114 status = pthread_mutex_lock(&start_mutex);
115 check(status,"Start_mutex lock bad status in main\n");
117 start_thread = 0; /* lower predicate */
119 status = pthread_cond_broadcast(&start_cond_var);
120 status = pthread_mutex_unlock(&start_mutex);
121 check(status,"Start_mutex unlock bad status in main\n");
124 void get_rbuf(struct buf_head **cur_buf, int max_work_buf)
128 status = pthread_mutex_lock(&reader_mutex); /* lock reader_buf structure */
130 check(status,"Reader_mutex lock in master bad status\n");
132 /* no reader bufs: wait for signal to proceed */
133 while (num_reader_bufs == 0) {
134 pthread_cond_wait(&reader_cond_var,&reader_mutex);
137 *cur_buf = reader_buf[reader_buf_readp]; /* get the buffer address */
138 reader_buf_readp = (reader_buf_readp+1)%(max_work_buf); /* increment index */
141 status = pthread_mutex_unlock(&reader_mutex); /* unlock structure */
142 check(status,"Reader_mutex unlock in master bad status\n");
145 void put_rbuf(struct buf_head *cur_buf, int max_work_buf)
149 /* give the buffer to a thread, and wait for more */
150 status = pthread_mutex_lock(&worker_mutex); /* lock worker_buf_structure */
151 check(status,"Worker_mutex lock in master bad status\n");
153 /* Put buffer onto available for workers list */
154 worker_buf[worker_buf_readp] = cur_buf;
155 worker_buf_readp = (worker_buf_readp+1)%(max_work_buf);
156 num_worker_bufs++; /* increment number of buffers available to workers */
158 /* Signal one worker to wake and start work */
159 status = pthread_cond_signal(&worker_cond_var);
161 status = pthread_mutex_unlock(&worker_mutex);
162 check(status,"Worker_mutex unlock in master bad status\n");
165 void put_rbuf_done(int nthreads, struct buf_head *cur_buf, int max_work_buf)
170 /* give the buffer to a thread, and wait for more */
171 status = pthread_mutex_lock(&worker_mutex); /* lock worker_buf_structure */
172 check(status,"Worker_mutex lock in master bad status\n");
174 /* Put buffer onto available for workers list */
175 worker_buf[worker_buf_readp] = cur_buf;
176 worker_buf_readp = (worker_buf_readp+1)%(max_work_buf);
177 num_worker_bufs++; /* increment number of buffers available to workers */
179 /* Signal one worker to wake and start work */
182 status = pthread_cond_broadcast(&worker_cond_var);
184 status = pthread_mutex_unlock(&worker_mutex);
185 check(status,"Worker_mutex unlock in master bad status\n");
187 /* wait for all buffers available (means all do_workers are done) */
189 for (i=0; i < nthreads; i++) {
190 status = pthread_join( threads[i], &exit_value);
191 check(status,"Pthread_join bad status\n");
195 /* wait for extern int start_thread == 0 */
201 /* Wait on master to give start signal */
202 status = pthread_mutex_lock(&start_mutex);
203 check(status,"Start_mutex lock bad status in worker\n");
205 while (start_thread) {
206 status = pthread_cond_wait(&start_cond_var, &start_mutex);
207 check(status,"Start_cond_wait bad status in worker\n");
210 status = pthread_mutex_unlock(&start_mutex);
211 check(status,"Start_mutex unlock bad status in worker\n");
214 int get_wbuf(struct buf_head **cur_buf, int max_work_buf)
218 /* get a buffer to work on */
219 status = pthread_mutex_lock(&worker_mutex);
220 check(status,"First worker_mutex lock in worker bad status\n");
222 /* No worker_bufs available: wait for reader to produce some */
223 while (num_worker_bufs == 0) {
224 /* Exit if reader has finished */
226 pthread_mutex_unlock(&worker_mutex);
229 pthread_cond_wait(&worker_cond_var,&worker_mutex);
232 /* Get the buffer from list */
233 *cur_buf = worker_buf[worker_buf_workp];
234 worker_buf_workp = (worker_buf_workp+1)%(max_work_buf);
237 status = pthread_mutex_unlock(&worker_mutex);
238 check(status,"First worker_mutex unlock in worker bad status\n");
242 void put_wbuf(struct buf_head *cur_buf, int max_work_buf)
246 /* put buffer back on list for reader */
247 status = pthread_mutex_lock(&reader_mutex);
248 check(status,"Reader_mutex lock in worker bad status\n");
250 reader_buf[reader_buf_workp] = cur_buf;
251 reader_buf_workp = (reader_buf_workp+1)%(max_work_buf);
254 /* No reader_bufs available: wake reader */
255 if (num_reader_bufs == 1) {
256 pthread_cond_signal(&reader_cond_var);
259 status = pthread_mutex_unlock(&reader_mutex);
260 check(status,"Reader_mutex unlock in worker bad status\n");