view butina_clustering.py @ 12:3b14765c22ee draft default tip

"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/chemicaltoolbox/chemfp commit 7fb96a3844b4771084f18de2346ed6d5e241d839"
author bgruening
date Sat, 25 Sep 2021 19:07:44 +0000
parents 70b071de9bee
children
line wrap: on
line source

#!/usr/bin/env python
"""
    Modified version of code examples from the chemfp project.
    http://code.google.com/p/chem-fingerprints/
    Thanks to Andrew Dalke of Andrew Dalke Scientific!
"""

import argparse
import os
import subprocess
import sys
import tempfile

import chemfp
from chemfp import search


def unix_sort(results):
    temp_unsorted = tempfile.NamedTemporaryFile(delete=False)
    for (i, indices) in enumerate(results.iter_indices()):
        temp_unsorted.write("%s %s\n" % (len(indices), i))
    temp_unsorted.close()
    temp_sorted = tempfile.NamedTemporaryFile(delete=False)
    temp_sorted.close()
    p = subprocess.Popen(
        ["sort", "-n", "-r", "-k", "1,1"],
        stdin=open(temp_unsorted.name),
        stdout=open(temp_sorted.name, "w+"),
    )
    stdout, stderr = p.communicate()
    return_code = p.returncode

    if return_code:
        sys.stdout.write(stdout)
        sys.stderr.write(stderr)
        sys.stderr.write("Return error code %i from command:\n" % return_code)
    temp_sorted.close()
    os.remove(temp_unsorted.name)

    for line in open(temp_sorted.name):
        size, fp_idx = line.strip().split()
        yield (int(size), int(fp_idx))

    os.remove(temp_sorted.name)


def butina(args):
    """
    Taylor-Butina clustering from the chemfp help.
    """
    out = args.output_path
    targets = chemfp.open(args.input_path, format="fps")
    arena = chemfp.load_fingerprints(targets)

    chemfp.set_num_threads(args.processors)
    results = search.threshold_tanimoto_search_symmetric(
        arena, threshold=args.tanimoto_threshold
    )
    results.reorder_all("move-closest-first")

    sorted_ids = unix_sort(results)

    # Determine the true/false singletons and the clusters
    true_singletons = []
    false_singletons = []
    clusters = []

    seen = set()
    # for (size, fp_idx, members) in results:
    for (size, fp_idx) in sorted_ids:
        members = results[fp_idx].get_indices()
        # print arena.ids[ fp_idx ], [arena.ids[ m ] for m in members]
        if fp_idx in seen:
            # Can't use a centroid which is already assigned
            continue
        seen.add(fp_idx)

        if size == 0:
            # The only fingerprint in the exclusion sphere is itself
            true_singletons.append(fp_idx)
            continue

        # Figure out which ones haven't yet been assigned
        unassigned = set(members) - seen

        if not unassigned:
            false_singletons.append(fp_idx)
            continue

        # this is a new cluster
        clusters.append((fp_idx, unassigned))
        seen.update(unassigned)

    len_cluster = len(clusters)
    # out.write( "#%s true singletons: %s\n" % ( len(true_singletons), " ".join(sorted(arena.ids[idx] for idx in true_singletons)) ) )
    # out.write( "#%s false singletons: %s\n" % ( len(false_singletons), " ".join(sorted(arena.ids[idx] for idx in false_singletons)) ) )

    out.write("#%s true singletons\n" % len(true_singletons))
    out.write("#%s false singletons\n" % len(false_singletons))
    out.write("#clusters: %s\n" % len_cluster)

    # Sort so the cluster with the most compounds comes first,
    # then by alphabetically smallest id
    def cluster_sort_key(cluster):
        centroid_idx, members = cluster
        return -len(members), arena.ids[centroid_idx]

    clusters.sort(key=cluster_sort_key)

    for centroid_idx, members in clusters:
        centroid_name = arena.ids[centroid_idx]
        out.write(
            "%s\t%s\t%s\n"
            % (centroid_name, len(members), " ".join(arena.ids[idx] for idx in members))
        )
        # ToDo: len(members) need to be some biggest top 90% or something ...

    for idx in true_singletons:
        out.write("%s\t%s\n" % (arena.ids[idx], 0))

    out.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="""Taylor-Butina clustering for fps files.
For more details please see the original publication or the chemfp documentation:
http://www.chemomine.co.uk/dbclus-paper.pdf
https://chemfp.readthedocs.org
"""
    )

    parser.add_argument(
        "-i",
        "--input",
        dest="input_path",
        required=True,
        help="Path to the input file.",
    )

    parser.add_argument(
        "-o",
        "--output",
        dest="output_path",
        type=argparse.FileType("w"),
        default=sys.stdout,
        help="Path to the output file.",
    )

    parser.add_argument(
        "-t",
        "--threshold",
        dest="tanimoto_threshold",
        type=float,
        default=0.8,
        help="Tanimoto threshold [0.8]",
    )

    parser.add_argument("-p", "--processors", type=int, default=4)

    options = parser.parse_args()
    butina(options)