view proteinpilot_wrapper.py @ 3:4afba45f01e8

Improved some datatype handling
author galaxyp
date Thu, 20 Jun 2013 11:05:37 -0400
parents 790d80981060
children
line wrap: on
line source

#!/usr/bin/env python
import optparse
import os
import sys
import tempfile
import subprocess
import time
import shutil
import logging
from xml.sax.saxutils import escape

log = logging.getLogger(__name__)

DEBUG = True

working_directory = os.getcwd()
tmp_stderr_name = tempfile.NamedTemporaryFile(dir=working_directory, suffix='.stderr').name
tmp_stdout_name = tempfile.NamedTemporaryFile(dir=working_directory, suffix='.stdout').name


def stop_err(msg):
    sys.stderr.write("%s\n" % msg)
    sys.exit()


def read_stderr():
    stderr = ''
    if(os.path.exists(tmp_stderr_name)):
        with open(tmp_stderr_name, 'rb') as tmp_stderr:
            buffsize = 1048576
            try:
                while True:
                    stderr += tmp_stderr.read(buffsize)
                    if not stderr or len(stderr) % buffsize != 0:
                        break
            except OverflowError:
                pass
    return stderr


def execute(command, stdin=None):
    try:
        with open(tmp_stderr_name, 'wb') as tmp_stderr:
            with open(tmp_stdout_name, 'wb') as tmp_stdout:
                proc = subprocess.Popen(args=command, shell=True, stderr=tmp_stderr.fileno(), stdout=tmp_stdout.fileno(), stdin=stdin, env=os.environ)
                returncode = proc.wait()
                if returncode != 0:
                    raise Exception("Program returned with non-zero exit code %d. stderr: %s" % (returncode, read_stderr()))
    finally:
        print open(tmp_stderr_name, "r").read(64000)
        print open(tmp_stdout_name, "r").read(64000)


def delete_file(path):
    if os.path.exists(path):
        try:
            os.remove(path)
        except:
            pass

def delete_directory(directory):
    if os.path.exists(directory):
        try:
            shutil.rmtree(directory)
        except:
            pass

def symlink(source, link_name):
    import platform
    if platform.system() == 'Windows':
        try:
            import win32file
            win32file.CreateSymbolicLink(source, link_name, 1)
        except:
            shutil.copy(source, link_name)
    else:
        os.symlink(source, link_name)


def copy_to_working_directory(data_file, relative_path):
    if os.path.abspath(data_file) != os.path.abspath(relative_path):
        shutil.copy(data_file, relative_path)
    return relative_path

def __main__():
    run_script()

#ENDTEMPLATE

from string import Template

METHOD_TEMPLATE = """<UISETTINGS>
<UI_SAMPLE_TYPE>$sample_type</UI_SAMPLE_TYPE>
<UI_QUANT_TYPE>$quant_type</UI_QUANT_TYPE>
<UI_BACKGROUND_CORRECTION>$background_correction</UI_BACKGROUND_CORRECTION>
<UI_BIAS_CORRECTION>$bias_correction</UI_BIAS_CORRECTION>
<UI_CYS_ALKYLATION>$cys_alkylation</UI_CYS_ALKYLATION>
<UI_DIGESTION>$digestion</UI_DIGESTION>
<UI_SPECIAL_FACTOR>$special_factors</UI_SPECIAL_FACTOR>
<UI_INSTRUMENT>$instrument</UI_INSTRUMENT>
<UI_SPECIES></UI_SPECIES>
<UI_USER_NAME></UI_USER_NAME>
<UI_MACHINE_NAME></UI_MACHINE_NAME>
<UI_START_TIME></UI_START_TIME>
<UI_SEARCH_ID></UI_SEARCH_ID>
<UI_ID_FOCUS>$search_foci</UI_ID_FOCUS>
<UI_SEARCH_EFFORT>$search_effort</UI_SEARCH_EFFORT>
<UI_SEARCH_RESOURCE>$database_name</UI_SEARCH_RESOURCE>
<UI_MIN_UNUSED_PROTSCORE>$min_unused_protscore</UI_MIN_UNUSED_PROTSCORE>
<UI_PSPEP>$pspep</UI_PSPEP>
<UI_MAX_QUANT_LABELS>$max_quant_labels</UI_MAX_QUANT_LABELS>
$quant_labels
</UISETTINGS>
"""

quant_special_cases = {
    "iTRAQ 4plex (Peptide Labeled)": "iTRAQ4PLEX",
    "iTRAQ 4plex (Protein Labeled)": "iTRAQ4PLEX",
    "iTRAQ 8plex (Peptide Labeled)": "iTRAQ8PLEX",
    "iTRAQ 8plex (Protein Labeled)": "iTRAQ8PLEX",
    "mTRAQ (Peptide Labeled - M00, M04)": "mTRAQ_0-4",
    "mTRAQ (Peptide Labeled - M00, M08)": "mTRAQ_0-8",
    "mTRAQ (Peptide Labeled - M04, M08)": "mTRAQ_4-8",
    "mTRAQ (Peptide Labeled - M00, M04, M08)": "mTRAQ_0-4-8",
    "Proteolytic O-18 labeling": "Proteolytic O-18 v O-16",
    "Cleavable ICAT": "ICAT9",
    "ICPL Light, Heavy (Peptide Labeled)": "ICPL peptide",
    "ICPL Light, Heavy (Protein Labeled)": "ICPL protein",
}


def parse_groups(inputs_file, group_parts=["group"], input_parts=["name", "path"]):
    inputs_lines = [line.strip() for line in open(inputs_file, "r").readlines()]
    inputs_lines = [line for line in inputs_lines if line and not line.startswith("#")]
    cur_group = None
    i = 0
    group_prefixes = ["%s:" % group_part  for group_part in group_parts]
    input_prefixes = ["%s:" % input_part for input_part in input_parts]
    groups = {}
    while i < len(inputs_lines):
        line = inputs_lines[i]
        if line.startswith(group_prefixes[0]):
            # Start new group
            cur_group = line[len(group_prefixes[0]):]
            group_data = {}
            for j, group_prefix in enumerate(group_prefixes):
                group_line = inputs_lines[i + j]
                group_data[group_parts[j]] = group_line[len(group_prefix):]
            i += len(group_prefixes)
        elif line.startswith(input_prefixes[0]):
            input = []
            for j, input_prefix in enumerate(input_prefixes):
                part_line = inputs_lines[i + j]
                part = part_line[len(input_prefixes[j]):]
                input.append(part)
            if cur_group not in groups:
                groups[cur_group] = {"group_data": group_data, "inputs": []}
            groups[cur_group]["inputs"].append(input)
            i += len(input_prefixes)
        else:
            # Skip empty line
            i += 1
    return groups


def get_env_property(name, default):
    if name in os.environ:
        return os.environ[name]
    else:
        return default


def build_quant_label(reagent, quant_type="Not Used", treatment="", minus2="0", minus1="0", plus1="0", plus2="0"):
    return {
    "reagent": reagent,
    "type": quant_type,
    "treatment": treatment,
    "minus2": minus2,
    "minus1": minus1,
    "plus1": plus1,
    "plus2": plus2,
    }


def build_quant_labels(options, quant_type):
    if quant_type == "iTRAQ8PLEX":
        return [
            build_quant_label("iTRAQ113", plus1="6.89", plus2="0.24"),
            build_quant_label("iTRAQ114", minus1="0.94", plus1="5.9", plus2="0.16"),
            build_quant_label("iTRAQ115", minus1="1.88", plus1="4.9", plus2="0.1"),
            build_quant_label("iTRAQ116", minus1="2.82", plus1="3.9", plus2="0.07"),
            build_quant_label("iTRAQ117", minus2="0.06", minus1="3.77", plus1="2.88"),
            build_quant_label("iTRAQ118", minus2="0.09", minus1="4.71", plus1="1.91"),
            build_quant_label("iTRAQ119", minus2="0.14", minus1="5.66", plus1="0.87"),
            build_quant_label("iTRAQ121", minus2="0.27", minus1="7.44", plus1="0.18"),
        ]
    elif quant_type == "iTRAQ4PLEX":
        return [
            build_quant_label("iTRAQ114", minus1="1.00", plus1="5.9", plus2="0.20"),
            build_quant_label("iTRAQ115", minus1="2.00", plus1="5.6", plus2="0.1"),
            build_quant_label("iTRAQ116", minus1="3.00", plus1="4.5", plus2="0.1"),
            build_quant_label("iTRAQ117", minus2="0.10", minus1="4.00", plus1="3.50", plus2="0.1"),
        ]
    else:
        return []


def join_quant_labels(labels):
    template = '<QUANT_LABEL_SETTING reagent="$reagent" type="$type" treatment="$treatment" minus2="$minus2" minus1="$minus1" plus1="$plus1" plus2="$plus2"/>'
    return "\n".join([Template(template).substitute(quant_label) for quant_label in labels])


def handle_sample_type(options, parameter_dict):
    sample_type = options.sample_type
    if sample_type in quant_special_cases:
        quant_type = quant_special_cases[sample_type]
    else:
        quant_type = sample_type
    if options.quantitative.upper() != "TRUE":
        quant_type = ""
    parameter_dict["sample_type"] = sample_type
    parameter_dict["quant_type"] = quant_type
    parameter_dict["quant_labels"] = join_quant_labels(build_quant_labels(options, quant_type))


def setup_database(options):
    PROTEINPILOT_DATABASE_DIR = get_env_property("PROTEIN_PILOT_DATABASE_FOLDER", "C:\\AB SCIEX\\ProteinPilot Data\\SearchDatabases")
    database_path = options.database
    database_name = options.database_name
    database_name = database_name.replace(" ", "_")
    (database_basename, extension) = os.path.splitext(database_name)
    base = os.path.join(PROTEINPILOT_DATABASE_DIR, "gx_%s" % database_basename)
    database_destination = get_unique_path(base, ".fasta")
    symlink(database_path, database_destination)
    return (database_destination, os.path.basename(os.path.splitext(database_destination)[0]))


def extract_list(parameter):
    if parameter == None or parameter == "None":
        parameter = ""
    return parameter.replace(",", ";")


def setup_methods(options):
    ## Setup methods file
    (database_path, database_name) = setup_database(options)
    special_factors = extract_list(options.special_factors)
    search_foci = extract_list(options.search_foci)
    method_parameters = {
        "background_correction": options.background_correction,
        "bias_correction": options.bias_correction,
        "cys_alkylation": options.cys_alkylation,
        "digestion": options.digestion,
        "instrument": options.instrument,
        "search_effort": options.search_effort,
        "search_foci": search_foci,
        "pspep": options.pspep,
        "min_unused_protscore": options.min_unused_protscore,
        "max_quant_labels": "3",
        "database_name": database_name,
        "quantitative": options.quantitative,
        "special_factors": special_factors
    }
    handle_sample_type(options, method_parameters)
    method_contents = Template(METHOD_TEMPLATE).substitute(method_parameters)
    PROTEINPILOT_METHODS_DIR = get_env_property("PROTEIN_PILOT_METHODS_FOLDER", "C:\\ProgramData\\AB SCIEX\\ProteinPilot\\ParagonMethods\\")
    methods_name = "gx_%s" % os.path.split(os.getcwd())[-1]
    methods_path = os.path.join(PROTEINPILOT_METHODS_DIR, "%s.xml" % methods_name)
    open(methods_path, "w").write(method_contents)
    return (methods_name, methods_path, database_path)


def setup_inputs(inputs):
    links = []
    for input_data in inputs:
        input_name = input_data[0]
        input = input_data[1]
        if DEBUG:
            print "Processing input %s with name %s and size %d" % (input, input_name, os.stat(input).st_size)
        if not input_name.upper().endswith(".MGF"):
            input_name = "%s.mgf" % input_name
        link_path = os.path.abspath(input_name)
        symlink(input, link_path)
        links.append(link_path)
    return ",".join(["<DATA type=\"MGF\" filename=\"%s\" />" % escape(link) for link in links])


def get_unique_path(base, extension):
    """
    """
    return "%s_%d%s" % (base, int(time.time() * 1000), extension)


def move_pspep_output(options, destination, suffix):
    if destination:
        source = "%s__FalsePositiveAnalysis__%s.csv" % (options.output, suffix)
        shutil.move(source, destination)


def run_script():
    parser = optparse.OptionParser()
    parser.add_option("--input_config")
    parser.add_option("--database")
    parser.add_option("--database_name")
    parser.add_option("--instrument")
    parser.add_option("--sample_type")  # TODO: Restrict values
    parser.add_option("--bias_correction", default="False")
    parser.add_option("--background_correction", default="False")
    parser.add_option("--cys_alkylation", default="None")
    parser.add_option("--digestion", default="Trypsin")
    parser.add_option("--special_factors", default="")
    parser.add_option("--search_foci", default="")
    parser.add_option("--search_effort", default="Rapid")
    parser.add_option("--min_unused_protscore", default="3")
    parser.add_option("--quantitative", default="False")
    parser.add_option("--pspep", default="TRUE")
    parser.add_option("--output")
    parser.add_option("--output_methods")
    #parser.add_option("--output_pspep_peptide", default="")
    #parser.add_option("--output_pspep_protein", default="")
    #parser.add_option("--output_pspep_spectra", default="")
    parser.add_option("--output_pspep_report", default="")
    (options, args) = parser.parse_args()

    (methods_name, methods_path, database_path) = setup_methods(options)
    try:
        group_file = "%s.group" % options.output
        input_contents_template = """<PROTEINPILOTPARAMETERS>
    <METHOD name="$methods_name" />
    $inputs
    <RESULT filename="$output" />
</PROTEINPILOTPARAMETERS>"""
        input_config = options.input_config
        group_data = parse_groups(input_config)
        group_values = group_data.values()
        # Not using groups right now.
        assert len(group_values) == 1, len(group_values)
        inputs = group_data.values()[0]["inputs"]
        input_parameters = {
            "inputs": setup_inputs(inputs),
            "output": group_file,
            "methods_name": methods_name
        }

        input_contents = Template(input_contents_template).substitute(input_parameters)
        open("input.xml", "w").write(input_contents)

        protein_pilot_path = get_env_property("PROTEIN_PILOT_PATH", "")
        if protein_pilot_path and not protein_pilot_path.endswith("\\"):
            protein_pilot_path = "%s" % protein_pilot_path
        execute("%sProteinPilot.exe input.xml" % protein_pilot_path)
        shutil.move(group_file, options.output)
        #move_pspep_output(options, options.output_pspep_spectra, "SpectralLevelData")
        #move_pspep_output(options, options.output_pspep_peptide, "DistinctPeptideLevelData")
        #move_pspep_output(options, options.output_pspep_protein, "ProteinLevelData")
        if options.output_pspep_report:
            source = "%s__FDR.xlsx" % (options.output)
            shutil.move(source, options.output_pspep_report)
        shutil.move(methods_path, options.output_methods)
    finally:
        delete_file(database_path)
        delete_file(methods_path)

if __name__ == '__main__':
    __main__()