diff landmark_registration.py @ 4:aee73493bf53 draft

planemo upload for repository https://github.com/BMCV/galaxy-image-analysis/tools/landmark_registration/ commit ba383a4f617791d0c84737a179e5b4b66cecc334
author imgteam
date Mon, 18 Jul 2022 18:41:19 +0000
parents 4e089a0983b1
children 12997d4c5b00
line wrap: on
line diff
--- a/landmark_registration.py	Sat Feb 26 17:14:05 2022 +0000
+++ b/landmark_registration.py	Mon Jul 18 18:41:19 2022 +0000
@@ -10,12 +10,39 @@
 
 import numpy as np
 import pandas as pd
+from scipy import spatial
 from scipy.linalg import lstsq
 from skimage.measure import ransac
 from skimage.transform import AffineTransform
 
 
-def landmark_registration(pts_f1, pts_f2, out_f, res_th=None, max_ite=None, delimiter="\t"):
+class pwlTransform(object):
+
+    def __init__(self):
+        self.triangulation = None
+        self.affines = None
+
+    def estimate(self, src, dst):
+        self.triangulation = spatial.Delaunay(src)
+        success = True
+        self.affines = []
+        for tri in self.triangulation.simplices:
+            affine = AffineTransform()
+            success &= affine.estimate(src[tri, :], dst[tri, :])
+            self.affines.append(affine)
+        return success
+
+    def __call__(self, coords):
+        simplex = self.triangulation.find_simplex(coords)
+        simplex[simplex == -1] = 0    # todo: dealing with points outside the triangulation
+        out = np.empty_like(coords, np.float64)
+        for i in range(len(self.triangulation.simplices)):
+            idx = simplex == i
+            out[idx, :] = self.affines[i](coords[idx, :])
+        return out
+
+
+def landmark_registration(pts_f1, pts_f2, out_f, pts_f=None, res_th=None, max_ite=None, delimiter="\t"):
 
     points1 = pd.read_csv(pts_f1, delimiter=delimiter)
     points2 = pd.read_csv(pts_f2, delimiter=delimiter)
@@ -31,6 +58,21 @@
         model_robust, inliers = ransac((src, dst), AffineTransform, min_samples=3, residual_threshold=res_th, max_trials=max_ite)
         pd.DataFrame(model_robust.params).to_csv(out_f, header=None, index=False, sep="\t")
 
+    elif pts_f is not None:
+        pwlt = pwlTransform()
+        pwlt.estimate(src, dst)
+
+        pts_df = pd.read_csv(pts_f, delimiter=delimiter)
+        pts = np.concatenate([np.array(pts_df['x']).reshape([-1, 1]),
+                              np.array(pts_df['y']).reshape([-1, 1])],
+                             axis=-1)
+        pts_pwlt = pwlt(pts)
+
+        df = pd.DataFrame()
+        df['x'] = pts_pwlt[:, 0]
+        df['y'] = pts_pwlt[:, 1]
+        df.to_csv(out_f, index=False, sep="\t", float_format='%.1f')
+
     else:
         A = np.zeros((src.size, 6))
         A[0:src.shape[0], [0, 1]] = src
@@ -47,14 +89,18 @@
 
 
 if __name__ == "__main__":
-    parser = argparse.ArgumentParser(description="Estimate affine transformation matrix based on landmark coordinates")
-    parser.add_argument("fn_pts1", help="Coordinates of SRC landmarks (tsv file)")
-    parser.add_argument("fn_pts2", help="Coordinates of DST landmarks (tsv file)")
-    parser.add_argument("fn_tmat", help="Path the output (transformation matrix)")
+    parser = argparse.ArgumentParser(description="estimates the affine transformation matrix or performs piecewiese affine transformation based on landmarks")
+    parser.add_argument("fn_lmkmov", help="Coordinates of moving landmarks (tsv file)")
+    parser.add_argument("fn_lmkfix", help="Coordinates of fixed landmarks (tsv file)")
+    parser.add_argument("fn_out", help="Path to the output")
+    parser.add_argument("--pwlt", dest="fn_ptsmov", help="Coordinates of points to be transformed (tsv file)")
     parser.add_argument("--res_th", dest="res_th", type=float, help="Maximum distance for a data point to be classified as an inlier")
     parser.add_argument("--max_ite", dest="max_ite", type=int, help="Maximum number of iterations for random sample selection")
     args = parser.parse_args()
 
+    fn_ptsmov = None
+    if args.fn_ptsmov:
+        fn_ptsmov = args.fn_ptsmov
     res_th = None
     if args.res_th:
         res_th = args.res_th
@@ -62,4 +108,4 @@
     if args.max_ite:
         max_ite = args.max_ite
 
-    landmark_registration(args.fn_pts1, args.fn_pts2, args.fn_tmat, res_th=res_th, max_ite=max_ite)
+    landmark_registration(args.fn_lmkmov, args.fn_lmkfix, args.fn_out, pts_f=fn_ptsmov, res_th=res_th, max_ite=max_ite)