Mercurial > repos > yufei-luo > s_mart
view commons/core/parsing/test/Test_Multifasta2SNPFileWriter.py @ 14:c79b9ae3f65f
Deleted selected files
author | m-zytnicki |
---|---|
date | Fri, 19 Apr 2013 10:13:11 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
from commons.core.utils.FileUtils import FileUtils from commons.core.seq.BioseqDB import BioseqDB from commons.core.seq.Bioseq import Bioseq from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFileWriter from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFile from commons.core.parsing.Multifasta2SNPFile import ReferenceBioseqAndLinesBioseqDBWrapper from commons.core.LoggerFactory import LoggerFactory import os import logging import unittest class Test_Multifasta2SNPFileWriter(unittest.TestCase): def setUp(self): self._obsSubSNPFile = "SubSNP.csv" self._expSubSNPFile = "ExpSubSNP.csv" self._obsAlleleFile = "Allele.csv" self._expAlleleFile = "ExpAllele.csv" self._obsIndividualFile = "Individual.csv" self._expIndividualFile = "ExpIndividual.csv" self._obsSequenceFSAFile = "Sequences.fsa" self._expSequenceFSAFile = "ExpSequences.fsa" self._obsSequenceCSVFile = "Sequences.csv" self._expSequenceCSVFile = "ExpSequences.csv" self._obsBatchFile = "Batch.txt" self._expBatchFile = "ExpBatch.txt" self._obsBatchLineFile = "BatchLine.csv" self._expBatchLineFile = "ExpBatchLine.csv" self._logFileName = "Test_Multifasta2SNPWriter.log" self._inputFileName = "multifasta.fsa" self._lSNPResult = [] self._dAlleleResult = {} self._lIndividualResult = [] self._refSeq = Bioseq() self._seqDb= BioseqDB() self._logFile = LoggerFactory.createLogger(self._logFileName, logging.INFO, "%(asctime)s %(levelname)s: %(message)s") self._lSequenceWrapper = ReferenceBioseqAndLinesBioseqDBWrapper(self._refSeq, self._seqDb, self._logFile, self._inputFileName) self._lBatchLineResults = [] self._Multifasta2SNPFileWriter = Multifasta2SNPFileWriter() self._inFileName = "multifasta.txt" self._taxon = "Arabidopsis thaliana" def tearDown(self): if FileUtils.isRessourceExists(self._inFileName): os.remove(self._inFileName) if FileUtils.isRessourceExists("multifasta2SNP.log"): os.remove("multifasta2SNP.log") if FileUtils.isRessourceExists("Test_Multifasta2SNPWriter.log"): os.remove("Test_Multifasta2SNPWriter.log") if FileUtils.isRessourceExists(self._obsSubSNPFile): os.remove(self._obsSubSNPFile) if FileUtils.isRessourceExists(self._expSubSNPFile): os.remove(self._expSubSNPFile) if FileUtils.isRessourceExists(self._obsAlleleFile): os.remove(self._obsAlleleFile) if FileUtils.isRessourceExists(self._expAlleleFile): os.remove(self._expAlleleFile) if FileUtils.isRessourceExists(self._obsIndividualFile): os.remove(self._obsIndividualFile) if FileUtils.isRessourceExists(self._expIndividualFile): os.remove(self._expIndividualFile) if FileUtils.isRessourceExists(self._obsSequenceFSAFile): os.remove(self._obsSequenceFSAFile) if FileUtils.isRessourceExists(self._expSequenceFSAFile): os.remove(self._expSequenceFSAFile) if FileUtils.isRessourceExists(self._obsSequenceCSVFile): os.remove(self._obsSequenceCSVFile) if FileUtils.isRessourceExists(self._expSequenceCSVFile): os.remove(self._expSequenceCSVFile) if FileUtils.isRessourceExists(self._obsBatchFile): FileUtils.removeFilesByPattern(self._obsBatchFile) if FileUtils.isRessourceExists(self._expBatchFile): FileUtils.removeFilesByPattern(self._expBatchFile) if FileUtils.isRessourceExists(self._obsBatchLineFile): FileUtils.removeFilesByPattern(self._obsBatchLineFile) if FileUtils.isRessourceExists(self._expBatchLineFile): FileUtils.removeFilesByPattern(self._expBatchLineFile) def test_writeSubSNPFileWithSubSNPList(self): self._lSNPResult = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}] self._writeExpSubSNPFile() self._Multifasta2SNPFileWriter._writeSubSNPFile(self._obsSubSNPFile, self._lSNPResult) self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile)) def test_writeAlleleFileWithAlleleDict(self): self._dAlleleResult['A'] = 1 self._dAlleleResult['C'] = 2 self._dAlleleResult['T'] = 3 self._writeExpAlleleFile() self._Multifasta2SNPFileWriter._writeAlleleFile(self._obsAlleleFile, self._dAlleleResult) self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile)) def test_writeIndividualFileWithIndivList(self): self._lIndividualResult = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"}, {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}] self._writeExpIndividualFile() self._Multifasta2SNPFileWriter._writeIndividualFile(self._obsIndividualFile, self._lIndividualResult) self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile)) def test_writeSequenceFilesWithSequenceWrapper(self): self._writeInputFile() self._writeExpSequenceFiles() batchName = "batch1" taxon = "Arabidopsis thaliana" gene = "methyltransferase" multifasta2SNPFile = Multifasta2SNPFile(batchName, gene, taxon) self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName) lRefseq = [] lRefseq.append(self._lSequenceWrapper._iReferenceBioseq) self._Multifasta2SNPFileWriter._writeSequenceFiles(self._obsSequenceFSAFile, self._obsSequenceCSVFile, lRefseq, taxon) self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile)) self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile)) def test_writeBatchFile(self): self._dBatchResults = {'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"} lBatchResults = [] lBatchResults.append(self._dBatchResults) self._writeExpBatchFile() self._Multifasta2SNPFileWriter._writeBatchFile(self._obsBatchFile, lBatchResults) self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile)) def test_writeBatchLineFile(self): self._lBatchLineResults = [{'IndividualNumber': "1", 'BatchNumber': "1"}, {'IndividualNumber': "2", 'BatchNumber': "1"}] self._writeExpBatchLineFile() self._Multifasta2SNPFileWriter._writeBatchLineFile(self._obsBatchLineFile, self._lBatchLineResults) self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile)) def test_sortAlleleResultByAlleleNumber(self): dAlleleResults = {'A': 3, 'G': 1, 'C': 2} lExpAlleleSortedList = [('G', 1), ('C', 2), ('A', 3)] lObsAlleleSortedList = self._Multifasta2SNPFileWriter.sortAlleleResultByAlleleNumber(dAlleleResults) self.assertEquals(lExpAlleleSortedList, lObsAlleleSortedList) def test_write(self): self._writeInputFile() batchName = "batch1" taxon = "Arabidopsis thaliana" gene = "methyltransferase" multifasta2SNPFile = Multifasta2SNPFile(taxon, batchName, gene) self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName) multifasta2SNPFile._lSubSNPFileResults = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}] multifasta2SNPFile._dAlleleFileResults['A'] = 1 multifasta2SNPFile._dAlleleFileResults['C'] = 2 multifasta2SNPFile._dAlleleFileResults['T'] = 3 multifasta2SNPFile._lIndividualFileResults = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"}, {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}] multifasta2SNPFile._lBatchFileResults = [{'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}] multifasta2SNPFile._lBatchLineFileResults = [{'IndividualNumber': "1", 'BatchNumber': "1"}, {'IndividualNumber': "2", 'BatchNumber': "1"}] self._writeExpSubSNPFile() self._writeExpAlleleFile() self._writeExpIndividualFile() self._writeExpSequenceFiles() self._writeExpBatchFile() self._writeExpBatchLineFile() self._Multifasta2SNPFileWriter.write(multifasta2SNPFile) self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile)) self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile)) self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile)) self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile)) self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile)) self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile)) self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile)) self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile)) def _writeExpSubSNPFile(self): expFile = open(self._expSubSNPFile, "w") expFile.write("SubSNPName;ConfidenceValue;Type;Position;5flank;3flank;Length;BatchNumber;IndividualNumber;PrimerType;PrimerNumber;Forward_or_Reverse;AlleleNumber\n") expFile.write("SubSNP1;A;SNP;1;A;T;1;1;1;Sequence;;;1\n") expFile.write("SubSNP2;A;SNP;10;T;A;1;1;1;Sequence;;;2\n") expFile.write("SubSNP3;A;SNP;20;T;A;1;1;2;Sequence;;;3\n") expFile.close() def _writeExpAlleleFile(self): expFile = open(self._expAlleleFile, "w") expFile.write("AlleleNumber;Value;Motif;NbCopy;Comment\n") expFile.write("1;A;;;\n") expFile.write("2;C;;;\n") expFile.write("3;T;;;\n") expFile.close() def _writeExpIndividualFile(self): expFile = open(self._expIndividualFile, "w") expFile.write("IndividualNumber;IndividualName;Description;AberrAneuploide;FractionLength;DeletionLineSynthesis;UrlEarImage;TypeLine;ChromNumber;ArmChrom;DeletionBin;ScientificName;local_germplasm_name;submitter_code;local_institute;donor_institute;donor_acc_id\n") expFile.write("1;Individual1;;;;;;;;;;Arabidopsis thaliana;;;;;\n") expFile.write("2;Individual2;;;;;;;;;;Arabidopsis thaliana;;;;;\n") expFile.close() def _writeInputFile(self): inFileHandle = open(self._inFileName, "w") inFileHandle.write(">Sequence_de_Reference\n") inFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n") inFileHandle.write(">Line1\n") inFileHandle.write("CCTTAGCCATTGCTTGGTGACTATGAAGGCAGTAGGCAAACCTCCACAATC\n") inFileHandle.write(">Line2\n") inFileHandle.write("CCTAAGCCATTGCTTGGTGACTATCAAGGCAGTAGCCAAACCTCCACAATA") inFileHandle.close() def _writeExpSequenceFiles(self): SequenceFSAFileHandle = open(self._expSequenceFSAFile, "w") SequenceFSAFileHandle.write(">Sequence_de_Reference\n") SequenceFSAFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n") SequenceFSAFileHandle.close() SequenceCSVFileHandle = open(self._expSequenceCSVFile, "w") SequenceCSVFileHandle.write("SequenceName;SeqType;BankName;BankVersion;ACNumber;Locus;ScientificName\n") SequenceCSVFileHandle.write("Sequence_de_Reference;Reference;;;;;Arabidopsis thaliana\n") SequenceCSVFileHandle.close() def _writeExpBatchFile(self): BatchFileHandle = open(self._expBatchFile, "w") BatchFileHandle.write("BatchNumber: 1\n") BatchFileHandle.write("BatchName: batch1\n") BatchFileHandle.write("GeneName: gene1\n") BatchFileHandle.write("Description: \n") BatchFileHandle.write("ContactNumber: \n") BatchFileHandle.write("ProtocolNumber: \n") BatchFileHandle.write("ThematicNumber: \n") BatchFileHandle.write("RefSeqName: Sequence de Reference\n") BatchFileHandle.write("AlignmentFileName: \n") BatchFileHandle.write("SeqName: \n") BatchFileHandle.write("//\n") BatchFileHandle.close() def _writeExpBatchLineFile(self): BatchLineFileHandle = open(self._expBatchLineFile, "w") BatchLineFileHandle.write("IndividualNumber;Pos5;Pos3;BatchNumber;Sequence\n") BatchLineFileHandle.write("1;;;1;\n") BatchLineFileHandle.write("2;;;1;\n") BatchLineFileHandle.close() if __name__ == "__main__": unittest.main()