comparison RepEnrich2_setup.py @ 0:4905a332a094 draft

planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/main/tools/repenrich2 commit 73721d980c1f422dc880d80f61e44d270992e537
author artbio
date Sat, 20 Apr 2024 11:56:53 +0000
parents
children c5bb2f9af708
comparison
equal deleted inserted replaced
-1:000000000000 0:4905a332a094
1 #!/usr/bin/env python
2 import argparse
3 import csv
4 import os
5 import shlex
6 import subprocess
7 import sys
8 from collections import defaultdict
9 from concurrent.futures import ProcessPoolExecutor
10
11
12 from Bio import SeqIO
13 from Bio.Seq import Seq
14 from Bio.SeqRecord import SeqRecord
15
16 parser = argparse.ArgumentParser(description='''
17 Prepartion of repetive element pseudogenomes bowtie\
18 indexes and annotation files used by RepEnrich.py enrichment.''',
19 prog='getargs_genome_maker.py')
20 parser.add_argument('--annotation_file', action='store',
21 metavar='annotation_file',
22 help='''Repeat masker annotation of the genome of\
23 interest. Download from RepeatMasker.org\
24 Example: mm9.fa.out''')
25 parser.add_argument('--genomefasta', action='store', metavar='genomefasta',
26 help='''Genome of interest in fasta format.\
27 Example: mm9.fa''')
28 parser.add_argument('--gaplength', action='store', dest='gaplength',
29 metavar='gaplength', default='200', type=int,
30 help='''Length of the N-spacer in the\
31 repeat pseudogenomes. Default 200''')
32 parser.add_argument('--flankinglength', action='store', dest='flankinglength',
33 metavar='flankinglength', default='25', type=int,
34 help='''Length of the flanking regions used to build\
35 repeat pseudogenomes. Flanking length should be set\
36 according to the length of your reads.\
37 Default 25, for 50 nt reads''')
38 parser.add_argument('--cpus', action='store', dest='cpus', metavar='cpus',
39 default="1", type=int,
40 help='Number of CPUs. The more cpus the\
41 faster RepEnrich performs. Default: "1"')
42 args = parser.parse_args()
43
44 # parameters from argsparse
45 gapl = args.gaplength
46 flankingl = args.flankinglength
47 annotation_file = args.annotation_file
48 genomefasta = args.genomefasta
49 cpus = args.cpus
50
51 # check that the programs we need are available
52 try:
53 subprocess.call(shlex.split("bowtie2 --version"),
54 stdout=open(os.devnull, 'wb'),
55 stderr=open(os.devnull, 'wb'))
56 except OSError:
57 print("Error: Bowtie2 not available in the path")
58 raise
59
60
61 def starts_with_numerical(list):
62 try:
63 if len(list) == 0:
64 return False
65 int(list[0])
66 return True
67 except ValueError:
68 return False
69
70
71 # define a text importer for .out/.txt format of repbase
72 def import_text(filename, separator):
73 csv.field_size_limit(sys.maxsize)
74 file = csv.reader(open(filename), delimiter=separator,
75 skipinitialspace=True)
76 return [line for line in file if starts_with_numerical(line)]
77
78
79 # load genome into dictionary and compute length
80 g = SeqIO.to_dict(SeqIO.parse(genomefasta, "fasta"))
81 genome = defaultdict(dict)
82
83 for chr in g.keys():
84 genome[chr]['sequence'] = g[chr].seq
85 genome[chr]['length'] = len(g[chr].seq)
86
87 # Build a bedfile of repeatcoordinates to use by RepEnrich region_sorter
88 repeat_elements = set()
89 rep_coords = defaultdict(list) # Merged dictionary for coordinates
90
91 with open('repnames.bed', 'w') as fout:
92 f_in = import_text(annotation_file, ' ')
93 for line in f_in:
94 repname = line[9].translate(str.maketrans('()/', '___'))
95 repeat_elements.add(repname)
96 repchr, repstart, repend = line[4], line[5], line[6]
97 fout.write(f"{repchr}\t{repstart}\t{repend}\t{repname}\n")
98 rep_coords[repname].extend([repchr, repstart, repend])
99 # repeat_elements now contains the unique repeat names
100 # rep_coords is a dictionary where keys are repeat names and values are lists
101 # containing chromosome, start, and end coordinates for each repeat instance
102
103 # sort repeat_elements and print them in repeatIDs.txt
104 with open('repeatIDs.txt', 'w') as fout:
105 for i, repeat in enumerate(sorted(repeat_elements)):
106 fout.write('\t'.join([repeat, str(i)]) + '\n')
107
108 # generate spacer for pseudogenomes
109 spacer = ''.join(['N' for i in range(gapl)])
110
111 # generate metagenomes and save them to FASTA files for bowtie build
112 for repname in rep_coords:
113 metagenome = ''
114 # iterating coordinate list by block of 3 (chr, start, end)
115 block = 3
116 for i in range(0, len(rep_coords[repname]) - block + 1, block):
117 batch = rep_coords[repname][i:i+block]
118 chromosome = batch[0]
119 start = max(int(batch[1]) - flankingl, 0)
120 end = min(int(batch[2]) + flankingl,
121 int(genome[chromosome]['length'])-1) + 1
122 metagenome = (
123 f"{metagenome}{spacer}"
124 f"{genome[chromosome]['sequence'][start:end]}"
125 )
126
127 # Create Fasta of repeat pseudogenome
128 fastafilename = f"{repname}.fa"
129 record = SeqRecord(Seq(metagenome), id=repname, name='', description='')
130 SeqIO.write(record, fastafilename, "fasta")
131
132
133 def bowtie_build(args):
134 """
135 Function to be executed in parallel by ProcessPoolExecutor.
136 """
137 try:
138 bowtie_base, fasta = args
139 command = shlex.split(f"bowtie2-build -f {fasta} {bowtie_base}")
140 squash = subprocess.run(command, capture_output=True, text=True)
141 return squash.stdout
142 except Exception as e:
143 return str(e)
144
145
146 args_list = [(name, f"{name}.fa") for name in rep_coords]
147 with ProcessPoolExecutor(max_workers=cpus) as executor:
148 executor.map(bowtie_build, args_list)