Mercurial > repos > yufei-luo > s_mart
comparison SMART/Java/Python/test/Test_F_GetFlanking.py @ 18:94ab73e8a190
Uploaded
| author | m-zytnicki |
|---|---|
| date | Mon, 29 Apr 2013 03:20:15 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 17:b0e8584489e6 | 18:94ab73e8a190 |
|---|---|
| 1 import unittest | |
| 2 import os, os.path, glob | |
| 3 from SMART.Java.Python.structure.Transcript import Transcript | |
| 4 from SMART.Java.Python.GetFlanking import GetFlanking | |
| 5 from commons.core.writer.Gff3Writer import Gff3Writer | |
| 6 from commons.core.parsing.GffParser import GffParser | |
| 7 | |
| 8 class Test_F_GetFlanking(unittest.TestCase): | |
| 9 | |
| 10 def setUp(self): | |
| 11 self.queryFileName = "testQuery.gff3" | |
| 12 self.referenceFileName = "testReference.gff3" | |
| 13 self.outputFileName = "testOutput.gff3" | |
| 14 | |
| 15 def tearDown(self): | |
| 16 for fileRoot in (self.queryFileName, self.referenceFileName, self.outputFileName): | |
| 17 for file in glob.glob("%s*" % (fileRoot)): | |
| 18 os.remove(file) | |
| 19 | |
| 20 def test_run_simple(self): | |
| 21 #return | |
| 22 reference1 = self._createTranscript("chr1", 1000, 1100, "+", "ref1") | |
| 23 reference2 = self._createTranscript("chr1", 2000, 2100, "+", "ref2") | |
| 24 reference3 = self._createTranscript("chr1", 1000000, 1200000, "+", "ref3") | |
| 25 writer = Gff3Writer(self.referenceFileName, 0) | |
| 26 writer.addTranscript(reference1) | |
| 27 writer.addTranscript(reference2) | |
| 28 writer.addTranscript(reference3) | |
| 29 writer.close() | |
| 30 query1 = self._createTranscript("chr1", 100, 200, "+", "query1") | |
| 31 query2 = self._createTranscript("chr1", 10000, 10100, "+", "query2") | |
| 32 writer = Gff3Writer(self.queryFileName, 0) | |
| 33 writer.addTranscript(query1) | |
| 34 writer.addTranscript(query2) | |
| 35 writer.close() | |
| 36 gf = GetFlanking(0) | |
| 37 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
| 38 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
| 39 gf.setOutputFile(self.outputFileName) | |
| 40 gf.run() | |
| 41 parser = GffParser(self.outputFileName) | |
| 42 self.assertEqual(parser.getNbTranscripts(), 2) | |
| 43 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
| 44 if i == 0: | |
| 45 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1") | |
| 46 self.assertEqual(transcript.getTagValue("flanking"), "ref1") | |
| 47 self.assertEqual(transcript.getTagValue("_region_flanking"), "downstream") | |
| 48 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear") | |
| 49 else: | |
| 50 self._checkTranscript(transcript, "chr1", 10000, 10100, "+", "query2") | |
| 51 self.assertEqual(transcript.getTagValue("flanking"), "ref2") | |
| 52 self.assertEqual(transcript.getTagValue("_region_flanking"), "upstream") | |
| 53 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear") | |
| 54 | |
| 55 def test_run_simple_downstream(self): | |
| 56 return | |
| 57 reference1 = self._createTranscript("chr1", 300, 400, "+", "ref1") | |
| 58 reference2 = self._createTranscript("chr1", 1000, 1100, "+", "ref2") | |
| 59 writer = Gff3Writer(self.referenceFileName, 0) | |
| 60 writer.addTranscript(reference1) | |
| 61 writer.addTranscript(reference2) | |
| 62 writer.close() | |
| 63 query1 = self._createTranscript("chr1", 100, 200, "+", "query1") | |
| 64 query2 = self._createTranscript("chr1", 1200, 1300, "+", "query2") | |
| 65 query3 = self._createTranscript("chr1", 1400, 1500, "+", "query3") | |
| 66 writer = Gff3Writer(self.queryFileName, 0) | |
| 67 writer.addTranscript(query1) | |
| 68 writer.addTranscript(query2) | |
| 69 writer.addTranscript(query3) | |
| 70 writer.close() | |
| 71 gf = GetFlanking(0) | |
| 72 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
| 73 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
| 74 gf.setOutputFile(self.outputFileName) | |
| 75 gf.addDownstreamDirection(True) | |
| 76 gf.run() | |
| 77 parser = GffParser(self.outputFileName) | |
| 78 self.assertEqual(parser.getNbTranscripts(), 3) | |
| 79 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
| 80 if i == 0: | |
| 81 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1") | |
| 82 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1") | |
| 83 self.assertEqual(transcript.getTagValue("_region_flanking"), "downstream") | |
| 84 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear") | |
| 85 if i == 1: | |
| 86 self._checkTranscript(transcript, "chr1", 1200, 1300, "+", "query2") | |
| 87 self.assertIsNone(transcript.getTagValue("flanking_downstream")) | |
| 88 if i == 2: | |
| 89 self._checkTranscript(transcript, "chr1", 1400, 1500, "+", "query3") | |
| 90 self.assertIsNone(transcript.getTagValue("flanking_downstream")) | |
| 91 | |
| 92 def test_run_simple_minus_strand_downstream(self): | |
| 93 return | |
| 94 reference1 = self._createTranscript("chr1", 1000, 1100, "+", "ref1") | |
| 95 reference2 = self._createTranscript("chr1", 2000, 2100, "+", "ref2") | |
| 96 writer = Gff3Writer(self.referenceFileName, 0) | |
| 97 writer.addTranscript(reference1) | |
| 98 writer.addTranscript(reference2) | |
| 99 writer.close() | |
| 100 query1 = self._createTranscript("chr1", 100, 200, "-", "query1") | |
| 101 query2 = self._createTranscript("chr1", 1200, 1300, "-", "query2") | |
| 102 query3 = self._createTranscript("chr1", 1400, 1500, "-", "query3") | |
| 103 writer = Gff3Writer(self.queryFileName, 0) | |
| 104 writer.addTranscript(query1) | |
| 105 writer.addTranscript(query2) | |
| 106 writer.addTranscript(query3) | |
| 107 writer.close() | |
| 108 gf = GetFlanking(0) | |
| 109 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
| 110 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
| 111 gf.setOutputFile(self.outputFileName) | |
| 112 gf.addDownstreamDirection(True) | |
| 113 gf.run() | |
| 114 parser = GffParser(self.outputFileName) | |
| 115 self.assertEqual(parser.getNbTranscripts(), 3) | |
| 116 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
| 117 if i == 0: | |
| 118 self._checkTranscript(transcript, "chr1", 100, 200, "-", "query1") | |
| 119 self.assertIsNone(transcript.getTagValue("flanking_downstream")) | |
| 120 if i == 1: | |
| 121 self._checkTranscript(transcript, "chr1", 1200, 1300, "-", "query2") | |
| 122 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1") | |
| 123 if i == 2: | |
| 124 self._checkTranscript(transcript, "chr1", 1400, 1500, "-", "query3") | |
| 125 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1") | |
| 126 | |
| 127 def test_run_simple_upstream(self): | |
| 128 return | |
| 129 reference1 = self._createTranscript("chr1", 500, 600, "+", "ref1") | |
| 130 reference2 = self._createTranscript("chr1", 700, 800, "+", "ref2") | |
| 131 reference3 = self._createTranscript("chr1", 2000, 2100, "+", "ref3") | |
| 132 writer = Gff3Writer(self.referenceFileName, 0) | |
| 133 writer.addTranscript(reference1) | |
| 134 writer.addTranscript(reference2) | |
| 135 writer.addTranscript(reference3) | |
| 136 writer.close() | |
| 137 query1 = self._createTranscript("chr1", 100, 200, "+", "query1") | |
| 138 query2 = self._createTranscript("chr1", 300, 400, "+", "query2") | |
| 139 query3 = self._createTranscript("chr1", 1200, 1300, "+", "query3") | |
| 140 writer = Gff3Writer(self.queryFileName, 0) | |
| 141 writer.addTranscript(query1) | |
| 142 writer.addTranscript(query2) | |
| 143 writer.addTranscript(query3) | |
| 144 writer.close() | |
| 145 gf = GetFlanking(0) | |
| 146 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
| 147 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
| 148 gf.setOutputFile(self.outputFileName) | |
| 149 gf.addUpstreamDirection(True) | |
| 150 gf.run() | |
| 151 parser = GffParser(self.outputFileName) | |
| 152 self.assertEqual(parser.getNbTranscripts(), 3) | |
| 153 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
| 154 if i == 0: | |
| 155 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1") | |
| 156 self.assertIsNone(transcript.getTagValue("flanking_upstream")) | |
| 157 if i == 1: | |
| 158 self._checkTranscript(transcript, "chr1", 300, 400, "+", "query2") | |
| 159 self.assertIsNone(transcript.getTagValue("flanking_upstream")) | |
| 160 if i == 2: | |
| 161 self._checkTranscript(transcript, "chr1", 1200, 1300, "+", "query3") | |
| 162 self.assertEqual(transcript.getTagValue("flanking_upstream"), "ref2") | |
| 163 | |
| 164 def test_run_simple_colinear(self): | |
| 165 return | |
| 166 reference1 = self._createTranscript("chr1", 100, 200, "+", "ref1") | |
| 167 reference2 = self._createTranscript("chr1", 1000, 1100, "+", "ref2") | |
| 168 reference3 = self._createTranscript("chr1", 1600, 1700, "+", "ref3") | |
| 169 writer = Gff3Writer(self.referenceFileName, 0) | |
| 170 writer.addTranscript(reference1) | |
| 171 writer.addTranscript(reference2) | |
| 172 writer.addTranscript(reference3) | |
| 173 writer.close() | |
| 174 query1 = self._createTranscript("chr1", 1200, 1300, "-", "query1") | |
| 175 query2 = self._createTranscript("chr1", 1400, 1500, "+", "query2") | |
| 176 writer = Gff3Writer(self.queryFileName, 0) | |
| 177 writer.addTranscript(query1) | |
| 178 writer.addTranscript(query2) | |
| 179 writer.close() | |
| 180 gf = GetFlanking(0) | |
| 181 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
| 182 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
| 183 gf.setOutputFile(self.outputFileName) | |
| 184 gf.addUpstreamDirection(True) | |
| 185 gf.setColinear(True) | |
| 186 gf.run() | |
| 187 parser = GffParser(self.outputFileName) | |
| 188 self.assertEqual(parser.getNbTranscripts(), 2) | |
| 189 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
| 190 if i == 0: | |
| 191 self._checkTranscript(transcript, "chr1", 1200, 1300, "-", "query1") | |
| 192 self.assertIsNone(transcript.getTagValue("flanking")) | |
| 193 if i == 1: | |
| 194 self._checkTranscript(transcript, "chr1", 1400, 1500, "+", "query2") | |
| 195 self.assertEqual(transcript.getTagValue("flanking_upstream"), "ref2") | |
| 196 | |
| 197 def test_run_simple_max_distance(self): | |
| 198 return | |
| 199 reference = self._createTranscript("chr1", 1000, 1100, "+", "ref") | |
| 200 writer = Gff3Writer(self.referenceFileName, 0) | |
| 201 writer.addTranscript(reference) | |
| 202 writer.close() | |
| 203 query1 = self._createTranscript("chr1", 2000, 2100, "-", "query1") | |
| 204 writer = Gff3Writer(self.queryFileName, 0) | |
| 205 writer.addTranscript(query1) | |
| 206 writer.close() | |
| 207 gf = GetFlanking(0) | |
| 208 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
| 209 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
| 210 gf.setOutputFile(self.outputFileName) | |
| 211 gf.setMaxDistance(100) | |
| 212 gf.run() | |
| 213 parser = GffParser(self.outputFileName) | |
| 214 self.assertEqual(parser.getNbTranscripts(), 1) | |
| 215 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
| 216 if i == 0: | |
| 217 self._checkTranscript(transcript, "chr1", 2000, 2100, "-", "query1") | |
| 218 self.assertIsNone(transcript.getTagValue("flanking")) | |
| 219 | |
| 220 def _createTranscript(self, chromosome, start, end, strand, name): | |
| 221 transcript = Transcript() | |
| 222 transcript.setChromosome(chromosome) | |
| 223 transcript.setStart(start) | |
| 224 transcript.setEnd(end) | |
| 225 transcript.setDirection(strand) | |
| 226 transcript.setName(name) | |
| 227 return transcript | |
| 228 | |
| 229 def _checkTranscript(self, transcript, chromosome, start, end, strand, name): | |
| 230 self.assertEqual(transcript.getChromosome(), chromosome) | |
| 231 self.assertEqual(transcript.getStart(), start) | |
| 232 self.assertEqual(transcript.getEnd(), end) | |
| 233 self.assertEqual(transcript.getStrand(), strand) | |
| 234 self.assertEqual(transcript.getName(), name) | |
| 235 | |
| 236 | |
| 237 if __name__ == "__main__": | |
| 238 unittest.main() |
