JAL-961, JAL-1115 - pending test to ensure only one worker waits around for another...
authorjprocter <jprocter@compbio.dundee.ac.uk>
Tue, 12 Jun 2012 10:57:52 +0000 (11:57 +0100)
committerjprocter <jprocter@compbio.dundee.ac.uk>
Tue, 12 Jun 2012 10:57:52 +0000 (11:57 +0100)
src/jalview/api/AlignCalcManagerI.java
src/jalview/workers/AlignCalcManager.java
src/jalview/workers/ConsensusThread.java
src/jalview/workers/ConservationThread.java
src/jalview/workers/StrucConsensusThread.java

index a7b17c6..1eedc74 100644 (file)
@@ -105,4 +105,14 @@ public interface AlignCalcManagerI
    */
   boolean startRegisteredWorkersOfClass(Class workerClass);
 
+  /**
+   * work out if there is an instance of a worker that is *waiting* to start
+   * calculating
+   *
+   * @param workingClass
+   * @return true if workingClass is already waiting to calculate. false if it
+   *         is calculating, or not queued.
+   */
+  boolean isPending(AlignCalcWorkerI workingClass);
+
 }
index 784c64e..c42e424 100644 (file)
@@ -1,6 +1,7 @@
 package jalview.workers;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.List;
@@ -12,42 +13,99 @@ import jalview.datamodel.AlignmentAnnotation;
 
 public class AlignCalcManager implements AlignCalcManagerI
 {
-  private volatile List<AlignCalcWorkerI> restartable = new ArrayList<AlignCalcWorkerI>();
+  private volatile List<AlignCalcWorkerI> restartable = Collections
+          .synchronizedList(new ArrayList<AlignCalcWorkerI>());
 
-  private final List<Class> blackList = new ArrayList<Class>();
+  private volatile List<Class> blackList = Collections
+          .synchronizedList(new ArrayList<Class>());
 
   /**
    * global record of calculations in progress
    */
-  private final Hashtable<Class, AlignCalcWorkerI> inProgress = new Hashtable<Class, AlignCalcWorkerI>();
+  private volatile Map<Class, AlignCalcWorkerI> inProgress = Collections
+          .synchronizedMap(new Hashtable<Class, AlignCalcWorkerI>());
 
   /**
    * record of calculations pending or in progress in the current context
    */
-  private final Map<Class, List<AlignCalcWorkerI>> updating = new Hashtable<Class, List<AlignCalcWorkerI>>();
+  private volatile Map<Class, List<AlignCalcWorkerI>> updating = Collections
+          .synchronizedMap(new Hashtable<Class, List<AlignCalcWorkerI>>());
 
   @Override
   public void notifyStart(AlignCalcWorkerI worker)
   {
-    List<AlignCalcWorkerI> upd = updating.get(worker.getClass());
-    if (upd == null)
+    synchronized (updating)
     {
-      updating.put(worker.getClass(),
-              upd = new ArrayList<AlignCalcWorkerI>());
+      List<AlignCalcWorkerI> upd = updating.get(worker.getClass());
+      if (upd == null)
+      {
+        updating.put(
+                worker.getClass(),
+                upd = Collections
+                        .synchronizedList(new ArrayList<AlignCalcWorkerI>()));
+      }
+      synchronized (upd)
+      {
+        upd.add(worker);
+      }
     }
-    upd.add(worker);
   }
 
   @Override
-  public synchronized boolean alreadyDoing(AlignCalcWorkerI worker)
+  public boolean alreadyDoing(AlignCalcWorkerI worker)
+  {
+    synchronized (inProgress)
+    {
+      return inProgress.containsKey(worker.getClass());
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see jalview.api.AlignCalcManagerI#isPending(jalview.api.AlignCalcWorkerI)
+   */
+  @Override
+  public boolean isPending(AlignCalcWorkerI workingClass)
+  {
+    List<AlignCalcWorkerI> upd;
+    synchronized (updating)
+    {
+      upd = updating.get(workingClass.getClass());
+      if (upd == null)
+      {
+        return false;
+      }
+      synchronized (upd)
+      {
+        if (upd.size() > 1)
+        {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  // TODO make into api method if needed ?
+  public int numberLive(AlignCalcWorkerI worker)
   {
-    return inProgress.containsKey(worker.getClass());
+    synchronized (updating)
+    {
+      List<AlignCalcWorkerI> upd = updating.get(worker.getClass());
+      if (upd == null)
+      {
+        return 0;
+      }
+      ;
+      return upd.size();
+    }
   }
 
   @Override
-  public synchronized boolean notifyWorking(AlignCalcWorkerI worker)
+  public boolean notifyWorking(AlignCalcWorkerI worker)
   {
-    // synchronized (inProgress)
+    synchronized (inProgress)
     {
       // TODO: decide if we should throw exceptions here if multiple workers
       // start to work
@@ -66,92 +124,133 @@ public class AlignCalcManager implements AlignCalcManagerI
     return true;
   }
 
-  private final HashSet<AlignCalcWorkerI> canUpdate=new HashSet<AlignCalcWorkerI>();
+  private final HashSet<AlignCalcWorkerI> canUpdate = new HashSet<AlignCalcWorkerI>();
+
   @Override
-  public synchronized void workerComplete(AlignCalcWorkerI worker)
+  public void workerComplete(AlignCalcWorkerI worker)
   {
-    inProgress.remove(worker.getClass());
-    List<AlignCalcWorkerI> upd = updating.get(worker.getClass());
-    if (upd != null)
+    synchronized (inProgress)
     {
-      upd.remove(worker);
-      canUpdate.add(worker);
+      inProgress.remove(worker.getClass());
+      List<AlignCalcWorkerI> upd = updating.get(worker.getClass());
+      if (upd != null)
+      {
+        synchronized (upd)
+        {
+          upd.remove(worker);
+        }
+        canUpdate.add(worker);
+      }
     }
-
   }
 
   @Override
   public void workerCannotRun(AlignCalcWorkerI worker)
   {
-    blackList.add(worker.getClass());
+    synchronized (blackList)
+    {
+      blackList.add(worker.getClass());
+    }
   }
 
   public boolean isBlackListed(Class workerType)
   {
-    return blackList.contains(workerType);
+    synchronized (blackList)
+    {
+      return blackList.contains(workerType);
+    }
   }
 
   @Override
   public void startWorker(AlignCalcWorkerI worker)
   {
-    new Thread(worker).start();
+    Thread tw = new Thread(worker);
+    tw.setName(worker.getClass().toString());
+    tw.start();
   }
 
   @Override
-  public synchronized boolean isWorking(AlignCalcWorkerI worker)
+  public boolean isWorking(AlignCalcWorkerI worker)
   {
-    // System.err.println("isWorking : worker "+(worker!=null ?
-    // worker.getClass():"null")+ " "+hashCode());
-    return worker != null && inProgress.get(worker.getClass()) == worker;
+    synchronized (inProgress)
+    {// System.err.println("isWorking : worker "+(worker!=null ?
+     // worker.getClass():"null")+ " "+hashCode());
+      return worker != null && inProgress.get(worker.getClass()) == worker;
+    }
   }
 
   @Override
   public boolean isWorking()
   {
-    // System.err.println("isWorking "+hashCode());
-    return inProgress.size() > 0;
+    synchronized (inProgress)
+    {
+      // System.err.println("isWorking "+hashCode());
+      return inProgress.size() > 0;
+    }
   }
 
   @Override
   public void registerWorker(AlignCalcWorkerI worker)
   {
-    if (!restartable.contains(worker))
+    synchronized (restartable)
     {
-      restartable.add(worker);
+      if (!restartable.contains(worker))
+      {
+        restartable.add(worker);
+      }
+      startWorker(worker);
     }
-    startWorker(worker);
   }
 
   @Override
   public void restartWorkers()
   {
-    for (AlignCalcWorkerI worker : restartable)
+    synchronized (restartable)
     {
-      startWorker(worker);
+      for (AlignCalcWorkerI worker : restartable)
+      {
+        startWorker(worker);
+      }
     }
   }
 
   @Override
   public boolean workingInvolvedWith(AlignmentAnnotation alignmentAnnotation)
   {
-    if (isWorking())
+    synchronized (inProgress)
     {
-      for (List<AlignCalcWorkerI> workers: updating.values())
+      for (AlignCalcWorkerI worker : inProgress.values())
       {
-        for (AlignCalcWorkerI worker:workers)
         if (worker.involves(alignmentAnnotation))
         {
           return true;
         }
       }
     }
+    synchronized (updating)
+    {
+      for (List<AlignCalcWorkerI> workers : updating.values())
+      {
+        for (AlignCalcWorkerI worker : workers)
+          if (worker.involves(alignmentAnnotation))
+          {
+            return true;
+          }
+      }
+    }
     return false;
   }
 
   @Override
   public void updateAnnotationFor(Class workerClass)
   {
-    for (AlignCalcWorkerI worker:canUpdate.toArray(new AlignCalcWorkerI[0]))
+
+    AlignCalcWorkerI[] workers;
+    synchronized (canUpdate)
+    {
+      workers = canUpdate.toArray(new AlignCalcWorkerI[0]);
+    }
+    for (AlignCalcWorkerI worker : workers)
     {
       if (workerClass.equals(worker.getClass()))
       {
@@ -164,37 +263,53 @@ public class AlignCalcManager implements AlignCalcManagerI
   public List<AlignCalcWorkerI> getRegisteredWorkersOfClass(
           Class workerClass)
   {
-    List<AlignCalcWorkerI> workingClass=new ArrayList<AlignCalcWorkerI>();
-    for (AlignCalcWorkerI worker:canUpdate.toArray(new AlignCalcWorkerI[0]))
+    List<AlignCalcWorkerI> workingClass = new ArrayList<AlignCalcWorkerI>();
+    AlignCalcWorkerI[] workers;
+    synchronized (canUpdate)
+    {
+      workers = canUpdate.toArray(new AlignCalcWorkerI[0]);
+    }
+    for (AlignCalcWorkerI worker : workers)
     {
       if (workerClass.equals(worker.getClass()))
       {
         workingClass.add(worker);
       }
     }
-    return (workingClass.size()==0) ? null : workingClass;
+    return (workingClass.size() == 0) ? null : workingClass;
   }
 
   @Override
   public boolean startRegisteredWorkersOfClass(Class workerClass)
   {
-         List<AlignCalcWorkerI> workers=getRegisteredWorkersOfClass(workerClass);
-         if (workers==null)
-         {
-                 return false;
-         }
-         for (AlignCalcWorkerI worker: workers) {
-                 startWorker(worker);
-         }
-         return true;
+    List<AlignCalcWorkerI> workers = getRegisteredWorkersOfClass(workerClass);
+    if (workers == null)
+    {
+      return false;
+    }
+    for (AlignCalcWorkerI worker : workers)
+    {
+      if (!isPending(worker))
+      {
+        startWorker(worker);
+      }
+      else
+      {
+        System.err.println("Pending exists for " + workerClass);
+      }
+    }
+    return true;
   }
 
   @Override
   public void workerMayRun(AlignCalcWorkerI worker)
   {
-    if (blackList.contains(worker.getClass()))
+    synchronized (blackList)
     {
-      blackList.remove(worker.getClass());
+      if (blackList.contains(worker.getClass()))
+      {
+        blackList.remove(worker.getClass());
+      }
     }
   }
 }
index 7dd25f8..9bdd714 100644 (file)
@@ -19,30 +19,37 @@ public class ConsensusThread extends AlignCalcWorker implements AlignCalcWorkerI
     super(alignViewport, alignPanel);
   }
 
+  @Override
   public void run()
   {
+    if (calcMan.isPending(this))
+    {
+      return;
+    }
+    calcMan.notifyStart(this);
+    long started=System.currentTimeMillis();
     try
     {
       AlignmentAnnotation consensus = alignViewport.getAlignmentConsensusAnnotation();
-      if (consensus==null) { return; 
+      if (consensus==null || calcMan.isPending(this))  {
+        calcMan.workerComplete(this);
+        return;
       }
-      calcMan.notifyStart(this); 
-      while (!calcMan.notifyWorking(this)) 
+      while (!calcMan.notifyWorking(this))
       {
+        // System.err.println("Thread (Consensus"+Thread.currentThread().getName()+") Waiting around.");
         try
         {
           if (ap != null)
           {
-            ap.paintAlignment(false);
+           ap.paintAlignment(false);
           }
-
           Thread.sleep(200);
         } catch (Exception ex)
         {
           ex.printStackTrace();
         }
       }
-      calcMan.notifyWorking(this);
       if (alignViewport.isClosed())
       {
         abortAndDestroy();
@@ -66,8 +73,17 @@ public class ConsensusThread extends AlignCalcWorker implements AlignCalcWorkerI
       consensus.annotations = new Annotation[aWidth];
       Hashtable[] hconsensus = alignViewport.getSequenceConsensusHash();
       hconsensus = new Hashtable[aWidth];
-      AAFrequency.calculate(alignment.getSequencesArray(), 0,
+      try {
+       AAFrequency.calculate(alignment.getSequencesArray(), 0,
               alignment.getWidth(), hconsensus, true);
+      } catch (ArrayIndexOutOfBoundsException x){
+        // this happens due to a race condition -
+        // alignment was edited at same time as calculation was running
+        //
+//        calcMan.workerCannotRun(this);
+        calcMan.workerComplete(this);
+        return;
+      }
       alignViewport.setSequenceConsensusHash(hconsensus);
       updateResultAnnotation(true);
       ColourSchemeI globalColourScheme = alignViewport
@@ -97,6 +113,7 @@ public class ConsensusThread extends AlignCalcWorker implements AlignCalcWorkerI
    * update the consensus annotation from the sequence profile data using
    * current visualization settings.
    */
+  @Override
   public void updateAnnotation()
   {
     updateResultAnnotation(false);
index 47ce6eb..23d1773 100644 (file)
@@ -54,7 +54,7 @@ public class ConservationThread extends AlignCalcWorker implements AlignCalcWork
         {
           if (ap != null)
           {
-            ap.paintAlignment(false);
+            // ap.paintAlignment(false);
           }
           Thread.sleep(200);
         } catch (Exception ex)
index ea900dc..a7f919b 100644 (file)
@@ -2,7 +2,6 @@ package jalview.workers;
 
 import java.util.Hashtable;
 
-import jalview.analysis.AAFrequency;
 import jalview.analysis.StructureFrequency;
 import jalview.api.AlignCalcWorkerI;
 import jalview.api.AlignViewportI;
@@ -29,6 +28,10 @@ public class StrucConsensusThread extends AlignCalcWorker implements
   {
     try
     {
+      if (calcMan.isPending(this))
+      {
+        return;
+      }
       calcMan.notifyStart(this);
       while (!calcMan.notifyWorking(this))
       {
@@ -36,7 +39,7 @@ public class StrucConsensusThread extends AlignCalcWorker implements
         {
           if (ap != null)
           {
-            ap.paintAlignment(false);
+            // ap.paintAlignment(false);
           }
 
           Thread.sleep(200);
@@ -85,9 +88,15 @@ public class StrucConsensusThread extends AlignCalcWorker implements
         return;
       }
 
-      jalview.analysis.StructureFrequency.calculate(
+      try {
+              jalview.analysis.StructureFrequency.calculate(
               alignment.getSequencesArray(), 0, alignment.getWidth(),
               hStrucConsensus, true, rnaStruc);
+      } catch (ArrayIndexOutOfBoundsException x)
+      {
+        calcMan.workerComplete(this);
+        return;
+      }
       alignViewport.setRnaStructureConsensusHash(hStrucConsensus);
       // TODO AlignmentAnnotation rnaStruc!!!
       updateResultAnnotation(true);
@@ -103,12 +112,13 @@ public class StrucConsensusThread extends AlignCalcWorker implements
       // consensus = null;
       // hconsensus = null;
       ap.raiseOOMWarning("calculating RNA structure consensus", error);
-    }
-
-    calcMan.workerComplete(this);
-    if (ap != null)
+    } finally
     {
-      ap.paintAlignment(true);
+      calcMan.workerComplete(this);
+      if (ap != null)
+      {
+        ap.paintAlignment(true);
+      }
     }
 
   }