import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.WeakHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
private abstract class WorkerManager
{
protected volatile boolean enabled = true;
-
- protected final AlignCalcWorkerI worker;
-
+
+ protected AlignCalcWorkerI worker;
+
WorkerManager(AlignCalcWorkerI worker)
{
this.worker = worker;
}
-
- AlignCalcWorkerI getWorker()
+
+ protected AlignCalcWorkerI getWorker()
{
return worker;
}
-
+
boolean isEnabled()
{
return enabled;
}
-
+
void setEnabled(boolean enabled)
{
this.enabled = enabled;
}
-
+
synchronized void restart()
{
if (!isEnabled())
{
return;
}
+ if (!isRestartable())
+ {
+ setEnabled(false);
+ }
if (isWorking())
{
cancel();
}
submit();
}
-
+
+ protected boolean isRestartable()
+ {
+ return registered.containsKey(getWorker());
+ }
+
abstract boolean isWorking();
-
+
protected abstract void submit();
-
+
abstract void cancel();
}
-
-
+
private class SimpleWorkerManager extends WorkerManager
{
private Future<?> task = null;
-
+
SimpleWorkerManager(AlignCalcWorkerI worker)
{
super(worker);
}
-
+
@Override
boolean isWorking()
{
if (task != null && !(task.isDone() || task.isCancelled()))
{
throw new IllegalStateException(
- "Cannot submit new task if the prevoius one is still running");
+ "Cannot submit new task if the prevoius one is still running");
}
- Cache.log.debug(format("Worker %s queued",
- worker.getClass().getName()));
+ Cache.log.debug(
+ format("Worker %s queued", getWorker().getClass().getName()));
task = executor.submit(() -> {
try
{
Cache.log.debug(format("Worker %s started",
- worker.getClass().getName()));
- worker.run();
+ getWorker().getClass().getName()));
+ getWorker().run();
Cache.log.debug(format("Worker %s finished",
- worker.getClass().getName()));
- }
- catch (InterruptedException e)
+ getWorker().getClass().getName()));
+ } catch (InterruptedException e)
{
Cache.log.debug(format("Worker %s interrupted",
- worker.getClass().getName()));
- }
- catch (Throwable th)
+ getWorker().getClass().getName()));
+ } catch (Throwable th)
{
Cache.log.debug(format("Worker %s failed",
- worker.getClass().getName()), th);
- }
- finally
+ getWorker().getClass().getName()), th);
+ } finally
{
+ if (!isRestartable())
+ {
+ // delete worker reference so garbage collector can remove it
+ worker = null;
+ }
}
});
}
return;
}
Cache.log.debug(format("Cancelling worker %s",
- worker.getClass().getName()));
+ getWorker().getClass().getName()));
task.cancel(true);
}
}
-
-
+
private class PollableWorkerManager extends WorkerManager
{
- private final PollableAlignCalcWorkerI worker;
private Future<?> task = null;
-
+
PollableWorkerManager(PollableAlignCalcWorkerI worker)
{
super(worker);
- this.worker = worker;
}
-
+
+ @Override
+ protected PollableAlignCalcWorkerI getWorker()
+ {
+ return (PollableAlignCalcWorkerI) super.getWorker();
+ }
+
@Override
boolean isWorking()
{
return task != null && !task.isDone();
}
-
+
protected void submit()
{
if (task != null && !(task.isDone() || task.isCancelled()))
{
throw new IllegalStateException(
- "Cannot submit new task if the prevoius one is still running");
+ "Cannot submit new task if the prevoius one is still running");
}
- Cache.log.debug(format("Worker %s queued",
- worker.getClass().getName()));
+ Cache.log.debug(
+ format("Worker %s queued", getWorker().getClass().getName()));
final var runnable = new Runnable()
{
private boolean started = false;
+
private boolean completed = false;
+
Future<?> future = null;
-
+
@Override
public void run()
{
if (!started)
{
Cache.log.debug(format("Worker %s started",
- worker.getClass().getName()));
- worker.startUp();
+ getWorker().getClass().getName()));
+ getWorker().startUp();
started = true;
}
else if (!completed)
{
Cache.log.debug(format("Polling worker %s",
- worker.getClass().getName()));
- if (worker.poll())
+ getWorker().getClass().getName()));
+ if (getWorker().poll())
{
Cache.log.debug(format("Worker %s finished",
- worker.getClass().getName()));
+ getWorker().getClass().getName()));
completed = true;
}
}
} catch (Throwable th)
{
Cache.log.debug(format("Worker %s failed",
- worker.getClass().getName()), th);
+ getWorker().getClass().getName()), th);
completed = true;
}
if (completed)
{
+ final var worker = getWorker();
+ if (!isRestartable())
+ PollableWorkerManager.super.worker = null;
Cache.log.debug(format("Finalizing completed worker %s",
worker.getClass().getName()));
worker.done();
}
}
};
- runnable.future = task = executor.scheduleWithFixedDelay(
- runnable, 10, 1000, TimeUnit.MILLISECONDS);
+ runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
+ 1000, TimeUnit.MILLISECONDS);
}
-
+
synchronized protected void cancel()
{
if (!isWorking())
return;
}
Cache.log.debug(format("Cancelling worker %s",
- worker.getClass().getName()));
+ getWorker().getClass().getName()));
task.cancel(false);
executor.submit(() -> {
- worker.cancel();
- Cache.log.debug(format("Finalizing cancelled worker %s",
- worker.getClass().getName()));
- worker.done();
+ final var worker = getWorker();
+ if (!isRestartable())
+ PollableWorkerManager.super.worker = null;
+ if (worker != null)
+ {
+ worker.cancel();
+ Cache.log.debug(format("Finalizing cancelled worker %s",
+ worker.getClass().getName()));
+ worker.done();
+ }
});
}
}
-
-
- private final ScheduledExecutorService executor =
- Executors.newSingleThreadScheduledExecutor();
- private final Map<AlignCalcWorkerI, WorkerManager> registered =
- synchronizedMap(new HashMap<>());
-
- private final List<AlignCalcListener> listeners =
- new CopyOnWriteArrayList<>();
-
- private WorkerManager createManager(AlignCalcWorkerI worker) {
+
+ private final ScheduledExecutorService executor = Executors
+ .newSingleThreadScheduledExecutor();
+
+ private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
+ new HashMap<>());
+
+ private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
+ new WeakHashMap<>());
+
+ private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
+
+ private WorkerManager createManager(AlignCalcWorkerI worker)
+ {
if (worker instanceof PollableAlignCalcWorkerI)
+ {
return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
+ }
else
+ {
return new SimpleWorkerManager(worker);
+ }
}
-
+
@Override
public void registerWorker(AlignCalcWorkerI worker)
{
@Override
public boolean isWorking(AlignCalcWorkerI worker)
{
- if (!registered.containsKey(worker))
- {
+ var manager = registered.get(worker);
+ if (manager == null)
+ manager = oneshot.get(worker);
+ if (manager == null)
return false;
- }
else
- {
- return registered.get(worker).isWorking();
- }
+ return manager.isWorking();
}
@Override
synchronized (registered)
{
for (var entry : registered.entrySet())
- {
- if (entry.getKey().involves(annot) &&
- entry.getValue().isWorking())
- {
+ if (entry.getKey().involves(annot) && entry.getValue().isWorking())
+ return true;
+ }
+ synchronized (oneshot)
+ {
+ for (var entry : registered.entrySet())
+ if (entry.getKey().involves(annot) && entry.getValue().isWorking())
return true;
- }
- }
}
return false;
}
synchronized (registered)
{
for (var manager : registered.values())
- {
if (manager.isWorking())
- {
return true;
- }
- }
+ }
+ synchronized (oneshot)
+ {
+ for (var manager : oneshot.values())
+ if (manager.isWorking())
+ return true;
}
return false;
}
{
Objects.requireNonNull(worker);
var manager = registered.get(worker);
- if (manager == null)
+ if (manager == null)
{
Cache.log.warn("Starting unregistered worker " + worker);
manager = createManager(worker);
+ oneshot.put(worker, manager);
}
manager.restart();
}
@Override
public void cancelWorker(AlignCalcWorkerI worker)
- {
+ {
Objects.requireNonNull(worker);
var manager = registered.get(worker);
- if (manager == null)
+ if (manager == null)
+ manager = oneshot.get(worker);
+ if (manager == null)
{
throw new NoSuchElementException();
}
manager.cancel();
}
-
+
private void notifyQueued(AlignCalcWorkerI worker)
{
for (AlignCalcListener listener : listeners)