package compbio.cassandra; import java.util.Arrays; import java.util.List; import me.prettyprint.cassandra.serializers.LongSerializer; import me.prettyprint.cassandra.serializers.StringSerializer; import me.prettyprint.cassandra.service.ThriftKsDef; import me.prettyprint.hector.api.Cluster; import me.prettyprint.hector.api.Keyspace; import me.prettyprint.hector.api.beans.ColumnSlice; import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition; import me.prettyprint.hector.api.ddl.ComparatorType; import me.prettyprint.hector.api.ddl.KeyspaceDefinition; import me.prettyprint.hector.api.factory.HFactory; import me.prettyprint.hector.api.mutation.Mutator; import me.prettyprint.hector.api.query.QueryResult; import me.prettyprint.hector.api.query.SliceQuery; public class CassandraCreate { private static Keyspace ksp; private static Cluster cluster; private static Mutator mutatorLong; private static Mutator mutatorString; private static Mutator mutatorLog; StringSerializer ss = StringSerializer.get(); LongSerializer ls = LongSerializer.get(); /* * connect to the cluster and look weather the dababase has any data inside */ public void Connection() { cluster = HFactory.getOrCreateCluster("Protein Cluster", "127.0.0.1:9160"); KeyspaceDefinition keyspaceDef = cluster.describeKeyspace("ProteinKeyspace"); /* * If keyspace does not exist, the CFs don't exist either. => create * them. */ if (keyspaceDef == null) { // create column family System.out.println("ProteinKeyspace has been null"); ColumnFamilyDefinition cfProtein = HFactory.createColumnFamilyDefinition("ProteinKeyspace", "ProteinRow", ComparatorType.ASCIITYPE); ColumnFamilyDefinition cfLog = HFactory.createColumnFamilyDefinition("ProteinKeyspace", "ProteinLog", ComparatorType.ASCIITYPE); ColumnFamilyDefinition cfData = HFactory.createColumnFamilyDefinition("ProteinKeyspace", "ProteinData", ComparatorType.ASCIITYPE); KeyspaceDefinition newKeyspace = HFactory.createKeyspaceDefinition("ProteinKeyspace", ThriftKsDef.DEF_STRATEGY_CLASS, 1, Arrays.asList(cfProtein, cfLog, cfData)); /* * Add the schema to the cluster. "true" as the second param means * that Hector will be blocked until all nodes see the change. */ cluster.addKeyspace(newKeyspace, true); cluster.addColumnFamily(cfProtein, true); cluster.addColumnFamily(cfLog, true); cluster.addColumnFamily(cfData, true); } else { System.out.println("Data loaded"); } ksp = HFactory.createKeyspace("ProteinKeyspace", cluster); System.out.println("Cassandra has been connected"); } /* * parsing data source and filling the database */ public void Parsing(String source) { /* * CF ProteinRow store protein and prediction */ mutatorString = HFactory.createMutator(ksp, ss); /* * ProteinLog stores logging info: IP, job id, start date and end date */ mutatorLog = HFactory.createMutator(ksp, ss); /* * CF ProteinData store id and protein per data */ mutatorLong = HFactory.createMutator(ksp, ls); if (source.equals("http")) { // get data from real Jpred production server System.out.println("Parsing web data source......"); String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat"; String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results"; JpredParserHTTP parser = new JpredParserHTTP(prefix); parser.Parsing(datasrc, 4); flushData(); } else if (source.equals("file")) { // get irtifical data generated for the DB stress tests System.out.println("Parsing local file data source......"); String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat"; String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata"; JpredParserLocalFile parser = new JpredParserLocalFile(prefix); parser.Parsing(datasrc, 365); flushData(); } else { System.out.println("Unknown data source......"); } } public void flushData() { mutatorString.execute(); mutatorLong.execute(); mutatorLog.execute(); // System.out.println("Flush new data..."); } public void Closing() { cluster.getConnectionManager().shutdown(); System.out.println("Cassandra has been shut down"); } /* * check whether the job id exists in the DB */ public boolean CheckID(String jobid) { SliceQuery sliceQuery = HFactory.createSliceQuery(ksp, ss, ss, ss); sliceQuery.setColumnFamily("ProteinLog").setKey(jobid).setRange("", "", false, 100); QueryResult> result = sliceQuery.execute(); if (result.get().getColumns().size() > 0) { return true; } return false; } /* * prepare data for insertion into the db */ public void InsertData(long dataWork, String dataBegin, String dataEnd, String ip, String id, String statusEx, String statusFinal, String protein, List jnetpred) { mutatorLog.addInsertion(id, "ProteinLog", HFactory.createColumn("ip", ip, ss, ss)) .addInsertion(id, "ProteinLog", HFactory.createColumn("DataBegin", dataBegin, ss, ss)) .addInsertion(id, "ProteinLog", HFactory.createColumn("DataEnd", dataEnd, ss, ss)) .addInsertion(id, "ProteinLog", HFactory.createColumn("Status ex", statusEx, ss, ss)) .addInsertion(id, "ProteinLog", HFactory.createColumn("Status final", statusFinal, ss, ss)) .addInsertion(id, "ProteinLog", HFactory.createColumn("Protein", protein, ss, ss)); for (int i = 0; i < jnetpred.size(); i++) { String namepred = jnetpred.get(i).getId(); String pred = jnetpred.get(i).getSequence().replaceAll("\n", ""); mutatorString.addInsertion(protein, "ProteinRow", HFactory.createColumn(id + ";" + namepred, pred, ss, ss)); } mutatorLong.addInsertion(dataWork, "ProteinData", HFactory.createColumn(id, protein, ss, ss)); } public Keyspace GetKeyspace() { return ksp; } }