comparison SMART/Java/Python/test/Test_F_GetFlanking.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
comparison
equal deleted inserted replaced
17:b0e8584489e6 18:94ab73e8a190
1 import unittest
2 import os, os.path, glob
3 from SMART.Java.Python.structure.Transcript import Transcript
4 from SMART.Java.Python.GetFlanking import GetFlanking
5 from commons.core.writer.Gff3Writer import Gff3Writer
6 from commons.core.parsing.GffParser import GffParser
7
8 class Test_F_GetFlanking(unittest.TestCase):
9
10 def setUp(self):
11 self.queryFileName = "testQuery.gff3"
12 self.referenceFileName = "testReference.gff3"
13 self.outputFileName = "testOutput.gff3"
14
15 def tearDown(self):
16 for fileRoot in (self.queryFileName, self.referenceFileName, self.outputFileName):
17 for file in glob.glob("%s*" % (fileRoot)):
18 os.remove(file)
19
20 def test_run_simple(self):
21 #return
22 reference1 = self._createTranscript("chr1", 1000, 1100, "+", "ref1")
23 reference2 = self._createTranscript("chr1", 2000, 2100, "+", "ref2")
24 reference3 = self._createTranscript("chr1", 1000000, 1200000, "+", "ref3")
25 writer = Gff3Writer(self.referenceFileName, 0)
26 writer.addTranscript(reference1)
27 writer.addTranscript(reference2)
28 writer.addTranscript(reference3)
29 writer.close()
30 query1 = self._createTranscript("chr1", 100, 200, "+", "query1")
31 query2 = self._createTranscript("chr1", 10000, 10100, "+", "query2")
32 writer = Gff3Writer(self.queryFileName, 0)
33 writer.addTranscript(query1)
34 writer.addTranscript(query2)
35 writer.close()
36 gf = GetFlanking(0)
37 gf.setInputFile(self.queryFileName, 'gff3', 0)
38 gf.setInputFile(self.referenceFileName, 'gff3', 1)
39 gf.setOutputFile(self.outputFileName)
40 gf.run()
41 parser = GffParser(self.outputFileName)
42 self.assertEqual(parser.getNbTranscripts(), 2)
43 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
44 if i == 0:
45 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1")
46 self.assertEqual(transcript.getTagValue("flanking"), "ref1")
47 self.assertEqual(transcript.getTagValue("_region_flanking"), "downstream")
48 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear")
49 else:
50 self._checkTranscript(transcript, "chr1", 10000, 10100, "+", "query2")
51 self.assertEqual(transcript.getTagValue("flanking"), "ref2")
52 self.assertEqual(transcript.getTagValue("_region_flanking"), "upstream")
53 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear")
54
55 def test_run_simple_downstream(self):
56 return
57 reference1 = self._createTranscript("chr1", 300, 400, "+", "ref1")
58 reference2 = self._createTranscript("chr1", 1000, 1100, "+", "ref2")
59 writer = Gff3Writer(self.referenceFileName, 0)
60 writer.addTranscript(reference1)
61 writer.addTranscript(reference2)
62 writer.close()
63 query1 = self._createTranscript("chr1", 100, 200, "+", "query1")
64 query2 = self._createTranscript("chr1", 1200, 1300, "+", "query2")
65 query3 = self._createTranscript("chr1", 1400, 1500, "+", "query3")
66 writer = Gff3Writer(self.queryFileName, 0)
67 writer.addTranscript(query1)
68 writer.addTranscript(query2)
69 writer.addTranscript(query3)
70 writer.close()
71 gf = GetFlanking(0)
72 gf.setInputFile(self.queryFileName, 'gff3', 0)
73 gf.setInputFile(self.referenceFileName, 'gff3', 1)
74 gf.setOutputFile(self.outputFileName)
75 gf.addDownstreamDirection(True)
76 gf.run()
77 parser = GffParser(self.outputFileName)
78 self.assertEqual(parser.getNbTranscripts(), 3)
79 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
80 if i == 0:
81 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1")
82 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1")
83 self.assertEqual(transcript.getTagValue("_region_flanking"), "downstream")
84 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear")
85 if i == 1:
86 self._checkTranscript(transcript, "chr1", 1200, 1300, "+", "query2")
87 self.assertIsNone(transcript.getTagValue("flanking_downstream"))
88 if i == 2:
89 self._checkTranscript(transcript, "chr1", 1400, 1500, "+", "query3")
90 self.assertIsNone(transcript.getTagValue("flanking_downstream"))
91
92 def test_run_simple_minus_strand_downstream(self):
93 return
94 reference1 = self._createTranscript("chr1", 1000, 1100, "+", "ref1")
95 reference2 = self._createTranscript("chr1", 2000, 2100, "+", "ref2")
96 writer = Gff3Writer(self.referenceFileName, 0)
97 writer.addTranscript(reference1)
98 writer.addTranscript(reference2)
99 writer.close()
100 query1 = self._createTranscript("chr1", 100, 200, "-", "query1")
101 query2 = self._createTranscript("chr1", 1200, 1300, "-", "query2")
102 query3 = self._createTranscript("chr1", 1400, 1500, "-", "query3")
103 writer = Gff3Writer(self.queryFileName, 0)
104 writer.addTranscript(query1)
105 writer.addTranscript(query2)
106 writer.addTranscript(query3)
107 writer.close()
108 gf = GetFlanking(0)
109 gf.setInputFile(self.queryFileName, 'gff3', 0)
110 gf.setInputFile(self.referenceFileName, 'gff3', 1)
111 gf.setOutputFile(self.outputFileName)
112 gf.addDownstreamDirection(True)
113 gf.run()
114 parser = GffParser(self.outputFileName)
115 self.assertEqual(parser.getNbTranscripts(), 3)
116 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
117 if i == 0:
118 self._checkTranscript(transcript, "chr1", 100, 200, "-", "query1")
119 self.assertIsNone(transcript.getTagValue("flanking_downstream"))
120 if i == 1:
121 self._checkTranscript(transcript, "chr1", 1200, 1300, "-", "query2")
122 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1")
123 if i == 2:
124 self._checkTranscript(transcript, "chr1", 1400, 1500, "-", "query3")
125 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1")
126
127 def test_run_simple_upstream(self):
128 return
129 reference1 = self._createTranscript("chr1", 500, 600, "+", "ref1")
130 reference2 = self._createTranscript("chr1", 700, 800, "+", "ref2")
131 reference3 = self._createTranscript("chr1", 2000, 2100, "+", "ref3")
132 writer = Gff3Writer(self.referenceFileName, 0)
133 writer.addTranscript(reference1)
134 writer.addTranscript(reference2)
135 writer.addTranscript(reference3)
136 writer.close()
137 query1 = self._createTranscript("chr1", 100, 200, "+", "query1")
138 query2 = self._createTranscript("chr1", 300, 400, "+", "query2")
139 query3 = self._createTranscript("chr1", 1200, 1300, "+", "query3")
140 writer = Gff3Writer(self.queryFileName, 0)
141 writer.addTranscript(query1)
142 writer.addTranscript(query2)
143 writer.addTranscript(query3)
144 writer.close()
145 gf = GetFlanking(0)
146 gf.setInputFile(self.queryFileName, 'gff3', 0)
147 gf.setInputFile(self.referenceFileName, 'gff3', 1)
148 gf.setOutputFile(self.outputFileName)
149 gf.addUpstreamDirection(True)
150 gf.run()
151 parser = GffParser(self.outputFileName)
152 self.assertEqual(parser.getNbTranscripts(), 3)
153 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
154 if i == 0:
155 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1")
156 self.assertIsNone(transcript.getTagValue("flanking_upstream"))
157 if i == 1:
158 self._checkTranscript(transcript, "chr1", 300, 400, "+", "query2")
159 self.assertIsNone(transcript.getTagValue("flanking_upstream"))
160 if i == 2:
161 self._checkTranscript(transcript, "chr1", 1200, 1300, "+", "query3")
162 self.assertEqual(transcript.getTagValue("flanking_upstream"), "ref2")
163
164 def test_run_simple_colinear(self):
165 return
166 reference1 = self._createTranscript("chr1", 100, 200, "+", "ref1")
167 reference2 = self._createTranscript("chr1", 1000, 1100, "+", "ref2")
168 reference3 = self._createTranscript("chr1", 1600, 1700, "+", "ref3")
169 writer = Gff3Writer(self.referenceFileName, 0)
170 writer.addTranscript(reference1)
171 writer.addTranscript(reference2)
172 writer.addTranscript(reference3)
173 writer.close()
174 query1 = self._createTranscript("chr1", 1200, 1300, "-", "query1")
175 query2 = self._createTranscript("chr1", 1400, 1500, "+", "query2")
176 writer = Gff3Writer(self.queryFileName, 0)
177 writer.addTranscript(query1)
178 writer.addTranscript(query2)
179 writer.close()
180 gf = GetFlanking(0)
181 gf.setInputFile(self.queryFileName, 'gff3', 0)
182 gf.setInputFile(self.referenceFileName, 'gff3', 1)
183 gf.setOutputFile(self.outputFileName)
184 gf.addUpstreamDirection(True)
185 gf.setColinear(True)
186 gf.run()
187 parser = GffParser(self.outputFileName)
188 self.assertEqual(parser.getNbTranscripts(), 2)
189 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
190 if i == 0:
191 self._checkTranscript(transcript, "chr1", 1200, 1300, "-", "query1")
192 self.assertIsNone(transcript.getTagValue("flanking"))
193 if i == 1:
194 self._checkTranscript(transcript, "chr1", 1400, 1500, "+", "query2")
195 self.assertEqual(transcript.getTagValue("flanking_upstream"), "ref2")
196
197 def test_run_simple_max_distance(self):
198 return
199 reference = self._createTranscript("chr1", 1000, 1100, "+", "ref")
200 writer = Gff3Writer(self.referenceFileName, 0)
201 writer.addTranscript(reference)
202 writer.close()
203 query1 = self._createTranscript("chr1", 2000, 2100, "-", "query1")
204 writer = Gff3Writer(self.queryFileName, 0)
205 writer.addTranscript(query1)
206 writer.close()
207 gf = GetFlanking(0)
208 gf.setInputFile(self.queryFileName, 'gff3', 0)
209 gf.setInputFile(self.referenceFileName, 'gff3', 1)
210 gf.setOutputFile(self.outputFileName)
211 gf.setMaxDistance(100)
212 gf.run()
213 parser = GffParser(self.outputFileName)
214 self.assertEqual(parser.getNbTranscripts(), 1)
215 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
216 if i == 0:
217 self._checkTranscript(transcript, "chr1", 2000, 2100, "-", "query1")
218 self.assertIsNone(transcript.getTagValue("flanking"))
219
220 def _createTranscript(self, chromosome, start, end, strand, name):
221 transcript = Transcript()
222 transcript.setChromosome(chromosome)
223 transcript.setStart(start)
224 transcript.setEnd(end)
225 transcript.setDirection(strand)
226 transcript.setName(name)
227 return transcript
228
229 def _checkTranscript(self, transcript, chromosome, start, end, strand, name):
230 self.assertEqual(transcript.getChromosome(), chromosome)
231 self.assertEqual(transcript.getStart(), start)
232 self.assertEqual(transcript.getEnd(), end)
233 self.assertEqual(transcript.getStrand(), strand)
234 self.assertEqual(transcript.getName(), name)
235
236
237 if __name__ == "__main__":
238 unittest.main()