Mercurial > repos > yufei-luo > s_mart
comparison SMART/Java/Python/test/Test_F_GetFlanking.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
17:b0e8584489e6 | 18:94ab73e8a190 |
---|---|
1 import unittest | |
2 import os, os.path, glob | |
3 from SMART.Java.Python.structure.Transcript import Transcript | |
4 from SMART.Java.Python.GetFlanking import GetFlanking | |
5 from commons.core.writer.Gff3Writer import Gff3Writer | |
6 from commons.core.parsing.GffParser import GffParser | |
7 | |
8 class Test_F_GetFlanking(unittest.TestCase): | |
9 | |
10 def setUp(self): | |
11 self.queryFileName = "testQuery.gff3" | |
12 self.referenceFileName = "testReference.gff3" | |
13 self.outputFileName = "testOutput.gff3" | |
14 | |
15 def tearDown(self): | |
16 for fileRoot in (self.queryFileName, self.referenceFileName, self.outputFileName): | |
17 for file in glob.glob("%s*" % (fileRoot)): | |
18 os.remove(file) | |
19 | |
20 def test_run_simple(self): | |
21 #return | |
22 reference1 = self._createTranscript("chr1", 1000, 1100, "+", "ref1") | |
23 reference2 = self._createTranscript("chr1", 2000, 2100, "+", "ref2") | |
24 reference3 = self._createTranscript("chr1", 1000000, 1200000, "+", "ref3") | |
25 writer = Gff3Writer(self.referenceFileName, 0) | |
26 writer.addTranscript(reference1) | |
27 writer.addTranscript(reference2) | |
28 writer.addTranscript(reference3) | |
29 writer.close() | |
30 query1 = self._createTranscript("chr1", 100, 200, "+", "query1") | |
31 query2 = self._createTranscript("chr1", 10000, 10100, "+", "query2") | |
32 writer = Gff3Writer(self.queryFileName, 0) | |
33 writer.addTranscript(query1) | |
34 writer.addTranscript(query2) | |
35 writer.close() | |
36 gf = GetFlanking(0) | |
37 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
38 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
39 gf.setOutputFile(self.outputFileName) | |
40 gf.run() | |
41 parser = GffParser(self.outputFileName) | |
42 self.assertEqual(parser.getNbTranscripts(), 2) | |
43 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
44 if i == 0: | |
45 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1") | |
46 self.assertEqual(transcript.getTagValue("flanking"), "ref1") | |
47 self.assertEqual(transcript.getTagValue("_region_flanking"), "downstream") | |
48 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear") | |
49 else: | |
50 self._checkTranscript(transcript, "chr1", 10000, 10100, "+", "query2") | |
51 self.assertEqual(transcript.getTagValue("flanking"), "ref2") | |
52 self.assertEqual(transcript.getTagValue("_region_flanking"), "upstream") | |
53 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear") | |
54 | |
55 def test_run_simple_downstream(self): | |
56 return | |
57 reference1 = self._createTranscript("chr1", 300, 400, "+", "ref1") | |
58 reference2 = self._createTranscript("chr1", 1000, 1100, "+", "ref2") | |
59 writer = Gff3Writer(self.referenceFileName, 0) | |
60 writer.addTranscript(reference1) | |
61 writer.addTranscript(reference2) | |
62 writer.close() | |
63 query1 = self._createTranscript("chr1", 100, 200, "+", "query1") | |
64 query2 = self._createTranscript("chr1", 1200, 1300, "+", "query2") | |
65 query3 = self._createTranscript("chr1", 1400, 1500, "+", "query3") | |
66 writer = Gff3Writer(self.queryFileName, 0) | |
67 writer.addTranscript(query1) | |
68 writer.addTranscript(query2) | |
69 writer.addTranscript(query3) | |
70 writer.close() | |
71 gf = GetFlanking(0) | |
72 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
73 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
74 gf.setOutputFile(self.outputFileName) | |
75 gf.addDownstreamDirection(True) | |
76 gf.run() | |
77 parser = GffParser(self.outputFileName) | |
78 self.assertEqual(parser.getNbTranscripts(), 3) | |
79 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
80 if i == 0: | |
81 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1") | |
82 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1") | |
83 self.assertEqual(transcript.getTagValue("_region_flanking"), "downstream") | |
84 self.assertEqual(transcript.getTagValue("_sense_flanking"), "collinear") | |
85 if i == 1: | |
86 self._checkTranscript(transcript, "chr1", 1200, 1300, "+", "query2") | |
87 self.assertIsNone(transcript.getTagValue("flanking_downstream")) | |
88 if i == 2: | |
89 self._checkTranscript(transcript, "chr1", 1400, 1500, "+", "query3") | |
90 self.assertIsNone(transcript.getTagValue("flanking_downstream")) | |
91 | |
92 def test_run_simple_minus_strand_downstream(self): | |
93 return | |
94 reference1 = self._createTranscript("chr1", 1000, 1100, "+", "ref1") | |
95 reference2 = self._createTranscript("chr1", 2000, 2100, "+", "ref2") | |
96 writer = Gff3Writer(self.referenceFileName, 0) | |
97 writer.addTranscript(reference1) | |
98 writer.addTranscript(reference2) | |
99 writer.close() | |
100 query1 = self._createTranscript("chr1", 100, 200, "-", "query1") | |
101 query2 = self._createTranscript("chr1", 1200, 1300, "-", "query2") | |
102 query3 = self._createTranscript("chr1", 1400, 1500, "-", "query3") | |
103 writer = Gff3Writer(self.queryFileName, 0) | |
104 writer.addTranscript(query1) | |
105 writer.addTranscript(query2) | |
106 writer.addTranscript(query3) | |
107 writer.close() | |
108 gf = GetFlanking(0) | |
109 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
110 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
111 gf.setOutputFile(self.outputFileName) | |
112 gf.addDownstreamDirection(True) | |
113 gf.run() | |
114 parser = GffParser(self.outputFileName) | |
115 self.assertEqual(parser.getNbTranscripts(), 3) | |
116 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
117 if i == 0: | |
118 self._checkTranscript(transcript, "chr1", 100, 200, "-", "query1") | |
119 self.assertIsNone(transcript.getTagValue("flanking_downstream")) | |
120 if i == 1: | |
121 self._checkTranscript(transcript, "chr1", 1200, 1300, "-", "query2") | |
122 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1") | |
123 if i == 2: | |
124 self._checkTranscript(transcript, "chr1", 1400, 1500, "-", "query3") | |
125 self.assertEqual(transcript.getTagValue("flanking_downstream"), "ref1") | |
126 | |
127 def test_run_simple_upstream(self): | |
128 return | |
129 reference1 = self._createTranscript("chr1", 500, 600, "+", "ref1") | |
130 reference2 = self._createTranscript("chr1", 700, 800, "+", "ref2") | |
131 reference3 = self._createTranscript("chr1", 2000, 2100, "+", "ref3") | |
132 writer = Gff3Writer(self.referenceFileName, 0) | |
133 writer.addTranscript(reference1) | |
134 writer.addTranscript(reference2) | |
135 writer.addTranscript(reference3) | |
136 writer.close() | |
137 query1 = self._createTranscript("chr1", 100, 200, "+", "query1") | |
138 query2 = self._createTranscript("chr1", 300, 400, "+", "query2") | |
139 query3 = self._createTranscript("chr1", 1200, 1300, "+", "query3") | |
140 writer = Gff3Writer(self.queryFileName, 0) | |
141 writer.addTranscript(query1) | |
142 writer.addTranscript(query2) | |
143 writer.addTranscript(query3) | |
144 writer.close() | |
145 gf = GetFlanking(0) | |
146 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
147 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
148 gf.setOutputFile(self.outputFileName) | |
149 gf.addUpstreamDirection(True) | |
150 gf.run() | |
151 parser = GffParser(self.outputFileName) | |
152 self.assertEqual(parser.getNbTranscripts(), 3) | |
153 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
154 if i == 0: | |
155 self._checkTranscript(transcript, "chr1", 100, 200, "+", "query1") | |
156 self.assertIsNone(transcript.getTagValue("flanking_upstream")) | |
157 if i == 1: | |
158 self._checkTranscript(transcript, "chr1", 300, 400, "+", "query2") | |
159 self.assertIsNone(transcript.getTagValue("flanking_upstream")) | |
160 if i == 2: | |
161 self._checkTranscript(transcript, "chr1", 1200, 1300, "+", "query3") | |
162 self.assertEqual(transcript.getTagValue("flanking_upstream"), "ref2") | |
163 | |
164 def test_run_simple_colinear(self): | |
165 return | |
166 reference1 = self._createTranscript("chr1", 100, 200, "+", "ref1") | |
167 reference2 = self._createTranscript("chr1", 1000, 1100, "+", "ref2") | |
168 reference3 = self._createTranscript("chr1", 1600, 1700, "+", "ref3") | |
169 writer = Gff3Writer(self.referenceFileName, 0) | |
170 writer.addTranscript(reference1) | |
171 writer.addTranscript(reference2) | |
172 writer.addTranscript(reference3) | |
173 writer.close() | |
174 query1 = self._createTranscript("chr1", 1200, 1300, "-", "query1") | |
175 query2 = self._createTranscript("chr1", 1400, 1500, "+", "query2") | |
176 writer = Gff3Writer(self.queryFileName, 0) | |
177 writer.addTranscript(query1) | |
178 writer.addTranscript(query2) | |
179 writer.close() | |
180 gf = GetFlanking(0) | |
181 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
182 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
183 gf.setOutputFile(self.outputFileName) | |
184 gf.addUpstreamDirection(True) | |
185 gf.setColinear(True) | |
186 gf.run() | |
187 parser = GffParser(self.outputFileName) | |
188 self.assertEqual(parser.getNbTranscripts(), 2) | |
189 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
190 if i == 0: | |
191 self._checkTranscript(transcript, "chr1", 1200, 1300, "-", "query1") | |
192 self.assertIsNone(transcript.getTagValue("flanking")) | |
193 if i == 1: | |
194 self._checkTranscript(transcript, "chr1", 1400, 1500, "+", "query2") | |
195 self.assertEqual(transcript.getTagValue("flanking_upstream"), "ref2") | |
196 | |
197 def test_run_simple_max_distance(self): | |
198 return | |
199 reference = self._createTranscript("chr1", 1000, 1100, "+", "ref") | |
200 writer = Gff3Writer(self.referenceFileName, 0) | |
201 writer.addTranscript(reference) | |
202 writer.close() | |
203 query1 = self._createTranscript("chr1", 2000, 2100, "-", "query1") | |
204 writer = Gff3Writer(self.queryFileName, 0) | |
205 writer.addTranscript(query1) | |
206 writer.close() | |
207 gf = GetFlanking(0) | |
208 gf.setInputFile(self.queryFileName, 'gff3', 0) | |
209 gf.setInputFile(self.referenceFileName, 'gff3', 1) | |
210 gf.setOutputFile(self.outputFileName) | |
211 gf.setMaxDistance(100) | |
212 gf.run() | |
213 parser = GffParser(self.outputFileName) | |
214 self.assertEqual(parser.getNbTranscripts(), 1) | |
215 for i, transcript in enumerate(sorted(parser.getIterator(), key = lambda t: t.getStart())): | |
216 if i == 0: | |
217 self._checkTranscript(transcript, "chr1", 2000, 2100, "-", "query1") | |
218 self.assertIsNone(transcript.getTagValue("flanking")) | |
219 | |
220 def _createTranscript(self, chromosome, start, end, strand, name): | |
221 transcript = Transcript() | |
222 transcript.setChromosome(chromosome) | |
223 transcript.setStart(start) | |
224 transcript.setEnd(end) | |
225 transcript.setDirection(strand) | |
226 transcript.setName(name) | |
227 return transcript | |
228 | |
229 def _checkTranscript(self, transcript, chromosome, start, end, strand, name): | |
230 self.assertEqual(transcript.getChromosome(), chromosome) | |
231 self.assertEqual(transcript.getStart(), start) | |
232 self.assertEqual(transcript.getEnd(), end) | |
233 self.assertEqual(transcript.getStrand(), strand) | |
234 self.assertEqual(transcript.getName(), name) | |
235 | |
236 | |
237 if __name__ == "__main__": | |
238 unittest.main() |