view commons/tools/tests/Test_F_LaunchBlasterInParallel.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
line wrap: on
line source

from commons.core.utils.FileUtils import FileUtils
import unittest
import os
import shutil
from commons.tools.LaunchBlasterInParallel import LaunchBlasterInParallel

class Test_F_LaunchBlasterInParallel(unittest.TestCase):

    CLUSTER_HOST = "compute-2-46.local"

    def setUp(self):
        self._inFilePath = "%s/Tools/DmelChr4_chunks.fa" % os.environ["REPET_DATA"]
        self._configFileName = "TE.cfg"
        self._outputFileName = "out.align.not_over"

    def tearDown(self):
        try:
            os.remove(self._outputFileName)
            os.remove(self._configFileName)
        except:
            pass

    def test_run_as_script_1bank_1file_withoutABA(self):
        self._outputFileName = "out.align"
        os.mkdir("tmp")
        os.chdir("tmp")
        queryFileName = "chunks.fa"
        with open(queryFileName, "w") as f:
            f.write(">chunk1\n")
            f.write("GAATTCGCGTCCGCTTACCCATGTGCCTGTGGATGCCGAACAGGAGGCGCCGTTGACGGC\n")
            f.write("GAATGACTTACTCAAGGGAGTAGCCAATCTGTCGGATACGCCCGGATTGGAGCTGCCCAT\n")
        os.chdir("..")
        subjectFileName = "genome.fa"
        with open(subjectFileName, "w") as f:
            f.write(">chunk1\n")
            f.write("GAATTCGCGTCCGCTTACCCATGTGCCTGTGGATGCCGAACAGGAGGCGCCGTTGACGGC\n")
            f.write("GAATGACTTACTCAAGGGAGTAGCCAATCTGTCGGATACGCCCGGATTGGAGCTGCCCAT\n")
            f.write(">chunk2\n")
            f.write("GAATTCGCGTCCGCTTACCCATGTGCCTGTGGATGCCGAACAGGAGGCGCCGTTGACGGC\n")
            f.write("GAATGACTTACTCAAGGGAGTAGCCAATCTGTCGGATACGCCCGGATTGGAGCTGCCCAT\n")
        expFileName = "expected"
        with open(expFileName, "w") as f:
            f.write("chunk1\t1\t120\tchunk1\t1\t120\t4e-68\t238\t100\n")
            f.write("chunk1\t1\t120\tchunk2\t1\t120\t4e-68\t238\t100\n")
        self._writeConfig(0.1)
        
        cmd = "LaunchBlasterInParallel.py -q %s/tmp -s %s/%s -C %s -o %s" % (os.getcwd(), os.getcwd(), subjectFileName, self._configFileName, self._outputFileName)
        os.system(cmd)
        
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, self._outputFileName))
        FileUtils.removeFilesByPattern("*.param")
        shutil.rmtree("tmp")
        os.remove(subjectFileName)
        os.remove(expFileName)
        
    def test_run_as_script_1bank_2_batches(self):
        os.mkdir("tmp")
        os.chdir("tmp")
        os.symlink("%s/Tools/batches/batch_1.fa" % os.environ["REPET_DATA"], "batch_1.fa")
        os.symlink("%s/Tools/batches/batch_2.fa" % os.environ["REPET_DATA"], "batch_2.fa")
        os.chdir("..")
        
        expFileName = "%s/Tools/DmelChr4.align.not_over" % os.environ["REPET_DATA"]
        self._writeConfig()
        
        cmd = "LaunchBlasterInParallel.py -q %s/tmp -s %s -a -o %s -C %s" % (os.getcwd(), self._inFilePath, self._outputFileName, self._configFileName)
        os.system(cmd)
        
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, self._outputFileName))
        FileUtils.removeFilesByPattern("*.param")
        shutil.rmtree("tmp")
        
    def test_run_1bank_1bank_2_batches(self):
        os.mkdir("tmp")
        os.chdir("tmp")
        os.symlink("%s/Tools/batches/batch_1.fa" % os.environ["REPET_DATA"], "batch_1.fa")
        os.symlink("%s/Tools/batches/batch_2.fa" % os.environ["REPET_DATA"], "batch_2.fa")
        os.chdir("..")
        
        expFileName = "%s/Tools/DmelChr4.align.not_over" % os.environ["REPET_DATA"]
        self._writeConfig()
        
        iLaunchBlaster = LaunchBlasterInParallel(configFileName = self._configFileName, outFileName = self._outputFileName)
        iLaunchBlaster.setDoAllByall(True)
        iLaunchBlaster.setVerbosity(4)
        iLaunchBlaster.setQueryDirectory("%s/tmp" % os.getcwd())
        iLaunchBlaster.setSubjectFilePath(self._inFilePath)
        iLaunchBlaster.run()
        
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, self._outputFileName))
        FileUtils.removeFilesByPattern("*.param")
        shutil.rmtree("tmp")
        
    def _writeConfig(self, eValue = 1e-300):
        with open(self._configFileName, "w") as fh:
            fh.write("[jobs]\n")
            if os.getenv("HOSTNAME") == self.CLUSTER_HOST:
                fh.write("resources: test\n")
            else:
                fh.write("resources:\n")
            fh.write("tmpDir:\n")
            fh.write("copy: no\n") 
            fh.write("clean: yes\n")
            fh.write("\n")
            fh.write("[prepare_data]\n")
            fh.write("chunk_length: 200000\n")
            fh.write("chunk_overlap: 10000\n")
            fh.write("\n")
            fh.write("[alignment]\n")
            fh.write("blast: ncbi\n")
            fh.write("Evalue: %s\n" % eValue)
            fh.write("length: 100\n")
            fh.write("identity: 90\n")

if __name__ == "__main__":
    unittest.main()