Mercurial > repos > imgteam > landmark_registration
comparison landmark_registration.py @ 4:aee73493bf53 draft
planemo upload for repository https://github.com/BMCV/galaxy-image-analysis/tools/landmark_registration/ commit ba383a4f617791d0c84737a179e5b4b66cecc334
author | imgteam |
---|---|
date | Mon, 18 Jul 2022 18:41:19 +0000 |
parents | 4e089a0983b1 |
children | 12997d4c5b00 |
comparison
equal
deleted
inserted
replaced
3:9ccd642e7ae2 | 4:aee73493bf53 |
---|---|
8 | 8 |
9 import argparse | 9 import argparse |
10 | 10 |
11 import numpy as np | 11 import numpy as np |
12 import pandas as pd | 12 import pandas as pd |
13 from scipy import spatial | |
13 from scipy.linalg import lstsq | 14 from scipy.linalg import lstsq |
14 from skimage.measure import ransac | 15 from skimage.measure import ransac |
15 from skimage.transform import AffineTransform | 16 from skimage.transform import AffineTransform |
16 | 17 |
17 | 18 |
18 def landmark_registration(pts_f1, pts_f2, out_f, res_th=None, max_ite=None, delimiter="\t"): | 19 class pwlTransform(object): |
20 | |
21 def __init__(self): | |
22 self.triangulation = None | |
23 self.affines = None | |
24 | |
25 def estimate(self, src, dst): | |
26 self.triangulation = spatial.Delaunay(src) | |
27 success = True | |
28 self.affines = [] | |
29 for tri in self.triangulation.simplices: | |
30 affine = AffineTransform() | |
31 success &= affine.estimate(src[tri, :], dst[tri, :]) | |
32 self.affines.append(affine) | |
33 return success | |
34 | |
35 def __call__(self, coords): | |
36 simplex = self.triangulation.find_simplex(coords) | |
37 simplex[simplex == -1] = 0 # todo: dealing with points outside the triangulation | |
38 out = np.empty_like(coords, np.float64) | |
39 for i in range(len(self.triangulation.simplices)): | |
40 idx = simplex == i | |
41 out[idx, :] = self.affines[i](coords[idx, :]) | |
42 return out | |
43 | |
44 | |
45 def landmark_registration(pts_f1, pts_f2, out_f, pts_f=None, res_th=None, max_ite=None, delimiter="\t"): | |
19 | 46 |
20 points1 = pd.read_csv(pts_f1, delimiter=delimiter) | 47 points1 = pd.read_csv(pts_f1, delimiter=delimiter) |
21 points2 = pd.read_csv(pts_f2, delimiter=delimiter) | 48 points2 = pd.read_csv(pts_f2, delimiter=delimiter) |
22 | 49 |
23 src = np.concatenate([np.array(points1['x']).reshape([-1, 1]), | 50 src = np.concatenate([np.array(points1['x']).reshape([-1, 1]), |
28 axis=-1) | 55 axis=-1) |
29 | 56 |
30 if res_th is not None and max_ite is not None: | 57 if res_th is not None and max_ite is not None: |
31 model_robust, inliers = ransac((src, dst), AffineTransform, min_samples=3, residual_threshold=res_th, max_trials=max_ite) | 58 model_robust, inliers = ransac((src, dst), AffineTransform, min_samples=3, residual_threshold=res_th, max_trials=max_ite) |
32 pd.DataFrame(model_robust.params).to_csv(out_f, header=None, index=False, sep="\t") | 59 pd.DataFrame(model_robust.params).to_csv(out_f, header=None, index=False, sep="\t") |
60 | |
61 elif pts_f is not None: | |
62 pwlt = pwlTransform() | |
63 pwlt.estimate(src, dst) | |
64 | |
65 pts_df = pd.read_csv(pts_f, delimiter=delimiter) | |
66 pts = np.concatenate([np.array(pts_df['x']).reshape([-1, 1]), | |
67 np.array(pts_df['y']).reshape([-1, 1])], | |
68 axis=-1) | |
69 pts_pwlt = pwlt(pts) | |
70 | |
71 df = pd.DataFrame() | |
72 df['x'] = pts_pwlt[:, 0] | |
73 df['y'] = pts_pwlt[:, 1] | |
74 df.to_csv(out_f, index=False, sep="\t", float_format='%.1f') | |
33 | 75 |
34 else: | 76 else: |
35 A = np.zeros((src.size, 6)) | 77 A = np.zeros((src.size, 6)) |
36 A[0:src.shape[0], [0, 1]] = src | 78 A[0:src.shape[0], [0, 1]] = src |
37 A[0:src.shape[0], 2] = 1 | 79 A[0:src.shape[0], 2] = 1 |
45 tmat[1, :] = x[0].take([3, 4, 5]) | 87 tmat[1, :] = x[0].take([3, 4, 5]) |
46 pd.DataFrame(tmat).to_csv(out_f, header=None, index=False, sep="\t") | 88 pd.DataFrame(tmat).to_csv(out_f, header=None, index=False, sep="\t") |
47 | 89 |
48 | 90 |
49 if __name__ == "__main__": | 91 if __name__ == "__main__": |
50 parser = argparse.ArgumentParser(description="Estimate affine transformation matrix based on landmark coordinates") | 92 parser = argparse.ArgumentParser(description="estimates the affine transformation matrix or performs piecewiese affine transformation based on landmarks") |
51 parser.add_argument("fn_pts1", help="Coordinates of SRC landmarks (tsv file)") | 93 parser.add_argument("fn_lmkmov", help="Coordinates of moving landmarks (tsv file)") |
52 parser.add_argument("fn_pts2", help="Coordinates of DST landmarks (tsv file)") | 94 parser.add_argument("fn_lmkfix", help="Coordinates of fixed landmarks (tsv file)") |
53 parser.add_argument("fn_tmat", help="Path the output (transformation matrix)") | 95 parser.add_argument("fn_out", help="Path to the output") |
96 parser.add_argument("--pwlt", dest="fn_ptsmov", help="Coordinates of points to be transformed (tsv file)") | |
54 parser.add_argument("--res_th", dest="res_th", type=float, help="Maximum distance for a data point to be classified as an inlier") | 97 parser.add_argument("--res_th", dest="res_th", type=float, help="Maximum distance for a data point to be classified as an inlier") |
55 parser.add_argument("--max_ite", dest="max_ite", type=int, help="Maximum number of iterations for random sample selection") | 98 parser.add_argument("--max_ite", dest="max_ite", type=int, help="Maximum number of iterations for random sample selection") |
56 args = parser.parse_args() | 99 args = parser.parse_args() |
57 | 100 |
101 fn_ptsmov = None | |
102 if args.fn_ptsmov: | |
103 fn_ptsmov = args.fn_ptsmov | |
58 res_th = None | 104 res_th = None |
59 if args.res_th: | 105 if args.res_th: |
60 res_th = args.res_th | 106 res_th = args.res_th |
61 max_ite = None | 107 max_ite = None |
62 if args.max_ite: | 108 if args.max_ite: |
63 max_ite = args.max_ite | 109 max_ite = args.max_ite |
64 | 110 |
65 landmark_registration(args.fn_pts1, args.fn_pts2, args.fn_tmat, res_th=res_th, max_ite=max_ite) | 111 landmark_registration(args.fn_lmkmov, args.fn_lmkfix, args.fn_out, pts_f=fn_ptsmov, res_th=res_th, max_ite=max_ite) |