Mercurial > repos > triasteran > ribogalaxy_umi_processing
comparison UMI_riboseq_tool/UMI.py @ 10:d003917eaf2c draft
Uploaded
| author | jackcurragh |
|---|---|
| date | Fri, 15 Jul 2022 12:43:45 +0000 |
| parents | |
| children | 0b5c3e35cddb |
comparison
equal
deleted
inserted
replaced
| 9:31438c26afec | 10:d003917eaf2c |
|---|---|
| 1 import gzip | |
| 2 from sys import argv, exit | |
| 3 from itertools import zip_longest | |
| 4 from Bio import SeqIO | |
| 5 | |
| 6 def grouper(iterable, n, fillvalue=None): | |
| 7 args = [iter(iterable)] * n | |
| 8 return zip_longest(*args, fillvalue=fillvalue) | |
| 9 | |
| 10 | |
| 11 def is_gz_file(filepath): | |
| 12 with open(filepath, 'rb') as test_f: | |
| 13 return test_f.read(2) == b'\x1f\x8b' | |
| 14 | |
| 15 | |
| 16 def process_fastq_record(record, output, UMI_5_prime_length=2, UMI_3_prime_length=5): | |
| 17 ''' | |
| 18 Write UMI to FASTQ header for given biopython record to given open output | |
| 19 UMI_5_prime_length number of bases to trim from 5' and write to header | |
| 20 UMI_3_prime_length number of bases to trim from 3' and write to header | |
| 21 defaults are for McGlincy Ingolia Protocol | |
| 22 ''' | |
| 23 lines = record.format('fastq').split('\n') # all 4 lines | |
| 24 header = lines[0] | |
| 25 seq = lines[1] | |
| 26 sep = lines[2] | |
| 27 qual = lines[3] | |
| 28 | |
| 29 if (header.startswith('@')): | |
| 30 trimmed_seq = seq[UMI_5_prime_length:-(UMI_3_prime_length + 1)] # fooprint + barcode | |
| 31 UMI = seq[0:UMI_5_prime_length]+seq.rstrip()[-UMI_3_prime_length:len(seq)] #7nt in total; 5'NN and last 3'NNNNN | |
| 32 split_header = header.split(" ") | |
| 33 new_header = split_header[0]+"_"+UMI+" "+split_header[1] | |
| 34 new_qual = qual[UMI_5_prime_length:-(UMI_3_prime_length + 1)] | |
| 35 output.write(new_header+'\n') | |
| 36 output.write(trimmed_seq+'\n') | |
| 37 output.write(sep+'\n') | |
| 38 output.write(new_qual+'\n') | |
| 39 | |
| 40 | |
| 41 | |
| 42 def UMI_processing(pathToFastaFile, output_path, bool_gzip, UMI_5_prime_length, UMI_3_prime_length): | |
| 43 | |
| 44 if bool_gzip: | |
| 45 output = gzip.open(output_path,"wt") | |
| 46 else: | |
| 47 output = open(output_path, 'w') | |
| 48 | |
| 49 if is_gz_file(pathToFastaFile) == True: | |
| 50 print ('file is gzipped fastq') | |
| 51 with gzip.open(pathToFastaFile, "rt") as handle: | |
| 52 for i, record in enumerate(SeqIO.parse(handle, "fastq")): | |
| 53 process_fastq_record(record, output, UMI_5_prime_length=2, UMI_3_prime_length=5) | |
| 54 | |
| 55 else: | |
| 56 for record in SeqIO.parse(pathToFastaFile, 'fastq'): | |
| 57 process_fastq_record(record, output, UMI_5_prime_length, UMI_3_prime_length) | |
| 58 | |
| 59 output.close() | |
| 60 | |
| 61 | |
| 62 | |
| 63 | |
| 64 def main(): | |
| 65 if len(argv) != 6: | |
| 66 exit("Usage: 3 arguments required\n1: Path to fasta file \n2: name of output file\n3: string 'True' or 'False' whether to gzip output") | |
| 67 | |
| 68 # Get paths | |
| 69 pathToFastaFile = argv[1] | |
| 70 output = argv[2] | |
| 71 bool_gzip = True if argv[3].lower().capitalize() == 'True' else False | |
| 72 UMI_5_prime_length = argv[4] | |
| 73 UMI_3_prime_length = argv[5] | |
| 74 UMI_processing(pathToFastaFile, output, bool_gzip, UMI_5_prime_length, UMI_3_prime_length) | |
| 75 | |
| 76 if __name__ == "__main__": | |
| 77 main() |
