d04083815cc579b8cebc567f6bba74a9303cf6a1
[jabaws.git] / engine / compbio / engine / FilePuller.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine;\r
20 \r
21 import java.io.File;\r
22 import java.io.IOException;\r
23 import java.io.UnsupportedEncodingException;\r
24 import java.util.concurrent.Delayed;\r
25 import java.util.concurrent.TimeUnit;\r
26 \r
27 import org.apache.log4j.Logger;\r
28 \r
29 import compbio.engine.client.PathValidator;\r
30 import compbio.metadata.ChunkHolder;\r
31 import compbio.util.FileWatcher;\r
32 \r
33 public class FilePuller implements Delayed {\r
34 \r
35     private static final Logger log = Logger.getLogger(FilePuller.class);\r
36 \r
37     // 5 minutes in nanosec\r
38     private static long defaultDelay = 1000 * 1000 * 1000L * 5 * 60;\r
39     private final File file;\r
40     private long lastAccessTime;\r
41     private FileWatcher watcher;\r
42     final int chunkSize;\r
43 \r
44     // used for testing only\r
45     long delay = 0;\r
46 \r
47     private FilePuller(String file, int size) {\r
48         FileWatcher.validateInput(file, size);\r
49         if (compbio.util.Util.isEmpty(file)) {\r
50             throw new NullPointerException("File name must be provided!");\r
51         }\r
52         // The fact that the file may not exist at a time does not matter here\r
53         if (!PathValidator.isAbsolutePath(file)) {\r
54             throw new IllegalArgumentException("Absolute path to the File "\r
55                     + file + " is expected but not provided!");\r
56         }\r
57         this.file = new File(file);\r
58         this.chunkSize = size;\r
59         this.lastAccessTime = System.nanoTime();\r
60     }\r
61 \r
62     private FilePuller(String file) {\r
63         if (compbio.util.Util.isEmpty(file)) {\r
64             throw new NullPointerException("File name must be provided!");\r
65         }\r
66         // The fact that the file may not exist at a time does not matter here\r
67         if (!PathValidator.isAbsolutePath(file)) {\r
68             throw new IllegalArgumentException("Absolute path to the File "\r
69                     + file + " is expected but not provided!");\r
70         }\r
71         this.file = new File(file);\r
72         this.chunkSize = 3;\r
73         this.lastAccessTime = System.nanoTime();\r
74     }\r
75 \r
76     public static FilePuller newFilePuller(String file, int chunkSize) {\r
77         return new FilePuller(file, chunkSize);\r
78     }\r
79 \r
80     /**\r
81      * Progress Puller is designed to read 3 characters from the beginning of\r
82      * the file, nothing more. Intended to be used in conjunction with a tool\r
83      * which output progress as a percent value from 0 to 100. In any cases\r
84      * progress could not be more than the largest byte value 255.\r
85      * \r
86      * @param file\r
87      * @return\r
88      */\r
89     public static FilePuller newProgressPuller(String file) {\r
90         return new FilePuller(file);\r
91     }\r
92 \r
93     public ChunkHolder pull(long position) throws IOException {\r
94         initPull();\r
95         String valueUTF16 = watcher.pull(position);\r
96         String value = null;\r
97         if (valueUTF16 != null) {\r
98             value = removeInvalidXMLCharacters(valueUTF16);\r
99         }\r
100         return new ChunkHolder(value, watcher.getCursorPosition());\r
101     }\r
102 \r
103     /**\r
104      * This method ensures that the output String has only valid XML unicode\r
105      * characters as specified by the XML 1.0 standard. For reference, please\r
106      * see the standard.\r
107      * \r
108      * @param The\r
109      *            String whose non-valid characters we want to remove.\r
110      * \r
111      * @return The in String, stripped of non-valid characters.\r
112      */\r
113     static String removeInvalidXMLCharacters(String str) {\r
114         assert str != null;\r
115 \r
116         StringBuilder out = new StringBuilder(); // Used to hold the output.\r
117         int codePoint; // Used to reference the current character.\r
118 \r
119         // For test\r
120         // String ss = "\ud801\udc00"; // This is actualy one unicode character,\r
121         // represented by two code units!!!.\r
122         // System.out.println(ss.codePointCount(0, ss.length()));// See: 1\r
123         int i = 0;\r
124         String value = null;\r
125         try {\r
126             // make sure the string contain only UTF-8 characters\r
127             value = new String(str.getBytes("UTF-8"), "UTF-8");\r
128         } catch (UnsupportedEncodingException e) {\r
129             // will not happen\r
130             throw new AssertionError("UTF-8 charset is not supported!!!");\r
131         }\r
132         while (i < value.length()) {\r
133             codePoint = value.codePointAt(i); // This is the unicode code of the\r
134             // character.\r
135             if ((codePoint == 0x9)\r
136                     || // Consider testing larger ranges first to\r
137                     // improve speed.\r
138                     (codePoint == 0xA) || (codePoint == 0xD)\r
139                     || ((codePoint >= 0x20) && (codePoint <= 0xD7FF))\r
140                     || ((codePoint >= 0xE000) && (codePoint <= 0xFFFD))\r
141                     || ((codePoint >= 0x10000) && (codePoint <= 0x10FFFF))) {\r
142 \r
143                 out.append(Character.toChars(codePoint));\r
144             }\r
145 \r
146             i += Character.charCount(codePoint);\r
147             /*\r
148              * Increment with the number of code units(java chars) needed to\r
149              * represent a Unicode char.\r
150              */\r
151         }\r
152         return out.toString();\r
153     }\r
154 \r
155     public void initPull() {\r
156         this.lastAccessTime = System.nanoTime();\r
157         if (!isFileCreated()) {\r
158             throw new IllegalStateException("File " + file.getAbsolutePath()\r
159                     + " has not been created yet! Cannot pull.");\r
160         }\r
161         if (watcher == null) {\r
162             init();\r
163         }\r
164     }\r
165 \r
166     public String getFile() {\r
167         return file.getAbsolutePath();\r
168     }\r
169 \r
170     public boolean isFileCreated() {\r
171         this.lastAccessTime = System.nanoTime();\r
172         return file.exists();\r
173     }\r
174 \r
175     public void waitForFile(long maxWaitSeconds) {\r
176         long waited = 0;\r
177         int step = 500;\r
178         while (true) {\r
179             if (isFileCreated()) {\r
180                 break;\r
181             }\r
182             try {\r
183                 Thread.sleep(step);\r
184                 // TODO is this needed? this.lastAccessTime = System.nanoTime();\r
185             } catch (InterruptedException e) {\r
186                 // Propagate interruption up the stack trace for anyone\r
187                 // interested\r
188                 Thread.currentThread().interrupt();\r
189                 log.debug("Thread interruped during waiting for file "\r
190                         + file.getAbsolutePath() + " to be created. Message: "\r
191                         + e.getMessage());\r
192                 break;\r
193             }\r
194             waited += step;\r
195             if (waited / 1000 >= maxWaitSeconds) {\r
196                 break;\r
197             }\r
198         }\r
199     }\r
200 \r
201     public boolean hasMoreData() throws IOException {\r
202         this.lastAccessTime = System.nanoTime();\r
203         if (!isFileCreated()) {\r
204             throw new IllegalStateException("File " + file.getAbsolutePath()\r
205                     + " has not been created yet! Cannot pull.");\r
206         }\r
207         if (watcher == null) {\r
208             init();\r
209         }\r
210         return watcher.hasMore();\r
211     }\r
212 \r
213     private synchronized void init() {\r
214         // Check watcher==null is duplicated to avoid adding synchronization to\r
215         // access methods\r
216         if (watcher == null) {\r
217             if (chunkSize < FileWatcher.MIN_CHUNK_SIZE_BYTES) {\r
218                 watcher = FileWatcher\r
219                         .newProgressWatcher(file.getAbsolutePath());\r
220                 log.debug("Init Progress watcher with file: "\r
221                         + file.getAbsolutePath());\r
222             } else {\r
223                 watcher = FileWatcher.newFileWatcher(file.getAbsolutePath(),\r
224                         chunkSize);\r
225                 log.debug("Init File watcher with file: "\r
226                         + file.getAbsolutePath());\r
227             }\r
228 \r
229         }\r
230     }\r
231 \r
232     @Override\r
233     public int compareTo(Delayed o) {\r
234         return new Long(this.getDelay(TimeUnit.NANOSECONDS)).compareTo(o\r
235                 .getDelay(TimeUnit.NANOSECONDS));\r
236     }\r
237 \r
238     /*\r
239      * This must return remaining delay associated with the object!\r
240      * (non-Javadoc)\r
241      * \r
242      * @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)\r
243      */\r
244     @Override\r
245     public long getDelay(final TimeUnit unit) {\r
246         long idleTime = System.nanoTime() - lastAccessTime;\r
247         long delayVal = (delay == 0 ? defaultDelay : delay);\r
248         return unit.convert(delayVal - idleTime, TimeUnit.NANOSECONDS);\r
249     }\r
250 \r
251     void setDelay(long delay, TimeUnit unit) {\r
252         assert delay > 0;\r
253         this.delay = TimeUnit.NANOSECONDS.convert(delay, unit);\r
254         assert delay < defaultDelay;\r
255     }\r
256 \r
257     long getDelayValue(TimeUnit unit) {\r
258         return (delay == 0 ? unit.convert(defaultDelay, TimeUnit.NANOSECONDS)\r
259                 : unit.convert(delay, TimeUnit.NANOSECONDS));\r
260     }\r
261 \r
262     public void disconnect() {\r
263         synchronized (this) {\r
264             if (watcher != null) {\r
265                 watcher.disconnect();\r
266                 watcher = null;\r
267             }\r
268         }\r
269     }\r
270 \r
271     boolean disconnected() {\r
272         return watcher == null;\r
273     }\r
274 \r
275     @Override\r
276     public String toString() {\r
277         String value = "File: " + this.file.getAbsolutePath() + "\n";\r
278         value += "Delay (s): " + getDelayValue(TimeUnit.SECONDS) + "\n";\r
279         long exp = getDelay(TimeUnit.MILLISECONDS);\r
280         if (exp > 0) {\r
281             value += "Expire in (ms): " + exp + "\n";\r
282         } else {\r
283             value += "This object has expired" + "\n";\r
284         }\r
285         value += "ChunkSize  " + this.chunkSize + "\n";\r
286         return value;\r
287     }\r
288 \r
289     @Override\r
290     public boolean equals(Object obj) {\r
291         if (obj == null) {\r
292             return false;\r
293         }\r
294         if (!(obj instanceof FilePuller)) {\r
295             return false;\r
296         }\r
297         FilePuller fp = (FilePuller) obj;\r
298         if (!this.file.equals(fp.file)) {\r
299             return false;\r
300         }\r
301         // Other fields does not matter\r
302         return true;\r
303     }\r
304 \r
305     @Override\r
306     public int hashCode() {\r
307         return this.file.hashCode();\r
308     }\r
309 \r
310     public byte getProgress() throws IOException {\r
311         initPull();\r
312         return watcher.getProgress();\r
313     }\r
314 \r
315 }\r