2 /* copyright (c) 1996, 1997, 1998, 1999 William R. Pearson and the
5 /* $Name: fa_34_26_5 $ - $Id: dec_pthr_subs.c,v 1.2 2006/04/12 18:00:02 wrp Exp $ */
7 /* this file isolates the pthreads calls from the main program */
13 #include <sys/types.h>
22 #include "pthr_subs.h"
24 extern void work_thread (struct thr_str *work_info);
26 /* start the threads working */
28 void init_thr(int nthreads, struct thr_str *work_info)
31 pthread_attr_t thread_attr;
33 if (nthreads > MAX_WORKERS) {
34 fprintf ( stderr," cannot start %d threads, max: %d\n",
35 nthreads, MAX_WORKERS);
39 /* mutex and condition variable initialisation */
41 status = pthread_mutex_init(&reader_mutex, pthread_mutexattr_default);
42 check(status,"Reader_mutex init bad status\n");
44 status = pthread_mutex_init(&worker_mutex, pthread_mutexattr_default);
45 check(status,"Worker_mutex init bad status\n");
47 status = pthread_cond_init(&reader_cond_var, pthread_condattr_default);
48 check(status,"Reader_cond_var init bad status\n");
50 status = pthread_cond_init(&worker_cond_var, pthread_condattr_default);
51 check(status,"Worker_cond_var init bad status\n");
53 status = pthread_mutex_init(&start_mutex, pthread_mutexattr_default);
54 check(status,"Start_mutex init bad status\n");
56 status = pthread_cond_init(&start_cond_var, pthread_condattr_default);
57 check(status,"Start_cond_var init bad status\n");
59 /* change stacksize on threads */ /***************************/
61 status = pthread_attr_create( &thread_attr );
62 check(status,"attribute create bad status\n");
64 status = pthread_attr_setstacksize( &thread_attr, 1000000);
65 check(status,"stacksize change bad status\n");
67 /* start the worker threads */
69 for (work_info->worker=0; work_info->worker < nthreads;
70 work_info->worker++) {
71 /**********************/
72 status=pthread_create(&threads[work_info->worker],thread_attr,
73 (pthread_startroutine_t)&work_thread,
74 (pthread_addr_t)work_info);
75 check(status,"Pthread_create failed\n");
83 /* tell threads to proceed */
85 status = pthread_mutex_lock(&start_mutex);
86 check(status,"Start_mutex lock bad status in main\n");
88 start_thread = 0; /* lower predicate */
90 status = pthread_cond_broadcast(&start_cond_var);
91 status = pthread_mutex_unlock(&start_mutex);
92 check(status,"Start_mutex unlock bad status in main\n");
95 void get_rbuf(struct buf_head **cur_buf, int max_work_buf)
99 status = pthread_mutex_lock(&reader_mutex); /* lock reader_buf structure */
101 check(status,"Reader_mutex lock in master bad status\n");
103 /* no reader bufs: wait for signal to proceed */
104 while (num_reader_bufs == 0) {
105 pthread_cond_wait(&reader_cond_var,&reader_mutex);
108 *cur_buf = reader_buf[reader_buf_readp]; /* get the buffer address */
109 reader_buf_readp = (reader_buf_readp+1)%(max_work_buf); /* increment index */
112 status = pthread_mutex_unlock(&reader_mutex); /* unlock structure */
113 check(status,"Reader_mutex unlock in master bad status\n");
116 void put_rbuf(struct buf_head *cur_buf, int max_work_buf)
120 /* give the buffer to a thread, and wait for more */
121 status = pthread_mutex_lock(&worker_mutex); /* lock worker_buf_structure */
122 check(status,"Worker_mutex lock in master bad status\n");
124 /* Put buffer onto available for workers list */
125 worker_buf[worker_buf_readp] = cur_buf;
126 worker_buf_readp = (worker_buf_readp+1)%(max_work_buf);
127 num_worker_bufs++; /* increment number of buffers available to workers */
129 /* Signal one worker to wake and start work */
130 status = pthread_cond_signal(&worker_cond_var);
132 status = pthread_mutex_unlock(&worker_mutex);
133 check(status,"Worker_mutex unlock in master bad status\n");
136 void put_rbuf_done(int nthreads, struct buf_head *cur_buf, int max_work_buf)
141 /* give the buffer to a thread, and wait for more */
142 status = pthread_mutex_lock(&worker_mutex); /* lock worker_buf_structure */
143 check(status,"Worker_mutex lock in master bad status\n");
145 /* Put buffer onto available for workers list */
146 worker_buf[worker_buf_readp] = cur_buf;
147 worker_buf_readp = (worker_buf_readp+1)%(max_work_buf);
148 num_worker_bufs++; /* increment number of buffers available to workers */
150 /* Signal one worker to wake and start work */
153 status = pthread_cond_broadcast(&worker_cond_var);
155 status = pthread_mutex_unlock(&worker_mutex);
156 check(status,"Worker_mutex unlock in master bad status\n");
158 /* wait for all buffers available (means all do_workers are done) */
160 for (i=0; i < nthreads; i++) {
161 status = pthread_join( threads[i], &exit_value);
162 check(status,"Pthread_join bad status\n");
164 status = pthread_detach( &threads[i]);
165 check(status,"Pthread_detach bad status\n");
173 /* Wait on master to give start signal */
174 status = pthread_mutex_lock(&start_mutex);
175 check(status,"Start_mutex lock bad status in worker\n");
177 while (start_thread) {
178 status = pthread_cond_wait(&start_cond_var, &start_mutex);
179 check(status,"Start_cond_wait bad status in worker\n");
182 status = pthread_mutex_unlock(&start_mutex);
183 check(status,"Start_mutex unlock bad status in worker\n");
186 int get_wbuf(struct buf_head **cur_buf, int max_work_buf)
190 /* get a buffer to work on */
191 status = pthread_mutex_lock(&worker_mutex);
192 check(status,"First worker_mutex lock in worker bad status\n");
194 /* No worker_bufs available: wait for reader to produce some */
195 while (num_worker_bufs == 0) {
196 /* Exit if reader has finished */
198 pthread_mutex_unlock(&worker_mutex);
201 pthread_cond_wait(&worker_cond_var,&worker_mutex);
204 /* Get the buffer from list */
205 *cur_buf = worker_buf[worker_buf_workp];
206 worker_buf_workp = (worker_buf_workp+1)%(max_work_buf);
209 status = pthread_mutex_unlock(&worker_mutex);
210 check(status,"First worker_mutex unlock in worker bad status\n");
214 void put_wbuf(struct buf_head *cur_buf, int max_work_buf)
218 /* put buffer back on list for reader */
219 status = pthread_mutex_lock(&reader_mutex);
220 check(status,"Reader_mutex lock in worker bad status\n");
222 reader_buf[reader_buf_workp] = cur_buf;
223 reader_buf_workp = (reader_buf_workp+1)%(max_work_buf);
226 /* No reader_bufs available: wake reader */
227 if (num_reader_bufs == 1) {
228 pthread_cond_signal(&reader_cond_var);
231 status = pthread_mutex_unlock(&reader_mutex);
232 check(status,"Reader_mutex unlock in worker bad status\n");