Mercurial > repos > galaxyp > iedb_netmhcpan
diff nextgen_iedb_api.py @ 0:88e44dab2988 draft default tip
planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/iedb_netmhcpan commit 0ac7534c8d9f5bfea21b998286f822784e62da08
author | galaxyp |
---|---|
date | Wed, 09 Jul 2025 12:56:30 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/nextgen_iedb_api.py Wed Jul 09 12:56:30 2025 +0000 @@ -0,0 +1,372 @@ +#!/usr/bin/env python + +""" +This file was adapted from the iedb_apy.py file of the iedb_api tool. +It uses the newer "Next-Generation" IEDB API, and is constrained to +the mhci and mhcii tool groups. +""" + +import argparse +import json +import os.path +import re +import sys +import time +import urllib.request +from urllib.error import HTTPError + +# IEDB tool groups and predictor methods +mhci_methods = ['netmhcpan_el', 'netmhcpan_ba'] +mhcii_methods = ['netmhciipan_el', 'netmhciipan_ba'] +tool_group_methods = {'mhci': mhci_methods, + 'mhcii': mhcii_methods} +all_methods = set(mhci_methods + mhcii_methods) + +# Values for polling backoff +max_backoff_count = 25 +init_poll_sleep = 10 +poll_retries = 50 +requests_before_backoff = 5 + + +def parse_alleles(allelefile): + """Returns a dictionary with alleles from input file.""" + alleles = [] + with open(allelefile, 'r') as fh: + for line in fh: + allele = line.strip() + alleles.append(allele) + return alleles + + +def parse_sequence_column(sequence_file_lines, col): + """Sequences may come from a specific column in a TSV file. + + Parse these sequences out while checking each against a regex for validity. + """ + aapat = '^[ABCDEFGHIKLMNPQRSTVWY]+$' + sequences = [] + for i, line in enumerate(sequence_file_lines): + fields = line.split('\t') + if len(fields) > col: + seq = re.sub('[_*]', '', fields[col].strip()) + if not re.match(aapat, seq): + warn_err(f'Line {i}, Not a peptide: {seq}') + else: + sequences.append(seq) + else: + warn_err('Invalid value for -c/--column') + break + return sequences + + +def iedb_request(req, timeout, retries, error_retry_sleep, response_fn=None, req_data=None): + """Handles HTTP request and exceptions. Allows for a callback to parse IEDB response.""" + for retry in range(1, retries + 1): + response = None + try: + response = urllib.request.urlopen(req, req_data, timeout=timeout) + except HTTPError as e: + warn_err(f'{retry} of {retries} Error connecting to IEDB server. \ + HTTP status code: {e.code}') + time.sleep(error_retry_sleep) + except Exception as e: + warn_err(f'Error getting results from IEDB server: {e}') + return None + + if response and response.getcode() == 200: + # If no callback, return results + if not response_fn: + response_string = response.read().decode('utf-8') + response_json = json.loads(response_string) + return response_json + + # Retry if response_fn callback deems necessary, i.e. results from job are not ready. + response_json = response_fn(response, retry) + if response_json: + return response_json + else: + code = response.getcode() if response else 1 + warn_err(f'Error connecting to IEDB server. HTTP status code: {code}') + + warn_err(f'No successful response from IEDB in {retries} retries') + return None + + +def pipeline_request(url, tool_group, sequence_text, alleles, length_range, + methods, peptide_shift, timeout=300, retries=3, error_retry_sleep=300): + """Submits job to IEDB pipeline and polls API until results are ready. + + Returns response JSON from IEDB. + """ + + # Set up input parameters for IEDB NetMHCPan or NetMHCIIPan job + input_parameters = { + 'alleles': alleles, + 'peptide_length_range': length_range, + 'predictors': [{'type': 'binding', 'method': m} for m in methods], + 'peptide_shift': peptide_shift + } + + if peptide_shift: + input_parameters['peptide_shift'] = peptide_shift + + stage = { + 'stage_number': 1, + 'tool_group': tool_group, + 'input_sequence_text': sequence_text, + 'input_parameters': input_parameters + } + + params = { + 'pipeline_id': "", + 'run_stage_range': [1, 1], + 'stages': [stage] + } + + req = urllib.request.Request(url, method='POST') + req_data = json.dumps(params).encode('utf-8') + req.add_header('Content-Type', 'application/json; charset=utf-8') + req.add_header('Content-Length', len(req_data)) + + # Make an initial request to submit job + response_json = iedb_request(req, timeout, retries, error_retry_sleep, req_data=req_data) + if not response_json: + warn_err('Initial request failed.') + return None + + # Check response from job submission + warnings = response_json.get('warnings') + if warnings and len(warnings) > 0: + invalid_alleles = False + for warning in warnings: + if 'cannot predict binding for allele' in warning: + warn_err(f"Error: Bad allelle input. {warning}") + invalid_alleles = True + if invalid_alleles: + return None + + warn_err(f'Warnings from IEDB: {warnings}') + + errors = response_json.get('errors') + if errors: + warn_err(f'Errors from IEDB: {errors}') + return None + + results_uri = response_json.get('results_uri') + if not results_uri: + warn_err('No results URI provided from IEDB.') + return None + + # Callback function to rate-limit poll requests + def poll_response_fn(response, retry): + response_string = response.read().decode('utf-8') + response_json = json.loads(response_string) + if response_json['status'] != 'done': + if retry == poll_retries: + warn_err('Job not finished in maximum allowed time.') + backoff_count = min(retry, max_backoff_count) + + # Double sleep every requests_before_backoff requests + sleep_duration = init_poll_sleep * 2 ** (backoff_count // requests_before_backoff) + time.sleep(sleep_duration) + return None + return response_json + + # Submit polling for results + response_json = iedb_request(results_uri, timeout, poll_retries, + error_retry_sleep, response_fn=poll_response_fn) + if not response_json: + warn_err('Retrieving results failed.') + return response_json + + +def warn_err(msg, exit_code=None): + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + if exit_code: + sys.exit(exit_code) + + +def add_reversed_sequences(file_lines, file_format): + """Adds a reversed sequence after each input sequence. + + Takes a plain list of sequences, or FASTA file. Each reversed FASTA sequence has + the same header prefixed with 'reversed_'. + """ + sequences_with_reversed = [] + if file_format == 'fasta': + i = 0 + while i < len(file_lines): + + # Validate header from next sequence + seq_header = file_lines[i] + if seq_header[0] != '>': + print('Invalid FASTA. Exiting.', file=sys.stderr) + sys.exit(1) + + # Aggregate sequence into a single line + j = i + 1 + seq = '' + while j < len(file_lines): + next_line = file_lines[j] + if next_line[0] == '>': + break + seq = seq + file_lines[j] + j += 1 + + # Add non-reversed sequence + sequences_with_reversed.append(seq_header) + sequences_with_reversed.append(seq) + + # Add reversed header and sequence + rev_header = seq_header.replace('>', '>reversed_') + sequences_with_reversed.append(rev_header) + + rev_seq = seq[::-1] + sequences_with_reversed.append(rev_seq) + + # Advance index to what should be the next sequence header + i = j + + # If not FASTA, should be a simple list of peptides to reverse sequentially + else: + for seq in file_lines: + + # Reverse seq + rev_seq = f'{seq[::-1]}' + + # Add original and reversed sequences + sequences_with_reversed.append(seq) + sequences_with_reversed.append(rev_seq) + + return sequences_with_reversed + + +def __main__(): + # Parse Command Line + parser = argparse.ArgumentParser() + parser.add_argument('-T', '--tool-group', + dest='tool_group', + default='mhci', + choices=tool_group_methods.keys(), + help='IEDB API Tool Group') + parser.add_argument('-m', '--method', + action="append", + required=True, + choices=all_methods, + help='prediction method') + parser.add_argument('-A', '--allelefile', + required=True, + help='File of HLA alleles') + parser.add_argument('-l', '--lengthrange', + help='length range for which to make predictions for alleles') + parser.add_argument('-P', '--peptide_shift', + type=int, + default=None, + help='Peptide Shift') + parser.add_argument('-i', '--input', + required=True, + help='Input file for peptide sequences ' + + '(fasta or tabular)') + parser.add_argument('-c', '--column', + default=None, + help='Zero-indexed peptide column in a tabular input file') + parser.add_argument('-o', '--output', + required=True, + help='Output file for query results') + parser.add_argument('-t', '--timeout', + type=int, + default=600, + help='Seconds to wait for server response') + parser.add_argument('-r', '--retries', + type=int, + default=5, + help='Number of times to retry failed server query') + parser.add_argument('-S', '--sleep', + type=int, + default=300, + help='Seconds to wait between failed server query retries') + parser.add_argument('-R', '--add-reversed', + dest='add_reversed', + action='store_true', + help='Input has every other sequence reversed. Identify in output.') + args = parser.parse_args() + + allele_string = ','.join(parse_alleles(args.allelefile)) + + length_range = [int(i) for i in args.lengthrange.split(',')] + + pipeline_url = 'https://api-nextgen-tools.iedb.org/api/v1/pipeline' + + # If sequences submitted as a file, parse out sequences. + try: + with open(args.input) as inf: + sequence_file_contents = inf.read() + except Exception as e: + warn_err(f'Unable to open input file: {e}', exit_code=1) + + sequence_file_lines = sequence_file_contents.splitlines() + + # Pick out sequences if input file has multiple columns, + # otherwise submit list of sequences as-is. + if not args.column: + # IEDB may take FASTA files directly, so input contents as-is + if args.add_reversed: + sequence_text = '\n'.join(add_reversed_sequences(sequence_file_lines, 'fasta')) + else: + sequence_text = sequence_file_contents + else: + sequences = parse_sequence_column(sequence_file_lines, int(args.column)) + if args.add_reversed: + sequence_text = '\n'.join(add_reversed_sequences(sequences, 'tsv')) + else: + sequence_text = '\n'.join(sequences) + + if len(sequence_text) == 0: + warn_err('Error parsing sequences', exit_code=1) + + # Submit job and return results + results = pipeline_request(pipeline_url, args.tool_group, sequence_text, + allele_string, length_range, args.method, + peptide_shift=args.peptide_shift, timeout=args.timeout, + retries=args.retries, error_retry_sleep=args.sleep) + if not results: + warn_err('Job failed. Exiting.', exit_code=1) + + try: + peptide_table = [t for t in results['data']['results'] if t['type'] == 'peptide_table'][0] + peptide_table_data = peptide_table['table_data'] + peptide_table_columns = peptide_table['table_columns'] + + # If we reversed peptides prior to IEDB input, + # find column index of sequence number so we can identify which come from reversed input. + if args.add_reversed: + for i, column in enumerate(peptide_table_columns): + if column['display_name'] == 'seq #': + seq_num_index = i + break + except (KeyError, IndexError) as e: + warn_err(f'Error parsing IEDB results: {e}', exit_code=1) + + output_path = os.path.abspath(args.output) + with open(output_path, 'w') as output_file: + # Write column names + display_names = '\t'.join([c['display_name'] for c in peptide_table_columns] + ['reversed']) + print(display_names, file=output_file) + + # Write data + for values in peptide_table_data: + if args.add_reversed: + seq_number = values[seq_num_index] + # Every original input sequence is followed by its reversed sequence, + # so we know even sequence numbers are reversed. + reversed_val = str(seq_number % 2 == 0).lower() + else: + reversed_val = 'false' + values = '\t'.join([str(v) for v in values] + [reversed_val]) + print(values, file=output_file) + + +if __name__ == "__main__": + __main__()