- AsyncExecutor sexec = Configurator.getAsyncEngine(confJronn);\r
- String jobId = sexec.submitJob(confJronn);\r
- FilePuller fw = FilePuller.newFilePuller(confJronn\r
- .getWorkDirectory()\r
- + File.separator + Jronn.getStatFile(),\r
- FileWatcher.MIN_CHUNK_SIZE_BYTES);\r
- int count = 0;\r
- long position = 0;\r
- fw.waitForFile(4);\r
- JobStatus status = sexec.getJobStatus(jobId);\r
- while (status != JobStatus.FINISHED\r
- || fw.hasMoreData()) {\r
- ChunkHolder ch = fw.pull(position);\r
- String chunk = ch.getChunk();\r
- position = ch.getNextPosition();\r
- System.out.print("C: "+chunk);\r
- count++;\r
- // Make sure the loop is terminated if the job fails\r
- if((status == JobStatus.UNDEFINED || status== JobStatus.FAILED)) {\r
- break;\r
- }\r
- status = sexec.getJobStatus(jobId);\r
- }\r
- System.out.println("\nOut of the LOOP!! ");\r
- assertTrue(count > 1);\r
- ConfiguredExecutable<?> al = sexec.getResults(jobId);\r
- assertNotNull(al.getResults());\r
- } catch (JobSubmissionException e) {\r
- e.printStackTrace();\r
- fail(e.getMessage());\r
- } catch (ResultNotAvailableException e) {\r
- e.printStackTrace();\r
- fail(e.getMessage());\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- fail(e.getMessage());\r
+ AsyncExecutor sexec = Configurator.getAsyncEngine(confJronn);\r
+ String jobId = sexec.submitJob(confJronn);\r
+ FilePuller fw = FilePuller.newFilePuller(confJronn\r
+ .getWorkDirectory()\r
+ + File.separator + Jronn.getStatFile(),\r
+ FileWatcher.MIN_CHUNK_SIZE_BYTES);\r
+ int count = 0;\r
+ long position = 0;\r
+ fw.waitForFile(4);\r
+ JobStatus status = sexec.getJobStatus(jobId);\r
+ while (status != JobStatus.FINISHED) {\r
+ System.out.println((status != JobStatus.FINISHED) + " d: "\r
+ + fw.hasMoreData());\r
+ if (fw.hasMoreData()) {\r
+ ChunkHolder ch = fw.pull(position);\r
+ System.out.println("p:" + position);\r
+ String chunk = ch.getChunk();\r
+ position = ch.getNextPosition();\r
+ System.out.println("np " + position + " c: " + chunk);\r