X-Git-Url: http://source.jalview.org/gitweb/?a=blobdiff_plain;f=src%2Fuk%2Fac%2Fvamsas%2Fclient%2Fsimpleclient%2FEventGeneratorThread.java;h=47c84ac3efe332dfe02a56adb022cd68deb85165;hb=844ccad5a3fcbedec17b2af66d460f31abc7cff1;hp=4a0b1fb7a6bc71f3a2cd06208ed0a625bf710e43;hpb=b4dc48a6b8be7fa6628bc61599b4678030c4084e;p=vamsas.git diff --git a/src/uk/ac/vamsas/client/simpleclient/EventGeneratorThread.java b/src/uk/ac/vamsas/client/simpleclient/EventGeneratorThread.java index 4a0b1fb..47c84ac 100644 --- a/src/uk/ac/vamsas/client/simpleclient/EventGeneratorThread.java +++ b/src/uk/ac/vamsas/client/simpleclient/EventGeneratorThread.java @@ -1,352 +1,430 @@ -package uk.ac.vamsas.client.simpleclient; - -import java.beans.PropertyChangeListener; -import java.beans.PropertyChangeSupport; -import java.util.Hashtable; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import uk.ac.vamsas.client.Events; - -/** - * monitors watcher objects and generates events. - */ -public class EventGeneratorThread { - private static Log log = LogFactory.getLog(EventGeneratorThread.class); - private SimpleClient client; - private Hashtable handlers; // manager object - private VamsasSession session; - /** - * thread watching all the session's file objects - */ - protected VamsasFileWatcherThread watchThread=null; - /** - * Watcher element for list of all the clientHandles for the session - */ - protected SessionFileWatcherElement clientfile=null; - /** - * the session's vamsasDocument - */ - protected VamsasFileWatcherElement vamsasfile=null; - /** - * written to by client when its app calls storeDocument. - */ - protected SessionFileWatcherElement storeFile=null; - - EventGeneratorThread(VamsasSession s, SimpleClient _client, Hashtable eventhandlers) { - if (eventhandlers==null || s==null || _client==null) - throw new Error("Null arguments to EventGeneratorThread constructor."); - handlers = eventhandlers; - session = s; - client = _client; - log.debug("Creating VamsasFileWatcherThread."); - watchThread = new VamsasFileWatcherThread(this); - initWatchers(); - } - - private void initWatchers() { - if (clientfile==null) { - log.debug("Initializing clientfile Watcher"); - clientfile = session.getClientWatcherElement(); - // handler is set in the Vamsas session -/* clientfile.setHandler(new WatcherCallBack() { - - public boolean handleWatchEvent(WatcherElement watcher, Lock lock) { - return clientListChanged(watcher, lock); - } - });*/ - watchThread.addElement(clientfile); - } - final EventGeneratorThread evgen=this; - - if (vamsasfile ==null) { - log.debug("Initializing VamsasFileWatcher"); - vamsasfile = new VamsasFileWatcherElement(session.vamArchive, - new WatcherCallBack() { - public boolean handleWatchEvent(WatcherElement watcher, Lock lock) { - return evgen.documentChanged(lock); - } - }); - watchThread.addElement(vamsasfile); - } - if (storeFile == null) { - storeFile = new SessionFileWatcherElement(session.getStoreDocFile(), - new WatcherCallBack() { - public boolean handleWatchEvent(WatcherElement watcher, Lock lock) { - return evgen.storeDocRequest(lock); - } - }); - log.debug("Initializing storeDocFile flag watcher"); - } - /* - */ - log.debug("Watchers inited."); - } - /** - * Call registered handlers for a vamsas session event - * @param handlerEvent a named event - * @param property property name to pass to handler - * @param oldval old value of property to pass - * @param newval new value of property to pass - * @return true if event generation did not raise any exceptions. - */ - boolean _raise(String handlerEvent, String property, Object oldval, Object newval) { - PropertyChangeSupport h = (PropertyChangeSupport) handlers.get(handlerEvent); - if (h!=null) { - log.debug("Triggering:"+handlerEvent); - try { - h.firePropertyChange(property, oldval, newval); - } catch (Exception e) { - log.warn("Client Exception during handling of "+handlerEvent, e); - return false; - } - catch (Error e) - { - log.error("Serious! Client Error during handling of "+handlerEvent, e); - return false; - } - log.debug("Finished :"+handlerEvent); - } else - log.debug("No handlers for raised "+handlerEvent); - return true; - } - protected boolean storeDocRequest(Lock lock) { - if (log.isDebugEnabled()) - log.debug("StoreDocRequest on "+(lock==null ? (lock.isLocked() ? "" : "Invalid ") : "Non-")+"Existing lock"); - // TODO: define the storeFile semaphore mechanism : file exists - all clients inform their apps, and then the client that wrote the file should delete the file (it should hold the lock to it). - if (storeFile.getWatcher().exists) { - _raise(Events.DOCUMENT_FINALIZEAPPDATA, client.getSessionUrn(), null, client); - // expect client to write to document so update watcher state on return - vamsasfile.getWatcher().setState(); - lock.release(); - } - return true; - } - - protected boolean documentChanged(Lock doclock) { - boolean continueWatching=true; - if (!block_document_updates) { - session.vamArchive.fileLock=doclock; - if (client.pickmanager!=null) - client.pickmanager.setPassThru(false); - if (log.isDebugEnabled()) { - log.debug("Initiating a documentChanged event. Document is "+(client.cdocument==null ? "closed" : "open")); - } - // TODO: decide if individual object update handlers are called as well as overall event handler - if (!_raise(Events.DOCUMENT_UPDATE, client.getSessionUrn(), null, client)) - { - log.info("Recovering from errors or exceptions generated by client application"); - if (client.cdocument!=null) - { - try { - client.tidyAwaySessionDocumentState(); - } - catch (Exception e) - { - log.warn("Exception generated by vamsas library - when tidying away session document:",e); - } - catch (Error e) - { - log.error("LIBRARY Implementation error - when tidying away session document:",e); - } - } - - } - if (client.pickmanager!=null) - client.pickmanager.setPassThru(true); - if (log.isDebugEnabled()) { - log.debug("Finished handling a documentChanged event. Document is "+(client.cdocument==null ? "closed" : "open")); - } - if (client.cdocument!=null) - { - log.warn("Implementation Error ? ClientDocument instance has not been closed or updated by handler!"); - } - /*try { - client._session.getVamsasDocument().closeArchive(); - } catch (Exception e) {log.warn("Unexpected exception when closing document after update.",e);}; - */ - } else { - // TODO: check documentChanged */ - log.debug("Ignoring documentChanged event for "+client.getSessionUrn()); - } - return continueWatching; - } - boolean ownsf = false; - /** - * Moved to SimpleClientSessionManager - * scans all watchers and fires changeEvents if necessary - * @return number of events generated. - */ - private boolean clientListChanged(WatcherElement clientfile, Lock watchlock) { - log.debug("ClientListChanged handler called for "+clientfile.getWatcher().getSubject()); - // could make this general - but for now keep simple - if (watchlock!=null) { - // TODO: compare new client list to old list version. is it changed ? - // see what happened to the clientfile - compare our internal version with the one in the file, or just send the updated list out...? - // - /** - * Generated when a new vamsas client is attached to a session (Handle is - * passed) Note: the newly created client does not receive the event. - * - public static final String CLIENT_CREATION = "uk.ac.vamsas.client.events.clientCreateEvent"; - */ // as the test - /** - * Generated when a vamsas client leaves a session (Handle is passed to all - * others). - public static final String CLIENT_FINALIZATION = "uk.ac.vamsas.client.events.clientFinalizationEvent"; - */ // again - as the test. - } - return true; - } - /** - * Events raised by IClient and propagated to others in session - */ - - /** - * number of milliseconds between any file state check. - */ - long POLL_UNIT = 20; - protected void wait(int u) { - if (u<=0) - u=1; - long l = System.currentTimeMillis()+POLL_UNIT*u; - while (System.currentTimeMillis(). + */ +package uk.ac.vamsas.client.simpleclient; + +import java.beans.PropertyChangeListener; +import java.beans.PropertyChangeSupport; +import java.util.Hashtable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import uk.ac.vamsas.client.Events; + +/** + * monitors watcher objects and generates events. + */ +public class EventGeneratorThread { + private static Log log = LogFactory.getLog(EventGeneratorThread.class); + + private SimpleClient client; + + private Hashtable handlers; // manager object + + private VamsasSession session; + + /** + * thread watching all the session's file objects + */ + protected VamsasFileWatcherThread watchThread = null; + + /** + * Watcher element for list of all the clientHandles for the session + */ + protected SessionFileWatcherElement clientfile = null; + + /** + * the session's vamsasDocument + */ + protected VamsasFileWatcherElement vamsasfile = null; + + /** + * written to by client when its app calls storeDocument. + */ + protected SessionFileWatcherElement storeFile = null; + + EventGeneratorThread(VamsasSession s, SimpleClient _client, + Hashtable eventhandlers) { + if (eventhandlers == null || s == null || _client == null) + throw new Error("Null arguments to EventGeneratorThread constructor."); + handlers = eventhandlers; + session = s; + client = _client; + log.debug("Creating VamsasFileWatcherThread."); + watchThread = new VamsasFileWatcherThread(this); + initWatchers(); + } + + private void initWatchers() { + if (clientfile == null) { + log.debug("Initializing clientfile Watcher"); + clientfile = session.getClientWatcherElement(); + // handler is set in the Vamsas session + /* + * clientfile.setHandler(new WatcherCallBack() { + * + * public boolean handleWatchEvent(WatcherElement watcher, Lock lock) { + * return clientListChanged(watcher, lock); } }); + */ + watchThread.addElement(clientfile); + } + final EventGeneratorThread evgen = this; + + if (vamsasfile == null) { + log.debug("Initializing VamsasFileWatcher"); + vamsasfile = new VamsasFileWatcherElement(session.vamArchive, + new WatcherCallBack() { + public boolean handleWatchEvent(WatcherElement watcher, Lock lock) { + return evgen.documentChanged(lock); + } + }); + watchThread.addElement(vamsasfile); + } + if (storeFile == null) { + storeFile = new SessionFileWatcherElement(session.getStoreDocFile(), + new WatcherCallBack() { + public boolean handleWatchEvent(WatcherElement watcher, Lock lock) { + return evgen.storeDocRequest(lock); + } + }); + log.debug("Initializing storeDocFile flag watcher"); + } + /* + */ + log.debug("Watchers inited."); + } + + /** + * Call registered handlers for a vamsas session event + * + * @param handlerEvent + * a named event + * @param property + * property name to pass to handler + * @param oldval + * old value of property to pass + * @param newval + * new value of property to pass + * @return true if event generation did not raise any exceptions. + */ + boolean _raise(String handlerEvent, String property, Object oldval, + Object newval) { + PropertyChangeSupport h = (PropertyChangeSupport) handlers + .get(handlerEvent); + if (h != null) { + log.debug("Triggering:" + handlerEvent); + try { + h.firePropertyChange(property, oldval, newval); + } catch (Exception e) { + log.warn("Client Exception during handling of " + handlerEvent, e); + return false; + } catch (Error e) { + log + .error("Serious! Client Error during handling of " + handlerEvent, + e); + return false; + } + log.debug("Finished :" + handlerEvent); + } else + log.debug("No handlers for raised " + handlerEvent); + return true; + } + + protected boolean storeDocRequest(Lock lock) { + if (log.isDebugEnabled()) + log.debug("StoreDocRequest on " + + (lock == null ? (lock.isLocked() ? "" : "Invalid ") : "Non-") + + "Existing lock"); + // TODO: define the storeFile semaphore mechanism : file exists - all + // clients inform their apps, and then the client that wrote the file should + // delete the file (it should hold the lock to it). + if (storeFile.getWatcher().exists) { + _raise(Events.DOCUMENT_FINALIZEAPPDATA, client.getSessionUrn(), null, + client); + // expect client to write to document so update watcher state on return + vamsasfile.getWatcher().setState(); + lock.release(); + } + return true; + } + + protected boolean documentChanged(Lock doclock) { + boolean continueWatching = true; + if (!block_document_updates) { + session.vamArchive.fileLock = doclock; + if (client.pickmanager != null) + client.pickmanager.setPassThru(false); + if (log.isDebugEnabled()) { + log.debug("Initiating a documentChanged event. Document is " + + (client.cdocument == null ? "closed" : "open")); + } + // TODO: decide if individual object update handlers are called as well as + // overall event handler + if (!_raise(Events.DOCUMENT_UPDATE, client.getSessionUrn(), null, client)) { + log + .info("Recovering from errors or exceptions generated by client application"); + if (client.cdocument != null) { + try { + client.tidyAwaySessionDocumentState(); + } catch (Exception e) { + log + .warn( + "Exception generated by vamsas library - when tidying away session document:", + e); + } catch (Error e) { + log + .error( + "LIBRARY Implementation error - when tidying away session document:", + e); + } + } + + } + if (client.pickmanager != null) + client.pickmanager.setPassThru(true); + if (log.isDebugEnabled()) { + log.debug("Finished handling a documentChanged event. Document is " + + (client.cdocument == null ? "closed" : "open")); + } + if (client.cdocument != null) { + log + .warn("Implementation Error ? ClientDocument instance has not been closed or updated by handler!"); + } + /* + * try { client._session.getVamsasDocument().closeArchive(); } catch + * (Exception e) + * {log.warn("Unexpected exception when closing document after update." + * ,e);}; + */ + } else { + // TODO: check documentChanged */ + log.debug("Ignoring documentChanged event for " + client.getSessionUrn()); + } + return continueWatching; + } + + boolean ownsf = false; + + /** + * Moved to SimpleClientSessionManager scans all watchers and fires + * changeEvents if necessary + * + * @return number of events generated. + */ + private boolean clientListChanged(WatcherElement clientfile, Lock watchlock) { + log.debug("ClientListChanged handler called for " + + clientfile.getWatcher().getSubject()); + // could make this general - but for now keep simple + if (watchlock != null) { + // TODO: compare new client list to old list version. is it changed ? + // see what happened to the clientfile - compare our internal version with + // the one in the file, or just send the updated list out...? + // + /** + * Generated when a new vamsas client is attached to a session (Handle is + * passed) Note: the newly created client does not receive the event. + * + * public static final String CLIENT_CREATION = + * "uk.ac.vamsas.client.events.clientCreateEvent"; + */ + // as the test + /** + * Generated when a vamsas client leaves a session (Handle is passed to + * all others). public static final String CLIENT_FINALIZATION = + * "uk.ac.vamsas.client.events.clientFinalizationEvent"; + */ + // again - as the test. + watchlock.release(); + } + return true; + } + + /** + * Events raised by IClient and propagated to others in session + */ + + /** + * number of milliseconds between any file state check. + */ + long POLL_UNIT = 20; + + protected void wait(int u) { + if (u <= 0) + u = 1; + long l = System.currentTimeMillis() + POLL_UNIT * u; + while (System.currentTimeMillis() < l) + ; + } + + private boolean block_document_updates = false; + + int STORE_WAIT = 5; // how many units before we decide all clients have + // finalized their appdatas + + private boolean in_want_to_store_phase = false; + + /** + * client App requests offline storage of vamsas data. Call blocks whilst + * other apps do any appData finalizing and then returns (after locking the + * vamsasDocument in the session) Note - the calling app may also receive + * events through the EventGeneratorThread for document updates. + * + * @return Lock for session.vamArchive + * @param STORE_WAIT + * indicates how lock the call will block for when nothing appears to + * be happening to the session. + */ + protected Lock want_to_store() { + if (in_want_to_store_phase) { + log + .error("client error: want_to_store called again before first call has completed."); + return null; + } + in_want_to_store_phase = true; + // TODO: test the storeDocumentRequest mechanism + /* + * / watchThread.haltWatchers(); + */ + log.debug("Stopping document_update watcher"); + vamsasfile.haltWatch(); + // block_document_updates=true; + log.debug("Cleared flag for ignoring document_update requests"); + + log.debug("Sending Store Document Request"); + try { + session.addStoreDocumentRequest(client.getClientHandle(), client + .getUserHandle()); + } catch (Exception e) { + log.warn("Whilst writing StoreDocumentRequest for " + + client.getClientHandle().getClientUrn() + " " + + client.getUserHandle(), e); + log.info("trying to continue after storeDocumentRequest exception."); + } + log.debug("Waiting for other apps to do FinalizeApp handling."); + // LATER: refine this semaphore process + // to make a robust signalling mechanism: + // app1 requests, app1..n do something (or don't - they may be dead), + // app1 realises all apps have done their thing, it then continues with + // synchronized data. + // this probably needs two files - a request file, + // and a response file which is acknowledged by the app1 requestor for each + // app. + // eventually, no more responses are received for the request, and the app + // can then only continue with its store. + FileWatcher sfwatcher = session.getStoreWatcher(); + FileWatcher vfwatcher = session.getDocWatcher(); + int units = 0; // zero if updates occured over a sleep period + while (units < STORE_WAIT) { + try { + Thread.sleep(watchThread.WATCH_SLEEP); + } catch (InterruptedException e) { + log.debug("interrupted."); + } + if (sfwatcher.hasChanged() || vfwatcher.hasChanged()) { + units = 0; + } else { + units++; + } + } + + block_document_updates = false; + vamsasfile.enableWatch(); + log.debug("Cleared flag for ignoring document_update requests"); + // wait around again (until our own watcher has woken up and synchronized). + while (units < STORE_WAIT) { + try { + Thread.sleep(watchThread.WATCH_SLEEP); + } catch (InterruptedException e) { + log.debug("interrupted."); + } + if (sfwatcher.hasChanged() || vfwatcher.hasChanged()) + units = 0; + else + units++; + } + + log.debug("finished waiting."); + in_want_to_store_phase = false; + return session.vamArchive.getLock(); + } + + /** + * count handlers for a particular vamsas event + * + * @param event + * string enumeration from uk.ac.vamsas.client.Events + * @return -1 for an invalid event, otherwise the number of handlers + */ + protected int countHandlersFor(String event) { + if (handlers.containsKey(event)) { + PropertyChangeSupport handler = (PropertyChangeSupport) handlers + .get(event); + PropertyChangeListener[] listeners; + if (handler != null) + return ((listeners = handler.getPropertyChangeListeners()) == null) ? -1 + : listeners.length; + } + return -1; + } + + public void disableDocumentWatch() { + vamsasfile.haltWatch(); + } + + public boolean isDocumentWatchEnabled() { + return (vamsasfile != null) && vamsasfile.isWatchEnabled(); + } + + public void enableDocumentWatch() { + vamsasfile.enableWatch(); + } + + public boolean isWatcherAlive() { + return watchThread != null && watchThread.running && watchThread.isAlive(); + } + + public void interruptWatching() { + if (watchThread != null && watchThread.isAlive()) { + // TODO: find a way of interrupting watcher in a way that prevents file IO + // being interrupted + watchThread.interrupt(); + } + + } + + /** + * called to start the session watching thread which generates events + */ + public void startWatching() { + enableDocumentWatch(); + watchThread.start(); + while (!watchThread.running && watchThread.isAlive()) + log.debug("Waiting until watcher is really started."); + } + + public void stopWatching() { + interruptWatching(); + watchThread.haltWatchers(); + + } + +}