initial commit
[jalview.git] / forester / archive / RIO / others / hmmer / src / hmmcalibrate.c
1 /************************************************************
2  * HMMER - Biological sequence analysis with profile HMMs
3  * Copyright (C) 1992-1999 Washington University School of Medicine
4  * All Rights Reserved
5  * 
6  *     This source code is distributed under the terms of the
7  *     GNU General Public License. See the files COPYING and LICENSE
8  *     for details.
9  ************************************************************/
10
11 /* hmmcalibrate.c
12  * SRE, Fri Oct 31 09:25:21 1997 [St. Louis]
13  * 
14  * Score an HMM against random sequence data sets;
15  * set histogram fitting parameters.
16  * 
17  * CVS $Id: hmmcalibrate.c,v 1.1.1.1 2005/03/22 08:34:03 cmzmasek Exp $
18  */
19
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <signal.h>
24 #include <math.h>
25 #include <time.h>
26 #include <limits.h>
27 #include <float.h>
28 #ifdef HMMER_THREADS
29 #include <pthread.h>
30 #endif
31 #ifdef HMMER_PVM
32 #include <pvm3.h>
33 #endif
34
35 #include "squid.h"              /* general sequence analysis library    */
36 #include "config.h"             /* compile-time configuration constants */
37 #include "structs.h"            /* data structures, macros, #define's   */
38 #include "funcs.h"              /* function declarations                */
39 #include "globals.h"            /* alphabet global variables            */
40 #include "version.h"            /* release version info                 */
41 #include "stopwatch.h"          /* process timings                      */
42
43 static char banner[] = "hmmcalibrate -- calibrate HMM search statistics";
44
45 static char usage[] = "\
46 Usage: hmmcalibrate [-options] <hmmfile>\n\
47 Available options are:\n\
48   -h             : print short usage and version info, then exit\n\
49 ";
50
51 static char experts[] = "\
52   --cpu <n>      : run <n> threads in parallel (if threaded)\n\
53   --fixed <n>    : fix random sequence length at <n>\n\
54   --histfile <f> : save histogram(s) to file <f>\n\
55   --mean <x>     : set random seq length mean at <x> [350]\n\
56   --num <n>      : set number of sampled seqs to <n> [5000]\n\
57   --pvm          : run on a Parallel Virtual Machine (PVM)\n\
58   --sd <x>       : set random seq length std. dev to <x> [350]\n\
59   --seed <n>     : set random seed to <n> [time()]\n\
60 ";
61
62 static struct opt_s OPTIONS[] = {
63    { "-h",         TRUE,  sqdARG_NONE  },
64    { "--cpu",      FALSE, sqdARG_INT },
65    { "--fixed",    FALSE, sqdARG_INT   },
66    { "--histfile", FALSE, sqdARG_STRING },
67    { "--mean",     FALSE, sqdARG_FLOAT },
68    { "--num",      FALSE, sqdARG_INT   },
69    { "--pvm",      FALSE, sqdARG_NONE  },
70    { "--sd",       FALSE, sqdARG_FLOAT },   
71    { "--seed",     FALSE, sqdARG_INT}, 
72 };
73 #define NOPTIONS (sizeof(OPTIONS) / sizeof(struct opt_s))
74
75
76 static void main_loop_serial(struct plan7_s *hmm, int seed, int nsample,
77                              float lenmean, float lensd, int fixedlen,
78                              struct histogram_s **ret_hist, float *ret_max);
79
80 #ifdef HMMER_THREADS
81 /* A structure of this type is shared by worker threads in the POSIX
82  * threads parallel version.
83  */
84 struct workpool_s {
85   /* Static configuration:
86    */
87   struct plan7_s  *hmm;         /* ptr to single HMM to search with    */
88   int    fixedlen;              /* if >0, fix random seq len to this   */
89   float  lenmean;               /* mean of Gaussian for random seq len */
90   float  lensd;                 /* s.d. of Gaussian for random seq len */
91   float *randomseq;             /* 0..Alphabet_size-1 i.i.d. probs     */
92   int    nsample;               /* number of random seqs to do         */
93
94   /* Shared (mutex-protected) input:
95    */
96   int    nseq;                  /* current number of seqs searched     */
97
98   /* Shared (mutex-protected) output:
99    */
100   struct histogram_s *hist;     /* histogram          */
101   float          max_score;     /* maximum score seen */
102   Stopwatch_t    watch;         /* Timings accumulated for threads */
103
104   /* Thread pool information:
105    */
106   pthread_t      *thread;       /* our pool of threads */
107   int             num_threads;  /* number of threads   */
108   pthread_mutex_t input_lock;   /* a mutex protecting input fields */
109   pthread_mutex_t output_lock;  /* a mutex protecting output fields */
110 };
111 static void main_loop_threaded(struct plan7_s *hmm, int seed, int nsample, 
112                                float lenmean, float lensd, int fixedlen,
113                                int nthreads,
114                                struct histogram_s **ret_hist, float *ret_max,
115                                Stopwatch_t *twatch);
116 static struct workpool_s *workpool_start(struct plan7_s *hmm, 
117                                  float lenmean, float lensd, int fixedlen,
118                                  float *randomseq, int nsample, 
119                                  struct histogram_s *hist, 
120                                  int num_threads);
121 static void  workpool_stop(struct workpool_s *wpool);
122 static void  workpool_free(struct workpool_s *wpool);
123 static void *worker_thread(void *ptr);
124 #endif /* HMMER_THREADS */
125
126 #ifdef HMMER_PVM
127 static void main_loop_pvm(struct plan7_s *hmm, int seed, int nsample, 
128                           int lumpsize,
129                           float lenmean, float lensd, int fixedlen,
130                           struct histogram_s **ret_hist, float *ret_max, 
131                           Stopwatch_t *extrawatch, int *ret_nslaves);
132 #endif /* HMMER_PVM */
133
134
135 int
136 main(int argc, char **argv)
137 {
138   char    *hmmfile;             /* HMM file to open                */
139   char    *tmpfile;             /* temporary calibrated HMM file   */
140   HMMFILE *hmmfp;               /* opened hmm file pointer         */
141   FILE    *outfp;               /* for writing HMM(s) into tmpfile */
142   char    *mode;                /* write mode, "w" or "wb"         */
143   struct plan7_s *hmm;          /* the hidden Markov model         */
144   int     idx;                  /* counter over sequences          */
145   sigset_t blocksigs;           /* list of signals to protect from */
146   int     nhmm;                 /* number of HMMs calibrated       */
147
148   struct histogram_s *hist;     /* a resulting histogram           */
149   float   max;                  /* maximum score from an HMM       */
150   char   *histfile;             /* histogram save file             */
151   FILE   *hfp;                  /* open file pointer for histfile  */
152
153   Stopwatch_t stopwatch;        /* main stopwatch for process       */
154   Stopwatch_t extrawatch;       /* stopwatch for threads/PVM slaves */
155
156   float  *mu;                   /* array of EVD mu's for HMMs      */
157   float  *lambda;               /* array of EVD lambda's for HMMs  */
158   int     mu_lumpsize;          /* allocation lumpsize for mu, lambda */
159
160   int     nsample;              /* number of random seqs to sample */
161   int     seed;                 /* random number seed              */
162   int     fixedlen;             /* fixed length, or 0 if unused    */
163   float   lenmean;              /* mean of length distribution     */
164   float   lensd;                /* std dev of length distribution  */
165   int     do_pvm;               /* TRUE to use PVM                 */
166   int     pvm_lumpsize;         /* # of seqs to do per PVM slave exchange */
167   int     pvm_nslaves;          /* number of slaves used in the PVM */
168
169
170   char *optname;                /* name of option found by Getopt() */
171   char *optarg;                 /* argument found by Getopt()       */
172   int   optind;                 /* index in argv[]                  */
173
174   int   num_threads;            /* number of worker threads */   
175
176
177   /***********************************************
178    * Parse the command line
179    ***********************************************/
180   StopwatchStart(&stopwatch);
181   StopwatchZero(&extrawatch);
182
183   nsample      = 5000;
184   fixedlen     = 0;
185   lenmean      = 325.;
186   lensd        = 200.;
187   seed         = (int) time ((time_t *) NULL);
188   histfile     = NULL;
189   do_pvm       = FALSE;
190   pvm_lumpsize = 20;            /* 20 seqs/PVM exchange: sets granularity */
191   mu_lumpsize  = 100;
192 #ifdef HMMER_THREADS
193   num_threads  = ThreadNumber(); /* only matters if we're threaded */
194 #else
195   num_threads  = 0;
196 #endif
197
198   while (Getopt(argc, argv, OPTIONS, NOPTIONS, usage,
199                 &optind, &optname, &optarg))
200     {
201       if      (strcmp(optname, "--cpu")      == 0) num_threads  = atoi(optarg);
202       else if (strcmp(optname, "--fixed")    == 0) fixedlen = atoi(optarg);
203       else if (strcmp(optname, "--histfile") == 0) histfile = optarg;
204       else if (strcmp(optname, "--mean")     == 0) lenmean  = atof(optarg); 
205       else if (strcmp(optname, "--num")      == 0) nsample  = atoi(optarg); 
206       else if (strcmp(optname, "--pvm")      == 0) do_pvm   = TRUE;
207       else if (strcmp(optname, "--sd")       == 0) lensd    = atof(optarg); 
208       else if (strcmp(optname, "--seed")     == 0) seed     = atoi(optarg);
209       else if (strcmp(optname, "-h") == 0)
210         {
211           Banner(stdout, banner);
212           puts(usage);
213           puts(experts);
214           exit(0);
215         }
216     }
217
218   if (argc - optind != 1) Die("Incorrect number of arguments.\n%s\n", usage);
219   hmmfile = argv[optind++];
220
221 #ifndef HMMER_PVM
222   if (do_pvm) Die("PVM support is not compiled into HMMER; --pvm doesn't work.");
223 #endif
224 #ifndef HMMER_THREADS
225   if (num_threads) Die("Posix threads support is not compiled into HMMER; --cpu doesn't have any effect");
226 #endif
227
228   /***********************************************
229    * Open our i/o file pointers, make sure all is well
230    ***********************************************/
231
232                                 /* HMM file */
233   if ((hmmfp = HMMFileOpen(hmmfile, NULL)) == NULL)
234     Die("failed to open HMM file %s for reading.", hmmfile);
235
236                                 /* histogram file */
237   hfp = NULL;
238   if (histfile != NULL) {
239     if ((hfp = fopen(histfile, "w")) == NULL)
240       Die("Failed to open histogram save file %s for writing\n", histfile);
241   }
242
243   /* Generate calibrated HMM(s) in a tmp file in the current
244    * directory. When we're finished, we delete the original
245    * HMM file and rename() this one. That way, the worst
246    * effect of a catastrophic failure should be that we
247    * leave a tmp file lying around, but the original HMM
248    * file remains uncorrupted. tmpnam() doesn't work portably here,
249    * because it'll put the file in /tmp and we won't
250    * necessarily be able to rename() it from there.
251    */
252   tmpfile = MallocOrDie(strlen(hmmfile) + 5);
253   strcpy(tmpfile, hmmfile);
254   strcat(tmpfile, ".xxx");      /* could be more inventive here... */
255   if (FileExists(tmpfile))
256     Die("temporary file %s already exists; please delete it first", tmpfile);
257   if (hmmfp->is_binary) mode = "wb";
258   else                  mode = "w"; 
259
260   /*********************************************** 
261    * Show the banner
262    ***********************************************/
263
264   Banner(stdout, banner);
265   printf("HMM file:                 %s\n", hmmfile);
266   if (fixedlen) 
267     printf("Length fixed to:          %d\n", fixedlen);
268   else {
269     printf("Length distribution mean: %.0f\n", lenmean);
270     printf("Length distribution s.d.: %.0f\n", lensd);
271   }
272   printf("Number of samples:        %d\n", nsample);
273   printf("random seed:              %d\n", seed);
274   printf("histogram(s) saved to:    %s\n",
275          histfile != NULL ? histfile : "[not saved]");
276   if (do_pvm)
277     printf("PVM:                      ACTIVE\n");
278   else if (num_threads > 0)
279     printf("POSIX threads:            %d\n", num_threads);
280   printf("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n\n");
281
282   /***********************************************
283    * Read the HMMs one at a time, and send them off
284    * in probability form to one of the main loops.
285    * The main loop functions are responsible for 
286    * synthesizing random sequences and returning
287    * a score histogram for each HMM.
288    ***********************************************/
289
290   nhmm = 0;
291   mu     = MallocOrDie(sizeof(float) * mu_lumpsize);
292   lambda = MallocOrDie(sizeof(float) * mu_lumpsize);
293
294   while (HMMFileRead(hmmfp, &hmm))
295     {
296       if (hmm == NULL)
297         Die("HMM file may be corrupt or in incorrect format; parse failed");
298
299       if (! do_pvm && num_threads == 0)
300         main_loop_serial(hmm, seed, nsample, lenmean, lensd, fixedlen, 
301                          &hist, &max);
302 #ifdef HMMER_PVM
303       else if (do_pvm) {
304         pvm_nslaves = 0;        /* solely to silence compiler warnings */
305         main_loop_pvm(hmm, seed, nsample, pvm_lumpsize, 
306                       lenmean, lensd, fixedlen, 
307                       &hist, &max, &extrawatch, &pvm_nslaves);
308       }
309 #endif 
310 #ifdef HMMER_THREADS
311       else if (num_threads > 0)
312         main_loop_threaded(hmm, seed, nsample, lenmean, lensd, fixedlen,
313                            num_threads, &hist, &max, &extrawatch);
314 #endif
315       else 
316         Die("wait. that can't happen. I didn't do anything.");
317
318
319       /* Fit an EVD to the observed histogram.
320        * The TRUE left-censors and fits only the right slope of the histogram.
321        * The 9999. is an arbitrary high number that means we won't trim
322        * outliers on the right.
323        */
324       if (! ExtremeValueFitHistogram(hist, TRUE, 9999.))
325         Die("fit failed; -n may be set too small?\n");
326       
327       mu[nhmm]     = hist->param[EVD_MU];
328       lambda[nhmm] = hist->param[EVD_LAMBDA];
329       nhmm++;
330       if (nhmm % 100 == 0) {
331         mu     = ReallocOrDie(mu,     sizeof(float) * (nhmm+mu_lumpsize));
332         lambda = ReallocOrDie(lambda, sizeof(float) * (nhmm+mu_lumpsize));
333       }      
334
335       /* Output
336        */
337       printf("HMM    : %s\n",   hmm->name);
338       printf("mu     : %12f\n", hist->param[EVD_MU]);
339       printf("lambda : %12f\n", hist->param[EVD_LAMBDA]);
340       printf("max    : %12f\n", max);
341       printf("//\n");
342
343       if (hfp != NULL) 
344         {
345           fprintf(hfp, "HMM: %s\n", hmm->name);
346           PrintASCIIHistogram(hfp, hist);
347           fprintf(hfp, "//\n");
348         }
349
350       FreeHistogram(hist);
351     }
352   SQD_DPRINTF1(("Main body believes it has calibrations for %d HMMs\n", nhmm));
353
354   /*****************************************************************
355    * Rewind the HMM file for a second pass.
356    * Write a temporary HMM file with new mu, lambda values in it
357    *****************************************************************/
358
359   HMMFileRewind(hmmfp);
360   if (FileExists(tmpfile))
361     Die("Ouch. Temporary file %s appeared during the run.", tmpfile);
362   if ((outfp = fopen(tmpfile, mode)) == NULL)
363     Die("Ouch. Temporary file %s couldn't be opened for writing.", tmpfile); 
364   
365   for (idx = 0; idx < nhmm; idx++)
366     {
367       /* Sanity checks 
368        */
369       if (!HMMFileRead(hmmfp, &hmm))
370         Die("Ran out of HMMs too early in pass 2");
371       if (hmm == NULL) 
372         Die("HMM file %s was corrupted? Parse failed in pass 2", hmmfile);
373
374       /* Put results in HMM
375        */
376       hmm->mu     = mu[idx];
377       hmm->lambda = lambda[idx];
378       hmm->flags |= PLAN7_STATS;
379       Plan7ComlogAppend(hmm, argc, argv);
380
381       /* Save HMM to tmpfile
382        */
383       if (hmmfp->is_binary) WriteBinHMM(outfp, hmm);
384       else                  WriteAscHMM(outfp, hmm); 
385
386       FreePlan7(hmm);
387     }
388   
389   /*****************************************************************
390    * Now, carefully remove original file and replace it
391    * with the tmpfile. Note the protection from signals;
392    * we wouldn't want a user to ctrl-C just as we've deleted
393    * their HMM file but before the new one is moved.
394    *****************************************************************/
395
396   HMMFileClose(hmmfp);
397   if (fclose(outfp)   != 0) PANIC;
398
399   if (sigemptyset(&blocksigs) != 0) PANIC;
400   if (sigaddset(&blocksigs, SIGINT) != 0) PANIC;
401   if (sigprocmask(SIG_BLOCK, &blocksigs, NULL) != 0)   PANIC;
402   if (remove(hmmfile) != 0)                            PANIC;
403   if (rename(tmpfile, hmmfile) != 0)                   PANIC;
404   if (sigprocmask(SIG_UNBLOCK, &blocksigs, NULL) != 0) PANIC;
405
406   /***********************************************
407    * Exit
408    ***********************************************/
409
410   StopwatchStop(&stopwatch);
411   if (do_pvm > 0) {
412     printf("PVM processors used: %d\n", pvm_nslaves);
413     StopwatchInclude(&stopwatch, &extrawatch);
414   }
415 #ifdef PTHREAD_TIMES_HACK
416   else if (num_threads > 0) StopwatchInclude(&stopwatch, &extrawatch);
417 #endif
418
419   /*  StopwatchDisplay(stdout, "CPU Time: ", &stopwatch); */
420
421   free(mu);
422   free(lambda);
423   free(tmpfile);
424   if (hfp != NULL) fclose(hfp);
425   SqdClean();
426   return 0;
427 }
428
429 /* Function: main_loop_serial()
430  * Date:     SRE, Tue Aug 18 16:18:28 1998 [St. Louis]
431  *
432  * Purpose:  Given an HMM and parameters for synthesizing random
433  *           sequences; return a histogram of scores.
434  *           (Serial version)  
435  *
436  * Args:     hmm      - an HMM to calibrate.
437  *           seed     - random number seed
438  *           nsample  - number of seqs to synthesize
439  *           lenmean  - mean length of random sequence
440  *           lensd    - std dev of random seq length
441  *           fixedlen - if nonzero, override lenmean, always this len
442  *           ret_hist - RETURN: the score histogram 
443  *           ret_max  - RETURN: highest score seen in simulation
444  *
445  * Returns:  (void)
446  *           hist is alloc'ed here, and must be free'd by caller.
447  */
448 static void
449 main_loop_serial(struct plan7_s *hmm, int seed, int nsample, 
450                  float lenmean, float lensd, int fixedlen,
451                  struct histogram_s **ret_hist, float *ret_max)
452 {
453   struct histogram_s *hist;
454   float  randomseq[MAXABET];
455   float  p1;
456   float  max;
457   char  *seq;
458   char  *dsq;
459   float  score;
460   int    sqlen;
461   int    idx;
462   
463   /* Initialize.
464    * We assume we've already set the alphabet (safe, because
465    * HMM input sets the alphabet).
466    */
467   sre_srandom(seed);
468   P7Logoddsify(hmm, TRUE);
469   P7DefaultNullModel(randomseq, &p1);
470   hist = AllocHistogram(-200, 200, 100);
471   max = -FLT_MAX;
472
473   for (idx = 0; idx < nsample; idx++)
474     {
475                                 /* choose length of random sequence */
476       if (fixedlen) sqlen = fixedlen;
477       else do sqlen = (int) Gaussrandom(lenmean, lensd); while (sqlen < 1);
478                                 /* generate it */
479       seq = RandomSequence(Alphabet, randomseq, Alphabet_size, sqlen);
480       dsq = DigitizeSequence(seq, sqlen);
481
482       if (P7ViterbiSize(sqlen, hmm->M) <= RAMLIMIT)
483         score = P7Viterbi(dsq, sqlen, hmm, NULL);
484       else
485         score = P7SmallViterbi(dsq, sqlen, hmm, NULL);
486
487       AddToHistogram(hist, score);
488       if (score > max) max = score;
489
490       free(dsq); 
491       free(seq);
492     }
493
494   *ret_hist   = hist;
495   *ret_max    = max;
496   return;
497 }
498
499
500 #ifdef HMMER_THREADS
501 /* Function: main_loop_threaded()
502  * Date:     SRE, Wed Dec  1 12:43:09 1999 [St. Louis]
503  *
504  * Purpose:  Given an HMM and parameters for synthesizing random
505  *           sequences; return a histogram of scores.
506  *           (Threaded version.)  
507  *
508  * Args:     hmm      - an HMM to calibrate.
509  *           seed     - random number seed
510  *           nsample  - number of seqs to synthesize
511  *           lenmean  - mean length of random sequence
512  *           lensd    - std dev of random seq length
513  *           fixedlen - if nonzero, override lenmean, always this len
514  *           nthreads - number of threads to start
515  *           ret_hist - RETURN: the score histogram 
516  *           ret_max  - RETURN: highest score seen in simulation
517  *           twatch   - RETURN: accumulation of thread times
518  *
519  * Returns:  (void)
520  *           hist is alloc'ed here, and must be free'd by caller.
521  */
522 static void
523 main_loop_threaded(struct plan7_s *hmm, int seed, int nsample, 
524                    float lenmean, float lensd, int fixedlen,
525                    int nthreads,
526                    struct histogram_s **ret_hist, float *ret_max,
527                    Stopwatch_t *twatch)
528 {
529   struct histogram_s *hist;
530   float  randomseq[MAXABET];
531   float  p1;
532   struct workpool_s *wpool;     /* pool of worker threads  */
533   
534   /* Initialize.
535    * We assume we've already set the alphabet (safe, because
536    * HMM input sets the alphabet).
537    */
538   sre_srandom(seed);
539   P7Logoddsify(hmm, TRUE);
540   P7DefaultNullModel(randomseq, &p1);
541   hist = AllocHistogram(-200, 200, 100);
542
543   wpool = workpool_start(hmm, lenmean, lensd, fixedlen, randomseq, nsample,
544                          hist, nthreads);
545   workpool_stop(wpool);
546
547   *ret_hist = hist;
548   *ret_max  = wpool->max_score;
549   StopwatchInclude(twatch, &(wpool->watch));
550
551   workpool_free(wpool);
552   return;
553 }
554
555 /*****************************************************************
556  * POSIX threads implementation.
557  * API:
558  *      workpool_start()   (makes a workpool_s structure. Starts calculations.)
559  *      workpool_stop()    (waits for threads to finish.)
560  *      [process histogram]
561  *      workpool_free()    (destroys the structure)
562  *      
563  * Threads:
564  *      worker_thread()    (the actual parallelized worker thread).
565  *****************************************************************/
566
567 /* Function: workpool_start()
568  * Date:     SRE, Thu Jul 16 11:09:05 1998 [St. Louis]
569  *
570  * Purpose:  Initialize a workpool_s structure, and return it.
571  *
572  * Args:     hmm      - the HMM to calibrate
573  *           fixedlen - 0, or a fixed length for seqs (bypass of Gaussian)
574  *           lenmean  - mean sequence length 
575  *           lensd    - std. dev. for sequence length
576  *           randomseq- i.i.d. frequencies for residues, 0..Alphabet_size-1
577  *           nsample  - how many seqs to calibrate on
578  *           hist     - histogram structure for storing results
579  *           num_threads - how many processors to run on
580  *
581  * Returns:  ptr to struct workpool_s.
582  *           Caller must wait for threads to finish with workpool_stop(),
583  *           then free the structure with workpool_free().
584  */
585 static struct workpool_s *
586 workpool_start(struct plan7_s *hmm, float lenmean, float lensd, int fixedlen,
587                float *randomseq, int nsample, struct histogram_s *hist, 
588                int num_threads)
589 {
590   struct workpool_s *wpool;
591   pthread_attr_t    attr;
592   int i;
593   int rtn;
594
595   wpool         = MallocOrDie(sizeof(struct workpool_s));
596   wpool->thread = MallocOrDie(num_threads * sizeof(pthread_t));
597   wpool->hmm        = hmm;
598   wpool->fixedlen   = fixedlen;
599   wpool->lenmean    = lenmean;
600   wpool->lensd      = lensd;
601   wpool->randomseq  = randomseq;
602   wpool->nsample    = nsample;
603   
604   wpool->nseq       = 0;
605   wpool->hist       = hist;
606   wpool->max_score  = -FLT_MAX;
607   wpool->num_threads= num_threads;
608
609   StopwatchZero(&(wpool->watch));
610   
611   if ((rtn = pthread_mutex_init(&(wpool->input_lock), NULL)) != 0)
612     Die("pthread_mutex_init FAILED; %s\n", strerror(rtn));
613   if ((rtn = pthread_mutex_init(&(wpool->output_lock), NULL)) != 0)
614     Die("pthread_mutex_init FAILED; %s\n", strerror(rtn));
615
616   /* Create slave threads.
617    * Note the crazy machinations we have to go through to achieve concurrency.
618    * You'd think that POSIX threads were portable... ha.
619    * On IRIX 6.5, system scope threads are only available to root, or if
620    *   /etc/capability has been configured specially, so to avoid strange
621    *   permissions errors we can't set PTHREAD_SCOPE_SYSTEM for IRIX.
622    * On IRIX pre-6.5, we can't get good concurrency, period. As of 6.5,
623    *   SGI provides the nonportable pthread_setconcurrency() call.
624    * On FreeBSD (3.0 snapshots), the pthread_attr_setscope() call isn't
625    *   even provided, apparently on grounds of "if it doesn't do anything,
626    *   why provide it?" Hello? POSIX compliance, perhaps?
627    * On Sun Solaris, we need to set system scope to achieve concurrency.
628    * Linux and DEC Digital UNIX seem to work fine in either process scope
629    *   or system scope, without a pthread_setconcurrency call.
630    */
631   pthread_attr_init(&attr);
632 #ifndef __sgi
633 #ifdef HAVE_PTHREAD_ATTR_SETSCOPE
634   pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
635 #endif
636 #endif
637 #ifdef HAVE_PTHREAD_SETCONCURRENCY
638   pthread_setconcurrency(num_threads+1);
639 #endif
640   for (i = 0; i < num_threads; i++)
641     if ((rtn = pthread_create(&(wpool->thread[i]), &attr,
642                               worker_thread , (void *) wpool)) != 0)
643       Die("Failed to create thread %d; return code %d\n", i, rtn);
644
645   pthread_attr_destroy(&attr);
646
647   return wpool;
648 }
649
650 /* Function: workpool_stop()
651  * Date:     SRE, Thu Jul 16 11:20:16 1998 [St. Louis]
652  *
653  * Purpose:  Waits for threads in a workpool to finish.
654  *
655  * Args:     wpool -- ptr to the workpool structure
656  *
657  * Returns:  (void)
658  */
659 static void
660 workpool_stop(struct workpool_s *wpool)
661 {
662   int i;
663                                 /* wait for threads to stop */
664   for (i = 0; i < wpool->num_threads; i++)
665     if (pthread_join(wpool->thread[i],NULL) != 0)
666       Die("pthread_join failed");
667   return;
668 }
669
670 /* Function: workpool_free()
671  * Date:     SRE, Thu Jul 16 11:26:27 1998 [St. Louis]
672  *
673  * Purpose:  Free a workpool_s structure, after the threads
674  *           have finished.
675  *
676  * Args:     wpool -- ptr to the workpool.
677  *
678  * Returns:  (void)
679  */
680 static void
681 workpool_free(struct workpool_s *wpool)
682 {
683   free(wpool->thread);
684   free(wpool);
685   return;
686 }
687
688 /* Function: worker_thread()
689  * Date:     SRE, Thu Jul 16 10:41:02 1998 [St. Louis]
690  *
691  * Purpose:  The procedure executed by the worker threads.
692  *
693  * Args:     ptr  - (void *) that is recast to a pointer to
694  *                  the workpool.
695  *
696  * Returns:  (void *)
697  */
698 void *
699 worker_thread(void *ptr)
700 {
701   struct plan7_s    *hmm;
702   struct workpool_s *wpool;
703   char       *seq;
704   char       *dsq;
705   int         len;
706   float       sc;
707   int         rtn;
708   Stopwatch_t thread_watch;
709
710   StopwatchStart(&thread_watch);
711   wpool = (struct workpool_s *) ptr;
712   hmm   = wpool->hmm;
713   for (;;)
714     {
715       /* 1. Synthesize a random sequence. 
716        *    The input sequence number is a shared resource,
717        *    and sre_random() isn't thread-safe, so protect
718        *    the whole section with mutex.
719        */
720                                 /* acquire a lock */
721       if ((rtn = pthread_mutex_lock(&(wpool->input_lock))) != 0)
722         Die("pthread_mutex_lock failure: %s\n", strerror(rtn));
723                                 /* generate a sequence */
724       wpool->nseq++;
725       if (wpool->nseq > wpool->nsample) 
726         { /* we're done; release input lock, break loop */
727           if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0)
728             Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));
729           break;
730         }
731       if (wpool->fixedlen) len = wpool->fixedlen;
732       else do len = (int) Gaussrandom(wpool->lenmean, wpool->lensd); while (len < 1);
733       seq = RandomSequence(Alphabet, wpool->randomseq, Alphabet_size, len);
734
735                                 /* release the lock */
736       if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0)
737         Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));
738
739       /* 2. Score the sequence against the model.
740        */
741       dsq = DigitizeSequence(seq, len);
742       
743       if (P7ViterbiSize(len, hmm->M) <= RAMLIMIT)
744         sc = P7Viterbi(dsq, len, hmm, NULL);
745       else
746         sc = P7SmallViterbi(dsq, len, hmm, NULL);
747       free(dsq); 
748       free(seq);
749       
750       /* 3. Save the output; hist and max_score are shared,
751        *    so protect this section with the output mutex.
752        */
753                                 /* acquire lock on the output queue */
754       if ((rtn = pthread_mutex_lock(&(wpool->output_lock))) != 0)
755         Die("pthread_mutex_lock failure: %s\n", strerror(rtn));
756                                 /* save output */
757       AddToHistogram(wpool->hist, sc);
758       if (sc > wpool->max_score) wpool->max_score = sc;
759                                 /* release our lock */
760       if ((rtn = pthread_mutex_unlock(&(wpool->output_lock))) != 0)
761         Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));
762     }
763
764   StopwatchStop(&thread_watch);
765                                 /* acquire lock on the output queue */
766   if ((rtn = pthread_mutex_lock(&(wpool->output_lock))) != 0)
767     Die("pthread_mutex_lock failure: %s\n", strerror(rtn));
768                                 /* accumulate cpu time into main stopwatch */
769   StopwatchInclude(&(wpool->watch), &thread_watch);
770                                 /* release our lock */
771   if ((rtn = pthread_mutex_unlock(&(wpool->output_lock))) != 0)
772     Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));
773
774   pthread_exit(NULL);
775   return NULL; /* solely to silence compiler warnings */
776 }
777 #endif /* HMMER_THREADS */
778
779
780
781 #ifdef HMMER_PVM
782 /* Function: main_loop_pvm()
783  * Date:     SRE, Wed Aug 19 13:59:54 1998 [St. Louis]
784  *
785  * Purpose:  Given an HMM and parameters for synthesizing random
786  *           sequences; return a histogram of scores.
787  *           (PVM version)  
788  *
789  * Args:     hmm     - an HMM to calibrate.
790  *           seed    - random number seed
791  *           nsample - number of seqs to synthesize
792  *           lumpsize- # of seqs per slave exchange; controls granularity
793  *           lenmean - mean length of random sequence
794  *           lensd   - std dev of random seq length
795  *           fixedlen- if nonzero, override lenmean, always this len
796  *           hist       - RETURN: the score histogram 
797  *           ret_max    - RETURN: highest score seen in simulation
798  *           extrawatch - RETURN: total CPU time spend in slaves.
799  *           ret_nslaves- RETURN: number of PVM slaves run.
800  *
801  * Returns:  (void)
802  *           hist is alloc'ed here, and must be free'd by caller.
803  */
804 static void
805 main_loop_pvm(struct plan7_s *hmm, int seed, int nsample, int lumpsize,
806               float lenmean, float lensd, int fixedlen,
807               struct histogram_s **ret_hist, float *ret_max, 
808               Stopwatch_t *extrawatch, int *ret_nslaves)
809 {
810   struct histogram_s *hist;
811   int                 master_tid;
812   int                *slave_tid;
813   int                 nslaves;
814   int                 nsent;    /* # of seqs we've asked for so far       */
815   int                 ndone;    /* # of seqs we've got results for so far */
816   int                 packet;   /* # of seqs to have a slave do           */
817   float               max;
818   int                 slaveidx; /* id of a slave */
819   float              *sc;        /* scores returned by a slave */
820   Stopwatch_t         slavewatch;
821   int                 i;
822   
823   StopwatchZero(extrawatch);
824   hist = AllocHistogram(-200, 200, 100);
825   max  = -FLT_MAX;
826
827   /* Initialize PVM
828    */
829   if ((master_tid = pvm_mytid()) < 0)
830     Die("pvmd not responding -- do you have PVM running?");
831 #if DEBUGLEVEL >= 1
832   pvm_catchout(stderr);         /* catch output for debugging */
833 #endif
834   PVMSpawnSlaves("hmmcalibrate-pvm", &slave_tid, &nslaves);
835
836   /* Initialize the slaves
837    */
838   pvm_initsend(PvmDataDefault);
839   pvm_pkfloat(&lenmean,       1, 1);
840   pvm_pkfloat(&lensd,         1, 1);
841   pvm_pkint(  &fixedlen,      1, 1);
842   pvm_pkint(  &Alphabet_type, 1, 1);
843   pvm_pkint(  &seed,          1, 1);
844   if (! PVMPackHMM(hmm)) Die("Failed to pack the HMM");
845   pvm_mcast(slave_tid, nslaves, HMMPVM_INIT);
846   SQD_DPRINTF1(("Initialized %d slaves\n", nslaves));
847
848   /* Confirm slaves' OK status.
849    */
850   PVMConfirmSlaves(slave_tid, nslaves);
851   SQD_DPRINTF1(("Slaves confirm that they're ok...\n"));
852  
853   /* Load the slaves
854    */
855   nsent = ndone = 0;
856   for (slaveidx = 0; slaveidx < nslaves; slaveidx++)
857     {
858       packet    = (nsample - nsent > lumpsize ? lumpsize : nsample - nsent);
859
860       pvm_initsend(PvmDataDefault);
861       pvm_pkint(&packet,    1, 1);
862       pvm_pkint(&slaveidx,  1, 1);
863       pvm_send(slave_tid[slaveidx], HMMPVM_WORK);
864       nsent += packet;
865     }
866   SQD_DPRINTF1(("Loaded %d slaves\n", nslaves));
867
868   /* Receive/send loop
869    */
870   sc = MallocOrDie(sizeof(float) * lumpsize);
871   while (nsent < nsample)
872     {
873                                 /* integrity check of slaves */
874       PVMCheckSlaves(slave_tid, nslaves);
875
876                                 /* receive results */
877       SQD_DPRINTF2(("Waiting for results...\n"));
878       pvm_recv(-1, HMMPVM_RESULTS);
879       pvm_upkint(&slaveidx,   1, 1);
880       pvm_upkint(&packet,     1, 1);
881       pvm_upkfloat(sc,   packet, 1);
882       SQD_DPRINTF2(("Got results.\n"));
883       ndone += packet;
884
885                                 /* store results */
886       for (i = 0; i < packet; i++) {
887         AddToHistogram(hist, sc[i]);
888         if (sc[i] > max) max = sc[i];
889       }
890                                 /* send new work */
891       packet    = (nsample - nsent > lumpsize ? lumpsize : nsample - nsent);
892
893       pvm_initsend(PvmDataDefault);
894       pvm_pkint(&packet,    1, 1);
895       pvm_pkint(&slaveidx,  1, 1);
896       pvm_send(slave_tid[slaveidx], HMMPVM_WORK);
897       SQD_DPRINTF2(("Told slave %d to do %d more seqs.\n", slaveidx, packet));
898       nsent += packet;
899     }
900       
901   /* Wait for the last output to come in.
902    */
903   while (ndone < nsample)
904     {
905                                 /* integrity check of slaves */
906       PVMCheckSlaves(slave_tid, nslaves);
907
908                                 /* receive results */
909       SQD_DPRINTF1(("Waiting for final results...\n"));
910       pvm_recv(-1, HMMPVM_RESULTS);
911       pvm_upkint(&slaveidx, 1, 1);
912       pvm_upkint(&packet,   1, 1);
913       pvm_upkfloat(sc, packet, 1);
914       SQD_DPRINTF2(("Got some final results.\n"));
915       ndone += packet;
916                                 /* store results */
917       for (i = 0; i < packet; i++) {
918         AddToHistogram(hist, sc[i]);
919         if (sc[i] > max) max = sc[i];
920       }
921     }
922
923   /* Shut down the slaves: send -1,-1,-1.
924    */
925   pvm_initsend(PvmDataDefault);
926   packet = -1;
927   pvm_pkint(&packet, 1, 1);
928   pvm_pkint(&packet, 1, 1);
929   pvm_pkint(&packet, 1, 1);
930   pvm_mcast(slave_tid, nslaves, HMMPVM_WORK);
931
932   /* Collect stopwatch results; quit the VM; return.
933    */
934   for (i = 0; i < nslaves; i++)
935     {
936       pvm_recv(-1, HMMPVM_RESULTS);
937       pvm_upkint(&slaveidx, 1, 1);
938       StopwatchPVMUnpack(&slavewatch);
939
940       SQD_DPRINTF1(("Slave %d finished; says it used %.2f cpu, %.2f sys\n",
941                     slaveidx, slavewatch.user, slavewatch.sys));
942
943       StopwatchInclude(extrawatch, &slavewatch);
944     }
945
946   free(slave_tid);
947   free(sc);
948   pvm_exit();
949   *ret_hist    = hist;
950   *ret_max     = max;
951   *ret_nslaves = nslaves;
952   return;
953 }
954 #endif /* HMMER_PVM */
955
956
957