refactored to create generic polling thread and subjobs
authorjprocter <Jim Procter>
Mon, 14 Aug 2006 15:23:07 +0000 (15:23 +0000)
committerjprocter <Jim Procter>
Mon, 14 Aug 2006 15:23:07 +0000 (15:23 +0000)
src/jalview/ws/MsaWSThread.java
src/jalview/ws/WSThread.java [new file with mode: 0644]

index 803bf44..1382a23 100644 (file)
@@ -32,16 +32,8 @@ import vamsas.objects.simple.MsaResult;
  * @version 1.0
  */
 class MsaWSThread
-    extends Thread implements WSClientI
+    extends WSThread implements WSClientI
 {
-  jalview.gui.AlignFrame alignFrame;
-
-  WebserviceInfo wsInfo = null;
-
-  String WebServiceName = null;
-
-  String OutputHeader;
-  AlignmentView input;
   boolean submitGaps = false; // pass sequences including gaps to alignment
 
   // service
@@ -50,14 +42,9 @@ class MsaWSThread
 
   // order
 
-  class MsaWSJob
+  class MsaWSJob extends WSThread.WSJob
   {
-    int jobnum = 0; // WebServiceInfo pane for this job
-
-    String jobId; // ws job ticket
-
-    vamsas.objects.simple.MsaResult result = null;
-
+    // hold special input for this
     vamsas.objects.simple.SequenceSet seqs = new vamsas.objects.simple.
         SequenceSet();
 
@@ -83,12 +70,6 @@ class MsaWSThread
 
     }
 
-    int allowedServerExceptions = 3; // thread dies if too many
-
-    // exceptions.
-    boolean submitted = false;
-    boolean subjobComplete = false;
-
     Hashtable SeqNames = new Hashtable();
     Vector emptySeqs = new Vector();
     /**
@@ -160,7 +141,7 @@ class MsaWSThread
     public boolean hasResults()
     {
       if (subjobComplete && result != null && result.isFinished()
-          && result.getMsa() != null && result.getMsa().getSeqs() != null)
+          && ((MsaResult) result).getMsa() != null && ((MsaResult) result).getMsa().getSeqs() != null)
       {
         return true;
       }
@@ -175,10 +156,10 @@ class MsaWSThread
         SequenceI[] alseqs = null;
         char alseq_gapchar = '-';
         int alseq_l = 0;
-        if (result.getMsa() != null)
+        if (((MsaResult) result).getMsa() != null)
         {
-          alseqs = getVamsasAlignment(result.getMsa());
-          alseq_gapchar = result.getMsa().getGapchar().charAt(0);
+          alseqs = getVamsasAlignment(((MsaResult) result).getMsa());
+          alseq_gapchar = ((MsaResult) result).getMsa().getGapchar().charAt(0);
           alseq_l = alseqs.length;
         }
         if (emptySeqs.size() > 0)
@@ -269,8 +250,6 @@ class MsaWSThread
       }
       return null;
     }
-
-    boolean cancelled = false;
     /**
      * mark subjob as cancelled and set result object appropriatly
      */
@@ -279,22 +258,24 @@ class MsaWSThread
       subjobComplete = true;
       result = null;
     }
+    /**
+     *
+     * @return boolean true if job can be submitted.
+     */
+    boolean hasValidInput() {
+      if (seqs.getSeqs()!=null)
+        return true;
+      return false;
+    }
   }
 
-  MsaWSJob jobs[] = null;
 
   String alTitle; // name which will be used to form new alignment window.
-
-  boolean jobComplete = false;
-
   Alignment dataset; // dataset to which the new alignment will be
 
   // associated.
 
   ext.vamsas.MuscleWS server = null;
-
-  String WsUrl = null;
-
   /**
    * set basic options for this (group) of Msa jobs
    *
@@ -446,7 +427,7 @@ class MsaWSThread
             {
               // CANCELLED_JOB
               cancelledMessage = "Job cancelled.";
-              jobs[job].cancel();
+              ((MsaWSJob) jobs[job]).cancel();
               wsInfo.setStatus(jobs[job].jobnum,
                                WebserviceInfo.STATE_CANCELLED_OK);
             }
@@ -495,205 +476,15 @@ class MsaWSThread
       }
     }
   }
-
-  class JobStateSummary {
-    int running = 0;
-    int queuing = 0;
-    int finished = 0;
-    int error = 0;
-    int serror = 0;
-    int cancelled=0;
-    int results=0;
-    void updateJobPanelState(WebserviceInfo wsInfo, MsaWSJob j) {
-      if (j.result != null)
-      {
-        String progheader = "";
-        // Parse state of job[j]
-        if (j.result.isRunning())
-        {
-          running++;
-          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_RUNNING);
-        }
-        else if (j.result.isQueued())
-        {
-          queuing++;
-          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_QUEUING);
-        }
-        else if (j.result.isFinished())
-        {
-          finished++;
-          j.subjobComplete = true;
-          if (j.hasResults())
-            results++;
-          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_OK);
-        }
-        else if (j.result.isFailed())
-        {
-          progheader += "Job failed.\n";
-          j.subjobComplete = true;
-          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_ERROR);
-          error++;
-        }
-        else if (j.result.isServerError())
-        {
-          serror++;
-          j.subjobComplete = true;
-          wsInfo.setStatus(j.jobnum,
-                           WebserviceInfo.STATE_STOPPED_SERVERERROR);
-        }
-        else if (j.result.isBroken() || j.result.isFailed())
-        {
-          error++;
-          j.subjobComplete = true;
-          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_ERROR);
-        }
-        // and pass on any sub-job messages to the user
-        wsInfo.setProgressText(j.jobnum, OutputHeader);
-        wsInfo.appendProgressText(j.jobnum, progheader);
-        if (j.result.getStatus() != null)
-        {
-          wsInfo.appendProgressText(j.jobnum, j.result.getStatus());
-        }
-      }
-      else
-      {
-        if (j.submitted && j.subjobComplete)
-        {
-          if (j.allowedServerExceptions == 0)
-          {
-            serror++;
-          }
-          else if (j.result == null)
-          {
-            error++;
-          }
-        }
-      }
-    }
+  void pollJob(WSJob job) throws Exception {
+    ((MsaWSJob) job).result = server.getResult(((MsaWSJob) job).jobId);
   }
-  public void run()
+  void StartJob(WSJob job)
   {
-    JobStateSummary jstate=null;
-    while (!jobComplete)
-    {
-      jstate = new JobStateSummary();
-      for (int j = 0; j < jobs.length; j++)
-      {
-
-        if (!jobs[j].submitted && jobs[j].seqs.getSeqs() != null)
-        {
-          StartJob(jobs[j]);
-        }
-
-        if (jobs[j].submitted && !jobs[j].subjobComplete)
-        {
-          try
-          {
-            if ( (jobs[j].result = server.getResult(jobs[j].jobId)) == null)
-            {
-              throw (new Exception(
-                  "Timed out when communicating with server\nTry again later.\n"));
-            }
-            jalview.bin.Cache.log.debug("Job " + j + " Result state " +
-                                        jobs[j].result.getState()
-                                        + "(ServerError=" +
-                                        jobs[j].result.isServerError() + ")");
-          }
-          catch (Exception ex)
-          {
-            // Deal with Transaction exceptions
-            wsInfo.appendProgressText(jobs[j].jobnum, "\n" + WebServiceName
-                                      + " Server exception!\n" + ex.getMessage());
-            Cache.log.warn(WebServiceName + " job(" + jobs[j].jobnum
-                           + ") Server exception: " + ex.getMessage());
-
-            if (jobs[j].allowedServerExceptions > 0)
-            {
-              jobs[j].allowedServerExceptions--;
-              Cache.log.debug("Sleeping after a server exception.");
-              try
-              {
-                Thread.sleep(5000);
-              }
-              catch (InterruptedException ex1)
-              {
-              }
-            }
-            else
-            {
-              Cache.log.warn("Dropping job " + j + " " + jobs[j].jobId);
-              jobs[j].subjobComplete = true;
-              wsInfo.setStatus(jobs[j].jobnum,
-                               WebserviceInfo.STATE_STOPPED_SERVERERROR);
-            }
-          }
-          catch (OutOfMemoryError er)
-          {
-            jobComplete = true;
-            jobs[j].subjobComplete = true;
-            jobs[j].result = null; // may contain out of date result object
-            wsInfo.setStatus(jobs[j].jobnum,
-                             WebserviceInfo.STATE_STOPPED_ERROR);
-            JOptionPane
-                .showInternalMessageDialog(
-                    Desktop.desktop,
-                    "Out of memory handling result for job !!"
-                    +
-                "\nSee help files for increasing Java Virtual Machine memory.",
-                    "Out of memory", JOptionPane.WARNING_MESSAGE);
-            Cache.log.error("Out of memory when retrieving Job " + j + " id:" +
-                            WsUrl + "/" + jobs[j].jobId, er);
-            System.gc();
-          }
-        }
-        jstate.updateJobPanelState(wsInfo, jobs[j]);
-      }
-      // Decide on overall state based on collected jobs[] states
-      if (jstate.running > 0)
-      {
-        wsInfo.setStatus(WebserviceInfo.STATE_RUNNING);
-      }
-      else if (jstate.queuing > 0)
-      {
-        wsInfo.setStatus(WebserviceInfo.STATE_QUEUING);
-      }
-      else
-      {
-        jobComplete = true;
-        if (jstate.finished > 0)
-        {
-          wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_OK);
-        }
-        else if (jstate.error > 0)
-        {
-          wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_ERROR);
-        }
-        else if (jstate.serror > 0)
-        {
-          wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_SERVERERROR);
-        }
-      }
-      if (!jobComplete)
-      {
-        try
-        {
-          Thread.sleep(5000);
-        }
-        catch (InterruptedException e)
-        {
-          Cache.log.debug("Interrupted sleep waiting for next job poll.", e);
-        }
-        // System.out.println("I'm alive "+alTitle);
-      }
+    if (!(job instanceof MsaWSJob)) {
+      throw new Error("StartJob(MsaWSJob) called on a WSJobInstance "+job.getClass());
     }
-    if (jobComplete)
-    {
-      parseResult(); // tidy up and make results available to user
-    }
-  }
-
-  void StartJob(MsaWSJob j)
-  {
+    MsaWSJob j = (MsaWSJob) job;
     if (j.submitted)
     {
       if (Cache.log.isDebugEnabled())
@@ -709,7 +500,7 @@ class MsaWSThread
       j.result = new MsaResult();
       j.result.setFinished(true);
       j.result.setStatus("Empty Alignment Job");
-      j.result.setMsa(null);
+      ((MsaResult) j.result).setMsa(null);
     }
     try
     {
@@ -783,11 +574,11 @@ class MsaWSThread
     {
       for (int j = 0; j < jobs.length; j++)
       {
-        finalState.updateJobPanelState(wsInfo, jobs[j]);
+        finalState.updateJobPanelState(wsInfo, OutputHeader, jobs[j]);
         if (jobs[j].submitted && jobs[j].subjobComplete && jobs[j].hasResults())
         {
           results++;
-          vamsas.objects.simple.Alignment valign = jobs[j].result.getMsa();
+          vamsas.objects.simple.Alignment valign = ((MsaResult) jobs[j].result).getMsa();
           if (valign != null)
           {
             wsInfo.appendProgressText(jobs[j].jobnum,
@@ -832,7 +623,8 @@ class MsaWSThread
         }
       });
       wsInfo.setResultsReady();
-    }
+    } else
+      wsInfo.setFinishedNoResults();
   }
 
   void displayResults(boolean newFrame)
@@ -859,7 +651,7 @@ class MsaWSThread
           int width = 0; // subalignment width
           if (jobs[j].hasResults())
           {
-            Object[] subalg = jobs[j++].getAlignment();
+            Object[] subalg = ((MsaWSJob) jobs[j++]).getAlignment();
             alorders.add(subalg[1]);
             SequenceI mseq[] = (SequenceI[]) subalg[0];
             width = mseq[0].getLength();
@@ -947,7 +739,7 @@ class MsaWSThread
         int width = 0;
         if (jobs[j].hasResults())
         {
-          Object[] subalg = jobs[j].getAlignment();
+          Object[] subalg = ((MsaWSJob) jobs[j]).getAlignment();
           alorders.add(subalg[1]);
           SequenceI mseq[] = (SequenceI[]) subalg[0];
           width = mseq[0].getLength();
@@ -1014,7 +806,7 @@ class MsaWSThread
     {
       if (jobs[0].hasResults())
       {
-        Object[] alg = jobs[0].getAlignment();
+        Object[] alg = ((MsaWSJob) jobs[0]).getAlignment();
         alignment = (SequenceI[]) alg[0];
         alorders.add(alg[1]);
       }
diff --git a/src/jalview/ws/WSThread.java b/src/jalview/ws/WSThread.java
new file mode 100644 (file)
index 0000000..4c29537
--- /dev/null
@@ -0,0 +1,249 @@
+package jalview.ws;
+
+import jalview.gui.AlignFrame;
+import jalview.gui.WebserviceInfo;
+import jalview.datamodel.AlignmentView;
+import jalview.gui.Desktop;
+import javax.swing.JOptionPane;
+import jalview.bin.Cache;
+
+public abstract class WSThread extends Thread
+{
+    /**
+     * Generic properties for Web Service Client threads.
+     */
+    AlignFrame alignFrame;
+  WebserviceInfo wsInfo = null;
+  AlignmentView input;
+  boolean jobComplete = false;
+  abstract class WSJob {
+      /**
+       * Generic properties for an individual job within a Web Service Client thread
+       */
+    int jobnum = 0; // WebServiceInfo pane for this job
+    String jobId; // ws job ticket
+    boolean cancelled = false;
+    int allowedServerExceptions = 3; // job dies if too many exceptions.
+    boolean submitted = false;
+    boolean subjobComplete = false;
+    /**
+     *
+     * @return true if job has completed and valid results are available
+     */
+    abstract boolean hasResults();
+    /**
+     *
+     * @return boolean true if job can be submitted.
+     */
+    abstract boolean hasValidInput();
+    vamsas.objects.simple.Result result;
+  }
+  class JobStateSummary {
+    int running = 0;
+    int queuing = 0;
+    int finished = 0;
+    int error = 0;
+    int serror = 0;
+    int cancelled = 0;
+    int results = 0;
+    void updateJobPanelState(WebserviceInfo wsInfo, String OutputHeader,
+                             WSJob j)
+    {
+      if (j.result != null)
+      {
+        String progheader = "";
+        // Parse state of job[j]
+        if (j.result.isRunning())
+        {
+          running++;
+          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_RUNNING);
+        }
+        else if (j.result.isQueued())
+        {
+          queuing++;
+          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_QUEUING);
+        }
+        else if (j.result.isFinished())
+        {
+          finished++;
+          j.subjobComplete = true;
+          if (j.hasResults())
+            results++;
+          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_OK);
+        }
+        else if (j.result.isFailed())
+        {
+          progheader += "Job failed.\n";
+          j.subjobComplete = true;
+          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_ERROR);
+          error++;
+        }
+        else if (j.result.isServerError())
+        {
+          serror++;
+          j.subjobComplete = true;
+          wsInfo.setStatus(j.jobnum,
+                           WebserviceInfo.STATE_STOPPED_SERVERERROR);
+        }
+        else if (j.result.isBroken() || j.result.isFailed())
+        {
+          error++;
+          j.subjobComplete = true;
+          wsInfo.setStatus(j.jobnum, WebserviceInfo.STATE_STOPPED_ERROR);
+        }
+        // and pass on any sub-job messages to the user
+        wsInfo.setProgressText(j.jobnum, OutputHeader);
+        wsInfo.appendProgressText(j.jobnum, progheader);
+        if (j.result.getStatus() != null)
+        {
+          wsInfo.appendProgressText(j.jobnum, j.result.getStatus());
+        }
+      }
+      else
+      {
+        if (j.submitted && j.subjobComplete)
+        {
+          if (j.allowedServerExceptions == 0)
+          {
+            serror++;
+          }
+          else if (j.result == null)
+          {
+            error++;
+          }
+        }
+      }
+    }
+  }
+
+  WSJob jobs[] = null;
+  String WebServiceName = null;
+  String OutputHeader;
+  String WsUrl = null;
+  abstract void pollJob(WSJob job) throws Exception;
+  public void run()
+  {
+  JobStateSummary jstate=null;
+  if (jobs==null)
+    jobComplete=true;
+  while (!jobComplete)
+  {
+    jstate = new JobStateSummary();
+    for (int j = 0; j < jobs.length; j++)
+    {
+
+      if (!jobs[j].submitted && jobs[j].hasValidInput())
+      {
+        StartJob(jobs[j]);
+      }
+
+      if (jobs[j].submitted && !jobs[j].subjobComplete)
+      {
+        try
+        {
+          pollJob(jobs[j]);
+          if (jobs[j].result == null)
+          {
+            throw (new Exception(
+                "Timed out when communicating with server\nTry again later.\n"));
+          }
+          jalview.bin.Cache.log.debug("Job " + j + " Result state " +
+                                      jobs[j].result.getState()
+                                      + "(ServerError=" +
+                                      jobs[j].result.isServerError() + ")");
+        }
+        catch (Exception ex)
+        {
+          // Deal with Transaction exceptions
+          wsInfo.appendProgressText(jobs[j].jobnum, "\n" + WebServiceName
+                                    + " Server exception!\n" + ex.getMessage());
+          Cache.log.warn(WebServiceName + " job(" + jobs[j].jobnum
+                         + ") Server exception: " + ex.getMessage());
+
+          if (jobs[j].allowedServerExceptions > 0)
+          {
+            jobs[j].allowedServerExceptions--;
+            Cache.log.debug("Sleeping after a server exception.");
+            try
+            {
+              Thread.sleep(5000);
+            }
+            catch (InterruptedException ex1)
+            {
+            }
+          }
+          else
+          {
+            Cache.log.warn("Dropping job " + j + " " + jobs[j].jobId);
+            jobs[j].subjobComplete = true;
+            wsInfo.setStatus(jobs[j].jobnum,
+                             WebserviceInfo.STATE_STOPPED_SERVERERROR);
+          }
+        }
+        catch (OutOfMemoryError er)
+        {
+          jobComplete = true;
+          jobs[j].subjobComplete = true;
+          jobs[j].result = null; // may contain out of date result object
+          wsInfo.setStatus(jobs[j].jobnum,
+                           WebserviceInfo.STATE_STOPPED_ERROR);
+          JOptionPane
+              .showInternalMessageDialog(
+                  Desktop.desktop,
+                  "Out of memory handling result for job !!"
+                  +
+              "\nSee help files for increasing Java Virtual Machine memory.",
+                  "Out of memory", JOptionPane.WARNING_MESSAGE);
+          Cache.log.error("Out of memory when retrieving Job " + j + " id:" +
+                          WsUrl + "/" + jobs[j].jobId, er);
+          System.gc();
+        }
+      }
+      jstate.updateJobPanelState(wsInfo, OutputHeader, jobs[j]);
+    }
+    // Decide on overall state based on collected jobs[] states
+    if (jstate.running > 0)
+    {
+      wsInfo.setStatus(WebserviceInfo.STATE_RUNNING);
+    }
+    else if (jstate.queuing > 0)
+    {
+      wsInfo.setStatus(WebserviceInfo.STATE_QUEUING);
+    }
+    else
+    {
+      jobComplete = true;
+      if (jstate.finished > 0)
+      {
+        wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_OK);
+      }
+      else if (jstate.error > 0)
+      {
+        wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_ERROR);
+      }
+      else if (jstate.serror > 0)
+      {
+        wsInfo.setStatus(WebserviceInfo.STATE_STOPPED_SERVERERROR);
+      }
+    }
+    if (!jobComplete)
+    {
+      try
+      {
+        Thread.sleep(5000);
+      }
+      catch (InterruptedException e)
+      {
+        Cache.log.debug("Interrupted sleep waiting for next job poll.", e);
+      }
+      // System.out.println("I'm alive "+alTitle);
+    }
+  }
+  if (jobComplete)
+  {
+    parseResult(); // tidy up and make results available to user
+  }
+}
+abstract void StartJob(WSJob job);
+abstract void parseResult();
+}