2 /* copyright (c) 1996, 1997, 1998, 1999 William R. Pearson and the
5 /* $Name: fa_34_26_5 $ - $Id: p2_workcomp.c,v 1.49 2007/01/02 17:24:36 wrp Exp $ */
7 /* This version is modifed to read all files, query and database,
8 through the manager process. Workers will now receive their
9 database from the manager, rather than reading it themselves. This
10 cuts down considerably on NFS traffic, simplifies searches of
11 multiple files, and allows use of clusters of slave nodes that do
12 not have NFS access */
14 /* September, 1994 - this version has been modified to do two kinds of
15 searches, a general library search, or list of library sequences search.
16 The latter would be used to generate optimized scores for fasta and
17 to produce alignments */
19 /* modified July, 2002, to provide query shuffle */
21 /* modified October, 2005, to support struct a_res_str a_res -
22 coordinates of alignment in aa0[], aa1[]. Future modifications
23 will cause do_walign to be run only once - subsequent calls for
24 seqc[0,1] can be filled using a_res, by adding a_res to the
27 19-March-2006 - modifications to call do_walign() only once, and
28 use the resulting a_res structure for subsequent calls to calc_id,
29 calcons, calcons_a, have been implemented. Also, the -V option is
30 now valid with the parallel programs.
32 31-May-2006 - some functions (e.g. dropfs and dropff do not store
33 complete information in a_res - thus they cannot use this shortcut
53 #define PvmDataDefault 0
54 #define PvmTaskDefault 0
70 int nnodes, pinums[MAXNOD];
73 #include "drop_func.h"
75 extern void alloc_pam (int d1, int d2, struct pstruct *ppst); /* allocate ppst->pam12,pam12x */
76 extern int **alloc_pam2p (int len, int nsq);
77 extern void w_init ();
78 extern void irand(int);
79 extern void revcomp(unsigned char *, int, int *);
83 extern void initseq(char **seqc0, char **seqc0a, char **seqc1, char **seqca, int seqsiz);
84 extern void freeseq(char **seqc0, char **seqc0a, char **seqc1, char **seqca);
86 void send_bestr(int, int, struct comstr *, int, int);
87 void send_bestr2(int, struct comstr2 *, int);
88 void send_code(int, char *, int);
90 extern void get_param (struct pstruct *ppst, char *pstring2, char *pstring3);
91 extern void update_param(struct qmng_str *qm_msg, struct mngmsg *m_msg,
92 struct pstruct *ppst);
93 extern int shuffle(unsigned char *, unsigned char *, int);
94 extern int wshuffle(unsigned char *, unsigned char *, int, int, int *);
96 extern char err_str[];
98 /* local function declarations */
99 void free_ares(struct sqs2 *, int itt, int *, int walign_cnt, int worker);
106 fprintf (stderr, " %s %s\n", p, p1);
112 MPI_Abort(MPI_COMM_WORLD,1);
124 unsigned char *aa0[6], *aa1s, *aa0s; /* Query and library sequences */
125 struct mngmsg m_msg; /* start message from manager to worker 1 */
126 struct qmng_str qm_msg; /* updated for each query */
127 int last_msg_b[10]; /* last set of numbers */
128 struct sqs2 *seqpt; /* sequence pointers for chunk */
129 int seqbuf_n,seqbuf_s; /* number of sequences, length of sequences */
130 int max_sql; /* maximum number of sequences/node */
131 int *n1_arr; /* array of sequence lengths in buffer */
132 int *m_seqnm_arr; /* array of sequence numbers in buffer */
133 int *aa1i_arr; /* array of offsets into the buffer */
134 unsigned char *seq_buf; /* space for sequence data */
136 int nsq; /* effective alphabet size */
137 long curtype = ONETYPE; /* current send message type */
138 int ieven=0; /* flag for window shuffle */
140 int n1, n1over; /* length of query, library sequences */
141 struct comstr bestr[BFR+1]; /* best structures */
142 struct comstr2 bestr2[BFR2+1]; /* best structures */
143 struct a_struct aln, *aln_dp;
144 int qres_bufsize; /* results buffer size */
145 int bestcnt = 0; /* how many best structures are full */
146 char gstring2[MAX_STR]; /* parameter string for manager */
147 char gstring3[MAX_STR]; /* parameter string for manager */
148 struct pstruct pst; /* parameter structure */
149 struct rstruct rst, qrst, rrst; /* results structure */
150 void *f_str[6], *qf_str;
152 int lcnt, count, seqnm; /* counters */
153 int *walign_done[2], walign_cnt[2]; /* index of current valid a_res in seqpt */
155 int *tres; /* allocated storage for seqpt[].a_res[].res */
156 int lend; /*global library sequence number information */
157 int lsn; /* library sequence number */
158 struct stage2_str *liblist=NULL; /* list of sequences to search */
159 int i, j; /* my turn to send sequence descriptions */
164 char *seqc0, *seqc0a, *seqc1, *seqca;
165 char *seqc, *seqc_buff;
166 int seqc_buff_cnt, seqc_buff_len, seqc_flag;
167 int maxc, lc, nc, nident, ngap, aln_code_n;
168 float percent, gpercent;
169 int old_shuffle=0; /* did a qshuffle last time */
174 MPI_Status mpi_status;
179 hosttid = pvm_parent();
182 w_init(); /* sets up default sascii, hsq, sq */
184 /* Allocate space for the query sequence */
185 if ((aa0[0] = (unsigned char *) malloc ((MAXTST+2+SEQ_PAD)*sizeof (char))) == NULL) {
186 w_abort ("Unable to allocate sequence array[0] - exiting!","");
191 /* initial messages set up various parameter structures:
205 pvm_setopt(PvmRoute,PvmRouteDirect);
207 /* get number of nodes, pinums */
208 bufid = pvm_recv(hosttid,STARTTYPE0);
209 pvm_upkint(&nnodes,1,1);
210 pvm_upkint(pinums,nnodes,1);
211 pvm_upkbyte((char *)&m_msg,(int)sizeof(m_msg),1);
212 worker = tidtonode(mytid);
216 sprintf(worker_str,"@%d",worker);
219 MPI_Recv(&m_msg,sizeof(m_msg),MPI_BYTE,hosttid,STARTTYPE0,MPI_COMM_WORLD,
223 /* the aln structure needs some information from m_msg0.aln */
224 memcpy(&aln,&m_msg.aln,sizeof(struct a_struct));
227 fprintf(stderr,"d1: %d d2: %d\n",m_msg.pamd1,m_msg.pamd2);
232 bufid = pvm_recv(hosttid,STARTTYPE1);
233 pvm_upkbyte((char *)&pst,(int)sizeof(pst),1);
234 /* 31t nsq = pst.nsq; */
235 pvm_upkbyte((char *)pascii,(int)sizeof(aascii),1);
239 MPI_Recv(&pst,(int)sizeof(pst),MPI_BYTE,hosttid,STARTTYPE1,MPI_COMM_WORLD,
242 MPI_Recv(pascii,(int)sizeof(aascii)/sizeof(int),MPI_INT,hosttid,STARTTYPE1,MPI_COMM_WORLD,
246 if (pst.ext_sq_set) { nsq = pst.nsqx;}
247 else { nsq = pst.nsq;}
249 aa0[5] = aa0[4] = aa0[3] = aa0[2] = aa0[1] = aa0[0];
250 if (m_msg.qframe == 2) {
251 if ((aa0[1]=(unsigned char *)malloc((MAXTST+2)*sizeof (char)))==NULL)
252 w_abort ("Unable to allocate sequence[1] array - exiting!","");
257 if ((aa1s=(unsigned char *)malloc((m_msg.max_tot+1)*sizeof (char)))==NULL)
258 w_abort ("Unable to allocate shuffled library sequence", "");
262 irand(0); /* necessary for shuffled sequences */
264 /* this function allocates pam12, pam12x
265 assigns pst.pam[0][0]=pam12, pst.pam[1][0] = pam12x
266 and sets up the correct pst.pam[0][0][0] pointers */
268 alloc_pam(m_msg.pamd1,m_msg.pamd2,&pst);
271 bufid = pvm_recv(hosttid,STARTTYPE2);
272 pvm_upkint(pam12,m_msg.pamd1*m_msg.pamd2,1);
275 bufid = pvm_recv(hosttid,STARTTYPE3);
276 pvm_upkint(pam12x,m_msg.pamd1*m_msg.pamd2,1);
281 if (worker==FIRSTNODE) {
282 fprintf(stderr,"ext?: %d\tnsq: %d\tnsqx: %d\n",pst.ext_sq_set,pst.nsq, pst.nsqx);
283 for (i=1; i<5; i++) {
284 for (j=1; j <= i; j++) fprintf(stderr," %c,%c:%2d",pst.sq[i],pst.sq[j],pst.pam2[0][i][j]);
285 fprintf(stderr,"\n");
287 for (i=pst.nsq+1; i<pst.nsq+5; i++) {
288 for (j=pst.nsq+1; j <= i; j++) fprintf(stderr," %c,%c:%2d",pst.sqx[i],pst.sqx[j],pst.pam2[0][i][j]);
289 fprintf(stderr,"\n");
292 for (i=1; i<5; i++) {
293 for (j=1; j <= i; j++) fprintf(stderr," %c,%c:%2d",pst.sqx[i],pst.sqx[j],pst.pam2[1][i][j]);
294 fprintf(stderr,"\n");
296 for (i=pst.nsq+1; i<pst.nsq+5; i++) {
297 for (j=pst.nsq+1; j <= i; j++) fprintf(stderr," %c,%c:%2d",pst.sqx[i],pst.sqx[j],pst.pam2[1][i][j]);
298 fprintf(stderr,"\n");
304 MPI_Recv(pam12,m_msg.pamd1*m_msg.pamd2,MPI_INT,hosttid,STARTTYPE2,
305 MPI_COMM_WORLD,&mpi_status);
307 MPI_Recv(pam12x,m_msg.pamd1*m_msg.pamd2,MPI_INT,hosttid,STARTTYPE3,
308 MPI_COMM_WORLD,&mpi_status);
312 We have the PAM matrices - get the library sequences
315 /* Allocate space for the sequences */
318 if ((seqpt=(struct sqs2 *)calloc(max_sql,sizeof(struct sqs2)))==NULL)
319 w_abort("cannot allocate seqpt(sqs2)","");
321 if ((n1_arr=(int *)calloc(m_msg.pbuf_siz+1,sizeof(int)))==NULL)
322 w_abort("cannot allocate n1_arr","");
324 if ((aa1i_arr=(int *)calloc(m_msg.pbuf_siz+1,sizeof(int)))==NULL)
325 w_abort("cannot allocate n1_arr","");
327 if ((m_seqnm_arr=(int *)calloc(m_msg.pbuf_siz+1,sizeof(int)))==NULL)
328 w_abort("cannot allocate m_seqnm_arr","");
330 /*****************************************************************/
331 /* This section gets all the database sequences from the manager */
332 /*****************************************************************/
337 /* get the number of sequences, sequence lengths */
338 bufid = pvm_recv(hosttid,STARTTYPE4);
339 pvm_upkint(&seqbuf_n,1,1); /* number of sequences */
340 pvm_upkint(&seqbuf_s,1,1); /* size of sequence buffer */
341 pvm_upkint(n1_arr,seqbuf_n,1); /* length of each sequence in buffer */
342 pvm_upkint(aa1i_arr,seqbuf_n,1); /* indexes for each sequence */
343 pvm_upkint(m_seqnm_arr,seqbuf_n,1); /* number of each library sequence */
347 MPI_Recv(&seqbuf_n,1,MPI_INT,hosttid,STARTTYPE4,MPI_COMM_WORLD,
349 MPI_Recv(&seqbuf_s,1,MPI_INT,hosttid,STARTTYPE4,MPI_COMM_WORLD,
351 MPI_Recv(n1_arr,seqbuf_n,MPI_INT,hosttid,STARTTYPE4,MPI_COMM_WORLD,
353 MPI_Recv(aa1i_arr,seqbuf_n,MPI_INT,hosttid,STARTTYPE4,MPI_COMM_WORLD,
355 MPI_Recv(m_seqnm_arr,seqbuf_n,MPI_INT,hosttid,STARTTYPE4,MPI_COMM_WORLD,
359 if (seqbuf_n <= 0) break;
362 fprintf(stderr,"[%d] seqbuf_n: %d seqbuf_s: %d\n",
363 worker,seqbuf_n,seqbuf_s);
364 fprintf(stderr,"[%d] lcnt: %d n1: %d seqnm %d\n",
365 worker,0,n1_arr[0],m_seqnm_arr[0]);
366 fprintf(stderr,"[%d] lcnt: %d n1: %d seqnm %d\n",
367 worker,1,n1_arr[1],m_seqnm_arr[1]);
371 /* allocate space for sequences */
372 if ((seq_buf = (unsigned char *)calloc((size_t)seqbuf_s+1,sizeof(char)))
374 w_abort("cannot allocate tmp_seq","");
376 seq_buf++; /* leave a '\0' at the start */
378 /* get the sequence buffer */
380 bufid = pvm_recv(hosttid,STARTTYPE5);
381 pvm_upkbyte((char *)seq_buf,seqbuf_s,1);
385 MPI_Recv(seq_buf,seqbuf_s,MPI_BYTE,hosttid,STARTTYPE5,MPI_COMM_WORLD,
389 /* now we have everything - update the pointers */
390 if (lcnt+seqbuf_n >= max_sql) {
391 max_sql += max(MAXSQL/2,seqbuf_n);
392 if ((seqpt=(struct sqs2 *)realloc(seqpt,max_sql*sizeof(struct sqs2)))
394 w_abort("cannot allocate seqpt(sqs2)","");
397 /* convert from offsets to pointers into buffer */
399 for (i=0; i<seqbuf_n; i++,lcnt++) {
400 seqpt[lcnt].n1 = n1_arr[i];
401 seqpt[lcnt].m_seqnm = m_seqnm_arr[i];
402 seqpt[lcnt].aa1 = &seq_buf[aa1i_arr[i]];
403 /* ntx += n1_arr[i]+1 + SEQ_PAD */
406 /* must have null's at both ends of sequence */
407 if (seqpt[lcnt].aa1[-1]!= '\0') {
408 fprintf(stderr,"Missing null at start: %d %d\n",
409 lcnt,seqpt[lcnt].aa1[-1]);
410 seqpt[lcnt].aa1[-1]='\0';
412 if (seqpt[lcnt].aa1[seqpt[lcnt].n1]!= '\0') {
413 fprintf(stderr,"Missing null at end: %d %d\n",
414 lcnt,seqpt[lcnt].aa1[seqpt[lcnt].n1]);
415 seqpt[lcnt].aa1[seqpt[lcnt].n1]='\0';
420 /* all done - lcnt has the total number of library sequences */
424 for (i=0; i<10; i++) {
425 for (j=0; j<10; j++) libstr[j]=pst.sq[seqpt[i].aa1[j]];
427 fprintf(stderr,"[%d] n1: %d seqnm: %d aa1: %s\n",
428 worker,seqpt[i].n1,seqpt[i].m_seqnm,libstr);
432 /* send back the number of descriptions received */
435 pvm_initsend(PvmDataRaw);
436 pvm_pkint(&lcnt,1,1);
437 pvm_send(hosttid,STARTTYPE0);
440 /* p4_dprintf(" have %d descriptions to send\n",lcnt); */
441 MPI_Send(&lcnt,1,MPI_INT,hosttid,STARTTYPE0,MPI_COMM_WORLD);
444 /*****************************************************************/
445 /* Library reads are finished, get ready to do searches */
446 /*****************************************************************/
448 /* get last set of numbers */
450 bufid = pvm_recv(hosttid,STARTTYPE0);
451 pvm_upkint(last_msg_b,2,1);
455 MPI_Recv(last_msg_b, 2, MPI_INT, hosttid, STARTTYPE0, MPI_COMM_WORLD,
458 m_msg.nbr_seq = last_msg_b[0];
459 qres_bufsize = last_msg_b[1];
463 fprintf(stderr,"[%d] have nbr_seq %d qres_bufsize %d\n",worker,
464 m_msg.nbr_seq, qres_bufsize);
467 /* p4_dprintf("[%d] have nbr_seq %d qres_bufsize %d\n",worker,
468 m_msg.nbr_seq, qres_bufsize);
472 /* If self search, receive sequence numbering data */
475 bufid = pvm_recv(hosttid,STARTTYPE1);
476 pvm_upkint(&lend,1,1);
480 MPI_Recv(&lend,1,MPI_INT,hosttid,STARTTYPE1,MPI_COMM_WORLD,&mpi_status);
484 /* allocate space for a_res flag array */
486 if ((walign_done[0] = (int *)calloc(lcnt,sizeof(int)))==NULL) {
487 w_abort("cannot allocate walign_done");
491 if ((walign_done[1] = (int *)calloc(lcnt,sizeof(int)))==NULL) {
492 w_abort("cannot allocate walign_done");
496 /* was commented in for only FASTX/TFASTX, but do it always to
498 aainit(pst.tr_type, pst.debug_lib);
499 pst.maxlen = m_msg.maxn;
501 /*****************************************************************/
502 /* Main search loop, which calles do_work() repeatedly */
503 /*****************************************************************/
510 fprintf(stderr," W: %d waiting MSEQTYPE\n",worker);
513 p4_dprintf(" W: %d waiting MSEQTYPE\n",worker);
518 /*****************************************************************/
519 /* Wait for a query sequence from the manager */
520 /*****************************************************************/
523 bufid = pvm_recv(hosttid,MSEQTYPE);
524 pvm_upkbyte((char *)&qm_msg,sizeof(qm_msg),1);
527 MPI_Recv(&qm_msg,sizeof(struct mngmsg),MPI_BYTE,hosttid,MSEQTYPE,
528 MPI_COMM_WORLD,&mpi_status);
531 fprintf(stderr,"[%d] have MSEQTYPE n0: %d s_func: %d slist: %d qf: %d\n",
532 worker,qm_msg.n0,qm_msg.s_func,qm_msg.slist,qm_msg.qshuffle);
535 /*****************************************************************/
536 /* New query sequence indicated by qm_msg.slist=0 */
537 /*****************************************************************/
539 if (qm_msg.n0 > 0 && qm_msg.slist == 0) {
543 /*****************************************************************/
544 /* free everything associated with previous search */
545 /*****************************************************************/
547 close_work (aa0[0], cur_n0, &pst, &f_str[0]);
548 free_ares(seqpt, 0, walign_done[0], walign_cnt[0], worker);
550 if (m_msg.ann_flg) free(m_msg.aa0a);
553 if (m_msg.qframe == 2) {
554 close_work(aa0[1], cur_n0, &pst, &f_str[1]);
555 free_ares(seqpt, 1, walign_done[1], walign_cnt[1], worker);
559 close_work(aa0s,cur_n0, &pst, &qf_str);
565 free_pam2p(pst.pam2p[0]);
566 free_pam2p(pst.pam2p[1]);
570 /*****************************************************************/
571 /* Start allocating things for the next search */
572 /*****************************************************************/
574 pst.pam_pssm = qm_msg.pam_pssm;
577 if ((m_msg.aa0a = calloc(qm_msg.n0+1,sizeof(char)))==NULL) {
578 w_abort(" cannot allocate aa0a");
582 /*****************************************************************/
583 /* Get the next query sequence */
584 /*****************************************************************/
587 pvm_upkbyte((char *)aa0[0],qm_msg.n0+1+SEQ_PAD,1);
589 pvm_upkbyte((char *)m_msg.aa0a,qm_msg.n0+1,1);
593 MPI_Recv(aa0[0],qm_msg.n0+1+SEQ_PAD,MPI_BYTE,hosttid,
594 MSEQTYPE1,MPI_COMM_WORLD, &mpi_status);
596 MPI_Recv(m_msg.aa0a,qm_msg.n0+1,MPI_BYTE,hosttid,
597 MSEQTYPE2,MPI_COMM_WORLD, &mpi_status);
602 /* must have null's at both ends of sequence */
603 if (aa0[0][-1]!= '\0') {
604 fprintf(stderr,"Missing null at start: %s %d\n",
605 qm_msg.libstr,aa0[0][-1]);
608 if (aa0[0][qm_msg.n0]!= '\0') {
609 fprintf(stderr,"Missing null at end: %s %d\n",
610 qm_msg.libstr,aa0[0][qm_msg.n0]);
614 /* This discovers most reasons for core dumps */
616 for (j=0; j<qm_msg.n0; j++)
617 if (aa0[0][j]>pst.nsq) {
619 "seq: %s residue[%d/%d] %d range (%d)\n",
620 qm_msg.libstr,j,qm_msg.n0,aa0[0][j],pst.nsq);
626 update_params(&qm_msg,&m_msg,&pst);
629 /*****************************************************************/
630 /* End of free()'s/ initialization for new sequence */
631 /*****************************************************************/
637 if (qm_msg.n0 == -1) {
639 /*****************************************************************/
640 /* All done with searches */
641 /*****************************************************************/
642 /* printf(" %d: got n0 == -1\n",worker); */
646 /* p4_dprintf(" W:%d n0:%d slist:%d s_func:%d (%d)\n",worker,qm_msg.n0,qm_msg.slist,qm_msg.s_func,qres_bufsize); */
648 /*****************************************************************/
649 /* if qm_msg.slist > 0, search specific sequences, to be sent */
650 /*****************************************************************/
652 if (qm_msg.slist > 0) { /* list search, not library search */
653 if (liblist != NULL) free(liblist);
655 /* get the list of sequences */
656 if ((liblist=(struct stage2_str *)
657 calloc(qm_msg.slist,sizeof(struct stage2_str)))==NULL) {
658 sprintf(errstr,"sequence list %d",qm_msg.slist);
659 w_abort (errstr, "");
663 bufid = pvm_recv(hosttid,LISTTYPE);
664 pvm_upkbyte((char *)liblist,qm_msg.slist*sizeof(struct stage2_str),1);
668 MPI_Recv(liblist,qm_msg.slist*sizeof(struct stage2_str),MPI_BYTE,
669 hosttid,LISTTYPE,MPI_COMM_WORLD, &mpi_status);
673 /*****************************************************************/
674 /* have list of sequences to be compared/aligned */
675 /*****************************************************************/
678 if (qm_msg.slist == 0) {
679 /*****************************************************************/
680 /* New query - set up matrices and init_work() */
681 /*****************************************************************/
684 fprintf(stderr,"n1: %d\t",qm_msg.n0);
685 for (i=0; i<10; i++) fprintf(stderr,"%c",nt[aa0[0][i]]);
686 fprintf(stderr,"\n");
690 pst.pam2p[0] = alloc_pam2p(qm_msg.n0,nsq);
691 pst.pam2p[1] = alloc_pam2p(qm_msg.n0,nsq);
694 init_work (aa0[0], qm_msg.n0, &pst, &f_str[0]);
695 f_str[5]=f_str[4]=f_str[3]=f_str[2]=f_str[1]=f_str[0];
697 if (qm_msg.qshuffle) {
698 if ((aa0s=(unsigned char *)malloc((qm_msg.n0+2)*sizeof (char)))==NULL)
699 w_abort ("Unable to allocate aa0s array - exiting!","");
703 memcpy(aa0s,aa0[0],qm_msg.n0+1);
704 qshuffle(aa0s,qm_msg.n0,qm_msg.nm0);
706 fprintf(stderr,"[%d] shuffle: %d\n",worker,qm_msg.n0);
708 for (i=0; i<5; i++) {fprintf(stderr,"%c",pst.sq[aa0s[i]]);}
712 init_work (aa0s, qm_msg.n0, &pst, &qf_str);
716 if (m_msg.qframe == 2) {
717 memcpy(aa0[1],aa0[0],qm_msg.n0+1);
718 revcomp(aa0[1],qm_msg.n0,&pst.c_nt[0]);
719 init_work (aa0[1], qm_msg.n0, &pst, &f_str[1]);
723 fprintf(stderr,"[%d] init_work qf: %d nf: %d\n",worker,m_msg.qframe,m_msg.nframe);
728 /*****************************************************************/
729 /* Finished with initialization, */
730 /* start doing comparisons or alignments */
731 /*****************************************************************/
734 if (qm_msg.slist == 0) { /* library search */
736 /*****************************************************************/
737 /* Start library search */
738 /*****************************************************************/
740 for (count=0; count < lcnt; count++) {
742 for (itt=m_msg.revcomp; itt<=m_msg.nitt1; itt++) {
744 rst.score[0] = rst.score[1] = rst.score[2] = 0;
747 if ((qm_msg.seqnm > lsn) && (((qm_msg.seqnm + lsn) % 2) != 0)) {
748 do_work (aa0[itt], qm_msg.n0,seqpt[count].aa1, seqpt[count].n1,
749 itt, &pst, f_str[itt], 0, &rst);
751 else if ((qm_msg.seqnm <= lsn) && (((qm_msg.seqnm+lsn)%2) == 0)) {
752 do_work (aa0[itt], qm_msg.n0, seqpt[count].aa1, seqpt[count].n1,
753 itt, &pst, f_str[itt], 0, &rst);
758 do_work (aa0[itt], qm_msg.n0, seqpt[count].aa1, seqpt[count].n1,
759 itt, &pst, f_str[itt], 0, &rst);
760 if (qm_msg.qshuffle) {
761 do_work (aa0s, qm_msg.n0, seqpt[count].aa1, seqpt[count].n1,
762 itt, &pst, qf_str, 1, &qrst);
767 if (count < 10 || (count % 200 == 199)) {
768 fprintf(stderr,"[node %d] itt:%d/%d (%d) %3d %3d %3d - %d/%d\n",
769 worker,itt,m_msg.nitt1,count,
770 rst.score[0],rst.score[1],rst.score[2],
771 seqpt[count].m_seqnm,seqpt[count].n1);
777 bestr[bestcnt].seqnm = count;
778 bestr[bestcnt].m_seqnm = seqpt[count].m_seqnm;
779 bestr[bestcnt].score[0] = rst.score[0];
780 bestr[bestcnt].score[1] = rst.score[1];
781 bestr[bestcnt].score[2] = rst.score[2];
782 bestr[bestcnt].escore = rst.escore;
783 bestr[bestcnt].segnum = rst.segnum;
784 bestr[bestcnt].seglen = rst.seglen;
785 bestr[bestcnt].frame = itt;
786 bestr[bestcnt].comp = rst.comp;
787 bestr[bestcnt].H = rst.H;
789 bestr[bestcnt].qr_score = qrst.score[pst.score_ix];
790 bestr[bestcnt].qr_escore = qrst.escore;
792 if (pst.zsflag >= 10) {
794 wshuffle(seqpt[count].aa1, aa1s,seqpt[count].n1,pst.zs_win,&ieven);
796 shuffle(seqpt[count].aa1, aa1s,seqpt[count].n1);
798 do_work(aa0[itt],qm_msg.n0,aa1s,seqpt[count].n1,itt, &pst,
799 f_str[itt], 0, &rst);
800 bestr[bestcnt].r_score = rst.score[pst.score_ix];
804 if (bestcnt >= qres_bufsize) {
806 fprintf(stderr," worker: %d sending %d results\n",worker,qres_bufsize);
808 send_bestr(hosttid,curtype,bestr,qres_bufsize,bestcnt);
812 } /* END - for count loop */
813 send_bestr(hosttid, curtype, bestr,qres_bufsize, (bestcnt | FINISHED));
816 /*****************************************************************/
817 /* End of library search section */
818 /*****************************************************************/
820 /*****************************************************************/
821 /* Do do_opt() from list s_func=DO_CALC_FLG */
822 /*****************************************************************/
824 else if (qm_msg.s_func== DO_CALC_FLG) { /* qm_msg.slist > 0 */
827 for (count=0; count < qm_msg.slist; count++) {
828 rst.score[0] = rst.score[1] = rst.score[2] = 0;
829 itt = liblist[count].frame;
830 seqnm = bestr2[bestcnt].seqnm = liblist[count].seqnm;
831 bestr2[bestcnt].m_seqnm = seqpt[seqnm].m_seqnm;
833 do_opt (aa0[itt], qm_msg.n0, seqpt[seqnm].aa1,
834 seqpt[seqnm].n1, itt,
835 &pst, f_str[itt], &rst);
837 bestr2[bestcnt].score[0] = rst.score[0];
838 bestr2[bestcnt].score[1] = rst.score[1];
839 bestr2[bestcnt].score[2] = rst.score[2];
840 bestr2[bestcnt].escore = rst.escore;
841 bestr2[bestcnt].segnum = rst.segnum;
842 bestr2[bestcnt].seglen = rst.seglen;
843 bestr2[bestcnt].aln_code_n = 0;
846 if (bestcnt >= BFR2) {
847 send_bestr2(hosttid,bestr2,bestcnt);
850 } /* END - for count loop */
852 send_bestr2(hosttid,bestr2,(bestcnt|FINISHED));
855 /*****************************************************************/
856 /* s_func=DO_OPT_FLG */
859 /* if (m_msg.stages > 1) do_opt() */
861 /* calc_id or calc_code, no calcons */
862 /*****************************************************************/
864 /* s_func == 1 means do_opt if necessary */
865 else if (qm_msg.s_func== DO_OPT_FLG) { /* qm_msg.slist > 0 */
867 fprintf(stderr," [%d] starting s_func:1 slist: %d\n",
868 worker,qm_msg.slist);
870 /* get the buffer once - re-use it for the entire slist */
871 if (m_msg.show_code == SHOW_CODE_ALIGN) {
872 seqc_buff_len = (BFR2+5)*256;
873 seqc = seqc_buff = (char *)calloc(seqc_buff_len,sizeof(char));
875 if (seqc_buff == NULL) {
876 seqc_buff_cnt = seqc_buff_len = 0;
881 for (count=0; count < qm_msg.slist; count++) {
882 rst.score[0] = rst.score[1] = rst.score[2] = 0;
883 itt = liblist[count].frame;
884 seqnm = liblist[count].seqnm;
886 bestr2[bestcnt].seqnm = seqnm;
887 bestr2[bestcnt].m_seqnm = seqpt[seqnm].m_seqnm;
888 if (m_msg.stages > 1) {
889 do_opt (aa0[itt], qm_msg.n0, seqpt[seqnm].aa1,
890 seqpt[seqnm].n1, itt,
891 &pst, f_str[itt], &rst);
893 bestr2[bestcnt].score[0] = rst.score[0];
894 bestr2[bestcnt].score[1] = rst.score[1];
895 bestr2[bestcnt].score[2] = rst.score[2];
898 if (m_msg.markx & MX_M9SUMM) {
900 fprintf(stderr," [%d] starting do_walign seqnm: %d n1: %d\n",
901 worker,seqnm,seqpt[seqnm].n1);
903 aln_dp = &bestr2[bestcnt].aln_d;
904 memcpy(aln_dp, &aln,sizeof(struct a_struct));
906 sw_score = do_walign(aa0[itt], qm_msg.n0,
907 seqpt[seqnm].aa1, seqpt[seqnm].n1,
908 itt, &pst, f_str[itt],
909 &seqpt[seqnm].a_res[itt],
911 seqpt[seqnm].sw_score[itt] = sw_score;
913 /* the a_res[itt] provided by do_walign is re-used - so it
914 must be copied to a valid location */
917 if ((tres = calloc(seqpt[seqnm].a_res[itt].nres+1,sizeof(int)))==NULL) {
918 w_abort(" cannot allocate tres");
921 memcpy(tres,seqpt[seqnm].a_res[itt].res,sizeof(int)*seqpt[seqnm].a_res[itt].nres);
922 seqpt[seqnm].a_res[itt].res = tres;
924 fprintf(stderr, " [%d] saving %d:%d[%d]:%o\n", worker,
925 walign_cnt[itt],seqnm,itt, seqpt[seqnm].a_res[itt].res);
927 if (walign_cnt[itt] < lcnt) walign_done[itt][walign_cnt[itt]++] = seqnm;
928 else w_abort(" walign_cnt overrun");
929 seqpt[seqnm].walign_dflg[itt] = 1;
932 aln_func_vals(itt, aln_dp);
935 fprintf(stderr," [%d] starting calc_id sw_score: %d\n",
937 fprintf(stderr,"bi: %d seqc_buff_cnt: %d - seqc_buff_len: %d\n",
938 bestcnt, seqc_buff_cnt, seqc_buff_len);
940 aln_code_n = 0; /* must be set in case no seqc_code */
941 if (m_msg.show_code == SHOW_CODE_ALIGN) {
942 if (seqc_buff_cnt < seqc_buff_len - 256) {
943 lc=calc_code(aa0[itt],qm_msg.n0,
944 seqpt[seqnm].aa1, seqpt[seqnm].n1,
945 aln_dp,seqpt[seqnm].a_res[itt],pst,
946 seqc,seqc_buff_len-seqc_buff_cnt-10,
948 aln_code_n = strlen(seqc);
949 seqc_buff_cnt += aln_code_n + 1;
951 fprintf(stderr,"%d:%d:%d: %d/%d - [%d] %s\n",
952 worker,seqnm,bestcnt,aln_code_n,seqc_buff_cnt, seqc-seqc_buff,seqc);
959 lc=calc_id(aa0[itt],qm_msg.n0,
960 seqpt[seqnm].aa1, seqpt[seqnm].n1,
961 aln_dp,seqpt[seqnm].a_res[itt],pst,f_str[itt]);
964 nident = aln_dp->nident;
967 if (lc > 0) percent = (100.0*(float)nident)/(float)lc;
970 ngap = aln_dp->ngap_q + aln_dp->ngap_l;
972 if (lc-ngap > 0) gpercent = (100.0*(float)nident)/(float)(lc-ngap);
974 if (lc > 0) gpercent =(100.0*(float)aln_dp->nsim)/(float)lc;
976 else gpercent = -1.0;
978 bestr2[bestcnt].sw_score = sw_score;
979 bestr2[bestcnt].percent = percent;
980 bestr2[bestcnt].gpercent = gpercent;
981 bestr2[bestcnt].aln_code_n = aln_code_n;
985 if (bestcnt >= BFR2) {
986 send_bestr2(hosttid,bestr2,bestcnt);
987 if (m_msg.show_code == SHOW_CODE_ALIGN) {
988 send_code(hosttid,seqc_buff,seqc_buff_cnt);
989 memset(seqc_buff,0,seqc_buff_len);
995 } /* END - for count loop */
997 send_bestr2(hosttid,bestr2,(bestcnt|FINISHED));
998 if (m_msg.show_code == SHOW_CODE_ALIGN) {
999 send_code(hosttid,seqc_buff,seqc_buff_cnt);
1000 if (seqc_buff) free(seqc_buff);
1003 /* get alignments */
1005 /*****************************************************************/
1007 /* s_func=DO_ALIGN_FLG */
1010 /* do_walign() if not done already */
1012 /*****************************************************************/
1014 else if (qm_msg.s_func==DO_ALIGN_FLG) {
1015 for (count=0; count < qm_msg.slist; count++) {
1016 itt = liblist[count].frame;
1017 seqnm = liblist[count].seqnm;
1019 fprintf(stderr,"worker: %d; %s, frame: %d\n",worker,qm_msg.libstr,itt);
1021 if (!seqpt[seqnm].walign_dflg[itt]) {
1022 seqpt[seqnm].sw_score[itt] =
1023 sw_score = do_walign (aa0[itt], qm_msg.n0,seqpt[seqnm].aa1,
1024 seqpt[seqnm].n1, itt,
1026 &seqpt[seqnm].a_res[itt],
1030 sw_score = seqpt[seqnm].sw_score[itt];
1031 pre_cons(seqpt[seqnm].aa1,seqpt[seqnm].n1,itt,f_str[itt]);
1034 aln_func_vals(itt, &aln);
1037 maxc = seqpt[seqnm].a_res[itt].nres + max(seqpt[seqnm].a_res[itt].min0,seqpt[seqnm].a_res[itt].min1)+
1038 max((qm_msg.n0-seqpt[seqnm].a_res[itt].max0),
1039 (seqpt[seqnm].n1-seqpt[seqnm].a_res[itt].max1))+4;
1040 else maxc = seqpt[seqnm].a_res[itt].nres + 4*aln.llen+4;
1042 initseq(&seqc0, &seqc0a, &seqc1, &seqca, maxc);
1044 if (!m_msg.ann_flg) {
1045 nc=calcons(aa0[itt],qm_msg.n0,
1046 seqpt[seqnm].aa1, seqpt[seqnm].n1,
1047 &lc,&aln,seqpt[seqnm].a_res[itt],pst,
1048 seqc0,seqc1,seqca,f_str[itt]);
1049 memset(seqc0a,' ',nc);
1053 nc=calcons_a(aa0[itt],m_msg.aa0a,qm_msg.n0,
1054 seqpt[seqnm].aa1, seqpt[seqnm].n1,
1055 &lc,&aln,seqpt[seqnm].a_res[itt],pst,
1056 seqc0,seqc0a,seqc1,seqca,
1057 m_msg.ann_arr,f_str[itt]);
1061 fprintf(stderr,"[%d] nident: %d nsim: %d lc: %d\n",aln.nident, aln.nsim, lc);
1064 maxc = max(strlen(seqc0),strlen(seqc1))+1;
1065 nident = aln.nident;
1066 percent = (100.0*(float)nident)/(float)lc;
1067 ngap = aln.ngap_q+aln.ngap_l;
1069 if (lc-ngap > 0) gpercent = (100.0*(float)nident)/(float)(lc-ngap);
1071 if (lc > 0) gpercent = (100.0*(float)aln.nsim)/(float)lc;
1073 else gpercent = -1.0;
1076 pvm_initsend(PvmDataRaw);
1079 pvm_pkint(&maxc,1,1);
1080 pvm_pkfloat(&percent,1,1);
1081 pvm_pkfloat(&gpercent,1,1);
1082 pvm_pkint(&sw_score,1,1);
1083 pvm_pkbyte((char *)&aln,sizeof(struct a_struct),1);
1084 pvm_send(hosttid,ALN1TYPE);
1086 fprintf(stderr,"[%d] ALN1TYPE sent: %d\n",worker,qm_msg.n0);
1088 pvm_initsend(PvmDataRaw);
1089 pvm_pkbyte(seqc0,maxc,1);
1090 if (m_msg.ann_flg) pvm_pkbyte(seqc0a,maxc,1);
1091 pvm_pkbyte(seqc1,maxc,1);
1092 pvm_pkbyte(seqca,maxc,1);
1093 pvm_send(hosttid,ALN2TYPE);
1099 last_msg_b[3]=sw_score;
1100 MPI_Send(last_msg_b,4,MPI_INT,hosttid,ALN1TYPE,MPI_COMM_WORLD);
1101 MPI_Send(&percent,1,MPI_FLOAT,hosttid,ALN2TYPE,MPI_COMM_WORLD);
1102 MPI_Send(&gpercent,1,MPI_FLOAT,hosttid,ALN2TYPE,MPI_COMM_WORLD);
1104 /* p4_dprintf("[%d] sending aln\n",worker); */
1105 MPI_Send(&aln,sizeof(struct a_struct),MPI_BYTE,hosttid,
1106 ALN3TYPE,MPI_COMM_WORLD);
1108 MPI_Send(seqc0,maxc,MPI_BYTE,hosttid,ALN2TYPE,MPI_COMM_WORLD);
1109 if (m_msg.ann_flg) MPI_Send(seqc0a,maxc,MPI_BYTE,hosttid,ALN2TYPE,MPI_COMM_WORLD);
1110 MPI_Send(seqc1,maxc,MPI_BYTE,hosttid,ALN3TYPE,MPI_COMM_WORLD);
1111 MPI_Send(seqca,maxc,MPI_BYTE,hosttid,ALN3TYPE,MPI_COMM_WORLD);
1113 freeseq(&seqc0,&seqc0a,&seqc1,&seqca);
1117 /* send back parameter settings */
1118 if (worker==FIRSTWORK && qm_msg.slist==0) {
1119 get_param(&pst, gstring2,gstring3);
1121 pvm_initsend(PvmDataRaw);
1122 pvm_pkbyte(gstring2,sizeof(gstring2),1);
1123 pvm_pkbyte(gstring3,sizeof(gstring3),1);
1124 pvm_send(hosttid,PARAMTYPE);
1127 MPI_Send(gstring2,sizeof(gstring2),MPI_BYTE,
1128 hosttid,PARAMTYPE,MPI_COMM_WORLD);
1129 MPI_Send(gstring3,sizeof(gstring3),MPI_BYTE,
1130 hosttid,PARAMTYPE,MPI_COMM_WORLD);
1134 if (qm_msg.slist==0) {
1135 if (curtype == ONETYPE) curtype = TWOTYPE;
1136 else curtype = ONETYPE;
1138 } /* END - while (1) loop */
1143 /* MPI_Finalize(); */
1148 send_bestr(int hosttid, int curtype,
1149 struct comstr *bestr, int buf_size, int lastcnt) {
1151 bestr[buf_size].seqnm = lastcnt;
1153 pvm_initsend(PvmDataRaw);
1154 pvm_pkbyte((char *)&bestr[0],sizeof(struct comstr)*(buf_size+1),1);
1155 pvm_send(hosttid,curtype);
1158 MPI_Send(bestr,sizeof(struct comstr)*(buf_size+1),MPI_BYTE,
1159 hosttid,curtype,MPI_COMM_WORLD);
1164 send_bestr2(int hosttid, struct comstr2 *bestr2,
1167 bestr2[BFR2].seqnm = lastcnt;
1169 pvm_initsend(PvmDataRaw);
1170 pvm_pkbyte((char *)&bestr2[0],sizeof(struct comstr2)*(BFR2+1),1);
1171 pvm_send(hosttid,LISTRTYPE);
1174 MPI_Send(&bestr2[0],sizeof(struct comstr2)*(BFR2+1),MPI_BYTE,
1175 hosttid,LISTRTYPE,MPI_COMM_WORLD);
1180 send_code(int hosttid, char *seqc_buff, int seqc_buff_len) {
1183 pvm_initsend(PvmDataRaw);
1184 pvm_pkint(&seqc_buff_len,1,1);
1185 if (seqc_buff_len > 0) pvm_pkbyte(seqc_buff,seqc_buff_len,1);
1186 pvm_send(hosttid,CODERTYPE);
1189 MPI_Send(&seqc_buff_len,1,MPI_INT,
1190 hosttid,CODERTYPE,MPI_COMM_WORLD);
1191 if (seqc_buff_len>0) MPI_Send(seqc_buff,seqc_buff_len,MPI_BYTE,
1192 hosttid,CODERTYPE,MPI_COMM_WORLD);
1201 for (i=FIRSTNODE; i< nnodes; i++) if (tid==pinums[i]) return i;
1202 fprintf(stderr," cannot find tid %d\n",tid);
1208 free_ares(struct sqs2 *seqpt, int itt, int *walign_done, int walign_cnt, int worker) {
1212 for (i=0; i< walign_cnt; i++) {
1213 seqnm = walign_done[i];
1215 if (seqpt[seqnm].walign_dflg[itt]) {
1216 if (seqpt[seqnm].a_res[itt].nres > 0 ) {
1218 fprintf(stderr, "[%d] freeing %d:%d[%d]:%o\n",
1219 worker,i,seqnm,itt,seqpt[seqnm].a_res[itt].res);
1221 seqpt[seqnm].a_res[itt].nres = 0;
1222 free(seqpt[seqnm].a_res[itt].res);
1226 w_abort(" have walign_done but no walign_dflag");
1228 seqpt[seqnm].walign_dflg[itt] = 0;