Mercurial > repos > triasteran > ribogalaxy_umi_processing
comparison UMI_riboseq_processing/UMI.py @ 4:a580e700aac3 draft
Uploaded
author | triasteran |
---|---|
date | Tue, 21 Jun 2022 08:32:44 +0000 |
parents | d27375bc4a1c |
children |
comparison
equal
deleted
inserted
replaced
3:d27375bc4a1c | 4:a580e700aac3 |
---|---|
1 import itertools | 1 import gzip |
2 from mimetypes import guess_type | |
3 from functools import partial | |
4 from Bio import SeqIO | |
2 from sys import argv, exit | 5 from sys import argv, exit |
3 from itertools import zip_longest | |
4 | 6 |
5 def grouper(iterable, n, fillvalue=None): | 7 def copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output): |
6 args = [iter(iterable)] * n | 8 # find wheather its plain or gzipped fastq |
7 return zip_longest(*args, fillvalue=fillvalue) | 9 encoding = guess_type(pathToFastaFile)[1] # uses file extension |
8 | 10 _open = partial(gzip.open, mode='rt') if encoding == 'gzip' else open |
9 | 11 # output file will be in gz format |
10 chunk_size=4 | 12 output = gzip.open(output,"wt") |
11 | 13 # open and parse |
12 | 14 with _open(pathToFastaFile) as f: |
13 def trimandpaste(pathToFastaFile, output): | 15 for record in SeqIO.parse(f, 'fastq'): |
14 #filename = pathToFastaFile.split('/')[-1] | 16 lines = record.format('fastq').split('\n') # list of each record: id, seq, '+', quality |
15 output = open(output,"w") | |
16 with open(pathToFastaFile) as f: | |
17 for lines in grouper(f, chunk_size, ""): #for every chunk_sized chunk | |
18 header = lines[0] | 17 header = lines[0] |
19 seq = lines[1] | 18 seq = lines[1] |
20 sep = lines[2] | 19 sep = lines[2] |
21 qual = lines[3] | 20 qual = lines[3] |
22 trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode | 21 trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode |
23 UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN | 22 UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN |
24 split_header = header.split(" ") | 23 split_header = header.split(" ") |
25 new_header = split_header[0]+"_"+UMI+" "+split_header[1] | 24 new_header = split_header[0]+"_"+UMI+" "+split_header[1] |
26 if qual[-1:] == "\n": | 25 if qual[-1:] == "\n": |
27 new_qual = qual[2:-6]+"\n" | 26 new_qual = qual[2:-6]+"\n" |
28 else: | 27 else: |
29 new_qual = qual[2:-6] | 28 new_qual = qual[2:-6] |
30 output.write(new_header) | 29 output.write(new_header+'\n') |
31 output.write(trimmed_seq) | 30 output.write(trimmed_seq) |
32 output.write(sep) | 31 output.write(sep+'\n') |
33 output.write(new_qual) | 32 output.write(new_qual+'\n') |
34 | 33 |
35 output.close() | 34 output.close() |
36 | 35 |
37 def main(): | 36 def main(): |
38 if len(argv) != 3: | 37 if len(argv) != 3: |
39 exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") | 38 exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") |
40 | 39 |
41 # Get paths | 40 # Get paths |
42 pathToFastaFile = argv[1] | 41 pathToFastaFile = argv[1] |
43 output = argv[2] | 42 output = argv[2] |
44 | 43 copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output) |
45 trimandpaste(pathToFastaFile, output) | |
46 | 44 |
47 if __name__ == "__main__": | 45 if __name__ == "__main__": |
48 main() | 46 main() |