18
|
1 import unittest
|
|
2 import os, os.path, glob
|
|
3 from SMART.Java.Python.structure.Transcript import Transcript
|
|
4 from SMART.Java.Python.GetFlanking import GetFlanking
|
|
5 from commons.core.writer.Gff3Writer import Gff3Writer
|
|
6 from commons.core.parsing.GffParser import GffParser
|
|
7
|
|
8 class Test_F_GetFlanking(unittest.TestCase):
|
|
9
|
|
10 def setUp(self):
|
|
11 self.queryFileName = "testQuery.gff3"
|
|
12 self.referenceFileName = "testReference.gff3"
|
|
13 self.outputFileName = "testOutput.gff3"
|
|
14
|
|
15 def tearDown(self):
|
|
16 for fileRoot in (self.queryFileName, self.referenceFileName, self.outputFileName):
|
|
17 for file in glob.glob("%s*" % (fileRoot)):
|
|
18 os.remove(file)
|
|
19
|
|
20 def test_run_simple(self):
|
|
21 #return
|
|
22 reference1 = self._createTranscript("chr1", 1000, 1100, "+", "ref1")
|
|
23 reference2 = self._createTranscript("chr1", 2000, 2100, "+", "ref2")
|
|
24 reference3 = self._createTranscript("chr1", 1000000, 1200000, "+", "ref3")
|
|
25 writer = Gff3Writer(self.referenceFileName, 0)
|
|
26 writer.addTranscript(reference1)
|
|
27 writer.addTranscript(reference2)
|
|
28 writer.addTranscript(reference3)
|
|
29 writer.close()
|
|
30 query1 = self._createTranscript("chr1", 100, 200, "+", "query1")
|
|
31 query2 = self._createTranscript("chr1", 10000, 10100, "+", "query2")
|
|
32 writer = Gff3Writer(self.queryFileName, 0)
|
|
33 writer.addTranscript(query1)
|
|
34 writer.addTranscript(query2)
|
|
35 writer.close()
|
|
36 gf = GetFlanking(0)
|
|
37 gf.setInputFile(self.queryFileName, 'gff3', 0)
|
|
38 gf.setInputFile(self.referenceFileName, 'gff3', 1)
|
|
39 gf.setOutputFile(self.outputFileName)
|
|
40 gf.run()
|
|
41 parser = GffParser(self.outputFileName)
|
|
42 self.assertEqual(parser.getNbTranscripts(), 2)
|
|
43 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
|
|
44 if i == 0:
|
|
45 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1")
|
|
46 self.assertEqual(transcript.getTagValue("flanking"), "ref1")
|
|
47 self.assertEqual(transcript.getTagValue("_region_flanking"), "downstream")
|
|
48 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear")
|
|
49 else:
|
|
50 self._checkTranscript(transcript, "chr1", 10000, 10100, "+", "query2")
|
|
51 self.assertEqual(transcript.getTagValue("flanking"), "ref2")
|
|
52 self.assertEqual(transcript.getTagValue("_region_flanking"), "upstream")
|
|
53 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear")
|
|
54
|
|
55 def test_run_simple_downstream(self):
|
|
56 return
|
|
57 reference1 = self._createTranscript("chr1", 300, 400, "+", "ref1")
|
|
58 reference2 = self._createTranscript("chr1", 1000, 1100, "+", "ref2")
|
|
59 writer = Gff3Writer(self.referenceFileName, 0)
|
|
60 writer.addTranscript(reference1)
|
|
61 writer.addTranscript(reference2)
|
|
62 writer.close()
|
|
63 query1 = self._createTranscript("chr1", 100, 200, "+", "query1")
|
|
64 query2 = self._createTranscript("chr1", 1200, 1300, "+", "query2")
|
|
65 query3 = self._createTranscript("chr1", 1400, 1500, "+", "query3")
|
|
66 writer = Gff3Writer(self.queryFileName, 0)
|
|
67 writer.addTranscript(query1)
|
|
68 writer.addTranscript(query2)
|
|
69 writer.addTranscript(query3)
|
|
70 writer.close()
|
|
71 gf = GetFlanking(0)
|
|
72 gf.setInputFile(self.queryFileName, 'gff3', 0)
|
|
73 gf.setInputFile(self.referenceFileName, 'gff3', 1)
|
|
74 gf.setOutputFile(self.outputFileName)
|
|
75 gf.addDownstreamDirection(True)
|
|
76 gf.run()
|
|
77 parser = GffParser(self.outputFileName)
|
|
78 self.assertEqual(parser.getNbTranscripts(), 3)
|
|
79 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
|
|
80 if i == 0:
|
|
81 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1")
|
|
82 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1")
|
|
83 self.assertEqual(transcript.getTagValue("_region_flanking"), "downstream")
|
|
84 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear")
|
|
85 if i == 1:
|
|
86 self._checkTranscript(transcript, "chr1", 1200, 1300, "+", "query2")
|
|
87 self.assertIsNone(transcript.getTagValue("flanking_downstream"))
|
|
88 if i == 2:
|
|
89 self._checkTranscript(transcript, "chr1", 1400, 1500, "+", "query3")
|
|
90 self.assertIsNone(transcript.getTagValue("flanking_downstream"))
|
|
91
|
|
92 def test_run_simple_minus_strand_downstream(self):
|
|
93 return
|
|
94 reference1 = self._createTranscript("chr1", 1000, 1100, "+", "ref1")
|
|
95 reference2 = self._createTranscript("chr1", 2000, 2100, "+", "ref2")
|
|
96 writer = Gff3Writer(self.referenceFileName, 0)
|
|
97 writer.addTranscript(reference1)
|
|
98 writer.addTranscript(reference2)
|
|
99 writer.close()
|
|
100 query1 = self._createTranscript("chr1", 100, 200, "-", "query1")
|
|
101 query2 = self._createTranscript("chr1", 1200, 1300, "-", "query2")
|
|
102 query3 = self._createTranscript("chr1", 1400, 1500, "-", "query3")
|
|
103 writer = Gff3Writer(self.queryFileName, 0)
|
|
104 writer.addTranscript(query1)
|
|
105 writer.addTranscript(query2)
|
|
106 writer.addTranscript(query3)
|
|
107 writer.close()
|
|
108 gf = GetFlanking(0)
|
|
109 gf.setInputFile(self.queryFileName, 'gff3', 0)
|
|
110 gf.setInputFile(self.referenceFileName, 'gff3', 1)
|
|
111 gf.setOutputFile(self.outputFileName)
|
|
112 gf.addDownstreamDirection(True)
|
|
113 gf.run()
|
|
114 parser = GffParser(self.outputFileName)
|
|
115 self.assertEqual(parser.getNbTranscripts(), 3)
|
|
116 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
|
|
117 if i == 0:
|
|
118 self._checkTranscript(transcript, "chr1", 100, 200, "-", "query1")
|
|
119 self.assertIsNone(transcript.getTagValue("flanking_downstream"))
|
|
120 if i == 1:
|
|
121 self._checkTranscript(transcript, "chr1", 1200, 1300, "-", "query2")
|
|
122 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1")
|
|
123 if i == 2:
|
|
124 self._checkTranscript(transcript, "chr1", 1400, 1500, "-", "query3")
|
|
125 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1")
|
|
126
|
|
127 def test_run_simple_upstream(self):
|
|
128 return
|
|
129 reference1 = self._createTranscript("chr1", 500, 600, "+", "ref1")
|
|
130 reference2 = self._createTranscript("chr1", 700, 800, "+", "ref2")
|
|
131 reference3 = self._createTranscript("chr1", 2000, 2100, "+", "ref3")
|
|
132 writer = Gff3Writer(self.referenceFileName, 0)
|
|
133 writer.addTranscript(reference1)
|
|
134 writer.addTranscript(reference2)
|
|
135 writer.addTranscript(reference3)
|
|
136 writer.close()
|
|
137 query1 = self._createTranscript("chr1", 100, 200, "+", "query1")
|
|
138 query2 = self._createTranscript("chr1", 300, 400, "+", "query2")
|
|
139 query3 = self._createTranscript("chr1", 1200, 1300, "+", "query3")
|
|
140 writer = Gff3Writer(self.queryFileName, 0)
|
|
141 writer.addTranscript(query1)
|
|
142 writer.addTranscript(query2)
|
|
143 writer.addTranscript(query3)
|
|
144 writer.close()
|
|
145 gf = GetFlanking(0)
|
|
146 gf.setInputFile(self.queryFileName, 'gff3', 0)
|
|
147 gf.setInputFile(self.referenceFileName, 'gff3', 1)
|
|
148 gf.setOutputFile(self.outputFileName)
|
|
149 gf.addUpstreamDirection(True)
|
|
150 gf.run()
|
|
151 parser = GffParser(self.outputFileName)
|
|
152 self.assertEqual(parser.getNbTranscripts(), 3)
|
|
153 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
|
|
154 if i == 0:
|
|
155 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1")
|
|
156 self.assertIsNone(transcript.getTagValue("flanking_upstream"))
|
|
157 if i == 1:
|
|
158 self._checkTranscript(transcript, "chr1", 300, 400, "+", "query2")
|
|
159 self.assertIsNone(transcript.getTagValue("flanking_upstream"))
|
|
160 if i == 2:
|
|
161 self._checkTranscript(transcript, "chr1", 1200, 1300, "+", "query3")
|
|
162 self.assertEqual(transcript.getTagValue("flanking_upstream"), "ref2")
|
|
163
|
|
164 def test_run_simple_colinear(self):
|
|
165 return
|
|
166 reference1 = self._createTranscript("chr1", 100, 200, "+", "ref1")
|
|
167 reference2 = self._createTranscript("chr1", 1000, 1100, "+", "ref2")
|
|
168 reference3 = self._createTranscript("chr1", 1600, 1700, "+", "ref3")
|
|
169 writer = Gff3Writer(self.referenceFileName, 0)
|
|
170 writer.addTranscript(reference1)
|
|
171 writer.addTranscript(reference2)
|
|
172 writer.addTranscript(reference3)
|
|
173 writer.close()
|
|
174 query1 = self._createTranscript("chr1", 1200, 1300, "-", "query1")
|
|
175 query2 = self._createTranscript("chr1", 1400, 1500, "+", "query2")
|
|
176 writer = Gff3Writer(self.queryFileName, 0)
|
|
177 writer.addTranscript(query1)
|
|
178 writer.addTranscript(query2)
|
|
179 writer.close()
|
|
180 gf = GetFlanking(0)
|
|
181 gf.setInputFile(self.queryFileName, 'gff3', 0)
|
|
182 gf.setInputFile(self.referenceFileName, 'gff3', 1)
|
|
183 gf.setOutputFile(self.outputFileName)
|
|
184 gf.addUpstreamDirection(True)
|
|
185 gf.setColinear(True)
|
|
186 gf.run()
|
|
187 parser = GffParser(self.outputFileName)
|
|
188 self.assertEqual(parser.getNbTranscripts(), 2)
|
|
189 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
|
|
190 if i == 0:
|
|
191 self._checkTranscript(transcript, "chr1", 1200, 1300, "-", "query1")
|
|
192 self.assertIsNone(transcript.getTagValue("flanking"))
|
|
193 if i == 1:
|
|
194 self._checkTranscript(transcript, "chr1", 1400, 1500, "+", "query2")
|
|
195 self.assertEqual(transcript.getTagValue("flanking_upstream"), "ref2")
|
|
196
|
|
197 def test_run_simple_max_distance(self):
|
|
198 return
|
|
199 reference = self._createTranscript("chr1", 1000, 1100, "+", "ref")
|
|
200 writer = Gff3Writer(self.referenceFileName, 0)
|
|
201 writer.addTranscript(reference)
|
|
202 writer.close()
|
|
203 query1 = self._createTranscript("chr1", 2000, 2100, "-", "query1")
|
|
204 writer = Gff3Writer(self.queryFileName, 0)
|
|
205 writer.addTranscript(query1)
|
|
206 writer.close()
|
|
207 gf = GetFlanking(0)
|
|
208 gf.setInputFile(self.queryFileName, 'gff3', 0)
|
|
209 gf.setInputFile(self.referenceFileName, 'gff3', 1)
|
|
210 gf.setOutputFile(self.outputFileName)
|
|
211 gf.setMaxDistance(100)
|
|
212 gf.run()
|
|
213 parser = GffParser(self.outputFileName)
|
|
214 self.assertEqual(parser.getNbTranscripts(), 1)
|
|
215 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())):
|
|
216 if i == 0:
|
|
217 self._checkTranscript(transcript, "chr1", 2000, 2100, "-", "query1")
|
|
218 self.assertIsNone(transcript.getTagValue("flanking"))
|
|
219
|
|
220 def _createTranscript(self, chromosome, start, end, strand, name):
|
|
221 transcript = Transcript()
|
|
222 transcript.setChromosome(chromosome)
|
|
223 transcript.setStart(start)
|
|
224 transcript.setEnd(end)
|
|
225 transcript.setDirection(strand)
|
|
226 transcript.setName(name)
|
|
227 return transcript
|
|
228
|
|
229 def _checkTranscript(self, transcript, chromosome, start, end, strand, name):
|
|
230 self.assertEqual(transcript.getChromosome(), chromosome)
|
|
231 self.assertEqual(transcript.getStart(), start)
|
|
232 self.assertEqual(transcript.getEnd(), end)
|
|
233 self.assertEqual(transcript.getStrand(), strand)
|
|
234 self.assertEqual(transcript.getName(), name)
|
|
235
|
|
236
|
|
237 if __name__ == "__main__":
|
|
238 unittest.main()
|