311c825fb07c2e43c49611f36f06c3214f347dd5
[vamsas.git] / src / uk / ac / vamsas / client / simpleclient / EventGeneratorThread.java
1 package uk.ac.vamsas.client.simpleclient;
2
3 import java.beans.PropertyChangeListener;
4 import java.beans.PropertyChangeSupport;
5 import java.util.Hashtable;
6
7 import org.apache.commons.logging.Log;
8 import org.apache.commons.logging.LogFactory;
9
10 import uk.ac.vamsas.client.Events;
11
12 /**
13  * monitors watcher objects and generates events.
14  */
15 public class EventGeneratorThread {
16   private static Log log = LogFactory.getLog(EventGeneratorThread.class);
17   private SimpleClient client;
18   private Hashtable handlers; // manager object
19   private VamsasSession session;
20   /**
21    * thread watching all the session's file objects
22    */
23   protected VamsasFileWatcherThread watchThread=null;
24   /** 
25    * Watcher element for list of all the clientHandles for the session
26    */
27   protected SessionFileWatcherElement clientfile=null;
28   /**
29    * the session's vamsasDocument
30    */
31   protected VamsasFileWatcherElement vamsasfile=null;
32   /**
33    * written to by client when its app calls storeDocument.
34    */
35   protected SessionFileWatcherElement storeFile=null;
36   
37   EventGeneratorThread(VamsasSession s, SimpleClient _client, Hashtable eventhandlers) {
38     if (eventhandlers==null || s==null || _client==null)
39       throw new Error("Null arguments to EventGeneratorThread constructor.");
40     handlers = eventhandlers;
41     session = s;
42     client = _client;
43     log.debug("Creating VamsasFileWatcherThread.");
44     watchThread = new VamsasFileWatcherThread(this);
45     initWatchers();
46   }
47   
48   private void initWatchers() {
49     if (clientfile==null) {
50       log.debug("Initializing clientfile Watcher");
51       clientfile = session.getClientWatcherElement();
52      // handler is set in the Vamsas session
53 /* clientfile.setHandler(new WatcherCallBack() {
54
55             public boolean handleWatchEvent(WatcherElement watcher, Lock lock) {
56               return clientListChanged(watcher, lock);
57             }        
58       });*/
59       watchThread.addElement(clientfile);
60     }
61     final EventGeneratorThread evgen=this;
62     
63     if (vamsasfile ==null) {
64       log.debug("Initializing VamsasFileWatcher");
65       vamsasfile = new VamsasFileWatcherElement(session.vamArchive,
66           new WatcherCallBack() {
67          public boolean handleWatchEvent(WatcherElement watcher, Lock lock) {
68            return evgen.documentChanged(lock);
69          }
70       });
71       watchThread.addElement(vamsasfile);
72     }
73     if (storeFile == null) {
74       storeFile = new SessionFileWatcherElement(session.getStoreDocFile(),
75           new WatcherCallBack() {
76         public boolean handleWatchEvent(WatcherElement watcher, Lock lock) {
77           return evgen.storeDocRequest(lock);
78         }
79       });
80       log.debug("Initializing storeDocFile flag watcher");
81     }
82     /*
83     */
84     log.debug("Watchers inited.");
85   }
86   void _raise(String handlerEvent, String property, Object oldval, Object newval) {
87     PropertyChangeSupport h = (PropertyChangeSupport) handlers.get(handlerEvent);
88     if (h!=null) {
89       log.debug("Triggering:"+handlerEvent);
90       try {
91         h.firePropertyChange(property, oldval, newval);
92       } catch (Exception e) {
93         log.warn("Client Exception during handling of "+handlerEvent, e);
94       }
95       log.debug("Finished  :"+handlerEvent);
96     } else
97       log.debug("No handlers for raised "+handlerEvent);
98   }
99   protected boolean storeDocRequest(Lock lock) {
100     if (log.isDebugEnabled())
101       log.debug("StoreDocRequest on "+(lock==null ? (lock.isLocked() ? "" : "Invalid ") : "Non-")+"Existing lock");
102     // TODO: define the storeFile semaphore mechanism : file exists - all clients inform their apps, and then the client that wrote the file should delete the file (it should hold the lock to it).
103     if (storeFile.getWatcher().exists) {
104       _raise(Events.DOCUMENT_FINALIZEAPPDATA, client.getSessionUrn(), null, client);
105       // expect client to write to document so update watcher state on return
106       vamsasfile.getWatcher().setState();
107       lock.release();
108     }
109     return true;
110   }
111
112   protected boolean documentChanged(Lock doclock) {
113     boolean continueWatching=true;
114     if (!block_document_updates) {
115       session.vamArchive.fileLock=doclock;
116       if (client.pickmanager!=null)
117         client.pickmanager.setPassThru(false);
118       // TODO: decide if individual object update handlers are called as well as overall event handler
119       _raise(Events.DOCUMENT_UPDATE, client.getSessionUrn(), null, client);
120       if (client.pickmanager!=null)
121         client.pickmanager.setPassThru(true);
122       if (log.isDebugEnabled()) {
123         log.debug("Finished handling a documentChanged event. Document is "+(client.cdocument==null ? "closed" : "open"));
124       }
125       /*try {
126         client._session.getVamsasDocument().closeArchive();
127       } catch (Exception e) {log.warn("Unexpected exception when closing document after update.",e);};
128       */
129     } else {
130       // TODO: check documentChanged */
131       log.debug("Ignoring documentChanged event for "+client.getSessionUrn());
132     }
133     return continueWatching;
134   }
135   boolean ownsf = false;
136   /**
137    * Moved to SimpleClientSessionManager
138    * scans all watchers and fires changeEvents if necessary
139    * @return number of events generated.
140    */
141   private boolean clientListChanged(WatcherElement clientfile, Lock watchlock) {
142     log.debug("ClientListChanged handler called for "+clientfile.getWatcher().getSubject());
143     // could make this general - but for now keep simple
144     if (watchlock!=null) {
145       // TODO: compare new client list to old list version. is it changed ?
146       // see what happened to the clientfile - compare our internal version with the one in the file, or just send the updated list out...?
147       //
148       /**
149        * Generated when a new vamsas client is attached to a session (Handle is
150        * passed) Note: the newly created client does not receive the event.
151        *
152       public static final String CLIENT_CREATION = "uk.ac.vamsas.client.events.clientCreateEvent";
153   */ // as the test
154       /**
155        * Generated when a vamsas client leaves a session (Handle is passed to all
156        * others).
157       public static final String CLIENT_FINALIZATION = "uk.ac.vamsas.client.events.clientFinalizationEvent";
158        */ // again - as the test.
159     }
160     return true;
161   }
162   /**
163    * Events raised by IClient and propagated to others in session
164    */
165   
166   /**
167    * number of milliseconds between any file state check.
168    */
169   long POLL_UNIT = 20;
170   protected void wait(int u) {
171     if (u<=0)
172       u=1;
173     long l = System.currentTimeMillis()+POLL_UNIT*u;
174       while (System.currentTimeMillis()<l)
175         ;
176   }
177     
178   
179   private boolean block_document_updates=false;
180   int STORE_WAIT=5; // how many units before we decide all clients have finalized their appdatas
181   private boolean in_want_to_store_phase=false;
182   /**
183    * client App requests offline storage of vamsas data. 
184    * Call blocks whilst other apps do any appData finalizing
185    * and then returns (after locking the vamsasDocument in the session)
186    * Note - the calling app may also receive events through the EventGeneratorThread for document updates.
187    * 
188    * @return Lock for session.vamArchive 
189    * @param STORE_WAIT indicates how lock the call will block for when nothing appears to be happening to the session.
190    */
191   protected Lock want_to_store() {
192     if (in_want_to_store_phase) {
193       log.error("client error: want_to_store called again before first call has completed.");
194       return null;
195     }
196     in_want_to_store_phase=true;
197     // TODO: test the storeDocumentRequest mechanism
198     /*/ watchThread.haltWatchers();
199      */
200     log.debug("Stopping document_update watcher");
201     vamsasfile.haltWatch();
202     // block_document_updates=true;
203     log.debug("Cleared flag for ignoring document_update requests");
204
205     log.debug("Sending Store Document Request");
206     try {
207       session.addStoreDocumentRequest(client.getClientHandle(), client.getUserHandle());
208     } catch (Exception e) {
209       log.warn("Whilst writing StoreDocumentRequest for "+client.getClientHandle().getClientUrn()+" "+client.getUserHandle(),
210           e);
211       log.info("trying to continue after storeDocumentRequest exception.");
212     }
213     log.debug("Waiting for other apps to do FinalizeApp handling.");
214     // LATER: refine this semaphore process 
215     // to make a robust signalling mechanism:
216     // app1 requests, app1..n do something (or don't - they may be dead), 
217     // app1 realises all apps have done their thing, it then continues with synchronized data.
218     // this probably needs two files - a request file, 
219     //  and a response file which is acknowledged by the app1 requestor for each app.
220     //  eventually, no more responses are received for the request, and the app can then only continue with its store.
221     FileWatcher sfwatcher=session.getStoreWatcher();
222     FileWatcher vfwatcher=session.getDocWatcher();
223     int units = 0; // zero if updates occured over a sleep period
224     while (units<STORE_WAIT) {
225       try {
226       Thread.sleep(watchThread.WATCH_SLEEP); 
227       } catch (InterruptedException e) {
228         log.debug("interrupted.");
229       }
230       if (sfwatcher.hasChanged() || vfwatcher.hasChanged()) {
231         units=0;
232       } else {
233          units++;
234       }
235     }
236     
237     block_document_updates=false;
238     vamsasfile.enableWatch();
239     log.debug("Cleared flag for ignoring document_update requests");
240     // wait around again (until our own watcher has woken up and synchronized).
241     while (units<STORE_WAIT) {
242       try {
243         Thread.sleep(watchThread.WATCH_SLEEP); 
244         } catch (InterruptedException e) {
245           log.debug("interrupted.");
246         }
247       if (sfwatcher.hasChanged() || vfwatcher.hasChanged())
248         units=0;
249       else
250         units++;
251     }
252     
253     
254     log.debug("finished waiting.");
255     in_want_to_store_phase=false;
256     return session.vamArchive.getLock();
257   }
258   /**
259    * count handlers for a particular vamsas event 
260    * @param event string enumeration from uk.ac.vamsas.client.Events
261    * @return -1 for an invalid event, otherwise the number of handlers
262    */
263   protected int countHandlersFor(String event) {
264     if (handlers.containsKey(event)) {
265       PropertyChangeSupport handler = (PropertyChangeSupport) handlers.get(event);
266       PropertyChangeListener[] listeners;
267       if (handler!=null)
268         return ((listeners=handler.getPropertyChangeListeners())==null) 
269         ? -1 : listeners.length;
270     }
271     return -1;
272   }
273
274   public void disableDocumentWatch() {
275     vamsasfile.haltWatch();
276   }
277
278   public boolean isDocumentWatchEnabled() {
279     return (vamsasfile!=null) && vamsasfile.isWatchEnabled();
280   }
281
282   public void enableDocumentWatch() {
283     vamsasfile.enableWatch();
284   }
285
286   public boolean isWatcherAlive() {
287     return watchThread!=null && watchThread.running && watchThread.isAlive();
288   }
289
290   public void interruptWatching() {
291     if (watchThread!=null && watchThread.isAlive())
292       watchThread.interrupt();
293     
294   }
295   /**
296    * called to start the session watching thread which generates events
297    */
298   public void startWatching() {
299     enableDocumentWatch();
300     watchThread.start();
301     while (!watchThread.running && watchThread.isAlive())
302       log.debug("Waiting until watcher is really started.");
303   }
304
305   public void stopWatching() {
306     interruptWatching();
307     watchThread.haltWatchers();
308     
309   }
310   
311
312 }