Mercurial > repos > cpt > cpt_export_seq_unique
comparison gff3_extract_sequence.py @ 1:ac766d7dd641 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
| author | cpt | 
|---|---|
| date | Mon, 05 Jun 2023 02:41:21 +0000 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| 0:aaed5a0c774c | 1:ac766d7dd641 | 
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import sys | |
| 3 import argparse | |
| 4 import logging | |
| 5 import uuid | |
| 6 from CPT_GFFParser import gffParse, gffWrite | |
| 7 from Bio import SeqIO | |
| 8 from Bio.Seq import Seq | |
| 9 from Bio.SeqRecord import SeqRecord | |
| 10 from Bio.SeqFeature import FeatureLocation, CompoundLocation | |
| 11 from gff3 import feature_lambda, feature_test_type, get_id | |
| 12 | |
| 13 logging.basicConfig(level=logging.INFO) | |
| 14 log = logging.getLogger(__name__) | |
| 15 | |
| 16 | |
| 17 def main(fasta, gff3, feature_filter=None, nodesc=False): | |
| 18 if feature_filter == "nice_cds": | |
| 19 from gff2gb import gff3_to_genbank as cpt_Gff2Gbk | |
| 20 | |
| 21 for rec in cpt_Gff2Gbk(gff3, fasta, 11): | |
| 22 seenList = {} | |
| 23 if rec.seq[0] == "?": | |
| 24 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
| 25 exit(1) | |
| 26 for feat in sorted(rec.features, key=lambda x: x.location.start): | |
| 27 if feat.type != "CDS": | |
| 28 continue | |
| 29 | |
| 30 ind = 0 | |
| 31 if ( | |
| 32 str( | |
| 33 feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-") | |
| 34 ) | |
| 35 in seenList.keys() | |
| 36 ): | |
| 37 seenList[ | |
| 38 str( | |
| 39 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
| 40 " ", "-" | |
| 41 ) | |
| 42 ) | |
| 43 ] += 1 | |
| 44 ind = seenList[ | |
| 45 str( | |
| 46 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
| 47 " ", "-" | |
| 48 ) | |
| 49 ) | |
| 50 ] | |
| 51 else: | |
| 52 seenList[ | |
| 53 str( | |
| 54 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
| 55 " ", "-" | |
| 56 ) | |
| 57 ) | |
| 58 ] = 1 | |
| 59 append = "" | |
| 60 if ind != 0: | |
| 61 append = "_" + str(ind) | |
| 62 | |
| 63 if nodesc: | |
| 64 description = "" | |
| 65 else: | |
| 66 feat.qualifiers["ID"] = [feat._ID] | |
| 67 product = feat.qualifiers.get("product", "") | |
| 68 description = ( | |
| 69 "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format( | |
| 70 feat, product | |
| 71 ) | |
| 72 ) | |
| 73 yield [ | |
| 74 SeqRecord( | |
| 75 feat.extract(rec).seq, | |
| 76 id=str( | |
| 77 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
| 78 " ", "-" | |
| 79 ) | |
| 80 ) | |
| 81 + append, | |
| 82 description=description, | |
| 83 ) | |
| 84 ] | |
| 85 | |
| 86 elif feature_filter == "unique_cds": | |
| 87 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
| 88 seen_ids = {} | |
| 89 | |
| 90 for rec in gffParse(gff3, base_dict=seq_dict): | |
| 91 noMatch = True | |
| 92 if "Alias" in rec.features[0].qualifiers.keys(): | |
| 93 lColumn = rec.features[0].qualifiers["Alias"][0] | |
| 94 else: | |
| 95 lColumn = "" | |
| 96 for x in seq_dict: | |
| 97 if x == rec.id or x == lColumn: | |
| 98 noMatch = False | |
| 99 if noMatch: | |
| 100 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
| 101 exit(1) | |
| 102 newfeats = [] | |
| 103 for feat in sorted( | |
| 104 feature_lambda( | |
| 105 rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False | |
| 106 ), | |
| 107 key=lambda f: f.location.start, | |
| 108 ): | |
| 109 nid = rec.id + "____" + feat.id | |
| 110 if nid in seen_ids: | |
| 111 nid = nid + "__" + uuid.uuid4().hex | |
| 112 feat.qualifiers["ID"] = [nid] | |
| 113 newfeats.append(feat) | |
| 114 seen_ids[nid] = True | |
| 115 | |
| 116 if nodesc: | |
| 117 description = "" | |
| 118 else: | |
| 119 if feat.strand == -1: | |
| 120 important_data = { | |
| 121 "Location": FeatureLocation( | |
| 122 feat.location.start + 1, | |
| 123 feat.location.end - feat.phase, | |
| 124 feat.strand, | |
| 125 ) | |
| 126 } | |
| 127 else: | |
| 128 important_data = { | |
| 129 "Location": FeatureLocation( | |
| 130 feat.location.start + 1 + feat.phase, | |
| 131 feat.location.end, | |
| 132 feat.strand, | |
| 133 ) | |
| 134 } | |
| 135 if "Name" in feat.qualifiers: | |
| 136 important_data["Name"] = feat.qualifiers.get("Name", [""])[0] | |
| 137 | |
| 138 description = "[{}]".format( | |
| 139 ";".join( | |
| 140 [ | |
| 141 "{key}={value}".format(key=k, value=v) | |
| 142 for (k, v) in important_data.items() | |
| 143 ] | |
| 144 ) | |
| 145 ) | |
| 146 # if feat.id == "CPT_Privateer_006.p01": | |
| 147 # print(feat) | |
| 148 # exit() | |
| 149 | |
| 150 if isinstance(feat.location, CompoundLocation): | |
| 151 finSeq = "" | |
| 152 if feat.strand == -1: | |
| 153 for x in feat.location.parts: | |
| 154 finSeq += str( | |
| 155 ( | |
| 156 rec.seq[ | |
| 157 feat.location.start : feat.location.end | |
| 158 - feat.phase | |
| 159 ] | |
| 160 ).reverse_complement() | |
| 161 ) | |
| 162 else: | |
| 163 for x in feat.location.parts: | |
| 164 finSeq += str( | |
| 165 rec.seq[ | |
| 166 feat.location.start + feat.phase : feat.location.end | |
| 167 ] | |
| 168 ) | |
| 169 yield [ | |
| 170 SeqRecord( | |
| 171 finSeq, | |
| 172 id=nid.replace(" ", "-"), | |
| 173 description=description, | |
| 174 ) | |
| 175 ] | |
| 176 elif feat.strand == -1: | |
| 177 yield [ | |
| 178 SeqRecord( | |
| 179 ( | |
| 180 rec.seq[ | |
| 181 feat.location.start : feat.location.end - feat.phase | |
| 182 ] | |
| 183 ).reverse_complement(), | |
| 184 id=nid.replace(" ", "-"), | |
| 185 description=description, | |
| 186 ) | |
| 187 ] | |
| 188 else: | |
| 189 yield [ | |
| 190 SeqRecord( | |
| 191 # feat.extract(rec).seq, | |
| 192 rec.seq[ | |
| 193 feat.location.start + feat.phase : feat.location.end | |
| 194 ], | |
| 195 id=nid.replace(" ", "-"), | |
| 196 description=description, | |
| 197 ) | |
| 198 ] | |
| 199 rec.features = newfeats | |
| 200 rec.annotations = {} | |
| 201 # gffWrite([rec], sys.stdout) | |
| 202 else: | |
| 203 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
| 204 | |
| 205 for rec in gffParse(gff3, base_dict=seq_dict): | |
| 206 noMatch = True | |
| 207 if "Alias" in rec.features[0].qualifiers.keys(): | |
| 208 lColumn = rec.features[0].qualifiers["Alias"][0] | |
| 209 else: | |
| 210 lColumn = "" | |
| 211 for x in seq_dict: | |
| 212 if x == rec.id or x == lColumn: | |
| 213 noMatch = False | |
| 214 if noMatch: | |
| 215 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
| 216 exit(1) | |
| 217 for feat in sorted( | |
| 218 feature_lambda( | |
| 219 rec.features, | |
| 220 feature_test_type, | |
| 221 {"type": feature_filter}, | |
| 222 subfeatures=True, | |
| 223 ), | |
| 224 key=lambda f: f.location.start, | |
| 225 ): | |
| 226 id = feat.id | |
| 227 if len(id) == 0: | |
| 228 id = get_id(feat) | |
| 229 | |
| 230 if nodesc: | |
| 231 description = "" | |
| 232 else: | |
| 233 if feat.strand == -1: | |
| 234 important_data = { | |
| 235 "Location": FeatureLocation( | |
| 236 feat.location.start + 1, | |
| 237 feat.location.end - feat.phase, | |
| 238 feat.strand, | |
| 239 ) | |
| 240 } | |
| 241 else: | |
| 242 important_data = { | |
| 243 "Location": FeatureLocation( | |
| 244 feat.location.start + 1 + feat.phase, | |
| 245 feat.location.end, | |
| 246 feat.strand, | |
| 247 ) | |
| 248 } | |
| 249 if "Name" in feat.qualifiers: | |
| 250 important_data["Name"] = feat.qualifiers.get("Name", [""])[0] | |
| 251 | |
| 252 description = "[{}]".format( | |
| 253 ";".join( | |
| 254 [ | |
| 255 "{key}={value}".format(key=k, value=v) | |
| 256 for (k, v) in important_data.items() | |
| 257 ] | |
| 258 ) | |
| 259 ) | |
| 260 | |
| 261 if isinstance(feat.location, CompoundLocation): | |
| 262 finSeq = "" | |
| 263 if feat.strand == -1: | |
| 264 for x in feat.location.parts: | |
| 265 finSeq += str( | |
| 266 ( | |
| 267 rec.seq[x.start : x.end - feat.phase] | |
| 268 ).reverse_complement() | |
| 269 ) | |
| 270 else: | |
| 271 for x in feat.location.parts: | |
| 272 finSeq += str(rec.seq[x.start + feat.phase : x.end]) | |
| 273 yield [ | |
| 274 SeqRecord( | |
| 275 Seq(finSeq), | |
| 276 id=id.replace(" ", "-"), | |
| 277 description=description, | |
| 278 ) | |
| 279 ] | |
| 280 | |
| 281 else: | |
| 282 | |
| 283 if feat.strand == -1: | |
| 284 yield [ | |
| 285 SeqRecord( | |
| 286 seq=Seq( | |
| 287 str( | |
| 288 rec.seq[ | |
| 289 feat.location.start : feat.location.end | |
| 290 - feat.phase | |
| 291 ] | |
| 292 ) | |
| 293 ).reverse_complement(), | |
| 294 id=id.replace(" ", "-"), | |
| 295 description=description, | |
| 296 ) | |
| 297 ] | |
| 298 else: | |
| 299 yield [ | |
| 300 SeqRecord( | |
| 301 # feat.extract(rec).seq, | |
| 302 seq=Seq( | |
| 303 str( | |
| 304 rec.seq[ | |
| 305 feat.location.start | |
| 306 + feat.phase : feat.location.end | |
| 307 ] | |
| 308 ) | |
| 309 ), | |
| 310 id=id.replace(" ", "-"), | |
| 311 description=description, | |
| 312 ) | |
| 313 ] | |
| 314 | |
| 315 | |
| 316 if __name__ == "__main__": | |
| 317 parser = argparse.ArgumentParser( | |
| 318 description="Export corresponding sequence in genome from GFF3", epilog="" | |
| 319 ) | |
| 320 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") | |
| 321 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") | |
| 322 parser.add_argument( | |
| 323 "--feature_filter", default=None, help="Filter for specific feature types" | |
| 324 ) | |
| 325 parser.add_argument( | |
| 326 "--nodesc", action="store_true", help="Strip description field off" | |
| 327 ) | |
| 328 args = parser.parse_args() | |
| 329 for seq in main(**vars(args)): | |
| 330 # if isinstance(seq, list): | |
| 331 # for x in seq: | |
| 332 # print(type(x.seq)) | |
| 333 # SeqIO.write(x, sys.stdout, "fasta") | |
| 334 # else: | |
| 335 SeqIO.write(seq, sys.stdout, "fasta") | 
