diff sequence_format_converter.py @ 0:a8aacccd79a3 draft

planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/sequence_format_converter commit d6ef80f9db43eae4f58b33f58b5ef6f8209907db
author artbio
date Mon, 04 Sep 2017 07:13:28 -0400
parents
children 9ce7ccd468aa
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sequence_format_converter.py	Mon Sep 04 07:13:28 2017 -0400
@@ -0,0 +1,208 @@
+#!/usr/bin/env python
+#
+import argparse
+import logging
+import sys
+from collections import defaultdict
+
+
+def Parser():
+    the_parser = argparse.ArgumentParser()
+    the_parser.add_argument(
+        '--input', action="store", type=str,
+        help="input file, accepted format: fastq, fasta, fasta_weigthed, \
+            tabular")
+    the_parser.add_argument(
+        '--output', action="store", type=str, help="output converted file")
+    the_parser.add_argument(
+        '--format', action="store", type=str,
+        help="select output format (fasta, fasta_weigthed, tabular")
+    args = the_parser.parse_args()
+    return args
+
+
+class Sequencing:
+
+    def __init__(self, input, output, format):
+        self.input = input
+        self.output = open(output, 'w')
+        self.outputformat = format
+        self.inputformat = self.detectformat(self.input)
+        self.seqdic = defaultdict(int)
+        self.read(self.input, self.inputformat)
+        self.write(self.output, self.outputformat)
+
+    def detectformat(self, input):
+        input = open(input, 'r')
+        block = []
+        reference = ['A', 'T', 'G', 'C', 'N']
+        format = ''
+        try:
+            for l in range(4):
+                block.append(input.readline()[:-1])
+        except:
+            logging.info("File hasn't at leat four lines !")
+            sys.exit("File hasn't at leat four lines !")
+        input.close()
+        line1, line2, line3, line4 = block[0], block[1], block[2], block[3]
+        if line1[0] == '>' and line3[0] == '>':
+            logging.info("'>' detected in lines 1 and 3")
+            sequence = ''.join([line2, line4]).upper()
+            nucleotides = set([base for base in sequence])
+            for nucleotide in nucleotides:
+                if nucleotide not in reference:
+                    logging.info("But other nucleotides that A, T, G, C or N")
+                    sys.exit('input appears to be Fasta but with \
+                              unexpected nucleotides')
+            format = 'fasta'
+        elif line1[0] == '>' and line4[0] == '>':
+            logging.info("'>' detected in lines 1 and 4")
+            sequence = ''.join([line2, line3]).upper()
+            nucleotides = set([base for base in sequence])
+            for nucleotide in nucleotides:
+                if nucleotide not in reference:
+                    logging.info("But other nucleotides that A, T, G, C or N")
+                    sys.exit('input appears to be Fasta but with \
+                              unexpected nucleotides')
+            format = 'fasta'
+        elif line1[0] == '>':
+            logging.info("'>' detected in lines 1")
+            sequence = ''.join([line2, line3, line4]).upper()
+            nucleotides = set([base for base in sequence])
+            for nucleotide in nucleotides:
+                if nucleotide not in reference:
+                    logging.info("But other nucleotides that A, T, G, C or N")
+                    sys.exit('input appears to be Fasta but with \
+                              unexpected nucleotides')
+            format = 'fasta'
+        if format == 'fasta':
+            try:
+                for line in block:
+                    if line[0] == '>':
+                        int(line.split('_')[-1])
+                return 'fastaw'
+            except:
+                return 'fasta'
+        if line1[0] == '@' and line3[0] == '+':
+            nucleotides = set([base for base in line2])
+            for nucleotide in nucleotides:
+                if nucleotide not in reference:
+                    logging.info("Looks like fastq input but other nucleotides \
+                                 that A, T, G, C or N")
+                    sys.exit("input appears to be Fastq \
+                             but with unexpected nucleotides")
+            return 'fastq'
+        for line in block:
+            if len(line.split('\t')) != 2:
+                logging.info("No valid format detected")
+                sys.exit('No valid format detected')
+            try:
+                int(line.split('\t')[-1])
+            except:
+                logging.info("No valid format detected")
+                sys.exit('No valid format detected')
+            for nucleotide in line.split('\t')[0]:
+                if nucleotide not in reference:
+                    logging.info("No valid format detected")
+                    sys.exit('No valid format detected')
+        return 'tabular'
+
+    def read(self, input, format):
+        input = open(input, 'r')
+        if format == 'fasta':
+            try:
+                self.readfasta(input)
+            except:
+                logging.info("an error occured while reading fasta")
+        elif format == 'fastaw':
+            try:
+                self.readfastaw(input)
+            except:
+                logging.info("an error occured while reading fastaw")
+        elif format == 'tabular':
+            try:
+                self.readtabular(input)
+            except:
+                logging.info("an error occured while reading tabular")
+        elif format == 'fastq':
+            try:
+                self.readfastq(input)
+            except:
+                logging.info("an error occured while reading fastq")
+        else:
+            logging.info("no valid format detected")
+            sys.exit('No valid format detected')
+
+    def readfastaw(self, input):
+        for line in input:
+            if line[0] == ">":
+                weigth = int(line[:-1].split("_")[-1])
+            else:
+                self.seqdic[line[:-1]] += weigth
+        input.close()
+
+    def readfasta(self, input):
+        ''' this method is able to read multi-line fasta sequence'''
+        for line in input:
+            if line[0] == ">":
+                try:
+                    #  to dump the sequence of the previous item
+                    #  try because of first missing stringlist variable
+                    self.seqdic["".join(stringlist)] += 1
+                except NameError:
+                    pass
+                stringlist = []
+            else:
+                try:
+                    stringlist.append(line[:-1])
+                except UnboundLocalError:
+                    # if file went through filter and contains only empty lines
+                    logging.info("first line is empty.")
+        try:
+            self.seqdic["".join(stringlist)] += 1  # for the last sequence
+        except NameError:
+            logging.info("input file has not fasta sequences.")
+        input.close()
+
+    def readtabular(self, input):
+        for line in input:
+            fields = line[:-1].split('\t')
+            self.seqdic[fields[0]] += int(fields[1])
+        input.close()
+
+    def readfastq(self, input):
+        linecount = 0
+        for line in input:
+            linecount += 1
+            if linecount % 4 == 2:
+                self.seqdic[line[:-1]] += 1
+        input.close()
+
+    def write(self, output, format='fasta'):
+        if format == 'fasta':
+            headercount = 0
+            for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True):
+                for i in range(self.seqdic[seq]):
+                    headercount += 1
+                    output.write('>%s\n%s\n' % (headercount, seq))
+        elif format == 'fastaw':
+            headercount = 0
+            for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True):
+                headercount += 1
+                output.write('>%s_%s\n%s\n' % (headercount,
+                                               self.seqdic[seq], seq))
+        elif format == 'tabular':
+            for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True):
+                output.write('%s\t%s\n' % (seq, self.seqdic[seq]))
+        output.close()
+
+
+def main(input, output, format):
+    Sequencing(input, output, format)
+
+
+if __name__ == "__main__":
+    args = Parser()
+    log = logging.getLogger(__name__)
+    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
+    main(args.input, args.output, args.format)