Mercurial > repos > artbio > repenrich2
comparison RepEnrich2_setup.py @ 0:4905a332a094 draft
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/main/tools/repenrich2 commit 73721d980c1f422dc880d80f61e44d270992e537
author | artbio |
---|---|
date | Sat, 20 Apr 2024 11:56:53 +0000 |
parents | |
children | c5bb2f9af708 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4905a332a094 |
---|---|
1 #!/usr/bin/env python | |
2 import argparse | |
3 import csv | |
4 import os | |
5 import shlex | |
6 import subprocess | |
7 import sys | |
8 from collections import defaultdict | |
9 from concurrent.futures import ProcessPoolExecutor | |
10 | |
11 | |
12 from Bio import SeqIO | |
13 from Bio.Seq import Seq | |
14 from Bio.SeqRecord import SeqRecord | |
15 | |
16 parser = argparse.ArgumentParser(description=''' | |
17 Prepartion of repetive element pseudogenomes bowtie\ | |
18 indexes and annotation files used by RepEnrich.py enrichment.''', | |
19 prog='getargs_genome_maker.py') | |
20 parser.add_argument('--annotation_file', action='store', | |
21 metavar='annotation_file', | |
22 help='''Repeat masker annotation of the genome of\ | |
23 interest. Download from RepeatMasker.org\ | |
24 Example: mm9.fa.out''') | |
25 parser.add_argument('--genomefasta', action='store', metavar='genomefasta', | |
26 help='''Genome of interest in fasta format.\ | |
27 Example: mm9.fa''') | |
28 parser.add_argument('--gaplength', action='store', dest='gaplength', | |
29 metavar='gaplength', default='200', type=int, | |
30 help='''Length of the N-spacer in the\ | |
31 repeat pseudogenomes. Default 200''') | |
32 parser.add_argument('--flankinglength', action='store', dest='flankinglength', | |
33 metavar='flankinglength', default='25', type=int, | |
34 help='''Length of the flanking regions used to build\ | |
35 repeat pseudogenomes. Flanking length should be set\ | |
36 according to the length of your reads.\ | |
37 Default 25, for 50 nt reads''') | |
38 parser.add_argument('--cpus', action='store', dest='cpus', metavar='cpus', | |
39 default="1", type=int, | |
40 help='Number of CPUs. The more cpus the\ | |
41 faster RepEnrich performs. Default: "1"') | |
42 args = parser.parse_args() | |
43 | |
44 # parameters from argsparse | |
45 gapl = args.gaplength | |
46 flankingl = args.flankinglength | |
47 annotation_file = args.annotation_file | |
48 genomefasta = args.genomefasta | |
49 cpus = args.cpus | |
50 | |
51 # check that the programs we need are available | |
52 try: | |
53 subprocess.call(shlex.split("bowtie2 --version"), | |
54 stdout=open(os.devnull, 'wb'), | |
55 stderr=open(os.devnull, 'wb')) | |
56 except OSError: | |
57 print("Error: Bowtie2 not available in the path") | |
58 raise | |
59 | |
60 | |
61 def starts_with_numerical(list): | |
62 try: | |
63 if len(list) == 0: | |
64 return False | |
65 int(list[0]) | |
66 return True | |
67 except ValueError: | |
68 return False | |
69 | |
70 | |
71 # define a text importer for .out/.txt format of repbase | |
72 def import_text(filename, separator): | |
73 csv.field_size_limit(sys.maxsize) | |
74 file = csv.reader(open(filename), delimiter=separator, | |
75 skipinitialspace=True) | |
76 return [line for line in file if starts_with_numerical(line)] | |
77 | |
78 | |
79 # load genome into dictionary and compute length | |
80 g = SeqIO.to_dict(SeqIO.parse(genomefasta, "fasta")) | |
81 genome = defaultdict(dict) | |
82 | |
83 for chr in g.keys(): | |
84 genome[chr]['sequence'] = g[chr].seq | |
85 genome[chr]['length'] = len(g[chr].seq) | |
86 | |
87 # Build a bedfile of repeatcoordinates to use by RepEnrich region_sorter | |
88 repeat_elements = set() | |
89 rep_coords = defaultdict(list) # Merged dictionary for coordinates | |
90 | |
91 with open('repnames.bed', 'w') as fout: | |
92 f_in = import_text(annotation_file, ' ') | |
93 for line in f_in: | |
94 repname = line[9].translate(str.maketrans('()/', '___')) | |
95 repeat_elements.add(repname) | |
96 repchr, repstart, repend = line[4], line[5], line[6] | |
97 fout.write(f"{repchr}\t{repstart}\t{repend}\t{repname}\n") | |
98 rep_coords[repname].extend([repchr, repstart, repend]) | |
99 # repeat_elements now contains the unique repeat names | |
100 # rep_coords is a dictionary where keys are repeat names and values are lists | |
101 # containing chromosome, start, and end coordinates for each repeat instance | |
102 | |
103 # sort repeat_elements and print them in repeatIDs.txt | |
104 with open('repeatIDs.txt', 'w') as fout: | |
105 for i, repeat in enumerate(sorted(repeat_elements)): | |
106 fout.write('\t'.join([repeat, str(i)]) + '\n') | |
107 | |
108 # generate spacer for pseudogenomes | |
109 spacer = ''.join(['N' for i in range(gapl)]) | |
110 | |
111 # generate metagenomes and save them to FASTA files for bowtie build | |
112 for repname in rep_coords: | |
113 metagenome = '' | |
114 # iterating coordinate list by block of 3 (chr, start, end) | |
115 block = 3 | |
116 for i in range(0, len(rep_coords[repname]) - block + 1, block): | |
117 batch = rep_coords[repname][i:i+block] | |
118 chromosome = batch[0] | |
119 start = max(int(batch[1]) - flankingl, 0) | |
120 end = min(int(batch[2]) + flankingl, | |
121 int(genome[chromosome]['length'])-1) + 1 | |
122 metagenome = ( | |
123 f"{metagenome}{spacer}" | |
124 f"{genome[chromosome]['sequence'][start:end]}" | |
125 ) | |
126 | |
127 # Create Fasta of repeat pseudogenome | |
128 fastafilename = f"{repname}.fa" | |
129 record = SeqRecord(Seq(metagenome), id=repname, name='', description='') | |
130 SeqIO.write(record, fastafilename, "fasta") | |
131 | |
132 | |
133 def bowtie_build(args): | |
134 """ | |
135 Function to be executed in parallel by ProcessPoolExecutor. | |
136 """ | |
137 try: | |
138 bowtie_base, fasta = args | |
139 command = shlex.split(f"bowtie2-build -f {fasta} {bowtie_base}") | |
140 squash = subprocess.run(command, capture_output=True, text=True) | |
141 return squash.stdout | |
142 except Exception as e: | |
143 return str(e) | |
144 | |
145 | |
146 args_list = [(name, f"{name}.fa") for name in rep_coords] | |
147 with ProcessPoolExecutor(max_workers=cpus) as executor: | |
148 executor.map(bowtie_build, args_list) |