+ super(worker);
+ this.worker = worker;
+ }
+
+ protected void submit()
+ {
+ if (task != null && !(task.isDone() || task.isCancelled()))
+ {
+ throw new IllegalStateException(
+ "Cannot submit new task if the prevoius one is still running");
+ }
+ Cache.log.debug(format("Worker %s queued",
+ worker.getClass().getName()));
+ setState(QUEUED);
+ final var runnable = new Runnable()
+ {
+ private boolean started = false;
+ private boolean completed = false;
+ Future<?> future = null;
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (!started)
+ {
+ Cache.log.debug(format("Worker %s started",
+ worker.getClass().getName()));
+ setState(RUNNING);
+ worker.startUp();
+ started = true;
+ }
+ else if (!completed)
+ {
+ if (worker.poll())
+ {
+ Cache.log.debug(format("Worker %s finished",
+ worker.getClass().getName()));
+ completed = true;
+ setState(IDLE);
+ }
+ }
+ } catch (Throwable th)
+ {
+ Cache.log.debug(format("Worker %s failed",
+ worker.getClass().getName()), th);
+ completed = true;
+ setState(IDLE);
+ }
+ if (completed)
+ {
+ try
+ {
+ future.cancel(false);
+ }
+ catch (NullPointerException ignored)
+ {
+ // extremely unlikely to happen
+ }
+ }
+ }
+ };
+ runnable.future = task = executor.scheduleWithFixedDelay(
+ runnable, 10, 1000, TimeUnit.MILLISECONDS);
+ }
+
+ synchronized protected void cancel()
+ {
+ if (task == null || state == IDLE || state == CANCELLING)
+ {
+ return;
+ }
+ Cache.log.debug(format("Cancelling worker %s",
+ worker.getClass().getName()));
+ setState(CANCELLING);
+ task.cancel(false);
+ if (task.isCancelled())
+ {
+ setState(IDLE);
+ }
+ executor.submit(() -> {
+ worker.cancel();
+ });