From: jprocter Date: Mon, 14 Aug 2006 15:23:07 +0000 (+0000) Subject: refactored to create generic polling thread and subjobs X-Git-Tag: Release_2_1~67 X-Git-Url: http://source.jalview.org/gitweb/?a=commitdiff_plain;h=d61c45c28dce0e7b39597b4b6dda0f66cfd6cfab;p=jalview.git refactored to create generic polling thread and subjobs --- diff --git a/src/jalview/ws/MsaWSThread.java b/src/jalview/ws/MsaWSThread.java index 803bf44..1382a23 100644 --- a/src/jalview/ws/MsaWSThread.java +++ b/src/jalview/ws/MsaWSThread.java @@ -32,16 +32,8 @@ import vamsas.objects.simple.MsaResult; * @version 1.0 */ class MsaWSThread - extends Thread implements WSClientI + extends WSThread implements WSClientI { - jalview.gui.AlignFrame alignFrame; - - WebserviceInfo wsInfo = null; - - String WebServiceName = null; - - String OutputHeader; - AlignmentView input; boolean submitGaps = false; // pass sequences including gaps to alignment // service @@ -50,14 +42,9 @@ class MsaWSThread // order - class MsaWSJob + class MsaWSJob extends WSThread.WSJob { - int jobnum = 0; // WebServiceInfo pane for this job - - String jobId; // ws job ticket - - vamsas.objects.simple.MsaResult result = null; - + // hold special input for this vamsas.objects.simple.SequenceSet seqs = new vamsas.objects.simple. SequenceSet(); @@ -83,12 +70,6 @@ class MsaWSThread } - int allowedServerExceptions = 3; // thread dies if too many - - // exceptions. - boolean submitted = false; - boolean subjobComplete = false; - Hashtable SeqNames = new Hashtable(); Vector emptySeqs = new Vector(); /** @@ -160,7 +141,7 @@ class MsaWSThread public boolean hasResults() { if (subjobComplete && result != null && result.isFinished() - && result.getMsa() != null && result.getMsa().getSeqs() != null) + && ((MsaResult) result).getMsa() != null && ((MsaResult) result).getMsa().getSeqs() != null) { return true; } @@ -175,10 +156,10 @@ class MsaWSThread SequenceI[] alseqs = null; char alseq_gapchar = '-'; int alseq_l = 0; - if (result.getMsa() != null) + if (((MsaResult) result).getMsa() != null) { - alseqs = getVamsasAlignment(result.getMsa()); - alseq_gapchar = result.getMsa().getGapchar().charAt(0); + alseqs = getVamsasAlignment(((MsaResult) result).getMsa()); + alseq_gapchar = ((MsaResult) result).getMsa().getGapchar().charAt(0); alseq_l = alseqs.length; } if (emptySeqs.size() > 0) @@ -269,8 +250,6 @@ class MsaWSThread } return null; } - - boolean cancelled = false; /** * mark subjob as cancelled and set result object appropriatly */ @@ -279,22 +258,24 @@ class MsaWSThread subjobComplete = true; result = null; } + /** + * + * @return boolean true if job can be submitted. + */ + boolean hasValidInput() { + if (seqs.getSeqs()!=null) + return true; + return false; + } } - MsaWSJob jobs[] = null; String alTitle; // name which will be used to form new alignment window. - - boolean jobComplete = false; - Alignment dataset; // dataset to which the new alignment will be // associated. ext.vamsas.MuscleWS server = null; - - String WsUrl = null; - /** * set basic options for this (group) of Msa jobs * @@ -446,7 +427,7 @@ class MsaWSThread { // CANCELLED_JOB cancelledMessage = "Job cancelled."; - jobs[job].cancel(); + ((MsaWSJob) jobs[job]).cancel(); wsInfo.setStatus(jobs[job].jobnum, WebserviceInfo.STATE_CANCELLED_OK); } @@ -495,205 +476,15 @@ class MsaWSThread } } } - - class JobStateSummary { - int running = 0; - int queuing = 0; - int finished = 0; - int error = 0; - int serror = 0; - int cancelled=0; - int results=0; - void updateJobPanelState(WebserviceInfo wsInfo, MsaWSJob j) { - if (j.result != null) - { - String progheader = ""; - // Parse state of job[j] - if (j.result.isRunning()) - { - running++; - wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_RUNNING); - } - else if (j.result.isQueued()) - { - queuing++; - wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_QUEUING); - } - else if (j.result.isFinished()) - { - finished++; - j.subjobComplete = true; - if (j.hasResults()) - results++; - wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_OK); - } - else if (j.result.isFailed()) - { - progheader += "Job failed.\n"; - j.subjobComplete = true; - wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_ERROR); - error++; - } - else if (j.result.isServerError()) - { - serror++; - j.subjobComplete = true; - wsInfo.setStatus(j.jobnum, - WebserviceInfo.STATE_STOPPED_SERVERERROR); - } - else if (j.result.isBroken() || j.result.isFailed()) - { - error++; - j.subjobComplete = true; - wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_ERROR); - } - // and pass on any sub-job messages to the user - wsInfo.setProgressText(j.jobnum, OutputHeader); - wsInfo.appendProgressText(j.jobnum, progheader); - if (j.result.getStatus() != null) - { - wsInfo.appendProgressText(j.jobnum, j.result.getStatus()); - } - } - else - { - if (j.submitted && j.subjobComplete) - { - if (j.allowedServerExceptions == 0) - { - serror++; - } - else if (j.result == null) - { - error++; - } - } - } - } + void pollJob(WSJob job) throws Exception { + ((MsaWSJob) job).result = server.getResult(((MsaWSJob) job).jobId); } - public void run() + void StartJob(WSJob job) { - JobStateSummary jstate=null; - while (!jobComplete) - { - jstate = new JobStateSummary(); - for (int j = 0; j < jobs.length; j++) - { - - if (!jobs[j].submitted && jobs[j].seqs.getSeqs() != null) - { - StartJob(jobs[j]); - } - - if (jobs[j].submitted && !jobs[j].subjobComplete) - { - try - { - if ( (jobs[j].result = server.getResult(jobs[j].jobId)) == null) - { - throw (new Exception( - "Timed out when communicating with server\nTry again later.\n")); - } - jalview.bin.Cache.log.debug("Job " + j + " Result state " + - jobs[j].result.getState() - + "(ServerError=" + - jobs[j].result.isServerError() + ")"); - } - catch (Exception ex) - { - // Deal with Transaction exceptions - wsInfo.appendProgressText(jobs[j].jobnum, "\n" + WebServiceName - + " Server exception!\n" + ex.getMessage()); - Cache.log.warn(WebServiceName + " job(" + jobs[j].jobnum - + ") Server exception: " + ex.getMessage()); - - if (jobs[j].allowedServerExceptions > 0) - { - jobs[j].allowedServerExceptions--; - Cache.log.debug("Sleeping after a server exception."); - try - { - Thread.sleep(5000); - } - catch (InterruptedException ex1) - { - } - } - else - { - Cache.log.warn("Dropping job " + j + " " + jobs[j].jobId); - jobs[j].subjobComplete = true; - wsInfo.setStatus(jobs[j].jobnum, - WebserviceInfo.STATE_STOPPED_SERVERERROR); - } - } - catch (OutOfMemoryError er) - { - jobComplete = true; - jobs[j].subjobComplete = true; - jobs[j].result = null; // may contain out of date result object - wsInfo.setStatus(jobs[j].jobnum, - WebserviceInfo.STATE_STOPPED_ERROR); - JOptionPane - .showInternalMessageDialog( - Desktop.desktop, - "Out of memory handling result for job !!" - + - "\nSee help files for increasing Java Virtual Machine memory.", - "Out of memory", JOptionPane.WARNING_MESSAGE); - Cache.log.error("Out of memory when retrieving Job " + j + " id:" + - WsUrl + "/" + jobs[j].jobId, er); - System.gc(); - } - } - jstate.updateJobPanelState(wsInfo, jobs[j]); - } - // Decide on overall state based on collected jobs[] states - if (jstate.running > 0) - { - wsInfo.setStatus(WebserviceInfo.STATE_RUNNING); - } - else if (jstate.queuing > 0) - { - wsInfo.setStatus(WebserviceInfo.STATE_QUEUING); - } - else - { - jobComplete = true; - if (jstate.finished > 0) - { - wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_OK); - } - else if (jstate.error > 0) - { - wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_ERROR); - } - else if (jstate.serror > 0) - { - wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_SERVERERROR); - } - } - if (!jobComplete) - { - try - { - Thread.sleep(5000); - } - catch (InterruptedException e) - { - Cache.log.debug("Interrupted sleep waiting for next job poll.", e); - } - // System.out.println("I'm alive "+alTitle); - } + if (!(job instanceof MsaWSJob)) { + throw new Error("StartJob(MsaWSJob) called on a WSJobInstance "+job.getClass()); } - if (jobComplete) - { - parseResult(); // tidy up and make results available to user - } - } - - void StartJob(MsaWSJob j) - { + MsaWSJob j = (MsaWSJob) job; if (j.submitted) { if (Cache.log.isDebugEnabled()) @@ -709,7 +500,7 @@ class MsaWSThread j.result = new MsaResult(); j.result.setFinished(true); j.result.setStatus("Empty Alignment Job"); - j.result.setMsa(null); + ((MsaResult) j.result).setMsa(null); } try { @@ -783,11 +574,11 @@ class MsaWSThread { for (int j = 0; j < jobs.length; j++) { - finalState.updateJobPanelState(wsInfo, jobs[j]); + finalState.updateJobPanelState(wsInfo, OutputHeader, jobs[j]); if (jobs[j].submitted && jobs[j].subjobComplete && jobs[j].hasResults()) { results++; - vamsas.objects.simple.Alignment valign = jobs[j].result.getMsa(); + vamsas.objects.simple.Alignment valign = ((MsaResult) jobs[j].result).getMsa(); if (valign != null) { wsInfo.appendProgressText(jobs[j].jobnum, @@ -832,7 +623,8 @@ class MsaWSThread } }); wsInfo.setResultsReady(); - } + } else + wsInfo.setFinishedNoResults(); } void displayResults(boolean newFrame) @@ -859,7 +651,7 @@ class MsaWSThread int width = 0; // subalignment width if (jobs[j].hasResults()) { - Object[] subalg = jobs[j++].getAlignment(); + Object[] subalg = ((MsaWSJob) jobs[j++]).getAlignment(); alorders.add(subalg[1]); SequenceI mseq[] = (SequenceI[]) subalg[0]; width = mseq[0].getLength(); @@ -947,7 +739,7 @@ class MsaWSThread int width = 0; if (jobs[j].hasResults()) { - Object[] subalg = jobs[j].getAlignment(); + Object[] subalg = ((MsaWSJob) jobs[j]).getAlignment(); alorders.add(subalg[1]); SequenceI mseq[] = (SequenceI[]) subalg[0]; width = mseq[0].getLength(); @@ -1014,7 +806,7 @@ class MsaWSThread { if (jobs[0].hasResults()) { - Object[] alg = jobs[0].getAlignment(); + Object[] alg = ((MsaWSJob) jobs[0]).getAlignment(); alignment = (SequenceI[]) alg[0]; alorders.add(alg[1]); } diff --git a/src/jalview/ws/WSThread.java b/src/jalview/ws/WSThread.java new file mode 100644 index 0000000..4c29537 --- /dev/null +++ b/src/jalview/ws/WSThread.java @@ -0,0 +1,249 @@ +package jalview.ws; + +import jalview.gui.AlignFrame; +import jalview.gui.WebserviceInfo; +import jalview.datamodel.AlignmentView; +import jalview.gui.Desktop; +import javax.swing.JOptionPane; +import jalview.bin.Cache; + +public abstract class WSThread extends Thread +{ + /** + * Generic properties for Web Service Client threads. + */ + AlignFrame alignFrame; + WebserviceInfo wsInfo = null; + AlignmentView input; + boolean jobComplete = false; + abstract class WSJob { + /** + * Generic properties for an individual job within a Web Service Client thread + */ + int jobnum = 0; // WebServiceInfo pane for this job + String jobId; // ws job ticket + boolean cancelled = false; + int allowedServerExceptions = 3; // job dies if too many exceptions. + boolean submitted = false; + boolean subjobComplete = false; + /** + * + * @return true if job has completed and valid results are available + */ + abstract boolean hasResults(); + /** + * + * @return boolean true if job can be submitted. + */ + abstract boolean hasValidInput(); + vamsas.objects.simple.Result result; + } + class JobStateSummary { + int running = 0; + int queuing = 0; + int finished = 0; + int error = 0; + int serror = 0; + int cancelled = 0; + int results = 0; + void updateJobPanelState(WebserviceInfo wsInfo, String OutputHeader, + WSJob j) + { + if (j.result != null) + { + String progheader = ""; + // Parse state of job[j] + if (j.result.isRunning()) + { + running++; + wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_RUNNING); + } + else if (j.result.isQueued()) + { + queuing++; + wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_QUEUING); + } + else if (j.result.isFinished()) + { + finished++; + j.subjobComplete = true; + if (j.hasResults()) + results++; + wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_OK); + } + else if (j.result.isFailed()) + { + progheader += "Job failed.\n"; + j.subjobComplete = true; + wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_ERROR); + error++; + } + else if (j.result.isServerError()) + { + serror++; + j.subjobComplete = true; + wsInfo.setStatus(j.jobnum, + WebserviceInfo.STATE_STOPPED_SERVERERROR); + } + else if (j.result.isBroken() || j.result.isFailed()) + { + error++; + j.subjobComplete = true; + wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_ERROR); + } + // and pass on any sub-job messages to the user + wsInfo.setProgressText(j.jobnum, OutputHeader); + wsInfo.appendProgressText(j.jobnum, progheader); + if (j.result.getStatus() != null) + { + wsInfo.appendProgressText(j.jobnum, j.result.getStatus()); + } + } + else + { + if (j.submitted && j.subjobComplete) + { + if (j.allowedServerExceptions == 0) + { + serror++; + } + else if (j.result == null) + { + error++; + } + } + } + } + } + + WSJob jobs[] = null; + String WebServiceName = null; + String OutputHeader; + String WsUrl = null; + abstract void pollJob(WSJob job) throws Exception; + public void run() + { + JobStateSummary jstate=null; + if (jobs==null) + jobComplete=true; + while (!jobComplete) + { + jstate = new JobStateSummary(); + for (int j = 0; j < jobs.length; j++) + { + + if (!jobs[j].submitted && jobs[j].hasValidInput()) + { + StartJob(jobs[j]); + } + + if (jobs[j].submitted && !jobs[j].subjobComplete) + { + try + { + pollJob(jobs[j]); + if (jobs[j].result == null) + { + throw (new Exception( + "Timed out when communicating with server\nTry again later.\n")); + } + jalview.bin.Cache.log.debug("Job " + j + " Result state " + + jobs[j].result.getState() + + "(ServerError=" + + jobs[j].result.isServerError() + ")"); + } + catch (Exception ex) + { + // Deal with Transaction exceptions + wsInfo.appendProgressText(jobs[j].jobnum, "\n" + WebServiceName + + " Server exception!\n" + ex.getMessage()); + Cache.log.warn(WebServiceName + " job(" + jobs[j].jobnum + + ") Server exception: " + ex.getMessage()); + + if (jobs[j].allowedServerExceptions > 0) + { + jobs[j].allowedServerExceptions--; + Cache.log.debug("Sleeping after a server exception."); + try + { + Thread.sleep(5000); + } + catch (InterruptedException ex1) + { + } + } + else + { + Cache.log.warn("Dropping job " + j + " " + jobs[j].jobId); + jobs[j].subjobComplete = true; + wsInfo.setStatus(jobs[j].jobnum, + WebserviceInfo.STATE_STOPPED_SERVERERROR); + } + } + catch (OutOfMemoryError er) + { + jobComplete = true; + jobs[j].subjobComplete = true; + jobs[j].result = null; // may contain out of date result object + wsInfo.setStatus(jobs[j].jobnum, + WebserviceInfo.STATE_STOPPED_ERROR); + JOptionPane + .showInternalMessageDialog( + Desktop.desktop, + "Out of memory handling result for job !!" + + + "\nSee help files for increasing Java Virtual Machine memory.", + "Out of memory", JOptionPane.WARNING_MESSAGE); + Cache.log.error("Out of memory when retrieving Job " + j + " id:" + + WsUrl + "/" + jobs[j].jobId, er); + System.gc(); + } + } + jstate.updateJobPanelState(wsInfo, OutputHeader, jobs[j]); + } + // Decide on overall state based on collected jobs[] states + if (jstate.running > 0) + { + wsInfo.setStatus(WebserviceInfo.STATE_RUNNING); + } + else if (jstate.queuing > 0) + { + wsInfo.setStatus(WebserviceInfo.STATE_QUEUING); + } + else + { + jobComplete = true; + if (jstate.finished > 0) + { + wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_OK); + } + else if (jstate.error > 0) + { + wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_ERROR); + } + else if (jstate.serror > 0) + { + wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_SERVERERROR); + } + } + if (!jobComplete) + { + try + { + Thread.sleep(5000); + } + catch (InterruptedException e) + { + Cache.log.debug("Interrupted sleep waiting for next job poll.", e); + } + // System.out.println("I'm alive "+alTitle); + } + } + if (jobComplete) + { + parseResult(); // tidy up and make results available to user + } +} +abstract void StartJob(WSJob job); +abstract void parseResult(); +}