Mercurial > repos > triasteran > ribogalaxy_umi_processing
comparison UMI_riboseq_processing/UMI.py @ 9:31438c26afec draft
Uploaded
| author | triasteran |
|---|---|
| date | Tue, 21 Jun 2022 14:41:24 +0000 |
| parents | 701804f5ad4b |
| children |
comparison
equal
deleted
inserted
replaced
| 8:701804f5ad4b | 9:31438c26afec |
|---|---|
| 4 from sys import argv, exit | 4 from sys import argv, exit |
| 5 import itertools | 5 import itertools |
| 6 from itertools import zip_longest | 6 from itertools import zip_longest |
| 7 import subprocess | 7 import subprocess |
| 8 from subprocess import call | 8 from subprocess import call |
| 9 import Bio | |
| 10 from Bio import SeqIO | |
| 9 | 11 |
| 10 def grouper(iterable, n, fillvalue=None): | 12 def grouper(iterable, n, fillvalue=None): |
| 11 args = [iter(iterable)] * n | 13 args = [iter(iterable)] * n |
| 12 return zip_longest(*args, fillvalue=fillvalue) | 14 return zip_longest(*args, fillvalue=fillvalue) |
| 13 | 15 |
| 15 def is_gz_file(filepath): | 17 def is_gz_file(filepath): |
| 16 with open(filepath, 'rb') as test_f: | 18 with open(filepath, 'rb') as test_f: |
| 17 return test_f.read(2) == b'\x1f\x8b' | 19 return test_f.read(2) == b'\x1f\x8b' |
| 18 | 20 |
| 19 | 21 |
| 20 def lines_parse(f, output_path): | 22 def UMI_processing(pathToFastaFile, output_path): |
| 23 | |
| 21 output = open(output_path,"w") | 24 output = open(output_path,"w") |
| 22 for lines in grouper(f, 4, "\n"): | 25 |
| 23 header = lines[0] | 26 if is_gz_file(pathToFastaFile) == True: |
| 24 #print (header) | 27 print ('file is gzipped fastq') |
| 25 seq = lines[1] | 28 with gzip.open(pathToFastaFile, "rt") as handle: |
| 26 sep = lines[2] | 29 for i, record in enumerate(SeqIO.parse(handle, "fastq")): |
| 27 qual = lines[3] | 30 lines = record.format('fastq').split('\n') # all 4 lines |
| 28 # check if header is OK | 31 header = lines[0] |
| 29 if (header.startswith('@')): | 32 if i % 100000 == 0: |
| 30 trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode | 33 print ('read number %s' % i) |
| 31 UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN | 34 seq = lines[1] |
| 35 sep = lines[2] | |
| 36 qual = lines[3] | |
| 37 if (header.startswith('@')): | |
| 38 trimmed_seq = seq[2:-6] # fooprint + barcode | |
| 39 UMI = seq[0:2]+seq.rstrip()[-5:len(seq)] #7nt in total; 5'NN and last 3'NNNNN | |
| 40 split_header = header.split(" ") | |
| 41 new_header = split_header[0]+"_"+UMI+" "+split_header[1] | |
| 42 new_qual = qual[2:-6] | |
| 43 output.write(new_header+'\n') | |
| 44 output.write(trimmed_seq+'\n') | |
| 45 output.write(sep+'\n') | |
| 46 output.write(new_qual+'\n') | |
| 47 | |
| 48 else: | |
| 49 for record in SeqIO.parse(pathToFastaFile, 'fastq'): | |
| 50 lines = record.format('fastq').split('\n') # list of each record: id, seq, '+', quality | |
| 51 header = lines[0] | |
| 52 seq = lines[1] | |
| 53 sep = lines[2] | |
| 54 qual = lines[3] | |
| 55 trimmed_seq = seq[2:-6] # fooprint + barcode | |
| 56 UMI = seq[0:2]+seq.rstrip()[-5:len(seq)] #7nt in total; 5'NN and last 3'NNNNN | |
| 32 split_header = header.split(" ") | 57 split_header = header.split(" ") |
| 33 new_header = split_header[0]+"_"+UMI+" "+split_header[1] | 58 new_header = split_header[0]+"_"+UMI+" "+split_header[1] |
| 34 if qual[-1:] == "\n": | 59 new_qual = qual[2:-6] |
| 35 new_qual = qual[2:-6]+"\n" | 60 output.write(new_header+'\n') |
| 36 else: | 61 output.write(trimmed_seq+'\n') |
| 37 new_qual = qual[2:-6] | 62 output.write(sep+'\n') |
| 38 output.write(new_header) | 63 output.write(new_qual+'\n') |
| 39 output.write(trimmed_seq) | 64 |
| 40 output.write(sep) | |
| 41 output.write(new_qual) | |
| 42 output.close() | 65 output.close() |
| 66 | |
| 43 | 67 |
| 44 | |
| 45 def UMI_processing(pathToFastaFile, output_path): | |
| 46 | 68 |
| 47 if is_gz_file(pathToFastaFile) == True: | |
| 48 with gzip.open(pathToFastaFile, 'rb') as file: | |
| 49 f = [x.decode("utf-8") for x in file.readlines()] | |
| 50 | |
| 51 else: | |
| 52 with open(pathToFastaFile, 'r') as file: | |
| 53 f = file.readlines() | |
| 54 | |
| 55 lines_parse(f, output_path) | |
| 56 | 69 |
| 57 def main(): | 70 def main(): |
| 58 if len(argv) != 3: | 71 if len(argv) != 3: |
| 59 exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") | 72 exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") |
| 60 | 73 |
