From 95bef5c43ee5a194f5e74312463d1346aedb19c6 Mon Sep 17 00:00:00 2001 From: Mateusz Warowny Date: Mon, 20 Sep 2021 18:21:01 +0200 Subject: [PATCH] JAL-3878 Add web service executor class and instantiate for each alignment viewport. --- src/jalview/viewmodel/AlignmentViewport.java | 10 ++ src/jalview/ws2/WebServiceExecutor.java | 136 ++++++++++++++++---------- 2 files changed, 96 insertions(+), 50 deletions(-) diff --git a/src/jalview/viewmodel/AlignmentViewport.java b/src/jalview/viewmodel/AlignmentViewport.java index 1a08b1b..179bfa0 100644 --- a/src/jalview/viewmodel/AlignmentViewport.java +++ b/src/jalview/viewmodel/AlignmentViewport.java @@ -62,6 +62,7 @@ import jalview.workers.ComplementConsensusThread; import jalview.workers.ConsensusThread; import jalview.workers.InformationThread; import jalview.workers.StrucConsensusThread; +import jalview.ws2.WebServiceExecutor; import java.awt.Color; import java.beans.PropertyChangeSupport; @@ -995,6 +996,13 @@ public abstract class AlignmentViewport return false; } + private WebServiceExecutor wsExecutor = new WebServiceExecutor(); + + public WebServiceExecutor getWSExecutor() + { + return wsExecutor; + } + public void setAlignment(AlignmentI align) { this.alignment = align; @@ -1024,6 +1032,8 @@ public abstract class AlignmentViewport gapcounts = null; calculator.shutdown(); calculator = null; + wsExecutor.shutdown(); + wsExecutor = null; residueShading = null; // may hold a reference to Consensus changeSupport = null; ranges = null; diff --git a/src/jalview/ws2/WebServiceExecutor.java b/src/jalview/ws2/WebServiceExecutor.java index 8959dd6..453e6ce 100644 --- a/src/jalview/ws2/WebServiceExecutor.java +++ b/src/jalview/ws2/WebServiceExecutor.java @@ -10,76 +10,112 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import jalview.bin.Cache; -import jalview.ws2.WebServiceWorkerI.WSJob; public class WebServiceExecutor { private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - + public void submit(final WebServiceWorkerI worker) { - for (var job : worker.getJobs()) { - executor.submit(() -> submitJob(worker, job)); - executor.schedule(() -> pollJob(worker, job), 1, TimeUnit.SECONDS); - } - } - - private void submitJob(WebServiceWorkerI worker, WSJob job) { - try { - job.setJobID(worker.startJob(job).getJobID()); - job.resetAllowedExceptions(); - executor.schedule(() -> pollJob(worker, job), 1, TimeUnit.SECONDS); - } - catch (IOException e) { - Cache.log.error("Exception occurred during job submission", e); - if (!job.deductAllowedExceptions()) { - job.setState(WSJobState.SERVER_ERROR); + executor.submit(() -> { + try { + worker.startJobs(); + wsThreadSupport.submitted(worker); + } + catch (Exception e) { + Cache.log.error("Failed to submit web service jobs.", e); + wsThreadSupport.submissionFailed(worker, e); + return; + } + executor.schedule(() -> pollJobs(worker), 1, TimeUnit.SECONDS); } - } - if (!job.getState().isSubmitted()) { - executor.schedule(() -> submitJob(worker, job), 5, TimeUnit.SECONDS); - } - } - - private void pollJob(WebServiceWorkerI worker, WSJob job) { + ); + executor.schedule(() -> pollJobs(worker), 1, TimeUnit.SECONDS); +} + + + private void pollJobs(WebServiceWorkerI worker) { try { - worker.pollJob(job); - job.resetAllowedExceptions(); + worker.pollJobs(); } - catch (IOException e) { - Cache.log.error("Exception occurred duringn job pollign", e); - if (!job.deductAllowedExceptions()) { - job.setState(WSJobState.SERVER_ERROR); + catch (Exception e) { + Cache.log.error("Failed to poll web service jobs.", e); + for (WSJob job : worker.getJobs()) { + job.setStatus(WSJobStatus.SERVER_ERROR); } + wsThreadSupport.pollFailed(worker, e); + return; } - if (!job.getState().isDone()) { - executor.schedule(() -> pollJob(worker, job), 1, TimeUnit.SECONDS); + if (!worker.isDone()) { + executor.schedule(() -> pollJobs(worker), 1, TimeUnit.SECONDS); } + else { + worker.done(); + wsThreadSupport.done(worker); + } + } + + private WebServiceThreadSupport wsThreadSupport = new WebServiceThreadSupport(); + + public void addThreadListener(WebServiceThreadListenerI listener) + { + wsThreadSupport.addListener(listener); } - - public static interface WebServiceThreadListenerI + + public void removeThreadListener(WebServiceThreadListenerI listener) + { + wsThreadSupport.removeListener(listener); + } + + + public void shutdown() { - public void threadSubmitted(WebServiceWorkerI thread); - public void threadStarted(WebServiceWorkerI thread); - public void stateChanged(WebServiceWorkerI thread, WSJobState oldState, - WSJobState newState); - public void logAppended(WebServiceWorkerI thread, String text); - public void errorLogAppended(WebServiceWorkerI thread, String text); - public void cancelled(WebServiceWorkerI thread); + executor.shutdownNow(); } - - + +} + + +class WebServiceThreadSupport implements WebServiceThreadListenerI { List listeners = new CopyOnWriteArrayList<>(); - - public void addServiceListener(WebServiceThreadListenerI listener) + + @Override + public void submitted(WebServiceWorkerI thread) { - if (!listeners.contains(listener)) - listeners.add(listener); + for (var listener : listeners) listener.submitted(thread); } - - public void removeServiceListener(WebServiceThreadListenerI listener) + + @Override + public void submissionFailed(WebServiceWorkerI thread, Exception e) { + for (var listener : listeners) listener.submissionFailed(thread, e); + } + + @Override + public void pollFailed(WebServiceWorkerI thread, Exception e) { + for (var listener : listeners) listener.pollFailed(thread, e); + } + + @Override + public void cancelled(WebServiceWorkerI thread) + { + for (var listener : listeners) listener.cancelled(thread); + } + + @Override + public void done(WebServiceWorkerI thread) + { + for (var listener : listeners) listener.done(thread); + } + + public void addListener(WebServiceThreadListenerI listener) { + if (!listeners.contains(listener)) { + listeners.add(listener); + } + } + + public void removeListener(WebServiceThreadListenerI listener) { listeners.remove(listener); } } -- 1.7.10.2