1 /************************************************************
2 * HMMER - Biological sequence analysis with profile HMMs
3 * Copyright (C) 1992-1999 Washington University School of Medicine
6 * This source code is distributed under the terms of the
7 * GNU General Public License. See the files COPYING and LICENSE
9 ************************************************************/
12 * SRE, Fri Oct 31 09:25:21 1997 [St. Louis]
14 * Score an HMM against random sequence data sets;
15 * set histogram fitting parameters.
17 * CVS $Id: hmmcalibrate.c,v 1.1.1.1 2005/03/22 08:34:03 cmzmasek Exp $
35 #include "squid.h" /* general sequence analysis library */
36 #include "config.h" /* compile-time configuration constants */
37 #include "structs.h" /* data structures, macros, #define's */
38 #include "funcs.h" /* function declarations */
39 #include "globals.h" /* alphabet global variables */
40 #include "version.h" /* release version info */
41 #include "stopwatch.h" /* process timings */
43 static char banner[] = "hmmcalibrate -- calibrate HMM search statistics";
45 static char usage[] = "\
46 Usage: hmmcalibrate [-options] <hmmfile>\n\
47 Available options are:\n\
48 -h : print short usage and version info, then exit\n\
51 static char experts[] = "\
52 --cpu <n> : run <n> threads in parallel (if threaded)\n\
53 --fixed <n> : fix random sequence length at <n>\n\
54 --histfile <f> : save histogram(s) to file <f>\n\
55 --mean <x> : set random seq length mean at <x> [350]\n\
56 --num <n> : set number of sampled seqs to <n> [5000]\n\
57 --pvm : run on a Parallel Virtual Machine (PVM)\n\
58 --sd <x> : set random seq length std. dev to <x> [350]\n\
59 --seed <n> : set random seed to <n> [time()]\n\
62 static struct opt_s OPTIONS[] = {
63 { "-h", TRUE, sqdARG_NONE },
64 { "--cpu", FALSE, sqdARG_INT },
65 { "--fixed", FALSE, sqdARG_INT },
66 { "--histfile", FALSE, sqdARG_STRING },
67 { "--mean", FALSE, sqdARG_FLOAT },
68 { "--num", FALSE, sqdARG_INT },
69 { "--pvm", FALSE, sqdARG_NONE },
70 { "--sd", FALSE, sqdARG_FLOAT },
71 { "--seed", FALSE, sqdARG_INT},
73 #define NOPTIONS (sizeof(OPTIONS) / sizeof(struct opt_s))
76 static void main_loop_serial(struct plan7_s *hmm, int seed, int nsample,
77 float lenmean, float lensd, int fixedlen,
78 struct histogram_s **ret_hist, float *ret_max);
81 /* A structure of this type is shared by worker threads in the POSIX
82 * threads parallel version.
85 /* Static configuration:
87 struct plan7_s *hmm; /* ptr to single HMM to search with */
88 int fixedlen; /* if >0, fix random seq len to this */
89 float lenmean; /* mean of Gaussian for random seq len */
90 float lensd; /* s.d. of Gaussian for random seq len */
91 float *randomseq; /* 0..Alphabet_size-1 i.i.d. probs */
92 int nsample; /* number of random seqs to do */
94 /* Shared (mutex-protected) input:
96 int nseq; /* current number of seqs searched */
98 /* Shared (mutex-protected) output:
100 struct histogram_s *hist; /* histogram */
101 float max_score; /* maximum score seen */
102 Stopwatch_t watch; /* Timings accumulated for threads */
104 /* Thread pool information:
106 pthread_t *thread; /* our pool of threads */
107 int num_threads; /* number of threads */
108 pthread_mutex_t input_lock; /* a mutex protecting input fields */
109 pthread_mutex_t output_lock; /* a mutex protecting output fields */
111 static void main_loop_threaded(struct plan7_s *hmm, int seed, int nsample,
112 float lenmean, float lensd, int fixedlen,
114 struct histogram_s **ret_hist, float *ret_max,
115 Stopwatch_t *twatch);
116 static struct workpool_s *workpool_start(struct plan7_s *hmm,
117 float lenmean, float lensd, int fixedlen,
118 float *randomseq, int nsample,
119 struct histogram_s *hist,
121 static void workpool_stop(struct workpool_s *wpool);
122 static void workpool_free(struct workpool_s *wpool);
123 static void *worker_thread(void *ptr);
124 #endif /* HMMER_THREADS */
127 static void main_loop_pvm(struct plan7_s *hmm, int seed, int nsample,
129 float lenmean, float lensd, int fixedlen,
130 struct histogram_s **ret_hist, float *ret_max,
131 Stopwatch_t *extrawatch, int *ret_nslaves);
132 #endif /* HMMER_PVM */
136 main(int argc, char **argv)
138 char *hmmfile; /* HMM file to open */
139 char *tmpfile; /* temporary calibrated HMM file */
140 HMMFILE *hmmfp; /* opened hmm file pointer */
141 FILE *outfp; /* for writing HMM(s) into tmpfile */
142 char *mode; /* write mode, "w" or "wb" */
143 struct plan7_s *hmm; /* the hidden Markov model */
144 int idx; /* counter over sequences */
145 sigset_t blocksigs; /* list of signals to protect from */
146 int nhmm; /* number of HMMs calibrated */
148 struct histogram_s *hist; /* a resulting histogram */
149 float max; /* maximum score from an HMM */
150 char *histfile; /* histogram save file */
151 FILE *hfp; /* open file pointer for histfile */
153 Stopwatch_t stopwatch; /* main stopwatch for process */
154 Stopwatch_t extrawatch; /* stopwatch for threads/PVM slaves */
156 float *mu; /* array of EVD mu's for HMMs */
157 float *lambda; /* array of EVD lambda's for HMMs */
158 int mu_lumpsize; /* allocation lumpsize for mu, lambda */
160 int nsample; /* number of random seqs to sample */
161 int seed; /* random number seed */
162 int fixedlen; /* fixed length, or 0 if unused */
163 float lenmean; /* mean of length distribution */
164 float lensd; /* std dev of length distribution */
165 int do_pvm; /* TRUE to use PVM */
166 int pvm_lumpsize; /* # of seqs to do per PVM slave exchange */
167 int pvm_nslaves; /* number of slaves used in the PVM */
170 char *optname; /* name of option found by Getopt() */
171 char *optarg; /* argument found by Getopt() */
172 int optind; /* index in argv[] */
174 int num_threads; /* number of worker threads */
177 /***********************************************
178 * Parse the command line
179 ***********************************************/
180 StopwatchStart(&stopwatch);
181 StopwatchZero(&extrawatch);
187 seed = (int) time ((time_t *) NULL);
190 pvm_lumpsize = 20; /* 20 seqs/PVM exchange: sets granularity */
193 num_threads = ThreadNumber(); /* only matters if we're threaded */
198 while (Getopt(argc, argv, OPTIONS, NOPTIONS, usage,
199 &optind, &optname, &optarg))
201 if (strcmp(optname, "--cpu") == 0) num_threads = atoi(optarg);
202 else if (strcmp(optname, "--fixed") == 0) fixedlen = atoi(optarg);
203 else if (strcmp(optname, "--histfile") == 0) histfile = optarg;
204 else if (strcmp(optname, "--mean") == 0) lenmean = atof(optarg);
205 else if (strcmp(optname, "--num") == 0) nsample = atoi(optarg);
206 else if (strcmp(optname, "--pvm") == 0) do_pvm = TRUE;
207 else if (strcmp(optname, "--sd") == 0) lensd = atof(optarg);
208 else if (strcmp(optname, "--seed") == 0) seed = atoi(optarg);
209 else if (strcmp(optname, "-h") == 0)
211 Banner(stdout, banner);
218 if (argc - optind != 1) Die("Incorrect number of arguments.\n%s\n", usage);
219 hmmfile = argv[optind++];
222 if (do_pvm) Die("PVM support is not compiled into HMMER; --pvm doesn't work.");
224 #ifndef HMMER_THREADS
225 if (num_threads) Die("Posix threads support is not compiled into HMMER; --cpu doesn't have any effect");
228 /***********************************************
229 * Open our i/o file pointers, make sure all is well
230 ***********************************************/
233 if ((hmmfp = HMMFileOpen(hmmfile, NULL)) == NULL)
234 Die("failed to open HMM file %s for reading.", hmmfile);
238 if (histfile != NULL) {
239 if ((hfp = fopen(histfile, "w")) == NULL)
240 Die("Failed to open histogram save file %s for writing\n", histfile);
243 /* Generate calibrated HMM(s) in a tmp file in the current
244 * directory. When we're finished, we delete the original
245 * HMM file and rename() this one. That way, the worst
246 * effect of a catastrophic failure should be that we
247 * leave a tmp file lying around, but the original HMM
248 * file remains uncorrupted. tmpnam() doesn't work portably here,
249 * because it'll put the file in /tmp and we won't
250 * necessarily be able to rename() it from there.
252 tmpfile = MallocOrDie(strlen(hmmfile) + 5);
253 strcpy(tmpfile, hmmfile);
254 strcat(tmpfile, ".xxx"); /* could be more inventive here... */
255 if (FileExists(tmpfile))
256 Die("temporary file %s already exists; please delete it first", tmpfile);
257 if (hmmfp->is_binary) mode = "wb";
260 /***********************************************
262 ***********************************************/
264 Banner(stdout, banner);
265 printf("HMM file: %s\n", hmmfile);
267 printf("Length fixed to: %d\n", fixedlen);
269 printf("Length distribution mean: %.0f\n", lenmean);
270 printf("Length distribution s.d.: %.0f\n", lensd);
272 printf("Number of samples: %d\n", nsample);
273 printf("random seed: %d\n", seed);
274 printf("histogram(s) saved to: %s\n",
275 histfile != NULL ? histfile : "[not saved]");
277 printf("PVM: ACTIVE\n");
278 else if (num_threads > 0)
279 printf("POSIX threads: %d\n", num_threads);
280 printf("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n\n");
282 /***********************************************
283 * Read the HMMs one at a time, and send them off
284 * in probability form to one of the main loops.
285 * The main loop functions are responsible for
286 * synthesizing random sequences and returning
287 * a score histogram for each HMM.
288 ***********************************************/
291 mu = MallocOrDie(sizeof(float) * mu_lumpsize);
292 lambda = MallocOrDie(sizeof(float) * mu_lumpsize);
294 while (HMMFileRead(hmmfp, &hmm))
297 Die("HMM file may be corrupt or in incorrect format; parse failed");
299 if (! do_pvm && num_threads == 0)
300 main_loop_serial(hmm, seed, nsample, lenmean, lensd, fixedlen,
304 pvm_nslaves = 0; /* solely to silence compiler warnings */
305 main_loop_pvm(hmm, seed, nsample, pvm_lumpsize,
306 lenmean, lensd, fixedlen,
307 &hist, &max, &extrawatch, &pvm_nslaves);
311 else if (num_threads > 0)
312 main_loop_threaded(hmm, seed, nsample, lenmean, lensd, fixedlen,
313 num_threads, &hist, &max, &extrawatch);
316 Die("wait. that can't happen. I didn't do anything.");
319 /* Fit an EVD to the observed histogram.
320 * The TRUE left-censors and fits only the right slope of the histogram.
321 * The 9999. is an arbitrary high number that means we won't trim
322 * outliers on the right.
324 if (! ExtremeValueFitHistogram(hist, TRUE, 9999.))
325 Die("fit failed; -n may be set too small?\n");
327 mu[nhmm] = hist->param[EVD_MU];
328 lambda[nhmm] = hist->param[EVD_LAMBDA];
330 if (nhmm % 100 == 0) {
331 mu = ReallocOrDie(mu, sizeof(float) * (nhmm+mu_lumpsize));
332 lambda = ReallocOrDie(lambda, sizeof(float) * (nhmm+mu_lumpsize));
337 printf("HMM : %s\n", hmm->name);
338 printf("mu : %12f\n", hist->param[EVD_MU]);
339 printf("lambda : %12f\n", hist->param[EVD_LAMBDA]);
340 printf("max : %12f\n", max);
345 fprintf(hfp, "HMM: %s\n", hmm->name);
346 PrintASCIIHistogram(hfp, hist);
347 fprintf(hfp, "//\n");
352 SQD_DPRINTF1(("Main body believes it has calibrations for %d HMMs\n", nhmm));
354 /*****************************************************************
355 * Rewind the HMM file for a second pass.
356 * Write a temporary HMM file with new mu, lambda values in it
357 *****************************************************************/
359 HMMFileRewind(hmmfp);
360 if (FileExists(tmpfile))
361 Die("Ouch. Temporary file %s appeared during the run.", tmpfile);
362 if ((outfp = fopen(tmpfile, mode)) == NULL)
363 Die("Ouch. Temporary file %s couldn't be opened for writing.", tmpfile);
365 for (idx = 0; idx < nhmm; idx++)
369 if (!HMMFileRead(hmmfp, &hmm))
370 Die("Ran out of HMMs too early in pass 2");
372 Die("HMM file %s was corrupted? Parse failed in pass 2", hmmfile);
374 /* Put results in HMM
377 hmm->lambda = lambda[idx];
378 hmm->flags |= PLAN7_STATS;
379 Plan7ComlogAppend(hmm, argc, argv);
381 /* Save HMM to tmpfile
383 if (hmmfp->is_binary) WriteBinHMM(outfp, hmm);
384 else WriteAscHMM(outfp, hmm);
389 /*****************************************************************
390 * Now, carefully remove original file and replace it
391 * with the tmpfile. Note the protection from signals;
392 * we wouldn't want a user to ctrl-C just as we've deleted
393 * their HMM file but before the new one is moved.
394 *****************************************************************/
397 if (fclose(outfp) != 0) PANIC;
399 if (sigemptyset(&blocksigs) != 0) PANIC;
400 if (sigaddset(&blocksigs, SIGINT) != 0) PANIC;
401 if (sigprocmask(SIG_BLOCK, &blocksigs, NULL) != 0) PANIC;
402 if (remove(hmmfile) != 0) PANIC;
403 if (rename(tmpfile, hmmfile) != 0) PANIC;
404 if (sigprocmask(SIG_UNBLOCK, &blocksigs, NULL) != 0) PANIC;
406 /***********************************************
408 ***********************************************/
410 StopwatchStop(&stopwatch);
412 printf("PVM processors used: %d\n", pvm_nslaves);
413 StopwatchInclude(&stopwatch, &extrawatch);
415 #ifdef PTHREAD_TIMES_HACK
416 else if (num_threads > 0) StopwatchInclude(&stopwatch, &extrawatch);
419 /* StopwatchDisplay(stdout, "CPU Time: ", &stopwatch); */
424 if (hfp != NULL) fclose(hfp);
429 /* Function: main_loop_serial()
430 * Date: SRE, Tue Aug 18 16:18:28 1998 [St. Louis]
432 * Purpose: Given an HMM and parameters for synthesizing random
433 * sequences; return a histogram of scores.
436 * Args: hmm - an HMM to calibrate.
437 * seed - random number seed
438 * nsample - number of seqs to synthesize
439 * lenmean - mean length of random sequence
440 * lensd - std dev of random seq length
441 * fixedlen - if nonzero, override lenmean, always this len
442 * ret_hist - RETURN: the score histogram
443 * ret_max - RETURN: highest score seen in simulation
446 * hist is alloc'ed here, and must be free'd by caller.
449 main_loop_serial(struct plan7_s *hmm, int seed, int nsample,
450 float lenmean, float lensd, int fixedlen,
451 struct histogram_s **ret_hist, float *ret_max)
453 struct histogram_s *hist;
454 float randomseq[MAXABET];
464 * We assume we've already set the alphabet (safe, because
465 * HMM input sets the alphabet).
468 P7Logoddsify(hmm, TRUE);
469 P7DefaultNullModel(randomseq, &p1);
470 hist = AllocHistogram(-200, 200, 100);
473 for (idx = 0; idx < nsample; idx++)
475 /* choose length of random sequence */
476 if (fixedlen) sqlen = fixedlen;
477 else do sqlen = (int) Gaussrandom(lenmean, lensd); while (sqlen < 1);
479 seq = RandomSequence(Alphabet, randomseq, Alphabet_size, sqlen);
480 dsq = DigitizeSequence(seq, sqlen);
482 if (P7ViterbiSize(sqlen, hmm->M) <= RAMLIMIT)
483 score = P7Viterbi(dsq, sqlen, hmm, NULL);
485 score = P7SmallViterbi(dsq, sqlen, hmm, NULL);
487 AddToHistogram(hist, score);
488 if (score > max) max = score;
501 /* Function: main_loop_threaded()
502 * Date: SRE, Wed Dec 1 12:43:09 1999 [St. Louis]
504 * Purpose: Given an HMM and parameters for synthesizing random
505 * sequences; return a histogram of scores.
506 * (Threaded version.)
508 * Args: hmm - an HMM to calibrate.
509 * seed - random number seed
510 * nsample - number of seqs to synthesize
511 * lenmean - mean length of random sequence
512 * lensd - std dev of random seq length
513 * fixedlen - if nonzero, override lenmean, always this len
514 * nthreads - number of threads to start
515 * ret_hist - RETURN: the score histogram
516 * ret_max - RETURN: highest score seen in simulation
517 * twatch - RETURN: accumulation of thread times
520 * hist is alloc'ed here, and must be free'd by caller.
523 main_loop_threaded(struct plan7_s *hmm, int seed, int nsample,
524 float lenmean, float lensd, int fixedlen,
526 struct histogram_s **ret_hist, float *ret_max,
529 struct histogram_s *hist;
530 float randomseq[MAXABET];
532 struct workpool_s *wpool; /* pool of worker threads */
535 * We assume we've already set the alphabet (safe, because
536 * HMM input sets the alphabet).
539 P7Logoddsify(hmm, TRUE);
540 P7DefaultNullModel(randomseq, &p1);
541 hist = AllocHistogram(-200, 200, 100);
543 wpool = workpool_start(hmm, lenmean, lensd, fixedlen, randomseq, nsample,
545 workpool_stop(wpool);
548 *ret_max = wpool->max_score;
549 StopwatchInclude(twatch, &(wpool->watch));
551 workpool_free(wpool);
555 /*****************************************************************
556 * POSIX threads implementation.
558 * workpool_start() (makes a workpool_s structure. Starts calculations.)
559 * workpool_stop() (waits for threads to finish.)
560 * [process histogram]
561 * workpool_free() (destroys the structure)
564 * worker_thread() (the actual parallelized worker thread).
565 *****************************************************************/
567 /* Function: workpool_start()
568 * Date: SRE, Thu Jul 16 11:09:05 1998 [St. Louis]
570 * Purpose: Initialize a workpool_s structure, and return it.
572 * Args: hmm - the HMM to calibrate
573 * fixedlen - 0, or a fixed length for seqs (bypass of Gaussian)
574 * lenmean - mean sequence length
575 * lensd - std. dev. for sequence length
576 * randomseq- i.i.d. frequencies for residues, 0..Alphabet_size-1
577 * nsample - how many seqs to calibrate on
578 * hist - histogram structure for storing results
579 * num_threads - how many processors to run on
581 * Returns: ptr to struct workpool_s.
582 * Caller must wait for threads to finish with workpool_stop(),
583 * then free the structure with workpool_free().
585 static struct workpool_s *
586 workpool_start(struct plan7_s *hmm, float lenmean, float lensd, int fixedlen,
587 float *randomseq, int nsample, struct histogram_s *hist,
590 struct workpool_s *wpool;
595 wpool = MallocOrDie(sizeof(struct workpool_s));
596 wpool->thread = MallocOrDie(num_threads * sizeof(pthread_t));
598 wpool->fixedlen = fixedlen;
599 wpool->lenmean = lenmean;
600 wpool->lensd = lensd;
601 wpool->randomseq = randomseq;
602 wpool->nsample = nsample;
606 wpool->max_score = -FLT_MAX;
607 wpool->num_threads= num_threads;
609 StopwatchZero(&(wpool->watch));
611 if ((rtn = pthread_mutex_init(&(wpool->input_lock), NULL)) != 0)
612 Die("pthread_mutex_init FAILED; %s\n", strerror(rtn));
613 if ((rtn = pthread_mutex_init(&(wpool->output_lock), NULL)) != 0)
614 Die("pthread_mutex_init FAILED; %s\n", strerror(rtn));
616 /* Create slave threads.
617 * Note the crazy machinations we have to go through to achieve concurrency.
618 * You'd think that POSIX threads were portable... ha.
619 * On IRIX 6.5, system scope threads are only available to root, or if
620 * /etc/capability has been configured specially, so to avoid strange
621 * permissions errors we can't set PTHREAD_SCOPE_SYSTEM for IRIX.
622 * On IRIX pre-6.5, we can't get good concurrency, period. As of 6.5,
623 * SGI provides the nonportable pthread_setconcurrency() call.
624 * On FreeBSD (3.0 snapshots), the pthread_attr_setscope() call isn't
625 * even provided, apparently on grounds of "if it doesn't do anything,
626 * why provide it?" Hello? POSIX compliance, perhaps?
627 * On Sun Solaris, we need to set system scope to achieve concurrency.
628 * Linux and DEC Digital UNIX seem to work fine in either process scope
629 * or system scope, without a pthread_setconcurrency call.
631 pthread_attr_init(&attr);
633 #ifdef HAVE_PTHREAD_ATTR_SETSCOPE
634 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
637 #ifdef HAVE_PTHREAD_SETCONCURRENCY
638 pthread_setconcurrency(num_threads+1);
640 for (i = 0; i < num_threads; i++)
641 if ((rtn = pthread_create(&(wpool->thread[i]), &attr,
642 worker_thread , (void *) wpool)) != 0)
643 Die("Failed to create thread %d; return code %d\n", i, rtn);
645 pthread_attr_destroy(&attr);
650 /* Function: workpool_stop()
651 * Date: SRE, Thu Jul 16 11:20:16 1998 [St. Louis]
653 * Purpose: Waits for threads in a workpool to finish.
655 * Args: wpool -- ptr to the workpool structure
660 workpool_stop(struct workpool_s *wpool)
663 /* wait for threads to stop */
664 for (i = 0; i < wpool->num_threads; i++)
665 if (pthread_join(wpool->thread[i],NULL) != 0)
666 Die("pthread_join failed");
670 /* Function: workpool_free()
671 * Date: SRE, Thu Jul 16 11:26:27 1998 [St. Louis]
673 * Purpose: Free a workpool_s structure, after the threads
676 * Args: wpool -- ptr to the workpool.
681 workpool_free(struct workpool_s *wpool)
688 /* Function: worker_thread()
689 * Date: SRE, Thu Jul 16 10:41:02 1998 [St. Louis]
691 * Purpose: The procedure executed by the worker threads.
693 * Args: ptr - (void *) that is recast to a pointer to
699 worker_thread(void *ptr)
702 struct workpool_s *wpool;
708 Stopwatch_t thread_watch;
710 StopwatchStart(&thread_watch);
711 wpool = (struct workpool_s *) ptr;
715 /* 1. Synthesize a random sequence.
716 * The input sequence number is a shared resource,
717 * and sre_random() isn't thread-safe, so protect
718 * the whole section with mutex.
721 if ((rtn = pthread_mutex_lock(&(wpool->input_lock))) != 0)
722 Die("pthread_mutex_lock failure: %s\n", strerror(rtn));
723 /* generate a sequence */
725 if (wpool->nseq > wpool->nsample)
726 { /* we're done; release input lock, break loop */
727 if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0)
728 Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));
731 if (wpool->fixedlen) len = wpool->fixedlen;
732 else do len = (int) Gaussrandom(wpool->lenmean, wpool->lensd); while (len < 1);
733 seq = RandomSequence(Alphabet, wpool->randomseq, Alphabet_size, len);
735 /* release the lock */
736 if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0)
737 Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));
739 /* 2. Score the sequence against the model.
741 dsq = DigitizeSequence(seq, len);
743 if (P7ViterbiSize(len, hmm->M) <= RAMLIMIT)
744 sc = P7Viterbi(dsq, len, hmm, NULL);
746 sc = P7SmallViterbi(dsq, len, hmm, NULL);
750 /* 3. Save the output; hist and max_score are shared,
751 * so protect this section with the output mutex.
753 /* acquire lock on the output queue */
754 if ((rtn = pthread_mutex_lock(&(wpool->output_lock))) != 0)
755 Die("pthread_mutex_lock failure: %s\n", strerror(rtn));
757 AddToHistogram(wpool->hist, sc);
758 if (sc > wpool->max_score) wpool->max_score = sc;
759 /* release our lock */
760 if ((rtn = pthread_mutex_unlock(&(wpool->output_lock))) != 0)
761 Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));
764 StopwatchStop(&thread_watch);
765 /* acquire lock on the output queue */
766 if ((rtn = pthread_mutex_lock(&(wpool->output_lock))) != 0)
767 Die("pthread_mutex_lock failure: %s\n", strerror(rtn));
768 /* accumulate cpu time into main stopwatch */
769 StopwatchInclude(&(wpool->watch), &thread_watch);
770 /* release our lock */
771 if ((rtn = pthread_mutex_unlock(&(wpool->output_lock))) != 0)
772 Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));
775 return NULL; /* solely to silence compiler warnings */
777 #endif /* HMMER_THREADS */
782 /* Function: main_loop_pvm()
783 * Date: SRE, Wed Aug 19 13:59:54 1998 [St. Louis]
785 * Purpose: Given an HMM and parameters for synthesizing random
786 * sequences; return a histogram of scores.
789 * Args: hmm - an HMM to calibrate.
790 * seed - random number seed
791 * nsample - number of seqs to synthesize
792 * lumpsize- # of seqs per slave exchange; controls granularity
793 * lenmean - mean length of random sequence
794 * lensd - std dev of random seq length
795 * fixedlen- if nonzero, override lenmean, always this len
796 * hist - RETURN: the score histogram
797 * ret_max - RETURN: highest score seen in simulation
798 * extrawatch - RETURN: total CPU time spend in slaves.
799 * ret_nslaves- RETURN: number of PVM slaves run.
802 * hist is alloc'ed here, and must be free'd by caller.
805 main_loop_pvm(struct plan7_s *hmm, int seed, int nsample, int lumpsize,
806 float lenmean, float lensd, int fixedlen,
807 struct histogram_s **ret_hist, float *ret_max,
808 Stopwatch_t *extrawatch, int *ret_nslaves)
810 struct histogram_s *hist;
814 int nsent; /* # of seqs we've asked for so far */
815 int ndone; /* # of seqs we've got results for so far */
816 int packet; /* # of seqs to have a slave do */
818 int slaveidx; /* id of a slave */
819 float *sc; /* scores returned by a slave */
820 Stopwatch_t slavewatch;
823 StopwatchZero(extrawatch);
824 hist = AllocHistogram(-200, 200, 100);
829 if ((master_tid = pvm_mytid()) < 0)
830 Die("pvmd not responding -- do you have PVM running?");
832 pvm_catchout(stderr); /* catch output for debugging */
834 PVMSpawnSlaves("hmmcalibrate-pvm", &slave_tid, &nslaves);
836 /* Initialize the slaves
838 pvm_initsend(PvmDataDefault);
839 pvm_pkfloat(&lenmean, 1, 1);
840 pvm_pkfloat(&lensd, 1, 1);
841 pvm_pkint( &fixedlen, 1, 1);
842 pvm_pkint( &Alphabet_type, 1, 1);
843 pvm_pkint( &seed, 1, 1);
844 if (! PVMPackHMM(hmm)) Die("Failed to pack the HMM");
845 pvm_mcast(slave_tid, nslaves, HMMPVM_INIT);
846 SQD_DPRINTF1(("Initialized %d slaves\n", nslaves));
848 /* Confirm slaves' OK status.
850 PVMConfirmSlaves(slave_tid, nslaves);
851 SQD_DPRINTF1(("Slaves confirm that they're ok...\n"));
856 for (slaveidx = 0; slaveidx < nslaves; slaveidx++)
858 packet = (nsample - nsent > lumpsize ? lumpsize : nsample - nsent);
860 pvm_initsend(PvmDataDefault);
861 pvm_pkint(&packet, 1, 1);
862 pvm_pkint(&slaveidx, 1, 1);
863 pvm_send(slave_tid[slaveidx], HMMPVM_WORK);
866 SQD_DPRINTF1(("Loaded %d slaves\n", nslaves));
870 sc = MallocOrDie(sizeof(float) * lumpsize);
871 while (nsent < nsample)
873 /* integrity check of slaves */
874 PVMCheckSlaves(slave_tid, nslaves);
876 /* receive results */
877 SQD_DPRINTF2(("Waiting for results...\n"));
878 pvm_recv(-1, HMMPVM_RESULTS);
879 pvm_upkint(&slaveidx, 1, 1);
880 pvm_upkint(&packet, 1, 1);
881 pvm_upkfloat(sc, packet, 1);
882 SQD_DPRINTF2(("Got results.\n"));
886 for (i = 0; i < packet; i++) {
887 AddToHistogram(hist, sc[i]);
888 if (sc[i] > max) max = sc[i];
891 packet = (nsample - nsent > lumpsize ? lumpsize : nsample - nsent);
893 pvm_initsend(PvmDataDefault);
894 pvm_pkint(&packet, 1, 1);
895 pvm_pkint(&slaveidx, 1, 1);
896 pvm_send(slave_tid[slaveidx], HMMPVM_WORK);
897 SQD_DPRINTF2(("Told slave %d to do %d more seqs.\n", slaveidx, packet));
901 /* Wait for the last output to come in.
903 while (ndone < nsample)
905 /* integrity check of slaves */
906 PVMCheckSlaves(slave_tid, nslaves);
908 /* receive results */
909 SQD_DPRINTF1(("Waiting for final results...\n"));
910 pvm_recv(-1, HMMPVM_RESULTS);
911 pvm_upkint(&slaveidx, 1, 1);
912 pvm_upkint(&packet, 1, 1);
913 pvm_upkfloat(sc, packet, 1);
914 SQD_DPRINTF2(("Got some final results.\n"));
917 for (i = 0; i < packet; i++) {
918 AddToHistogram(hist, sc[i]);
919 if (sc[i] > max) max = sc[i];
923 /* Shut down the slaves: send -1,-1,-1.
925 pvm_initsend(PvmDataDefault);
927 pvm_pkint(&packet, 1, 1);
928 pvm_pkint(&packet, 1, 1);
929 pvm_pkint(&packet, 1, 1);
930 pvm_mcast(slave_tid, nslaves, HMMPVM_WORK);
932 /* Collect stopwatch results; quit the VM; return.
934 for (i = 0; i < nslaves; i++)
936 pvm_recv(-1, HMMPVM_RESULTS);
937 pvm_upkint(&slaveidx, 1, 1);
938 StopwatchPVMUnpack(&slavewatch);
940 SQD_DPRINTF1(("Slave %d finished; says it used %.2f cpu, %.2f sys\n",
941 slaveidx, slavewatch.user, slavewatch.sys));
943 StopwatchInclude(extrawatch, &slavewatch);
951 *ret_nslaves = nslaves;
954 #endif /* HMMER_PVM */