/* Copyright (c) 2009 Peter Troshin * Copyright (c) 2013 Alexander Sherstnev * * JAva Bioinformatics Analysis Web Services (JABAWS) * @version: 2.5 * * This library is free software; you can redistribute it and/or modify it under the terms of the * Apache License version 2 as published by the Apache Software Foundation * * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache * License for more details. * * A copy of the license is in apache_license.txt. It is also available here: * @see: http://www.apache.org/licenses/LICENSE-2.0.txt * * Any republication or derived work distributed in source code form * must include this copyright and license notice. */ package compbio.runner.msa; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.text.ParseException; import java.util.Collections; import java.util.List; import javax.xml.bind.JAXBException; import org.ggf.drmaa.DrmaaException; import org.ggf.drmaa.JobInfo; import org.testng.annotations.Test; import compbio.engine.AsyncExecutor; import compbio.engine.Configurator; import compbio.engine.FilePuller; import compbio.engine.SyncExecutor; import compbio.engine.client.Util; import compbio.engine.client.ConfExecutable; import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.Executable.ExecProvider; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterUtil; import compbio.engine.cluster.drmaa.JobRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.conf.RunnerConfigMarshaller; import compbio.engine.local.AsyncLocalRunner; import compbio.engine.local.LocalExecutorService; import compbio.engine.local.LocalRunner; import compbio.metadata.AllTestSuit; import compbio.metadata.ChunkHolder; import compbio.metadata.JobExecutionException; import compbio.metadata.JobStatus; import compbio.metadata.JobSubmissionException; import compbio.metadata.LimitsManager; import compbio.metadata.ResultNotAvailableException; import compbio.metadata.RunnerConfig; import compbio.runner.OptionCombinator; import compbio.util.FileWatcher; import compbio.util.SysPrefs; public class ClustalOTester { static final String clustalConfigFile = AllTestSuit.TEST_DATA_PATH + "ClustaloParameters.xml"; public static String test_outfile = "TO1381.clustalo.out"; public static String cluster_test_outfile = "TO1381.clustalo.cluster.out"; @Test(groups = {AllTestSuit.test_group_runner}) public void RunLocally() { ClustalO clustal = new ClustalO(); clustal.setInput(AllTestSuit.test_input).setOutput(test_outfile); try { // For local execution use relavive ConfiguredExecutable confClustal = Configurator.configureExecutable(clustal, Executable.ExecProvider.Local); LocalRunner lr = new LocalRunner(confClustal); lr.executeJob(); confClustal = (ConfiguredExecutable) lr.waitForResult(); assertNotNull(confClustal.getResults()); } catch (JobSubmissionException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } catch (JobExecutionException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } catch (ResultNotAvailableException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } } @Test(groups = {AllTestSuit.test_group_runner}) public void ConfigurationLoading() { try { RunnerConfig clustalConfig = ConfExecutable.getRunnerOptions(ClustalO.class); assertNotNull(clustalConfig); assertTrue(clustalConfig.getArguments().size() > 0); LimitsManager clustalLimits = ConfExecutable.getRunnerLimits(ClustalO.class); assertNotNull(clustalLimits); assertTrue(clustalLimits.getLimits().size() > 0); } catch (FileNotFoundException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } catch (IOException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } } @Test(groups = {AllTestSuit.test_group_runner}) public void OptionsLocally() { try { RunnerConfigMarshaller clustalmarsh = new RunnerConfigMarshaller(RunnerConfig.class); RunnerConfig clustalConfig = clustalmarsh.read(new FileInputStream(new File(clustalConfigFile)), RunnerConfig.class); OptionCombinator clustalOpc = new OptionCombinator(clustalConfig); List options = clustalOpc.getOptionsAtRandom(); for (int i = 0; i < options.size(); i++) { System.out.println("Using options: " + options); ClustalO clustal = new ClustalO(); clustal.setInput(AllTestSuit.test_input); clustal.setOutput(test_outfile); // For local execution use relavive ConfiguredExecutable confClustal = Configurator.configureExecutable(clustal, ExecProvider.Local); // Add options to the executable confClustal.addParameters(options); LocalRunner lr = new LocalRunner(confClustal); lr.executeJob(); confClustal = (ConfiguredExecutable) lr.waitForResult(); assertNotNull(confClustal.getResults()); Collections.shuffle(options); } } catch (JobSubmissionException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } catch (JobExecutionException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } catch (JAXBException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } catch (ResultNotAvailableException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } catch (FileNotFoundException e) { e.printStackTrace(); fail(e.getLocalizedMessage()); } } public static final void main(String[] args) throws JobSubmissionException, JobExecutionException, InterruptedException { ClustalO clustal = new ClustalO(); clustal.setInput(AllTestSuit.test_input).setOutput(test_outfile); // For local execution use relavive ConfiguredExecutable confClustal = Configurator.configureExecutable(clustal); AsyncExecutor lr = new AsyncLocalRunner(); lr.submitJob(confClustal); Thread.sleep(3000); LocalExecutorService.shutDown(); } @Test(groups = {AllTestSuit.test_group_runner}) public void Persistance() { try { ClustalO clustal = new ClustalO(); clustal.setError("errrr.txt"); clustal.setInput(AllTestSuit.test_input); clustal.setOutput("outtt.txt"); assertEquals(clustal.getInput(), AllTestSuit.test_input); assertEquals(clustal.getError(), "errrr.txt"); assertEquals(clustal.getOutput(), "outtt.txt"); ConfiguredExecutable cClustal = Configurator.configureExecutable(clustal, Executable.ExecProvider.Local); SyncExecutor sexec = Configurator.getSyncEngine(cClustal); sexec.executeJob(); cClustal = (ConfiguredExecutable) sexec.waitForResult(); assertNotNull(cClustal.getResults()); // Save run configuration assertTrue(cClustal.saveRunConfiguration()); // See if loaded configuration is the same as saved RunConfiguration loadedRun = RunConfiguration.load(new FileInputStream(new File(cClustal.getWorkDirectory(), RunConfiguration.rconfigFile))); assertTrue(((ConfExecutable) cClustal) .getRunConfiguration().equals(loadedRun)); // Load run configuration as ConfExecutable ConfiguredExecutable resurrectedCclustal = (ConfiguredExecutable) cClustal .loadRunConfiguration(new FileInputStream(new File(cClustal .getWorkDirectory(), RunConfiguration.rconfigFile))); assertNotNull(resurrectedCclustal); // See in details whether executables are the same assertEquals(resurrectedCclustal.getExecutable(), clustal); // Finally rerun the job in the new task directory ConfiguredExecutable resclustal = Configurator .configureExecutable(resurrectedCclustal.getExecutable(), Executable.ExecProvider.Local); sexec = Configurator.getSyncEngine(resclustal, Executable.ExecProvider.Local); sexec.executeJob(); cClustal = (ConfiguredExecutable) sexec.waitForResult(); assertNotNull(cClustal.getResults()); } catch (JobSubmissionException e) { e.printStackTrace(); fail(e.getMessage()); } catch (JobExecutionException e) { e.printStackTrace(); fail(e.getMessage()); } catch (FileNotFoundException e) { e.printStackTrace(); fail(e.getMessage()); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); } catch (ResultNotAvailableException e) { e.printStackTrace(); fail(e.getMessage()); } } @Test(groups = {AllTestSuit.test_group_runner}) public void readStatistics() throws InterruptedException { try { ClustalO al = new ClustalO(); al.setInput(AllTestSuit.test_input); al.setOutput(test_outfile); ConfiguredExecutable confal = Configurator.configureExecutable(al, Executable.ExecProvider.Local); AsyncExecutor sexec = Configurator.getAsyncEngine(confal); String jobId = sexec.submitJob(confal); String file = Util.getFullPath(confal.getWorkDirectory(), ClustalW.getStatFile()); FilePuller fw = FilePuller.newFilePuller(file, FileWatcher.MIN_CHUNK_SIZE_BYTES); int count = 0; long position = 0; fw.waitForFile(4); JobStatus status = sexec.getJobStatus(jobId); while (status != JobStatus.FINISHED || fw.hasMoreData()) { if (fw.hasMoreData()) { ChunkHolder ch = fw.pull(position); String chunk = ch.getChunk(); position = ch.getNextPosition(); } count++; if ((status == JobStatus.UNDEFINED || status == JobStatus.FAILED)) { fail("job " + jobId +" failed!"); break; } Thread.sleep(200); status = sexec.getJobStatus(jobId); System.out.println("CLustalO: Job status = " + status + ", file status = " + fw.hasMoreData()); } assertTrue(count > 1); ConfiguredExecutable al2 = sexec.getResults(jobId); assertNotNull(al2.getResults()); } catch (JobSubmissionException e) { e.printStackTrace(); fail(e.getMessage()); } catch (ResultNotAvailableException e) { e.printStackTrace(); fail(e.getMessage()); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); } } @Test(groups = {AllTestSuit.test_group_cluster, AllTestSuit.test_group_runner}) public void RunOnCluster() { ClustalO clustal = new ClustalO(); assertFalse(SysPrefs.isWindows, "Cluster execution can only be in unix environment"); clustal.setInput(AllTestSuit.test_input).setOutput(cluster_test_outfile); try { ConfiguredExecutable confClustal = Configurator.configureExecutable(clustal); JobRunner runner = JobRunner.getInstance(confClustal); // ClusterSession csession = JobRunner.getSession(); assertNotNull(runner); runner.executeJob(); // assertNotNull("JobId is null", jobId1); JobStatus status = runner.getJobStatus(); assertTrue(status == JobStatus.PENDING || status == JobStatus.RUNNING); JobInfo info = runner.getJobInfo(); assertNotNull(info); StatisticManager sm = new StatisticManager(info); assertNotNull(sm); try { String exits = sm.getExitStatus(); assertNotNull("Exit status is null", exits); // cut 4 trailing zeros from the number int exitsInt = ClusterUtil.CLUSTER_STAT_IN_SEC.parse(exits).intValue(); assertEquals(0, exitsInt); System.out.println(sm.getAllStats()); } catch (ParseException e) { e.printStackTrace(); fail("Parse Exception: " + e.getMessage()); } // At present the task directory could not be completely removed // @see JobRunner.cleanup() assertFalse(runner.cleanup(), "Could not remove some files whilst cleaning up "); assertTrue(sm.hasExited()); assertFalse(sm.wasAborted()); assertFalse(sm.hasDump()); assertFalse(sm.hasSignaled()); } catch (JobSubmissionException e) { e.printStackTrace(); fail("DrmaaException caught:" + e.getMessage()); } catch (JobExecutionException e) { e.printStackTrace(); fail("DrmaaException caught:" + e.getMessage()); } catch (DrmaaException e) { e.printStackTrace(); fail("DrmaaException caught:" + e.getMessage()); } } @Test(groups = {AllTestSuit.test_group_cluster, AllTestSuit.test_group_runner}) public void readStatisticsClusterExecution() { try { ClustalO clustal = new ClustalO().setInput(AllTestSuit.test_input).setOutput(test_outfile); ConfiguredExecutable confClustal = Configurator.configureExecutable(clustal, Executable.ExecProvider.Cluster); AsyncExecutor sexec = Configurator.getAsyncEngine(confClustal); String jobId = sexec.submitJob(confClustal); String file = Util.getFullPath(confClustal.getWorkDirectory(), ClustalW.getStatFile()); FilePuller fw = FilePuller.newFilePuller(file, FileWatcher.MIN_CHUNK_SIZE_BYTES); int count = 0; long position = 0; int maxloopcount = 108000; // max waiting time = 6h*60m*60s/0.2(one loop sleep) fw.waitForFile(200); /* * Under certain circumstances DRMAA could report the status wrongly * thus this loop never ends. maxloopcount ensures hard stop of the loop... */ while (!( sexec.getJobStatus(jobId) == JobStatus.FINISHED || sexec.getJobStatus(jobId) == JobStatus.FAILED) || count < maxloopcount || fw.hasMoreData()) { ChunkHolder ch = fw.pull(position); String chunk = ch.getChunk(); position = ch.getNextPosition(); System.out.print(chunk); count++; Thread.sleep(200); if (sexec.getJobStatus(jobId) == JobStatus.UNDEFINED) { System.out.println("DRMAA reported wrong status for job + " + jobId + " continue anyway!"); break; } } assertTrue(count > 1); ConfiguredExecutable al = sexec.getResults(jobId); assertNotNull(al.getResults()); } catch (JobSubmissionException e) { e.printStackTrace(); fail(e.getMessage()); } catch (ResultNotAvailableException e) { e.printStackTrace(); fail(e.getMessage()); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); } catch (InterruptedException e) { e.printStackTrace(); fail(e.getMessage()); } } }