Skip to content

cylindra.components

CylSpline

Bases: Spline

A spline object with cylindrical structure.

Source code in cylindra/components/spline/_cyl_spline.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class CylSpline(Spline):
    """A spline object with cylindrical structure."""

    @property
    def radius(self) -> nm | None:
        """Average radius of the cylinder."""
        return self.props.get_glob(H.radius, None)

    @radius.setter
    def radius(self, value: nm | None):
        if value is None:
            if H.radius in self.props.glob.columns:
                self.props.drop_glob(H.radius)
            return None
        value = float(value)
        if value <= 0:
            raise ValueError("Radius must be positive.")
        elif value != value:
            raise ValueError("Radius got NaN value.")
        self.props.update_glob([pl.Series(H.radius, [value], dtype=pl.Float32)])
        return None

    def radius_range(self, rc: nm | None = None) -> tuple[nm, nm]:
        """Return the range of the radius used for the cylindric coordinate."""
        if rc is None:
            if self.radius is None:
                raise ValueError("Radius is not set.")
            rc = self.radius
        cfg = self.config
        return (max(rc - cfg.thickness_inner, 0.0), rc + cfg.thickness_outer)

    @property
    def orientation(self) -> Ori:
        """Orientation of the spline."""
        return Ori(str(self.props.get_glob(H.orientation, "none")))

    @orientation.setter
    def orientation(self, value: Ori | str | None):
        if value is None:
            value = Ori.none
        else:
            value = Ori(value)
        self.props.update_glob([pl.Series(H.orientation, [str(value)])])
        return None

    def invert(self) -> CylSpline:
        """
        Invert the direction of spline. Also invert orientation if exists.

        Returns
        -------
        CylSpline
            Inverted object
        """
        # NOTE: invert() calls clip() internally.
        # We don't have to invert the orientation here.
        new = super().invert()
        new.props.update_loc(self.props.loc[::-1], self.props.window_size)
        return new

    def clip(self, start: float, stop: float) -> CylSpline:
        """
        Clip spline and generate a new one.

        This method does not convert spline bases. ``_lims`` is updated instead.
        For instance, if you want to clip spline at 20% to 80% position, call
        ``spl.clip(0.2, 0.8)``. If ``stop < start``, the orientation of spline
        will be inverted, thus the ``orientation`` attribute will also be inverted.

        Parameters
        ----------
        start : float
            New starting position.
        stop : float
            New stopping position.

        Returns
        -------
        CylSpline
            Clipped spline.
        """
        clipped = super().clip(start, stop)

        clipped.props.glob = self.props.glob.clone()
        clipped.props._binsize_glob = self.props._binsize_glob.copy()
        if start > stop:
            clipped.orientation = Ori.invert(self.orientation)
        else:
            clipped.orientation = self.orientation
        return clipped

    def restore(self) -> CylSpline:
        """
        Restore the original, not-clipped spline.

        Returns
        -------
        Spline
            Copy of the original spline.
        """
        original = super().restore()
        start, stop = self._lims
        if start > stop:
            original.orientation = Ori.invert(self.orientation)
        else:
            original.orientation = self.orientation
        return original

    def nrise(self, **kwargs) -> int:
        """Raw start number (minus for microtubule)"""
        if H.start in kwargs:
            start = kwargs[H.start]
        elif H.start in self.props.glob.columns:
            start = self.props.get_glob(H.start)
        else:
            start = self.cylinder_params(**kwargs).start
        return start * self.config.rise_sign

    def cylinder_params(self, **kwargs) -> CylinderParameters:
        """Get the cylinder parameters of the spline."""
        radius = _get_globalprops(self, kwargs, H.radius)
        if radius is None:
            raise ValueError("Radius is not known.")
        radius += (self.config.thickness_outer - self.config.thickness_inner) / 2
        return CylinderParameters.solve(
            spacing=_get_globalprops(self, kwargs, H.spacing),
            pitch=_get_globalprops(self, kwargs, H.pitch),
            twist=_get_globalprops(self, kwargs, H.twist),
            skew=_get_globalprops(self, kwargs, H.skew),
            rise_angle=_get_globalprops(self, kwargs, H.rise),
            radius=radius,
            npf=_get_globalprops(self, kwargs, H.npf),
            start=_get_globalprops(self, kwargs, H.start),
            allow_duplicate=True,
            rise_sign=self.config.rise_sign,
        )

    def cylinder_model(
        self,
        offsets: tuple[float, float] = (0.0, 0.0),
        **kwargs,
    ) -> CylinderModel:
        """
        Return the cylinder model of the spline.

        Parameters
        ----------
        offsets : tuple of float, optional
            Offset of the model. See :meth:`map_monomers` for details.

        Returns
        -------
        CylinderModel
            The cylinder model.
        """
        length = self.length()
        cp = self.cylinder_params(**kwargs)
        ly = cp.spacing_proj
        la = cp.lat_spacing_proj
        factor = ly / la
        ny = roundint(length / ly) + 1  # number of monomers in y-direction

        if offsets is None:
            offsets = (0.0, 0.0)

        return CylinderModel(
            shape=(ny, cp.npf),
            tilts=(
                cp.tan_skew * factor,
                cp.tan_rise_raw / factor,
            ),
            intervals=(ly, la / cp.perimeter * 2 * np.pi),
            radius=cp.radius,
            offsets=offsets,
        )

    def update_props(
        self,
        *,
        npf: int | None = None,
        start: int | None = None,
        orientation: Ori | str | None = None,
    ) -> CylSpline:
        """Update the npf or orientation parameters in place."""
        loc = list[pl.Expr]()
        glob = list[pl.Series]()
        if npf is not None:
            loc.append(pl.repeat(npf, pl.len()).cast(pl.UInt8).alias(H.npf))
            glob.append(pl.Series([npf]).cast(pl.UInt8).alias(H.npf))
        if start is not None:
            loc.append(pl.repeat(start, pl.len()).cast(pl.Int8).alias(H.start))
            glob.append(pl.Series([start]).cast(pl.Int8).alias(H.start))
        if orientation is not None:
            glob.append(
                pl.Series([str(orientation)]).cast(pl.String).alias(H.orientation)
            )

        self.props.loc = self.props.loc.with_columns(loc)
        self.props.glob = self.props.glob.with_columns(glob)

        return self

    def update_glob_by_cylinder_params(self, cparams: CylinderParameters) -> CylSpline:
        """Update the global properties using a CylinderParameters object."""
        props = {
            H.rise: cparams.rise_angle,
            H.rise_length: cparams.rise_length,
            H.pitch: cparams.pitch,
            H.spacing: cparams.spacing,
            H.skew: cparams.skew,
            H.twist: cparams.twist,
            H.npf: cparams.npf,
            H.start: cparams.start,
            H.radius: cparams.radius,
        }
        self.props.update_glob(props)
        return self

    def _need_rotation(self, orientation: Ori | str | None) -> bool:
        if orientation is not None:
            orientation = Ori(orientation)
            if orientation is Ori.none or self.orientation is Ori.none:
                raise ValueError(
                    "Either molecules' orientation or the input orientation should "
                    "not be none."
                )
            if orientation is not self.orientation:
                return True
        return False
orientation: Ori property writable

Orientation of the spline.

radius: nm | None property writable

Average radius of the cylinder.

clip(start, stop)

Clip spline and generate a new one.

This method does not convert spline bases. _lims is updated instead. For instance, if you want to clip spline at 20% to 80% position, call spl.clip(0.2, 0.8). If stop < start, the orientation of spline will be inverted, thus the orientation attribute will also be inverted.

Parameters:

Name Type Description Default
start float

New starting position.

required
stop float

New stopping position.

required

Returns:

Type Description
CylSpline

Clipped spline.

Source code in cylindra/components/spline/_cyl_spline.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def clip(self, start: float, stop: float) -> CylSpline:
    """
    Clip spline and generate a new one.

    This method does not convert spline bases. ``_lims`` is updated instead.
    For instance, if you want to clip spline at 20% to 80% position, call
    ``spl.clip(0.2, 0.8)``. If ``stop < start``, the orientation of spline
    will be inverted, thus the ``orientation`` attribute will also be inverted.

    Parameters
    ----------
    start : float
        New starting position.
    stop : float
        New stopping position.

    Returns
    -------
    CylSpline
        Clipped spline.
    """
    clipped = super().clip(start, stop)

    clipped.props.glob = self.props.glob.clone()
    clipped.props._binsize_glob = self.props._binsize_glob.copy()
    if start > stop:
        clipped.orientation = Ori.invert(self.orientation)
    else:
        clipped.orientation = self.orientation
    return clipped
cylinder_model(offsets=(0.0, 0.0), **kwargs)

Return the cylinder model of the spline.

Parameters:

Name Type Description Default
offsets tuple of float

Offset of the model. See :meth:map_monomers for details.

(0.0, 0.0)

Returns:

Type Description
CylinderModel

The cylinder model.

Source code in cylindra/components/spline/_cyl_spline.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def cylinder_model(
    self,
    offsets: tuple[float, float] = (0.0, 0.0),
    **kwargs,
) -> CylinderModel:
    """
    Return the cylinder model of the spline.

    Parameters
    ----------
    offsets : tuple of float, optional
        Offset of the model. See :meth:`map_monomers` for details.

    Returns
    -------
    CylinderModel
        The cylinder model.
    """
    length = self.length()
    cp = self.cylinder_params(**kwargs)
    ly = cp.spacing_proj
    la = cp.lat_spacing_proj
    factor = ly / la
    ny = roundint(length / ly) + 1  # number of monomers in y-direction

    if offsets is None:
        offsets = (0.0, 0.0)

    return CylinderModel(
        shape=(ny, cp.npf),
        tilts=(
            cp.tan_skew * factor,
            cp.tan_rise_raw / factor,
        ),
        intervals=(ly, la / cp.perimeter * 2 * np.pi),
        radius=cp.radius,
        offsets=offsets,
    )
cylinder_params(**kwargs)

Get the cylinder parameters of the spline.

Source code in cylindra/components/spline/_cyl_spline.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def cylinder_params(self, **kwargs) -> CylinderParameters:
    """Get the cylinder parameters of the spline."""
    radius = _get_globalprops(self, kwargs, H.radius)
    if radius is None:
        raise ValueError("Radius is not known.")
    radius += (self.config.thickness_outer - self.config.thickness_inner) / 2
    return CylinderParameters.solve(
        spacing=_get_globalprops(self, kwargs, H.spacing),
        pitch=_get_globalprops(self, kwargs, H.pitch),
        twist=_get_globalprops(self, kwargs, H.twist),
        skew=_get_globalprops(self, kwargs, H.skew),
        rise_angle=_get_globalprops(self, kwargs, H.rise),
        radius=radius,
        npf=_get_globalprops(self, kwargs, H.npf),
        start=_get_globalprops(self, kwargs, H.start),
        allow_duplicate=True,
        rise_sign=self.config.rise_sign,
    )
invert()

Invert the direction of spline. Also invert orientation if exists.

Returns:

Type Description
CylSpline

Inverted object

Source code in cylindra/components/spline/_cyl_spline.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def invert(self) -> CylSpline:
    """
    Invert the direction of spline. Also invert orientation if exists.

    Returns
    -------
    CylSpline
        Inverted object
    """
    # NOTE: invert() calls clip() internally.
    # We don't have to invert the orientation here.
    new = super().invert()
    new.props.update_loc(self.props.loc[::-1], self.props.window_size)
    return new
nrise(**kwargs)

Raw start number (minus for microtubule)

Source code in cylindra/components/spline/_cyl_spline.py
124
125
126
127
128
129
130
131
132
def nrise(self, **kwargs) -> int:
    """Raw start number (minus for microtubule)"""
    if H.start in kwargs:
        start = kwargs[H.start]
    elif H.start in self.props.glob.columns:
        start = self.props.get_glob(H.start)
    else:
        start = self.cylinder_params(**kwargs).start
    return start * self.config.rise_sign
radius_range(rc=None)

Return the range of the radius used for the cylindric coordinate.

Source code in cylindra/components/spline/_cyl_spline.py
38
39
40
41
42
43
44
45
def radius_range(self, rc: nm | None = None) -> tuple[nm, nm]:
    """Return the range of the radius used for the cylindric coordinate."""
    if rc is None:
        if self.radius is None:
            raise ValueError("Radius is not set.")
        rc = self.radius
    cfg = self.config
    return (max(rc - cfg.thickness_inner, 0.0), rc + cfg.thickness_outer)
restore()

Restore the original, not-clipped spline.

Returns:

Type Description
Spline

Copy of the original spline.

Source code in cylindra/components/spline/_cyl_spline.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def restore(self) -> CylSpline:
    """
    Restore the original, not-clipped spline.

    Returns
    -------
    Spline
        Copy of the original spline.
    """
    original = super().restore()
    start, stop = self._lims
    if start > stop:
        original.orientation = Ori.invert(self.orientation)
    else:
        original.orientation = self.orientation
    return original
update_glob_by_cylinder_params(cparams)

Update the global properties using a CylinderParameters object.

Source code in cylindra/components/spline/_cyl_spline.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def update_glob_by_cylinder_params(self, cparams: CylinderParameters) -> CylSpline:
    """Update the global properties using a CylinderParameters object."""
    props = {
        H.rise: cparams.rise_angle,
        H.rise_length: cparams.rise_length,
        H.pitch: cparams.pitch,
        H.spacing: cparams.spacing,
        H.skew: cparams.skew,
        H.twist: cparams.twist,
        H.npf: cparams.npf,
        H.start: cparams.start,
        H.radius: cparams.radius,
    }
    self.props.update_glob(props)
    return self
update_props(*, npf=None, start=None, orientation=None)

Update the npf or orientation parameters in place.

Source code in cylindra/components/spline/_cyl_spline.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def update_props(
    self,
    *,
    npf: int | None = None,
    start: int | None = None,
    orientation: Ori | str | None = None,
) -> CylSpline:
    """Update the npf or orientation parameters in place."""
    loc = list[pl.Expr]()
    glob = list[pl.Series]()
    if npf is not None:
        loc.append(pl.repeat(npf, pl.len()).cast(pl.UInt8).alias(H.npf))
        glob.append(pl.Series([npf]).cast(pl.UInt8).alias(H.npf))
    if start is not None:
        loc.append(pl.repeat(start, pl.len()).cast(pl.Int8).alias(H.start))
        glob.append(pl.Series([start]).cast(pl.Int8).alias(H.start))
    if orientation is not None:
        glob.append(
            pl.Series([str(orientation)]).cast(pl.String).alias(H.orientation)
        )

    self.props.loc = self.props.loc.with_columns(loc)
    self.props.glob = self.props.glob.with_columns(glob)

    return self

Spline

Bases: BaseComponent

3D spline curve model with coordinate system.

Anchor points can be set via anchor property. A spline object is semi-immutable. Different spline curves are always of different objects, but the anchors and properties can be dynamically changed.

References
  • Scipy document https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.splprep.html
Source code in cylindra/components/spline/_spline_base.py
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
class Spline(BaseComponent):
    """
    3D spline curve model with coordinate system.

    Anchor points can be set via `anchor` property. A spline object is semi-immutable.
    Different spline curves are always of different objects, but the anchors and
    properties can be dynamically changed.

    References
    ----------
    - Scipy document
      https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.splprep.html
    """

    def __init__(
        self,
        order: int = 3,
        *,
        lims: tuple[float, float] = (0.0, 1.0),
        extrapolate: ExtrapolationMode | str = ExtrapolationMode.linear,
        config: dict[str, Any] | SplineConfig = {},
    ):
        self._tck: TCKType = (None, None, order)
        self._u: NDArray[np.float32] | None = None
        self._anchors = None
        self._extrapolate = ExtrapolationMode(extrapolate)

        self._lims = lims
        self._props = SplineProps()
        if isinstance(config, SplineConfig):
            self._config = config
        else:
            self._config = SplineConfig.construct(**config)

    @property
    def props(self) -> SplineProps:
        """Return the spline properties"""
        return self._props

    @property
    def config(self) -> SplineConfig:
        """Return the spline configuration"""
        return self._config

    def has_props(self) -> bool:
        """True if there are any properties."""
        return len(self.props.loc) > 0 or len(self.props.glob) > 0

    def copy(self, copy_props: bool = True, copy_config: bool = True) -> Self:
        """
        Copy Spline object.

        Parameters
        ----------
        copy_props : bool, default True
            Also copy local/global properties if true.

        Returns
        -------
        Spline
            Copied object.
        """
        new = self.__class__(
            order=self.order, lims=self._lims, extrapolate=self.extrapolate
        )
        new._tck = self._tck
        new._u = self._u
        new._anchors = self._anchors

        if copy_props:
            new._props = self.props.copy()
        if copy_config:
            new._config = self.config.copy()
        return new

    __copy__ = copy

    def with_extrapolation(self, extrapolate: ExtrapolationMode | str) -> Self:
        """Return a copy of the spline with a new extrapolation mode."""
        new = self.copy()
        new._extrapolate = ExtrapolationMode(extrapolate)
        return new

    def with_config(
        self,
        config: dict[str, Any] | SplineConfig,
        copy_props: bool = False,
    ) -> Self:
        """Return a copy of the spline with a new config."""
        new = self.copy(copy_props=copy_props, copy_config=False)
        if not isinstance(config, SplineConfig):
            config = SplineConfig.construct(**config)
        new._config = config
        return new

    @property
    def knots(self) -> np.ndarray:
        """Spline knots."""
        return self._tck[0]

    @property
    def coeff(self) -> list[np.ndarray]:
        """Spline coefficient."""
        return self._tck[1]

    @property
    def order(self) -> int:
        """Spline order."""
        return self._tck[2]

    @property
    def extrapolate(self) -> ExtrapolationMode:
        """Extrapolation mode of the spline."""
        return self._extrapolate

    @property
    def params(self) -> np.ndarray:
        """Spline parameters."""
        return self._u

    @classmethod
    def line(cls, start: ArrayLike, end: ArrayLike) -> Self:
        """
        Create a line spline.

        Parameters
        ----------
        start : array_like
            Start point of the line.
        end : array_like
            End point of the line.

        Returns
        -------
        Spline
            Line spline.
        """
        spl = cls()
        coords = np.stack([start, end], axis=0)
        return spl.fit(coords, err_max=0.0)

    def translate(self, shift: tuple[nm, nm, nm]):
        """Translate the spline by given shift vectors."""
        new = self.copy()
        c = [x + s for x, s in zip(self.coeff, shift, strict=True)]
        new._tck = (self.knots, c, self.order)
        return new

    @property
    def has_anchors(self) -> bool:
        """True if there are any anchors."""
        return self._anchors is not None

    @property
    def anchors(self) -> NDArray[np.float32]:
        """Local anchors along spline."""
        if self._anchors is None:
            raise ValueError("Anchor has not been set yet.")
        return self._anchors

    @anchors.setter
    def anchors(self, positions: float | Sequence[float]) -> None:
        _anc = np.atleast_1d(np.asarray(positions, dtype=np.float32))
        if _anc.ndim != 1:
            raise TypeError("Could not convert positions into 1D array.")
        elif _anc.min() < 0 or _anc.max() > 1:
            msg = (
                "Anchor positions should be set between 0 and 1. Otherwise spline "
                "curve does not fit well."
            )
            warnings.warn(msg, UserWarning, stacklevel=2)
        _old = self._anchors
        if (
            _old is None
            or _anc.size != _old.size
            or not np.allclose(_anc, _old, rtol=1e-4, atol=1e-4)
        ):
            self.props.clear_loc()
        self._anchors = _anc
        return None

    @anchors.deleter
    def anchors(self) -> None:
        self._anchors = None
        self.props.clear_loc()
        return None

    def is_inverted(self) -> bool:
        """Return true if spline is inverted."""
        return self._lims[0] > self._lims[1]

    @property
    def lims(self) -> tuple[float, float]:
        """Return spline limit positions."""
        return self._lims

    def _set_params(self, tck, u) -> Self:
        self._tck = tck
        self._u = u
        return self

    def prep_anchor_positions(
        self,
        interval: nm | None = None,
        n: int | None = None,
        max_interval: nm | None = None,
    ) -> NDArray[np.float32]:
        length = self.length()
        if interval is not None:
            stop, n_segs = interval_divmod(length, interval)
            n = n_segs + 1
        elif n is not None:
            stop = length
        elif max_interval is not None:
            n = max(ceilint(length / max_interval), self.order) + 1
            stop = length
        else:
            raise ValueError("Either 'interval' or 'n' must be specified.")
        y = np.linspace(0, stop, n)
        return self.y_to_position(y)

    def make_anchors(
        self,
        interval: nm | None = None,
        n: int | None = None,
        max_interval: nm | None = None,
    ) -> Self:
        """
        Make anchor points at constant intervals. Either interval, number of anchor or
        the maximum interval between anchors can be specified.

        Parameters
        ----------
        interval : nm, optional
            Interval between anchor points.
        n : int, optional
            Number of anchor points, including both ends.
        max_interval: nm, optional
            Spline will be split by as little anchors as possible but interval between
            anchors will not be larger than this. The number of anchors are also
            guaranteed to be larger than spline order.
        """
        self.anchors = self.prep_anchor_positions(interval, n, max_interval)
        return self

    def __repr__(self) -> str:
        """Use start/end points to describe a spline."""
        start, end = self.map(self._lims)
        start = "({:.1f}, {:.1f}, {:.1f})".format(*start)
        end = "({:.1f}, {:.1f}, {:.1f})".format(*end)
        return f"Spline[{start}:{end}]"

    def close_to(self: Self, other: Self) -> bool:
        """True if two objects draws the same curve."""
        if not isinstance(other, self.__class__):
            return False
        t0, c0, k0 = self._tck
        t1, c1, k1 = other._tck
        return (
            np.allclose(t0, t1)
            and all(np.allclose(x, y) for x, y in zip(c0, c1, strict=True))
            and k0 == k1
            and np.allclose(self._u, other._u)
            and np.allclose(self._lims, other._lims)
        )

    def clip(self, start: float, stop: float) -> Self:
        """
        Clip spline and generate a new one.

        This method does not convert spline bases. `_lims` is updated instead.

        Parameters
        ----------
        start : float
            New starting position.
        stop : float
            New stopping position.

        Returns
        -------
        Spline
            Clipped spline.
        """
        u0 = _linear_conversion(start, *self._lims)
        u1 = _linear_conversion(stop, *self._lims)
        return self.__class__(
            order=self.order,
            lims=(u0, u1),
            extrapolate=self.extrapolate,
            config=self.config,
        )._set_params(self._tck, self._u)

    def restore(self) -> Self:
        """
        Restore the original, not-clipped spline.

        Returns
        -------
        Spline
            Copy of the original spline.
        """
        return self.__class__(
            order=self.order,
            lims=(0, 1),
            extrapolate=self.extrapolate,
            config=self.config,
        )._set_params(self._tck, self._u)

    def resample(self, max_interval: nm = 1.0, err_max: nm = 0.1) -> Self:
        """
        Resample a new spline along the original spline.

        Parameters
        ----------
        max_interval : nm, default 1.0
            Maximum interval between resampling points.
        err_max : float, default 0.1
            Spline fitting maximum error.

        Returns
        -------
        Spline
            Resampled spline object.
        """
        l = self.length()
        points = self.map(np.linspace(0, 1, ceilint(l / max_interval)))
        return self.fit(points, err_max=err_max)

    def fit(
        self,
        coords: ArrayLike,
        *,
        err_max: nm = 1.0,
    ) -> Self:
        """
        Fit spline model to coordinates.

        This method uses `scipy.interpolate.splprep` to fit given coordinates to a
        spline.

        Parameters
        ----------
        coords : np.ndarray
            Coordinates. Must be (N, 3).
        err_max : float, default 1.0
            Error allowed for fitting. Several upper limit of residual values will be
            used and the fit that results in error lower than this value and minimize
            the maximum curvature will be chosen.

        Returns
        -------
        Spline
            New spline fit to given coordinates.
        """
        crds = np.asarray(coords)
        npoints = crds.shape[0]
        if npoints < 2:
            raise ValueError("Number of input coordinates must be > 1.")
        if npoints <= self.order:
            k = npoints - 1
        else:
            k = self.order

        if self.is_inverted():
            crds = crds[::-1]

        if err_max > 4.0:
            raise ValueError("std_max must be smaller than 4.0.")
        if err_max < 1e-3:
            std_list = [err_max]
        else:
            ntrial = max(int(err_max / 0.02), 2)
            std_list = np.linspace(0, err_max, ntrial)[1:]

        fit_results = list[SplineFitResult]()
        new = self.__class__(order=k, extrapolate=self.extrapolate, config=self.config)
        with warnings.catch_warnings():
            # fitting may fail for some std
            warnings.simplefilter("ignore", RuntimeWarning)
            for std in std_list:
                _tck, _u = splprep(crds.T, k=k, s=std**2 * npoints)
                new._set_params(_tck, _u)
                _crds_at_u = new.map(_u)
                res: NDArray[np.float32] = np.sqrt(
                    np.sum((_crds_at_u - crds) ** 2, axis=1)
                )
                _knots = _tck[0][new.order : -new.order]
                nedge = _knots.size - 1
                assert nedge > 0
                nanc = nedge * 20 + 1
                anc = np.interp(
                    np.linspace(0, 1, nanc), np.linspace(0, 1, nedge + 1), _knots
                )
                max_curvature = new.curvature(anc).max()
                success = res.max() <= err_max
                fit_results.append(
                    SplineFitResult((_tck, _u), max_curvature, res, success)
                )

        fit_results_filt = list(filter(lambda x: x.success, fit_results))
        if len(fit_results_filt) == 0:
            fit_results_filt = fit_results

        reult_opt = min(fit_results_filt, key=lambda x: x.curvature)
        return new._set_params(*reult_opt.params)

    def shift(
        self,
        positions: Sequence[float] | None = None,
        shifts: NDArray[np.floating] | None = None,
        *,
        err_max: nm = 1.0,
    ) -> Self:
        """
        Fit spline model using a list of shifts in XZ-plane.

        Parameters
        ----------
        positions : sequence of float, optional
            Positions. Between 0 and 1. If not given, anchors are used instead.
        shifts : np.ndarray
            Shift from center in nm. Must be (N, 2).
        err_max : float, default 1.0
            Error allowed for fitting. See `Spline.fit`.

        Returns
        -------
        Spline
            Spline shifted by fitting to given coordinates.
        """
        if shifts is None:
            raise ValueError("Shifts must be given.")
        coords = self.map(positions)
        rot = self.get_rotator(positions)
        # insert 0 in y coordinates.
        shifts = np.stack([shifts[:, 0], np.zeros(len(rot)), shifts[:, 1]], axis=1)
        coords += rot.apply(shifts)
        return self.fit(coords, err_max=err_max)

    def distances(
        self,
        positions: Sequence[float] | None = None,
        nknots: int = 512,
    ) -> NDArray[np.float32]:
        """
        Get the distances from u=0.

        Parameters
        ----------
        positions : sequence of float, optional
            Positions. Between 0 and 1. If not given, anchors are used instead.

        Returns
        -------
        np.ndarray
            Distances for each `u`.
        """
        if positions is None:
            _u = self.anchors
        else:
            _u = np.asarray(positions, dtype=np.float32)
            if _u.ndim != 1:
                raise ValueError("Positions must be 1D array.")
        u = np.linspace(0, 1, nknots)
        u_tr = _linear_conversion(u, *self._lims)
        dz, dy, dx = map(np.diff, splev(u_tr, self._tck, der=0))
        dist = np.concatenate([[0], np.sqrt(dx**2 + dy**2 + dz**2)]).cumsum()
        tck = splrep(u, dist, k=1)
        out = splev(_u, tck)
        return out

    def map(
        self,
        positions: float | NDArray[np.number] | None = None,
        der: int = 0,
    ) -> NDArray[np.float32]:
        """
        Calculate coordinates (or n-th derivative) at points on the spline.

        Parameters
        ----------
        positions : np.ndarray or float, optional
            Positions. Between 0 and 1. If not given, anchors are used instead.
        der : int, default 0
            `der`-th derivative will be calculated.

        Returns
        -------
        np.ndarray
            Positions or vectors in (3,) or (N, 3) shape.
        """
        _assert_fitted(self)
        if positions is None:
            positions = self.anchors
        u0, u1 = self._lims
        if np.isscalar(positions):
            if self.order < der:
                return np.zeros(3, dtype=np.float32)
            u_tr = _linear_conversion(float(positions), u0, u1)
            if 0 <= u_tr <= 1 or self.extrapolate is ExtrapolationMode.default:
                coord = splev([u_tr], self._tck, der=der)
            elif self.extrapolate is ExtrapolationMode.linear:
                if der == 0:
                    if u_tr < 0:
                        der0 = splev([0], self._tck, der=0)
                        der1 = splev([0], self._tck, der=1)
                        dr = u_tr
                    else:
                        der0 = splev([1], self._tck, der=0)
                        der1 = splev([1], self._tck, der=1)
                        dr = u_tr - 1
                    coord = [a0 + a1 * dr for a0, a1 in zip(der0, der1, strict=True)]
                elif der == 1:
                    if u_tr < 0:
                        coord = splev([0], self._tck, der=1)
                    else:
                        coord = splev([1], self._tck, der=1)
                else:
                    coord = [[0], [0], [0]]
            else:
                raise ValueError(f"Invalid extrapolation mode: {self.extrapolate!r}.")
            out = np.concatenate(coord).astype(np.float32)

        else:
            u_tr = _linear_conversion(np.asarray(positions, dtype=np.float32), u0, u1)
            if self.order < der:
                return np.zeros((u_tr.size, 3), dtype=np.float32)
            if self.extrapolate is ExtrapolationMode.default:
                out = np.stack(splev(u_tr, self._tck, der=der), axis=1).astype(
                    np.float32
                )
            elif self.extrapolate is ExtrapolationMode.linear:
                sl_small = u_tr < 0
                sl_large = u_tr > 1
                n_small = np.count_nonzero(sl_small)
                n_large = np.count_nonzero(sl_large)
                out = np.stack(splev(u_tr, self._tck, der=der), axis=1).astype(
                    np.float32
                )
                if der == 0:
                    if n_small > 0:
                        der0 = np.array(splev(0, self._tck, der=0), dtype=np.float32)
                        der1 = np.array(splev(0, self._tck, der=1), dtype=np.float32)
                        dr = u_tr[sl_small]
                        coords_new = der0 + der1 * dr[:, np.newaxis]
                        out[sl_small] = coords_new

                    if n_large > 0:
                        der0 = splev(1, self._tck, der=0)
                        der1 = splev(1, self._tck, der=1)
                        dr = u_tr[sl_large] - 1
                        coords_new = der0 + der1 * dr[:, np.newaxis]
                        out[sl_large] = coords_new

                elif der == 1:
                    if n_small > 0:
                        out[sl_small] = splev(0, self._tck, der=1)
                    if n_large > 0:
                        out[sl_large] = splev(1, self._tck, der=1)
                else:
                    if n_small > 0:
                        out[sl_small] = 0
                    if n_large > 0:
                        out[sl_large] = 0
            else:
                raise ValueError(f"Invalid extrapolation mode: {self.extrapolate!r}.")

        if u0 > u1 and der % 2 == 1:
            out = -out
        return out

    __call__ = map  # scipy-style alias

    def partition(self, n: int, der: int = 0) -> NDArray[np.float32]:
        """Return the n-partitioning coordinates of the spline."""
        u = np.linspace(0, 1, n)
        return self.map(u, der)

    def length(self, start: float = 0, stop: float = 1, nknots: int = 512) -> nm:
        """
        Approximate the length of B-spline between [start, stop] by partitioning
        the spline with 'nknots' knots. nknots=256 is large enough for most cases.
        """
        _assert_fitted(self)
        u = np.linspace(start, stop, nknots)
        u_tr = _linear_conversion(u, *self._lims)
        dz, dy, dx = map(np.diff, splev(u_tr, self._tck, der=0))
        return np.sum(np.sqrt(dx**2 + dy**2 + dz**2))

    def invert(self) -> Self:
        """
        Invert the direction of spline.

        Returns
        -------
        Spline
            Inverted object
        """
        anchors = self._anchors
        inverted = self.clip(1.0, 0.0)
        if anchors is not None:
            inverted.anchors = 1 - anchors[::-1]
        return inverted

    def curvature(
        self,
        positions: Sequence[float] | None = None,
    ) -> NDArray[np.float32]:
        """
        Calculate curvature of spline curve.

        Parameters
        ----------
        positions : sequence of float, optional
            Positions. Between 0 and 1. If not given, anchors are used instead.

        Returns
        -------
        np.ndarray
            Array of curvature.

        References
        ----------
        - https://en.wikipedia.org/wiki/Curvature#Space_curves
        """

        if positions is None:
            positions = self.anchors

        dz, dy, dx = self.map(positions, der=1).T
        ddz, ddy, ddx = self.map(positions, der=2).T
        a = (
            (ddz * dy - ddy * dz) ** 2
            + (ddx * dz - ddz * dx) ** 2
            + (ddy * dx - ddx * dy) ** 2
        )
        return np.sqrt(a) / (dx**2 + dy**2 + dz**2) ** 1.5

    def curvature_radii(self, positions: Sequence[float] = None) -> NDArray[np.float32]:
        """Inverse of curvature."""
        return 1.0 / self.curvature(positions)

    def to_dict(self) -> SplineInfo:
        """Convert spline info into a dict."""
        t, c, k = self._tck
        u = self._u
        return {
            "t": t.tolist(),
            "c": {"z": c[0].tolist(), "y": c[1].tolist(), "x": c[2].tolist()},
            "k": k,
            "u": u.tolist(),
            "lims": self._lims,
            "localprops_window_size": dict(self.props.window_size),
            "binsize_loc": dict(self.props.binsize_loc),
            "binsize_glob": dict(self.props.binsize_glob),
            "extrapolate": self._extrapolate.name,
            "config": self.config.asdict(),
        }

    @classmethod
    def from_dict(cls: type[Self], d: SplineInfo) -> Self:
        """
        Construct a spline model from a dictionary.

        Parameters
        ----------
        d: dict
            Dictionary with keys "t", "c", "k", "u" and "lims".

        Returns
        -------
        Spline
            Spline object constructed from the dictionary.
        """
        self = cls(
            order=d.get("k", 3),
            lims=d.get("lims", (0, 1)),
            extrapolate=d.get("extrapolate", "linear"),
        )
        t = np.asarray(d["t"])
        c = [np.asarray(d["c"][k]) for k in "zyx"]
        k = roundint(d["k"])
        self._tck = (t, c, k)
        self._u = np.asarray(d["u"])
        self.props._window_size = d.get("localprops_window_size", {})
        self.props._binsize_loc = d.get("binsize_loc", {})
        self.props._binsize_glob = d.get("binsize_glob", {})
        if cfg := d.get("config", None):
            self._config = SplineConfig.from_dict(cfg)
        return self

    def get_rotator(
        self,
        positions: Sequence[float] | None = None,
        inverse: bool = False,
    ) -> Rotation:
        """
        Calculate list of Affine transformation matrix along spline, which correspond to
        the orientation of spline curve.

        Parameters
        ----------
        positions : array-like, (N,)
            Positions. Between 0 and 1.
        inverse : bool, default False
            If True, rotation matrix will be inversed.

        Returns
        -------
        Rotation
            Rotation object at each anchor.
        """
        if positions is None:
            positions = self.anchors
        ds = self.map(positions, der=1)
        out = axes_to_rotator(None, -ds)

        if inverse:
            out = out.inv()

        return out

    def local_cartesian(
        self,
        shape: tuple[nm, nm],
        depth: nm,
        u: float | Sequence[float] = None,
        scale: nm = 1.0,
    ) -> NDArray[np.float32]:
        """
        Generate local Cartesian coordinate systems.

        The generated array can be used for `ndi.map_coordinates`. The result coordinate
        systems are flat, i.e., not distorted by the curvature of spline.

        Parameters
        ----------
        shape : (float, float)
            Vertical and horizontal length of Cartesian coordinates. Corresponds to zx
            axes.
        depth : float
            Length of y axis in nm.
        u : float, optional
            Position on the spline at which local Cartesian coordinates will be built.
        scale: nm, default 1.0
            Scale of coordinates, i.e. spacing of the grid.

        Returns
        -------
        np.ndarray
            (D, V, S, H) shape. Each cooresponds to dimensional vertical, longitudinal
            and horizontal axis, which is ready to be used in `ndi.map_coordinates`.
        """

        mole = self.anchors_to_molecules(u)
        nz = roundint(shape[0] / scale)
        ny = roundint(depth / scale)
        nx = roundint(shape[1] / scale)
        return mole.local_coordinates(shape=(nz, ny, nx), scale=scale)

    def local_cylindrical(
        self,
        r_range: tuple[nm, nm],
        depth: nm,
        u: float | None = None,
        scale: nm = 1.0,
    ) -> NDArray[np.float32]:
        """
        Generate local cylindrical coordinate systems.

        The generated array can be used for `ndi.map_coordinates`. The result coordinate
        systems are flat, i.e., not distorted by the curvature of spline.

        Parameters
        ----------
        r_range : (float, float)
            Lower and upper bound of radius in nm.
        depth : nm
            Length of y axis in nm.
        u : float
            Position on the spline at which local cylindrical coordinates will be built.
        scale: nm, default 1.0
            Scale of coordinates, i.e. spacing of the grid.

        Returns
        -------
        np.ndarray
            (D, V, S, H) shape. Each cooresponds to dimensional, radius, longitudinal
            and angle axis, which is ready to be used in `ndi.map_coordinates`.
        """
        if u is None:
            u = self.anchors
        rmin, rmax = r_range
        ds = self.map(u, der=1)
        ds_norm: NDArray[np.float32] = ds.reshape(-1, 1) / np.sqrt(sum(ds**2))
        depth_px = roundint(depth / scale)
        grid = np.linspace(-depth_px / 2 + 0.5, depth_px / 2 - 0.5, depth_px)
        dy = ds_norm * grid
        y_ax_coords = (self.map(u) / scale).reshape(1, -1) + dy.T
        dslist = np.stack([ds] * depth_px, axis=0)
        map_ = polar_coords_2d(rmin / scale, rmax / scale)
        map_slice = _stack_coords(map_)
        out = _rot_with_vector(map_slice, y_ax_coords, dslist)
        return np.moveaxis(out, -1, 0)

    def cartesian(
        self,
        shape: tuple[nm, nm],
        s_range: tuple[float, float] = (0, 1),
        scale: nm = 1.0,
    ) -> NDArray[np.float32]:
        """
        Generate a Cartesian coordinate system along spline that can be used for
        `ndi.map_coordinate`. Note that this coordinate system is distorted, thus
        does not reflect real geometry (such as distance and derivatives).

        Parameters
        ----------
        shape : (float, float)
            The ZX-shape of output coordinate system. Center of the array will be
            spline curve itself after coodinate transformation.
        s_range : tuple[float, float], default (0, 1)
            Range of spline. Spline coordinate system will be built between
            `spl[s_range[0]]` and `spl[s_range[1]]`.
        scale: nm, default 1.0
            Scale of coordinates, i.e. spacing of the grid.

        Returns
        -------
        np.ndarray
            (V, S, H, D) shape. Each cooresponds to vertical, longitudinal, horizontal
            and dimensional axis.
        """
        dz = roundint(shape[0] / scale)
        dx = roundint(shape[1] / scale)
        return self._get_coords(_cartesian_coords_2d, (dz, dx), s_range, scale)

    def cylindrical(
        self,
        r_range: tuple[nm, nm],
        s_range: tuple[float, float] = (0, 1),
        scale: nm = 1.0,
    ) -> NDArray[np.float32]:
        """
        Generate a cylindrical coordinate system along spline that can be used for
        `ndi.map_coordinate`. Note that this coordinate system is distorted, thus
        does not reflect real geometry (such as distance and derivatives).

        Parameters
        ----------
        r_range : (nm, nm)
            Range of radius in nm. r=0 will be spline curve itself after coodinate
            transformation.
        s_range : tuple[float, float], default (0, 1)
            Range of spline. Spline coordinate system will be built between
            `spl[s_range[0]]` and `spl[s_range[1]]`.
        scale: nm, default 1.0
            Scale of coordinates, i.e. spacing of the grid.

        Returns
        -------
        np.ndarray
            (V, S, H, D) shape. Each cooresponds to radius, longitudinal, angle and
            dimensional axis.
        """
        rmin = r_range[0] / scale
        rmax = r_range[1] / scale
        return self._get_coords(polar_coords_2d, (rmin, rmax), s_range, scale)

    def y_to_position(
        self, y: NDArray[np.float32], nknots: int = 512
    ) -> NDArray[np.float32]:
        """
        Convert y-coordinate to spline position parameter.

        Parameters
        ----------
        y : array-like
            Y coordinates.
        nknots : int, optional
            Number of knots. Increasing the number of knots will increase the accuracy.
        """
        # almost equal to y / self.length()
        u = np.linspace(0, 1, nknots)
        u_tr = _linear_conversion(u, *self._lims)
        dz, dy, dx = map(np.diff, splev(u_tr, self._tck, der=0))
        dist = np.concatenate([[0], np.sqrt(dx**2 + dy**2 + dz**2)]).cumsum()
        tck = splrep(dist, u, k=1)
        out = splev(y, tck)
        return out

    def cartesian_to_world(
        self,
        coords: NDArray[np.float32],
        nknots: int = 512,
    ) -> NDArray[np.float32]:
        """
        Inverse Cartesian coordinate mapping, (z', y', x') to world coordinate.

        Parameters
        ----------
        coords : np.ndarray
            Spline Cartesian coordinates. All the coordinates must be in nm unit.

        Returns
        -------
        np.ndarray
            World coordinates.
        """
        ncoords = coords.shape[0]
        _zs = coords[:, 0]
        _us = self.y_to_position(coords[:, 1], nknots=nknots)
        _xs = coords[:, 2]
        _zeros = np.zeros(ncoords, dtype=np.float32)
        coords_ext = np.stack([_zs, _zeros, _xs], axis=1)
        rot = self.get_rotator(_us)
        out = rot.apply(coords_ext) + self.map(_us)

        return out

    def cylindrical_to_world(self, coords: NDArray[np.float32]) -> NDArray[np.float32]:
        """
        Inverse cylindrical coordinate mapping, (r, y, angle) to world coordinate.

        Parameters
        ----------
        coords : np.ndarray
            Cylindrical coordinates. "r" and "y" must be in scale of "nm", while angle
            must be in radian.

        Returns
        -------
        np.ndarray
            World coordinates.
        """
        radius = coords[:, 0]
        y = coords[:, 1]
        theta = coords[:, 2]
        cart_coords = np.stack(
            [radius * np.sin(theta), y, radius * np.cos(theta)], axis=1
        )

        return self.cartesian_to_world(cart_coords)

    def anchors_to_molecules(
        self,
        positions: float | Sequence[float] | None = None,
        rotation: Sequence[float] | None = None,
    ) -> Molecules:
        """
        Convert coordinates of anchors to `Molecules` instance.

        Coordinates of anchors must be in range from 0 to 1. The y-direction of
        `Molecules` always points at the direction of spline and the z- direction always
        in the plane orthogonal to YX-plane.

        Parameters
        ----------
        positions : iterable of float, optional
            Positions. Between 0 and 1. If not given, anchors are used instead.

        Returns
        -------
        Molecules
            Molecules object of points.
        """
        if positions is None:
            positions = self.anchors
        elif np.isscalar(positions):
            positions = [positions]
        pos = self.map(positions)
        yvec = self.map(positions, der=1)
        rot = axes_to_rotator(None, yvec)
        if rotation is not None:
            rotvec = np.zeros((len(rot), 3), dtype=np.float32)
            rotvec[:, 1] = rotation
            rot = rot * Rotation.from_rotvec(rotvec)
        return Molecules(pos=pos, rot=rot, features={Mole.nth: np.arange(len(pos))})

    def cylindrical_to_molecules(
        self,
        coords: NDArray[np.float32],
    ) -> Molecules:
        """
        Convert coordinates of points near the spline to `Molecules` instance.

        Coordinates of points must be those in spline cylindrical coordinate system.

        Parameters
        ----------
        coords : (N, 3) array
            Spline cylindrical coordinates of points.

        Returns
        -------
        Molecules
            Molecules object of points.
        """
        world_coords = self.cylindrical_to_world(coords)

        # world coordinates of the projection point of coords onto the spline
        u = self.y_to_position(coords[:, 1])
        ycoords = self.map(u)
        zvec = world_coords - ycoords
        yvec = self.map(u, der=1)
        return Molecules.from_axes(pos=world_coords, z=zvec, y=yvec)

    def _get_coords(
        self,
        map_func: Callable[[tuple], NDArray[np.float32]],
        map_params: tuple,
        s_range: tuple[float, float],
        scale: nm,
    ):
        """
        Make coordinate system using function `map_func` and stack the same point cloud
        in the direction of the spline, in the range of `s_range`.
        """
        u, y_ax_coords = self._get_y_ax_coords(s_range, scale)
        dslist = self.map(u, 1).astype(np.float32)
        map_ = map_func(*map_params)
        map_slice = _stack_coords(map_)
        out = _rot_with_vector(map_slice, y_ax_coords, dslist)
        return np.moveaxis(out, -1, 0)

    def _get_y_ax_coords(self, s_range: tuple[float, float], scale: nm):
        s0, s1 = s_range
        length = self.length(start=s0, stop=s1)
        stop_length, n_segs = interval_divmod(length, scale)
        n_pixels = n_segs + 1
        s2 = (s1 - s0) * stop_length / length + s0
        if n_pixels < 2:
            raise ValueError("Too short. Change 's_range'.")
        d0, d2 = self.distances([s0, s2])
        u = self.y_to_position(np.linspace(d0, d2, n_pixels))
        y = self.map(u) / scale  # world coordinates of y-axis in spline coords system
        return u, y
anchors: NDArray[np.float32] deletable property writable

Local anchors along spline.

coeff: list[np.ndarray] property

Spline coefficient.

config: SplineConfig property

Return the spline configuration

extrapolate: ExtrapolationMode property

Extrapolation mode of the spline.

has_anchors: bool property

True if there are any anchors.

knots: np.ndarray property

Spline knots.

lims: tuple[float, float] property

Return spline limit positions.

order: int property

Spline order.

params: np.ndarray property

Spline parameters.

props: SplineProps property

Return the spline properties

__repr__()

Use start/end points to describe a spline.

Source code in cylindra/components/spline/_spline_base.py
282
283
284
285
286
287
def __repr__(self) -> str:
    """Use start/end points to describe a spline."""
    start, end = self.map(self._lims)
    start = "({:.1f}, {:.1f}, {:.1f})".format(*start)
    end = "({:.1f}, {:.1f}, {:.1f})".format(*end)
    return f"Spline[{start}:{end}]"
anchors_to_molecules(positions=None, rotation=None)

Convert coordinates of anchors to Molecules instance.

Coordinates of anchors must be in range from 0 to 1. The y-direction of Molecules always points at the direction of spline and the z- direction always in the plane orthogonal to YX-plane.

Parameters:

Name Type Description Default
positions iterable of float

Positions. Between 0 and 1. If not given, anchors are used instead.

None

Returns:

Type Description
Molecules

Molecules object of points.

Source code in cylindra/components/spline/_spline_base.py
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
def anchors_to_molecules(
    self,
    positions: float | Sequence[float] | None = None,
    rotation: Sequence[float] | None = None,
) -> Molecules:
    """
    Convert coordinates of anchors to `Molecules` instance.

    Coordinates of anchors must be in range from 0 to 1. The y-direction of
    `Molecules` always points at the direction of spline and the z- direction always
    in the plane orthogonal to YX-plane.

    Parameters
    ----------
    positions : iterable of float, optional
        Positions. Between 0 and 1. If not given, anchors are used instead.

    Returns
    -------
    Molecules
        Molecules object of points.
    """
    if positions is None:
        positions = self.anchors
    elif np.isscalar(positions):
        positions = [positions]
    pos = self.map(positions)
    yvec = self.map(positions, der=1)
    rot = axes_to_rotator(None, yvec)
    if rotation is not None:
        rotvec = np.zeros((len(rot), 3), dtype=np.float32)
        rotvec[:, 1] = rotation
        rot = rot * Rotation.from_rotvec(rotvec)
    return Molecules(pos=pos, rot=rot, features={Mole.nth: np.arange(len(pos))})
cartesian(shape, s_range=(0, 1), scale=1.0)

Generate a Cartesian coordinate system along spline that can be used for ndi.map_coordinate. Note that this coordinate system is distorted, thus does not reflect real geometry (such as distance and derivatives).

Parameters:

Name Type Description Default
shape (float, float)

The ZX-shape of output coordinate system. Center of the array will be spline curve itself after coodinate transformation.

required
s_range tuple[float, float]

Range of spline. Spline coordinate system will be built between spl[s_range[0]] and spl[s_range[1]].

(0, 1)
scale nm

Scale of coordinates, i.e. spacing of the grid.

1.0

Returns:

Type Description
ndarray

(V, S, H, D) shape. Each cooresponds to vertical, longitudinal, horizontal and dimensional axis.

Source code in cylindra/components/spline/_spline_base.py
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
def cartesian(
    self,
    shape: tuple[nm, nm],
    s_range: tuple[float, float] = (0, 1),
    scale: nm = 1.0,
) -> NDArray[np.float32]:
    """
    Generate a Cartesian coordinate system along spline that can be used for
    `ndi.map_coordinate`. Note that this coordinate system is distorted, thus
    does not reflect real geometry (such as distance and derivatives).

    Parameters
    ----------
    shape : (float, float)
        The ZX-shape of output coordinate system. Center of the array will be
        spline curve itself after coodinate transformation.
    s_range : tuple[float, float], default (0, 1)
        Range of spline. Spline coordinate system will be built between
        `spl[s_range[0]]` and `spl[s_range[1]]`.
    scale: nm, default 1.0
        Scale of coordinates, i.e. spacing of the grid.

    Returns
    -------
    np.ndarray
        (V, S, H, D) shape. Each cooresponds to vertical, longitudinal, horizontal
        and dimensional axis.
    """
    dz = roundint(shape[0] / scale)
    dx = roundint(shape[1] / scale)
    return self._get_coords(_cartesian_coords_2d, (dz, dx), s_range, scale)
cartesian_to_world(coords, nknots=512)

Inverse Cartesian coordinate mapping, (z', y', x') to world coordinate.

Parameters:

Name Type Description Default
coords ndarray

Spline Cartesian coordinates. All the coordinates must be in nm unit.

required

Returns:

Type Description
ndarray

World coordinates.

Source code in cylindra/components/spline/_spline_base.py
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
def cartesian_to_world(
    self,
    coords: NDArray[np.float32],
    nknots: int = 512,
) -> NDArray[np.float32]:
    """
    Inverse Cartesian coordinate mapping, (z', y', x') to world coordinate.

    Parameters
    ----------
    coords : np.ndarray
        Spline Cartesian coordinates. All the coordinates must be in nm unit.

    Returns
    -------
    np.ndarray
        World coordinates.
    """
    ncoords = coords.shape[0]
    _zs = coords[:, 0]
    _us = self.y_to_position(coords[:, 1], nknots=nknots)
    _xs = coords[:, 2]
    _zeros = np.zeros(ncoords, dtype=np.float32)
    coords_ext = np.stack([_zs, _zeros, _xs], axis=1)
    rot = self.get_rotator(_us)
    out = rot.apply(coords_ext) + self.map(_us)

    return out
clip(start, stop)

Clip spline and generate a new one.

This method does not convert spline bases. _lims is updated instead.

Parameters:

Name Type Description Default
start float

New starting position.

required
stop float

New stopping position.

required

Returns:

Type Description
Spline

Clipped spline.

Source code in cylindra/components/spline/_spline_base.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def clip(self, start: float, stop: float) -> Self:
    """
    Clip spline and generate a new one.

    This method does not convert spline bases. `_lims` is updated instead.

    Parameters
    ----------
    start : float
        New starting position.
    stop : float
        New stopping position.

    Returns
    -------
    Spline
        Clipped spline.
    """
    u0 = _linear_conversion(start, *self._lims)
    u1 = _linear_conversion(stop, *self._lims)
    return self.__class__(
        order=self.order,
        lims=(u0, u1),
        extrapolate=self.extrapolate,
        config=self.config,
    )._set_params(self._tck, self._u)
close_to(other)

True if two objects draws the same curve.

Source code in cylindra/components/spline/_spline_base.py
289
290
291
292
293
294
295
296
297
298
299
300
301
def close_to(self: Self, other: Self) -> bool:
    """True if two objects draws the same curve."""
    if not isinstance(other, self.__class__):
        return False
    t0, c0, k0 = self._tck
    t1, c1, k1 = other._tck
    return (
        np.allclose(t0, t1)
        and all(np.allclose(x, y) for x, y in zip(c0, c1, strict=True))
        and k0 == k1
        and np.allclose(self._u, other._u)
        and np.allclose(self._lims, other._lims)
    )
copy(copy_props=True, copy_config=True)

Copy Spline object.

Parameters:

Name Type Description Default
copy_props bool

Also copy local/global properties if true.

True

Returns:

Type Description
Spline

Copied object.

Source code in cylindra/components/spline/_spline_base.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def copy(self, copy_props: bool = True, copy_config: bool = True) -> Self:
    """
    Copy Spline object.

    Parameters
    ----------
    copy_props : bool, default True
        Also copy local/global properties if true.

    Returns
    -------
    Spline
        Copied object.
    """
    new = self.__class__(
        order=self.order, lims=self._lims, extrapolate=self.extrapolate
    )
    new._tck = self._tck
    new._u = self._u
    new._anchors = self._anchors

    if copy_props:
        new._props = self.props.copy()
    if copy_config:
        new._config = self.config.copy()
    return new
curvature(positions=None)

Calculate curvature of spline curve.

Parameters:

Name Type Description Default
positions sequence of float

Positions. Between 0 and 1. If not given, anchors are used instead.

None

Returns:

Type Description
ndarray

Array of curvature.

References
  • https://en.wikipedia.org/wiki/Curvature#Space_curves
Source code in cylindra/components/spline/_spline_base.py
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
def curvature(
    self,
    positions: Sequence[float] | None = None,
) -> NDArray[np.float32]:
    """
    Calculate curvature of spline curve.

    Parameters
    ----------
    positions : sequence of float, optional
        Positions. Between 0 and 1. If not given, anchors are used instead.

    Returns
    -------
    np.ndarray
        Array of curvature.

    References
    ----------
    - https://en.wikipedia.org/wiki/Curvature#Space_curves
    """

    if positions is None:
        positions = self.anchors

    dz, dy, dx = self.map(positions, der=1).T
    ddz, ddy, ddx = self.map(positions, der=2).T
    a = (
        (ddz * dy - ddy * dz) ** 2
        + (ddx * dz - ddz * dx) ** 2
        + (ddy * dx - ddx * dy) ** 2
    )
    return np.sqrt(a) / (dx**2 + dy**2 + dz**2) ** 1.5
curvature_radii(positions=None)

Inverse of curvature.

Source code in cylindra/components/spline/_spline_base.py
676
677
678
def curvature_radii(self, positions: Sequence[float] = None) -> NDArray[np.float32]:
    """Inverse of curvature."""
    return 1.0 / self.curvature(positions)
cylindrical(r_range, s_range=(0, 1), scale=1.0)

Generate a cylindrical coordinate system along spline that can be used for ndi.map_coordinate. Note that this coordinate system is distorted, thus does not reflect real geometry (such as distance and derivatives).

Parameters:

Name Type Description Default
r_range (nm, nm)

Range of radius in nm. r=0 will be spline curve itself after coodinate transformation.

required
s_range tuple[float, float]

Range of spline. Spline coordinate system will be built between spl[s_range[0]] and spl[s_range[1]].

(0, 1)
scale nm

Scale of coordinates, i.e. spacing of the grid.

1.0

Returns:

Type Description
ndarray

(V, S, H, D) shape. Each cooresponds to radius, longitudinal, angle and dimensional axis.

Source code in cylindra/components/spline/_spline_base.py
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
def cylindrical(
    self,
    r_range: tuple[nm, nm],
    s_range: tuple[float, float] = (0, 1),
    scale: nm = 1.0,
) -> NDArray[np.float32]:
    """
    Generate a cylindrical coordinate system along spline that can be used for
    `ndi.map_coordinate`. Note that this coordinate system is distorted, thus
    does not reflect real geometry (such as distance and derivatives).

    Parameters
    ----------
    r_range : (nm, nm)
        Range of radius in nm. r=0 will be spline curve itself after coodinate
        transformation.
    s_range : tuple[float, float], default (0, 1)
        Range of spline. Spline coordinate system will be built between
        `spl[s_range[0]]` and `spl[s_range[1]]`.
    scale: nm, default 1.0
        Scale of coordinates, i.e. spacing of the grid.

    Returns
    -------
    np.ndarray
        (V, S, H, D) shape. Each cooresponds to radius, longitudinal, angle and
        dimensional axis.
    """
    rmin = r_range[0] / scale
    rmax = r_range[1] / scale
    return self._get_coords(polar_coords_2d, (rmin, rmax), s_range, scale)
cylindrical_to_molecules(coords)

Convert coordinates of points near the spline to Molecules instance.

Coordinates of points must be those in spline cylindrical coordinate system.

Parameters:

Name Type Description Default
coords (N, 3) array

Spline cylindrical coordinates of points.

required

Returns:

Type Description
Molecules

Molecules object of points.

Source code in cylindra/components/spline/_spline_base.py
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
def cylindrical_to_molecules(
    self,
    coords: NDArray[np.float32],
) -> Molecules:
    """
    Convert coordinates of points near the spline to `Molecules` instance.

    Coordinates of points must be those in spline cylindrical coordinate system.

    Parameters
    ----------
    coords : (N, 3) array
        Spline cylindrical coordinates of points.

    Returns
    -------
    Molecules
        Molecules object of points.
    """
    world_coords = self.cylindrical_to_world(coords)

    # world coordinates of the projection point of coords onto the spline
    u = self.y_to_position(coords[:, 1])
    ycoords = self.map(u)
    zvec = world_coords - ycoords
    yvec = self.map(u, der=1)
    return Molecules.from_axes(pos=world_coords, z=zvec, y=yvec)
cylindrical_to_world(coords)

Inverse cylindrical coordinate mapping, (r, y, angle) to world coordinate.

Parameters:

Name Type Description Default
coords ndarray

Cylindrical coordinates. "r" and "y" must be in scale of "nm", while angle must be in radian.

required

Returns:

Type Description
ndarray

World coordinates.

Source code in cylindra/components/spline/_spline_base.py
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
def cylindrical_to_world(self, coords: NDArray[np.float32]) -> NDArray[np.float32]:
    """
    Inverse cylindrical coordinate mapping, (r, y, angle) to world coordinate.

    Parameters
    ----------
    coords : np.ndarray
        Cylindrical coordinates. "r" and "y" must be in scale of "nm", while angle
        must be in radian.

    Returns
    -------
    np.ndarray
        World coordinates.
    """
    radius = coords[:, 0]
    y = coords[:, 1]
    theta = coords[:, 2]
    cart_coords = np.stack(
        [radius * np.sin(theta), y, radius * np.cos(theta)], axis=1
    )

    return self.cartesian_to_world(cart_coords)
distances(positions=None, nknots=512)

Get the distances from u=0.

Parameters:

Name Type Description Default
positions sequence of float

Positions. Between 0 and 1. If not given, anchors are used instead.

None

Returns:

Type Description
ndarray

Distances for each u.

Source code in cylindra/components/spline/_spline_base.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
def distances(
    self,
    positions: Sequence[float] | None = None,
    nknots: int = 512,
) -> NDArray[np.float32]:
    """
    Get the distances from u=0.

    Parameters
    ----------
    positions : sequence of float, optional
        Positions. Between 0 and 1. If not given, anchors are used instead.

    Returns
    -------
    np.ndarray
        Distances for each `u`.
    """
    if positions is None:
        _u = self.anchors
    else:
        _u = np.asarray(positions, dtype=np.float32)
        if _u.ndim != 1:
            raise ValueError("Positions must be 1D array.")
    u = np.linspace(0, 1, nknots)
    u_tr = _linear_conversion(u, *self._lims)
    dz, dy, dx = map(np.diff, splev(u_tr, self._tck, der=0))
    dist = np.concatenate([[0], np.sqrt(dx**2 + dy**2 + dz**2)]).cumsum()
    tck = splrep(u, dist, k=1)
    out = splev(_u, tck)
    return out
fit(coords, *, err_max=1.0)

Fit spline model to coordinates.

This method uses scipy.interpolate.splprep to fit given coordinates to a spline.

Parameters:

Name Type Description Default
coords ndarray

Coordinates. Must be (N, 3).

required
err_max float

Error allowed for fitting. Several upper limit of residual values will be used and the fit that results in error lower than this value and minimize the maximum curvature will be chosen.

1.0

Returns:

Type Description
Spline

New spline fit to given coordinates.

Source code in cylindra/components/spline/_spline_base.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def fit(
    self,
    coords: ArrayLike,
    *,
    err_max: nm = 1.0,
) -> Self:
    """
    Fit spline model to coordinates.

    This method uses `scipy.interpolate.splprep` to fit given coordinates to a
    spline.

    Parameters
    ----------
    coords : np.ndarray
        Coordinates. Must be (N, 3).
    err_max : float, default 1.0
        Error allowed for fitting. Several upper limit of residual values will be
        used and the fit that results in error lower than this value and minimize
        the maximum curvature will be chosen.

    Returns
    -------
    Spline
        New spline fit to given coordinates.
    """
    crds = np.asarray(coords)
    npoints = crds.shape[0]
    if npoints < 2:
        raise ValueError("Number of input coordinates must be > 1.")
    if npoints <= self.order:
        k = npoints - 1
    else:
        k = self.order

    if self.is_inverted():
        crds = crds[::-1]

    if err_max > 4.0:
        raise ValueError("std_max must be smaller than 4.0.")
    if err_max < 1e-3:
        std_list = [err_max]
    else:
        ntrial = max(int(err_max / 0.02), 2)
        std_list = np.linspace(0, err_max, ntrial)[1:]

    fit_results = list[SplineFitResult]()
    new = self.__class__(order=k, extrapolate=self.extrapolate, config=self.config)
    with warnings.catch_warnings():
        # fitting may fail for some std
        warnings.simplefilter("ignore", RuntimeWarning)
        for std in std_list:
            _tck, _u = splprep(crds.T, k=k, s=std**2 * npoints)
            new._set_params(_tck, _u)
            _crds_at_u = new.map(_u)
            res: NDArray[np.float32] = np.sqrt(
                np.sum((_crds_at_u - crds) ** 2, axis=1)
            )
            _knots = _tck[0][new.order : -new.order]
            nedge = _knots.size - 1
            assert nedge > 0
            nanc = nedge * 20 + 1
            anc = np.interp(
                np.linspace(0, 1, nanc), np.linspace(0, 1, nedge + 1), _knots
            )
            max_curvature = new.curvature(anc).max()
            success = res.max() <= err_max
            fit_results.append(
                SplineFitResult((_tck, _u), max_curvature, res, success)
            )

    fit_results_filt = list(filter(lambda x: x.success, fit_results))
    if len(fit_results_filt) == 0:
        fit_results_filt = fit_results

    reult_opt = min(fit_results_filt, key=lambda x: x.curvature)
    return new._set_params(*reult_opt.params)
from_dict(d) classmethod

Construct a spline model from a dictionary.

Parameters:

Name Type Description Default
d SplineInfo

Dictionary with keys "t", "c", "k", "u" and "lims".

required

Returns:

Type Description
Spline

Spline object constructed from the dictionary.

Source code in cylindra/components/spline/_spline_base.py
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
@classmethod
def from_dict(cls: type[Self], d: SplineInfo) -> Self:
    """
    Construct a spline model from a dictionary.

    Parameters
    ----------
    d: dict
        Dictionary with keys "t", "c", "k", "u" and "lims".

    Returns
    -------
    Spline
        Spline object constructed from the dictionary.
    """
    self = cls(
        order=d.get("k", 3),
        lims=d.get("lims", (0, 1)),
        extrapolate=d.get("extrapolate", "linear"),
    )
    t = np.asarray(d["t"])
    c = [np.asarray(d["c"][k]) for k in "zyx"]
    k = roundint(d["k"])
    self._tck = (t, c, k)
    self._u = np.asarray(d["u"])
    self.props._window_size = d.get("localprops_window_size", {})
    self.props._binsize_loc = d.get("binsize_loc", {})
    self.props._binsize_glob = d.get("binsize_glob", {})
    if cfg := d.get("config", None):
        self._config = SplineConfig.from_dict(cfg)
    return self
get_rotator(positions=None, inverse=False)

Calculate list of Affine transformation matrix along spline, which correspond to the orientation of spline curve.

Parameters:

Name Type Description Default
positions (array - like, (N))

Positions. Between 0 and 1.

None
inverse bool

If True, rotation matrix will be inversed.

False

Returns:

Type Description
Rotation

Rotation object at each anchor.

Source code in cylindra/components/spline/_spline_base.py
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
def get_rotator(
    self,
    positions: Sequence[float] | None = None,
    inverse: bool = False,
) -> Rotation:
    """
    Calculate list of Affine transformation matrix along spline, which correspond to
    the orientation of spline curve.

    Parameters
    ----------
    positions : array-like, (N,)
        Positions. Between 0 and 1.
    inverse : bool, default False
        If True, rotation matrix will be inversed.

    Returns
    -------
    Rotation
        Rotation object at each anchor.
    """
    if positions is None:
        positions = self.anchors
    ds = self.map(positions, der=1)
    out = axes_to_rotator(None, -ds)

    if inverse:
        out = out.inv()

    return out
has_props()

True if there are any properties.

Source code in cylindra/components/spline/_spline_base.py
81
82
83
def has_props(self) -> bool:
    """True if there are any properties."""
    return len(self.props.loc) > 0 or len(self.props.glob) > 0
invert()

Invert the direction of spline.

Returns:

Type Description
Spline

Inverted object

Source code in cylindra/components/spline/_spline_base.py
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def invert(self) -> Self:
    """
    Invert the direction of spline.

    Returns
    -------
    Spline
        Inverted object
    """
    anchors = self._anchors
    inverted = self.clip(1.0, 0.0)
    if anchors is not None:
        inverted.anchors = 1 - anchors[::-1]
    return inverted
is_inverted()

Return true if spline is inverted.

Source code in cylindra/components/spline/_spline_base.py
224
225
226
def is_inverted(self) -> bool:
    """Return true if spline is inverted."""
    return self._lims[0] > self._lims[1]
length(start=0, stop=1, nknots=512)

Approximate the length of B-spline between [start, stop] by partitioning the spline with 'nknots' knots. nknots=256 is large enough for most cases.

Source code in cylindra/components/spline/_spline_base.py
616
617
618
619
620
621
622
623
624
625
def length(self, start: float = 0, stop: float = 1, nknots: int = 512) -> nm:
    """
    Approximate the length of B-spline between [start, stop] by partitioning
    the spline with 'nknots' knots. nknots=256 is large enough for most cases.
    """
    _assert_fitted(self)
    u = np.linspace(start, stop, nknots)
    u_tr = _linear_conversion(u, *self._lims)
    dz, dy, dx = map(np.diff, splev(u_tr, self._tck, der=0))
    return np.sum(np.sqrt(dx**2 + dy**2 + dz**2))
line(start, end) classmethod

Create a line spline.

Parameters:

Name Type Description Default
start array_like

Start point of the line.

required
end array_like

End point of the line.

required

Returns:

Type Description
Spline

Line spline.

Source code in cylindra/components/spline/_spline_base.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@classmethod
def line(cls, start: ArrayLike, end: ArrayLike) -> Self:
    """
    Create a line spline.

    Parameters
    ----------
    start : array_like
        Start point of the line.
    end : array_like
        End point of the line.

    Returns
    -------
    Spline
        Line spline.
    """
    spl = cls()
    coords = np.stack([start, end], axis=0)
    return spl.fit(coords, err_max=0.0)
local_cartesian(shape, depth, u=None, scale=1.0)

Generate local Cartesian coordinate systems.

The generated array can be used for ndi.map_coordinates. The result coordinate systems are flat, i.e., not distorted by the curvature of spline.

Parameters:

Name Type Description Default
shape (float, float)

Vertical and horizontal length of Cartesian coordinates. Corresponds to zx axes.

required
depth float

Length of y axis in nm.

required
u float

Position on the spline at which local Cartesian coordinates will be built.

None
scale nm

Scale of coordinates, i.e. spacing of the grid.

1.0

Returns:

Type Description
ndarray

(D, V, S, H) shape. Each cooresponds to dimensional vertical, longitudinal and horizontal axis, which is ready to be used in ndi.map_coordinates.

Source code in cylindra/components/spline/_spline_base.py
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
def local_cartesian(
    self,
    shape: tuple[nm, nm],
    depth: nm,
    u: float | Sequence[float] = None,
    scale: nm = 1.0,
) -> NDArray[np.float32]:
    """
    Generate local Cartesian coordinate systems.

    The generated array can be used for `ndi.map_coordinates`. The result coordinate
    systems are flat, i.e., not distorted by the curvature of spline.

    Parameters
    ----------
    shape : (float, float)
        Vertical and horizontal length of Cartesian coordinates. Corresponds to zx
        axes.
    depth : float
        Length of y axis in nm.
    u : float, optional
        Position on the spline at which local Cartesian coordinates will be built.
    scale: nm, default 1.0
        Scale of coordinates, i.e. spacing of the grid.

    Returns
    -------
    np.ndarray
        (D, V, S, H) shape. Each cooresponds to dimensional vertical, longitudinal
        and horizontal axis, which is ready to be used in `ndi.map_coordinates`.
    """

    mole = self.anchors_to_molecules(u)
    nz = roundint(shape[0] / scale)
    ny = roundint(depth / scale)
    nx = roundint(shape[1] / scale)
    return mole.local_coordinates(shape=(nz, ny, nx), scale=scale)
local_cylindrical(r_range, depth, u=None, scale=1.0)

Generate local cylindrical coordinate systems.

The generated array can be used for ndi.map_coordinates. The result coordinate systems are flat, i.e., not distorted by the curvature of spline.

Parameters:

Name Type Description Default
r_range (float, float)

Lower and upper bound of radius in nm.

required
depth nm

Length of y axis in nm.

required
u float

Position on the spline at which local cylindrical coordinates will be built.

None
scale nm

Scale of coordinates, i.e. spacing of the grid.

1.0

Returns:

Type Description
ndarray

(D, V, S, H) shape. Each cooresponds to dimensional, radius, longitudinal and angle axis, which is ready to be used in ndi.map_coordinates.

Source code in cylindra/components/spline/_spline_base.py
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
def local_cylindrical(
    self,
    r_range: tuple[nm, nm],
    depth: nm,
    u: float | None = None,
    scale: nm = 1.0,
) -> NDArray[np.float32]:
    """
    Generate local cylindrical coordinate systems.

    The generated array can be used for `ndi.map_coordinates`. The result coordinate
    systems are flat, i.e., not distorted by the curvature of spline.

    Parameters
    ----------
    r_range : (float, float)
        Lower and upper bound of radius in nm.
    depth : nm
        Length of y axis in nm.
    u : float
        Position on the spline at which local cylindrical coordinates will be built.
    scale: nm, default 1.0
        Scale of coordinates, i.e. spacing of the grid.

    Returns
    -------
    np.ndarray
        (D, V, S, H) shape. Each cooresponds to dimensional, radius, longitudinal
        and angle axis, which is ready to be used in `ndi.map_coordinates`.
    """
    if u is None:
        u = self.anchors
    rmin, rmax = r_range
    ds = self.map(u, der=1)
    ds_norm: NDArray[np.float32] = ds.reshape(-1, 1) / np.sqrt(sum(ds**2))
    depth_px = roundint(depth / scale)
    grid = np.linspace(-depth_px / 2 + 0.5, depth_px / 2 - 0.5, depth_px)
    dy = ds_norm * grid
    y_ax_coords = (self.map(u) / scale).reshape(1, -1) + dy.T
    dslist = np.stack([ds] * depth_px, axis=0)
    map_ = polar_coords_2d(rmin / scale, rmax / scale)
    map_slice = _stack_coords(map_)
    out = _rot_with_vector(map_slice, y_ax_coords, dslist)
    return np.moveaxis(out, -1, 0)
make_anchors(interval=None, n=None, max_interval=None)

Make anchor points at constant intervals. Either interval, number of anchor or the maximum interval between anchors can be specified.

Parameters:

Name Type Description Default
interval nm

Interval between anchor points.

None
n int

Number of anchor points, including both ends.

None
max_interval nm | None

Spline will be split by as little anchors as possible but interval between anchors will not be larger than this. The number of anchors are also guaranteed to be larger than spline order.

None
Source code in cylindra/components/spline/_spline_base.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def make_anchors(
    self,
    interval: nm | None = None,
    n: int | None = None,
    max_interval: nm | None = None,
) -> Self:
    """
    Make anchor points at constant intervals. Either interval, number of anchor or
    the maximum interval between anchors can be specified.

    Parameters
    ----------
    interval : nm, optional
        Interval between anchor points.
    n : int, optional
        Number of anchor points, including both ends.
    max_interval: nm, optional
        Spline will be split by as little anchors as possible but interval between
        anchors will not be larger than this. The number of anchors are also
        guaranteed to be larger than spline order.
    """
    self.anchors = self.prep_anchor_positions(interval, n, max_interval)
    return self
map(positions=None, der=0)

Calculate coordinates (or n-th derivative) at points on the spline.

Parameters:

Name Type Description Default
positions ndarray or float

Positions. Between 0 and 1. If not given, anchors are used instead.

None
der int

der-th derivative will be calculated.

0

Returns:

Type Description
ndarray

Positions or vectors in (3,) or (N, 3) shape.

Source code in cylindra/components/spline/_spline_base.py
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
def map(
    self,
    positions: float | NDArray[np.number] | None = None,
    der: int = 0,
) -> NDArray[np.float32]:
    """
    Calculate coordinates (or n-th derivative) at points on the spline.

    Parameters
    ----------
    positions : np.ndarray or float, optional
        Positions. Between 0 and 1. If not given, anchors are used instead.
    der : int, default 0
        `der`-th derivative will be calculated.

    Returns
    -------
    np.ndarray
        Positions or vectors in (3,) or (N, 3) shape.
    """
    _assert_fitted(self)
    if positions is None:
        positions = self.anchors
    u0, u1 = self._lims
    if np.isscalar(positions):
        if self.order < der:
            return np.zeros(3, dtype=np.float32)
        u_tr = _linear_conversion(float(positions), u0, u1)
        if 0 <= u_tr <= 1 or self.extrapolate is ExtrapolationMode.default:
            coord = splev([u_tr], self._tck, der=der)
        elif self.extrapolate is ExtrapolationMode.linear:
            if der == 0:
                if u_tr < 0:
                    der0 = splev([0], self._tck, der=0)
                    der1 = splev([0], self._tck, der=1)
                    dr = u_tr
                else:
                    der0 = splev([1], self._tck, der=0)
                    der1 = splev([1], self._tck, der=1)
                    dr = u_tr - 1
                coord = [a0 + a1 * dr for a0, a1 in zip(der0, der1, strict=True)]
            elif der == 1:
                if u_tr < 0:
                    coord = splev([0], self._tck, der=1)
                else:
                    coord = splev([1], self._tck, der=1)
            else:
                coord = [[0], [0], [0]]
        else:
            raise ValueError(f"Invalid extrapolation mode: {self.extrapolate!r}.")
        out = np.concatenate(coord).astype(np.float32)

    else:
        u_tr = _linear_conversion(np.asarray(positions, dtype=np.float32), u0, u1)
        if self.order < der:
            return np.zeros((u_tr.size, 3), dtype=np.float32)
        if self.extrapolate is ExtrapolationMode.default:
            out = np.stack(splev(u_tr, self._tck, der=der), axis=1).astype(
                np.float32
            )
        elif self.extrapolate is ExtrapolationMode.linear:
            sl_small = u_tr < 0
            sl_large = u_tr > 1
            n_small = np.count_nonzero(sl_small)
            n_large = np.count_nonzero(sl_large)
            out = np.stack(splev(u_tr, self._tck, der=der), axis=1).astype(
                np.float32
            )
            if der == 0:
                if n_small > 0:
                    der0 = np.array(splev(0, self._tck, der=0), dtype=np.float32)
                    der1 = np.array(splev(0, self._tck, der=1), dtype=np.float32)
                    dr = u_tr[sl_small]
                    coords_new = der0 + der1 * dr[:, np.newaxis]
                    out[sl_small] = coords_new

                if n_large > 0:
                    der0 = splev(1, self._tck, der=0)
                    der1 = splev(1, self._tck, der=1)
                    dr = u_tr[sl_large] - 1
                    coords_new = der0 + der1 * dr[:, np.newaxis]
                    out[sl_large] = coords_new

            elif der == 1:
                if n_small > 0:
                    out[sl_small] = splev(0, self._tck, der=1)
                if n_large > 0:
                    out[sl_large] = splev(1, self._tck, der=1)
            else:
                if n_small > 0:
                    out[sl_small] = 0
                if n_large > 0:
                    out[sl_large] = 0
        else:
            raise ValueError(f"Invalid extrapolation mode: {self.extrapolate!r}.")

    if u0 > u1 and der % 2 == 1:
        out = -out
    return out
partition(n, der=0)

Return the n-partitioning coordinates of the spline.

Source code in cylindra/components/spline/_spline_base.py
611
612
613
614
def partition(self, n: int, der: int = 0) -> NDArray[np.float32]:
    """Return the n-partitioning coordinates of the spline."""
    u = np.linspace(0, 1, n)
    return self.map(u, der)
resample(max_interval=1.0, err_max=0.1)

Resample a new spline along the original spline.

Parameters:

Name Type Description Default
max_interval nm

Maximum interval between resampling points.

1.0
err_max float

Spline fitting maximum error.

0.1

Returns:

Type Description
Spline

Resampled spline object.

Source code in cylindra/components/spline/_spline_base.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def resample(self, max_interval: nm = 1.0, err_max: nm = 0.1) -> Self:
    """
    Resample a new spline along the original spline.

    Parameters
    ----------
    max_interval : nm, default 1.0
        Maximum interval between resampling points.
    err_max : float, default 0.1
        Spline fitting maximum error.

    Returns
    -------
    Spline
        Resampled spline object.
    """
    l = self.length()
    points = self.map(np.linspace(0, 1, ceilint(l / max_interval)))
    return self.fit(points, err_max=err_max)
restore()

Restore the original, not-clipped spline.

Returns:

Type Description
Spline

Copy of the original spline.

Source code in cylindra/components/spline/_spline_base.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def restore(self) -> Self:
    """
    Restore the original, not-clipped spline.

    Returns
    -------
    Spline
        Copy of the original spline.
    """
    return self.__class__(
        order=self.order,
        lims=(0, 1),
        extrapolate=self.extrapolate,
        config=self.config,
    )._set_params(self._tck, self._u)
shift(positions=None, shifts=None, *, err_max=1.0)

Fit spline model using a list of shifts in XZ-plane.

Parameters:

Name Type Description Default
positions sequence of float

Positions. Between 0 and 1. If not given, anchors are used instead.

None
shifts ndarray

Shift from center in nm. Must be (N, 2).

None
err_max float

Error allowed for fitting. See Spline.fit.

1.0

Returns:

Type Description
Spline

Spline shifted by fitting to given coordinates.

Source code in cylindra/components/spline/_spline_base.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
def shift(
    self,
    positions: Sequence[float] | None = None,
    shifts: NDArray[np.floating] | None = None,
    *,
    err_max: nm = 1.0,
) -> Self:
    """
    Fit spline model using a list of shifts in XZ-plane.

    Parameters
    ----------
    positions : sequence of float, optional
        Positions. Between 0 and 1. If not given, anchors are used instead.
    shifts : np.ndarray
        Shift from center in nm. Must be (N, 2).
    err_max : float, default 1.0
        Error allowed for fitting. See `Spline.fit`.

    Returns
    -------
    Spline
        Spline shifted by fitting to given coordinates.
    """
    if shifts is None:
        raise ValueError("Shifts must be given.")
    coords = self.map(positions)
    rot = self.get_rotator(positions)
    # insert 0 in y coordinates.
    shifts = np.stack([shifts[:, 0], np.zeros(len(rot)), shifts[:, 1]], axis=1)
    coords += rot.apply(shifts)
    return self.fit(coords, err_max=err_max)
to_dict()

Convert spline info into a dict.

Source code in cylindra/components/spline/_spline_base.py
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
def to_dict(self) -> SplineInfo:
    """Convert spline info into a dict."""
    t, c, k = self._tck
    u = self._u
    return {
        "t": t.tolist(),
        "c": {"z": c[0].tolist(), "y": c[1].tolist(), "x": c[2].tolist()},
        "k": k,
        "u": u.tolist(),
        "lims": self._lims,
        "localprops_window_size": dict(self.props.window_size),
        "binsize_loc": dict(self.props.binsize_loc),
        "binsize_glob": dict(self.props.binsize_glob),
        "extrapolate": self._extrapolate.name,
        "config": self.config.asdict(),
    }
translate(shift)

Translate the spline by given shift vectors.

Source code in cylindra/components/spline/_spline_base.py
178
179
180
181
182
183
def translate(self, shift: tuple[nm, nm, nm]):
    """Translate the spline by given shift vectors."""
    new = self.copy()
    c = [x + s for x, s in zip(self.coeff, shift, strict=True)]
    new._tck = (self.knots, c, self.order)
    return new
with_config(config, copy_props=False)

Return a copy of the spline with a new config.

Source code in cylindra/components/spline/_spline_base.py
120
121
122
123
124
125
126
127
128
129
130
def with_config(
    self,
    config: dict[str, Any] | SplineConfig,
    copy_props: bool = False,
) -> Self:
    """Return a copy of the spline with a new config."""
    new = self.copy(copy_props=copy_props, copy_config=False)
    if not isinstance(config, SplineConfig):
        config = SplineConfig.construct(**config)
    new._config = config
    return new
with_extrapolation(extrapolate)

Return a copy of the spline with a new extrapolation mode.

Source code in cylindra/components/spline/_spline_base.py
114
115
116
117
118
def with_extrapolation(self, extrapolate: ExtrapolationMode | str) -> Self:
    """Return a copy of the spline with a new extrapolation mode."""
    new = self.copy()
    new._extrapolate = ExtrapolationMode(extrapolate)
    return new
y_to_position(y, nknots=512)

Convert y-coordinate to spline position parameter.

Parameters:

Name Type Description Default
y array - like

Y coordinates.

required
nknots int

Number of knots. Increasing the number of knots will increase the accuracy.

512
Source code in cylindra/components/spline/_spline_base.py
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
def y_to_position(
    self, y: NDArray[np.float32], nknots: int = 512
) -> NDArray[np.float32]:
    """
    Convert y-coordinate to spline position parameter.

    Parameters
    ----------
    y : array-like
        Y coordinates.
    nknots : int, optional
        Number of knots. Increasing the number of knots will increase the accuracy.
    """
    # almost equal to y / self.length()
    u = np.linspace(0, 1, nknots)
    u_tr = _linear_conversion(u, *self._lims)
    dz, dy, dx = map(np.diff, splev(u_tr, self._tck, der=0))
    dist = np.concatenate([[0], np.sqrt(dx**2 + dy**2 + dz**2)]).cumsum()
    tck = splrep(dist, u, k=1)
    out = splev(y, tck)
    return out

SplineConfig dataclass

Class for spline configuration.

Source code in cylindra/components/spline/_config.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@dataclass(frozen=True)
class SplineConfig:
    """Class for spline configuration."""

    npf_range: Range[int] = Range(11, 17)
    spacing_range: Range[nm] = Range(3.9, 4.3)
    twist_range: Range[float] = Range(-1.0, 1.0)
    rise_range: Range[float] = Range(0.0, 45.0)
    rise_sign: Literal[-1, 1] = -1
    clockwise: Literal["PlusToMinus", "MinusToPlus"] = "MinusToPlus"
    thickness_inner: nm = 2.0
    thickness_outer: nm = 3.0
    fit_depth: nm = 48.0
    fit_width: nm = 44.0

    def _repr_pretty_(self, p, cycle: bool):
        if cycle:
            p.text(repr(self))
        parts = list[str]()
        for k in self.__dataclass_fields__:
            v = getattr(self, k)
            parts.append(f"{k}={v!r}")
        cont = ",\n\t".join(parts)
        p.text(f"SplineConfig(\n\t{cont}\n)")

    def copy(self) -> SplineConfig:
        return SplineConfig(
            npf_range=self.npf_range.copy(),
            spacing_range=self.spacing_range.copy(),
            twist_range=self.twist_range.copy(),
            rise_range=self.rise_range.copy(),
            rise_sign=self.rise_sign,
            clockwise=self.clockwise,
            thickness_inner=self.thickness_inner,
            thickness_outer=self.thickness_outer,
            fit_depth=self.fit_depth,
            fit_width=self.fit_width,
        )

    def asdict(self) -> dict[str, Any]:
        return {
            "npf_range": self.npf_range.astuple(),
            "spacing_range": self.spacing_range.astuple_rounded(),
            "twist_range": self.twist_range.astuple_rounded(),
            "rise_range": self.rise_range.astuple_rounded(),
            "rise_sign": self.rise_sign,
            "clockwise": self.clockwise,
            "thickness_inner": self.thickness_inner,
            "thickness_outer": self.thickness_outer,
            "fit_depth": self.fit_depth,
            "fit_width": self.fit_width,
        }

    def json_dumps(self) -> str:
        strings = list[str]()
        for k, v in self.asdict().items():
            if isinstance(v, tuple):
                strings.append(f'"{k}": {list(v)!r}'.replace("'", '"'))
            else:
                strings.append(f'"{k}": {v!r}'.replace("'", '"'))
        return "{\n    " + ",\n    ".join(strings) + "\n}"

    @classmethod
    def construct(cls, **kwargs) -> SplineConfig:
        """Construct a SplineConfig with argument check."""
        return SplineConfig().updated(**kwargs)

    @classmethod
    def from_dict(
        cls,
        cfg: dict[str, Any],
        unknown: Literal["warn", "error", "ignore"] = "warn",
    ) -> SplineConfig:
        # for version compatibility
        _undef = {}
        cfg_input = {}
        for k, v in cfg.items():
            if k not in SplineConfig.__annotations__:
                _undef[k] = v
            else:
                cfg_input[k] = v

        if _undef:
            msg = f"Unknown keys, maybe due to version incompatibility: {_undef!r}"
            match unknown:
                case "error":
                    raise ValueError(msg)
                case "warn":
                    warnings.warn(msg, RuntimeWarning, stacklevel=2)
                case "ignore":
                    pass
                case other:  # pragma: no cover
                    raise ValueError(f"Got invalid case {other!r}")
        return SplineConfig().updated(**cfg_input)

    @classmethod
    def from_file(
        cls,
        path: str,
        unknown: Literal["warn", "error", "ignore"] = "warn",
    ) -> SplineConfig:
        with open(path) as f:
            cfg: dict = json.load(f)
        return SplineConfig.from_dict(cfg, unknown=unknown)

    def to_file(self, path: str):
        with open(path, mode="w") as f:
            json.dump(self.asdict(), f)
        return None

    def updated(
        self,
        npf_range: tuple[int, int] | None = None,
        spacing_range: tuple[nm, nm] | None = None,
        twist_range: tuple[float, float] | None = None,
        rise_range: tuple[float, float] | None = None,
        rise_sign: Literal[-1, 1] | None = None,
        clockwise: Literal["PlusToMinus", "MinusToPlus"] | None = None,
        thickness_inner: nm | None = None,
        thickness_outer: nm | None = None,
        fit_depth: nm | None = None,
        fit_width: nm | None = None,
    ) -> SplineConfig:
        kwargs = locals()
        kwargs.pop("self")
        for k, v in kwargs.items():
            if v is None:
                kwargs[k] = getattr(self, k)
        for rng in ["npf_range", "spacing_range", "twist_range", "rise_range"]:
            kwargs[rng] = _norm_range(kwargs[rng])
        if kwargs["rise_sign"] not in [-1, 1]:
            raise ValueError("rise_sign must be -1 or 1")
        if kwargs["clockwise"] not in ["PlusToMinus", "MinusToPlus"]:
            raise ValueError("clockwise must be PlusToMinus or MinusToPlus")
        for n in ["thickness_inner", "thickness_outer", "fit_depth", "fit_width"]:
            if kwargs[n] < 0:
                raise ValueError(f"{n} must be non-negative")
        return SplineConfig(**kwargs)
construct(**kwargs) classmethod

Construct a SplineConfig with argument check.

Source code in cylindra/components/spline/_config.py
120
121
122
123
@classmethod
def construct(cls, **kwargs) -> SplineConfig:
    """Construct a SplineConfig with argument check."""
    return SplineConfig().updated(**kwargs)

CylTomogram

Bases: Tomogram

Tomogram with cylindrical splines.

Source code in cylindra/components/tomogram/_cyl_tomo.py
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
class CylTomogram(Tomogram):
    """Tomogram with cylindrical splines."""

    def __init__(self):
        super().__init__()
        self._splines = SplineList()

    @property
    def splines(self) -> SplineList:
        """List of splines."""
        return self._splines

    def add_spline(
        self,
        coords: ArrayLike,
        *,
        order: int = 3,
        err_max: nm = 0.5,
        extrapolate: ExtrapolationMode | str = ExtrapolationMode.linear,
        config: SplineConfig | dict[str, Any] = {},
    ) -> None:
        """
        Add spline path to tomogram.

        Parameters
        ----------
        coords : array-like
            (N, 3) array of coordinates. A spline curve that fit it well is added.
        order : int, optional
            Order of spline curve.
        extrapolate : str, optional
            Extrapolation mode of the spline.
        config : SplineConfig or dict, optional
            Configuration for spline fitting.
        """
        _coords = np.asarray(coords)
        ncoords = _coords.shape[0]
        spl = CylSpline(
            order=order,
            config=config,
            extrapolate=extrapolate,
        ).fit(_coords, err_max=err_max)
        interval: nm = 30.0
        length = spl.length()

        n = int(length / interval) + 1
        fit = spl.map(np.linspace(0, 1, n))
        if ncoords <= spl.order and ncoords < fit.shape[0]:
            return self.add_spline(
                fit,
                order=order,
                err_max=err_max,
                extrapolate=extrapolate,
                config=config,
            )

        self.splines.append(spl)
        return None

    @_misc.batch_process
    def make_anchors(
        self,
        i: int = None,
        *,
        interval: nm | None = None,
        n: int | None = None,
        max_interval: nm | None = None,
    ):
        """
        Make anchors on spline object(s).

        Parameters
        ----------
        interval : nm, optional
            Anchor intervals.
        n : int, optional
            Number of anchors
        max_interval : nm, optional
            Maximum interval between anchors.

        """
        self.splines[i].make_anchors(interval=interval, n=n, max_interval=max_interval)
        return None

    def align_to_polarity(self, orientation: Ori | str = Ori.MinusToPlus) -> Self:
        """
        Align all the splines in the direction parallel to the given polarity.

        Parameters
        ----------
        orientation : Ori or str, default Ori.MinusToPlus
            To which direction splines will be aligned.

        Returns
        -------
        Tomogram object
            Same object with updated splines.
        """
        orientation = Ori(orientation)
        if orientation is Ori.none:
            raise ValueError("Must be PlusToMinus or MinusToPlus.")
        for i, spl in enumerate(self.splines):
            if spl.orientation is Ori.none:
                raise ValueError(f"Spline-{i} has no orientation.")
            if spl.orientation != orientation:
                try:
                    self.splines[i] = spl.invert()
                except Exception as e:
                    raise type(e)(f"Cannot invert spline-{i}: {e}")
        return self

    @_misc.batch_process
    def fit(
        self,
        i: int = None,
        *,
        max_interval: nm = 30.0,
        degree_precision: float = 0.5,
        binsize: int = 1,
        err_max: nm = 1.0,
        edge_sigma: nm = 2.0,
        max_shift: nm = 5.0,
        n_rotations: int = 5,
    ) -> _misc.FitResult:
        """
        Roughly fit splines to cylindrical structures.

        Subtomograms will be sampled at every ``max_interval`` nm. In dense mode,
        Subtomograms will be masked relative to XY-plane, using sigmoid function.
        Sharpness of the sigmoid function is determined by ``dense_mode_sigma``
        (``dense_mode_sigma=0`` corresponds to a step function).

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to fit.
        max_interval : nm, default 30.0
            Maximum interval of sampling points in nm unit.
        degree_precision : float, default 0.5
            Precision of xy-tilt degree in angular correlation.
        binsize : int, default 1
            Multiscale bin size used for fitting.
        edge_sigma : nm, default 2.0
            Sharpness of mask at the edges. If not None, fitting will be executed after regions
            outside the cylinder are masked. Soft mask is important for precision because sharp
            changes in intensity cause strong correlation at the edges.
        max_shift: nm, default 5.0
            Maximum shift from the true center of the cylinder. This parameter is used in phase
            cross correlation.
        n_rotations : int, default 5
            Number of rotations to be tested during finding the cylinder center.

        Returns
        -------
        FitResult
            Result of fitting.
        """
        LOGGER.info(f"Running: {self.__class__.__name__}.fit, i={i}")
        spl = self.splines[i]
        anc = spl.prep_anchor_positions(max_interval=max_interval)
        subtomograms, interval, scale = self._prep_fit_spline(spl, anc, binsize)

        with set_gpu():
            subtomograms = _misc.soft_mask_edges(
                subtomograms, spl, anc, scale, edge_sigma
            )
            ds = spl.map(anc, der=1)
            yx_tilt = np.rad2deg(np.arctan2(-ds[:, 2], ds[:, 1]))
            degree_max = 14.0
            nrots = roundint(degree_max / degree_precision) + 1

            # Angular correlation
            out = _misc.dask_angle_corr(subtomograms, yx_tilt, nrots=nrots)
            refined_tilt_deg = np.array(out)
            refined_tilt_rad = np.deg2rad(refined_tilt_deg)

            # If subtomograms are sampled at short intervals, angles should be smoothened to
            # avoid overfitting.
            size = 2 * roundint(48.0 / interval) + 1
            if size > 1:
                # Mirror-mode padding is "a b c d | c b a".
                refined_tilt_rad = _misc.angle_uniform_filter(
                    refined_tilt_rad, size=size, mode=Mode.mirror
                )
                refined_tilt_deg = np.rad2deg(refined_tilt_rad)

            # Rotate subtomograms in YX plane
            for _j, img in enumerate(subtomograms):
                img: ip.ImgArray
                angle = refined_tilt_deg[_j]
                img.rotate(-angle, cval=0, update=True)

            # zx-shift correction by self-ZNCC
            subtomo_proj = subtomograms.mean(axis="y")

            if edge_sigma is not None:
                # Regions outside the mask don't need to be considered.
                xc = int(subtomo_proj.shape.x / 2)
                w = int(spl.config.fit_width / scale / 2)
                subtomo_proj = subtomo_proj[ip.slicer.x[xc - w : xc + w + 1]]

            max_shift_px = max_shift / scale * 2
            pf_ang = 360 / spl.config.npf_range.center
            degrees = np.linspace(-pf_ang / 2, pf_ang / 2, n_rotations) + 180
            shifts = _misc.multi_rotated_auto_zncc(subtomo_proj, degrees, max_shift_px)

        # Update spline coordinates.
        # Because centers of subtomogram are on lattice points of pixel coordinate,
        # coordinates that will be shifted should be converted to integers.
        coords_px = self.nm2pixel(spl.map(anc), binsize=binsize).astype(np.float32)
        coords_px_new = _misc.shift_coords(coords_px, shifts, refined_tilt_rad)
        coords = coords_px_new * scale + self.multiscale_translation(binsize)

        # Update spline parameters
        self.splines[i] = spl.fit(coords, err_max=err_max)
        result = _misc.FitResult(shifts * scale)
        LOGGER.info(f" >> Shift RMSD = {result.rmsd:.3f} nm")
        return result

    @_misc.batch_process
    def fit_centroid(
        self,
        i: int = None,
        *,
        max_interval: nm = 30.0,
        binsize: int = 1,
        err_max: nm = 1.0,
        max_shift: nm = 5.0,
    ) -> _misc.FitResult:
        LOGGER.info(f"Running: {self.__class__.__name__}.fit_centroid, i={i}")
        spl = self.splines[i]
        anc = spl.prep_anchor_positions(max_interval=max_interval)
        scale = self.scale * binsize

        # sample subtomograms
        loader = _misc.prep_loader_for_refine(self, spl, anc, binsize)
        subtomograms = ip.asarray(loader.asnumpy(), axes="pzyx").mean(axis="y")[
            ip.slicer.x[::-1]
        ]
        num, lz, lx = subtomograms.shape
        dpx = ceilint(max_shift / scale)
        sl_z = slice(max((lz - 1) // 2 - dpx, 0), min(lz // 2 + dpx + 1, lz))
        sl_x = slice(max((lx - 1) // 2 - dpx, 0), min(lx // 2 + dpx + 1, lx))
        centers = np.stack(
            [centroid_2d(patch, sl_z, sl_x) for patch in subtomograms], axis=0
        )
        shifts = centers - np.column_stack(
            [
                np.full(num, (sl_z.start + sl_z.stop - 1) / 2),
                np.full(num, (sl_x.start + sl_x.stop - 1) / 2),
            ]
        )
        self.splines[i] = spl.shift(anc, shifts=shifts * scale, err_max=err_max)
        result = _misc.FitResult(shifts * scale)
        LOGGER.info(f" >> Shift RMSD = {result.rmsd:.3f} nm")
        return result

    @_misc.batch_process
    def refine(
        self,
        i: int = None,
        *,
        max_interval: nm = 30.0,
        binsize: int = 1,
        err_max: nm = 1.0,
        corr_allowed: float = 0.9,
        max_shift: nm = 2.0,
        n_rotations: int = 3,
    ) -> _misc.FitResult:
        """
        Spline refinement using global lattice structural parameters.

        Refine spline using the result of previous fit and the global structural parameters.
        During refinement, Y-projection of XZ cross section of cylinder is rotated with the
        twist angles, thus is much more precise than the coarse fitting.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to fit.
        max_interval : nm, default 24.0
            Maximum interval of sampling points in nm unit.
        binsize : int, default 1
            Multiscale bin size used for refining.
        corr_allowed : float, defaul is 0.9
            How many images will be used to make template for alignment. If 0.9, then top
            90% will be used.
        max_shift: nm, default 2.0
            Maximum shift from the true center of the cylinder. This parameter is used in
            phase cross correlation.
        n_rotations : int, default 3
            Number of rotations to be tested during finding the cylinder center.

        Returns
        -------
        FitResult
            Result of fitting.
        """
        LOGGER.info(f"Running: {self.__class__.__name__}.refine, i={i}")
        spl = self.splines[i]
        _required = [H.spacing, H.twist, H.npf]
        if not spl.props.has_glob(_required):
            if (radius := spl.radius) is None:
                radius = self.measure_radius(
                    i=i,
                    binsize=binsize,
                    positions="auto",
                    update=False,
                )
            with spl.props.temp_glob(radius=radius):
                gprops = self.global_cft_params(
                    i=i, binsize=binsize, nsamples=1, update=False
                )
        else:
            gprops = spl.props.glob.select(_required)
        gdict = {k: float(gprops[k][0]) for k in _required}
        ancs = spl.prep_anchor_positions(max_interval=max_interval)

        # Calculate Fourier parameters by cylindrical transformation along spline.
        # Skew angles are divided by the angle of single protofilament and the residual
        # angles are used, considering missing wedge effect.
        space = gdict[H.spacing]
        twist = gdict[H.twist]
        npf = roundint(gdict[H.npf])

        LOGGER.info(f" >> Parameters: spacing = {space:.2f} nm, twist = {twist:.3f} deg, PF = {npf}")  # fmt: skip

        # complement twisting
        pf_ang = 360 / npf
        twists = _misc.get_twists(spl.length(), ancs.size, space, twist, npf)
        scale = self.scale * binsize
        loader = _misc.prep_loader_for_refine(self, spl, ancs, binsize, twists)
        subtomograms = ip.asarray(loader.asnumpy(), axes="pzyx", dtype=np.float32)
        subtomograms[:] -= subtomograms.mean()  # normalize
        subtomograms.set_scale(zyx=scale)

        degrees = np.linspace(-pf_ang / 2, pf_ang / 2, n_rotations) + 180
        max_shift_px = max_shift / scale
        with set_gpu():
            inputs = subtomograms.mean(axis="y")[ip.slicer.x[::-1]]

            # Align twist-corrected images
            shifts_loc = _misc.multi_rotated_auto_zncc(inputs, degrees, max_shift_px)
            tasks = [
                _misc.delayed_translate(inputs[_j], shifts_loc[_j])
                for _j in range(ancs.size)
            ]
            imgs_aligned = _filter_by_corr(
                np.stack(compute(*tasks), axis="p"),
                corr_allowed,
            )

            # Make 2D template using coarse aligned images.
            imgcory = imgs_aligned.mean(axis="p")
            shift = rotated_auto_zncc(
                imgcory, degrees=degrees, max_shifts=max_shift_px * 2
            )
            template = imgcory.affine(translation=shift, mode=Mode.constant, cval=0.0)

            # Align twist-corrected images to the template
            quat = loader.molecules.quaternion()
            tasks = [
                _misc.delayed_zncc_maximum(
                    inputs[_j],
                    _misc.mask_missing_wedge(template, self.tilt_model, quat[_j]),
                    max_shift_px,
                    twists[_j],
                )
                for _j in range(ancs.size)
            ]
            shifts = np.stack(compute(*tasks), axis=0)

        # Update spline parameters
        self.splines[i] = spl.shift(ancs, shifts=shifts * scale, err_max=err_max)
        result = _misc.FitResult(shifts * scale)
        LOGGER.info(f" >> Shift RMSD = {result.rmsd:.3f} nm")
        return result

    @_misc.batch_process
    def measure_radius(
        self,
        i: int = None,
        *,
        binsize: int = 1,
        positions: NDArray[np.float32] | Literal["auto", "anchor"] = "auto",
        min_radius: nm = 1.0,
        max_radius: nm = 100.0,
        update: bool = True,
    ) -> nm:
        """
        Measure radius using radial profile from the center.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to measure.
        binsize : int, default 1
            Multiscale bin size used for radius calculation.
        positions : array-like or "auto" or "anchor", default "auto"
            Sampling positions (between 0 and 1) to calculate radius. If "anchor"
            is given, anchors of the spline will be used. If "auto" is given,
            three positions along the spline will be used.
        min_radius : nm, default 1.0
            Minimum radius of the cylinder.
        max_radius : nm, default 100.0
            Maximum radius of the cylinder.
        update : bool, default True
            If True, global radius property will be updated.

        Returns
        -------
        float (nm)
            Cylinder radius.
        """
        LOGGER.info(f"Running: {self.__class__.__name__}.measure_radius, i={i}")
        spl = self.splines[i]

        if isinstance(positions, str) and positions == "auto":
            nanchor = 3
            pos = 1 / nanchor * np.arange(nanchor) + 0.5 / nanchor
        elif isinstance(positions, str) and positions == "anchor":
            pos = spl.anchors
        else:
            pos = np.asarray(positions, dtype=np.float32)

        input_img = self._get_multiscale_or_original(binsize)

        depth = spl.config.fit_depth
        _scale = input_img.scale.x
        min_radius_px = min_radius / _scale
        max_radius = min(max_radius, spl.config.fit_width / 2)
        max_radius_px = max_radius / _scale
        spl_trans = spl.translate([-self.multiscale_translation(binsize)] * 3)
        tasks = [
            _misc.get_radial_prof(
                input_img, spl_trans, anc, (min_radius, max_radius), depth
            )
            for anc in pos
        ]
        profs: list[NDArray[np.float32]] = compute(*tasks)
        prof = np.stack(profs, axis=0).mean(axis=0)
        imax_sub = find_centroid_peak(prof, *_misc.get_thickness(spl, _scale))
        offset_px = _misc.get_radius_offset(min_radius_px, max_radius_px)
        radius = (imax_sub + offset_px) * _scale
        if update:
            spl.props.update_glob(
                [pl.Series(H.radius, [radius], dtype=pl.Float32)],
                bin_size=binsize,
            )
        LOGGER.info(f" >> Radius = {radius:.3f} nm")
        return radius

    @_misc.batch_process
    def local_radii(
        self,
        *,
        i: int = None,
        depth: nm = 50.0,
        binsize: int = 1,
        min_radius: nm = 1.0,
        max_radius: nm = 100.0,
        update: bool = True,
        update_glob: bool = True,
    ) -> pl.Series:
        """
        Measure the local radii along the splines.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to analyze.
        depth : nm, default 50.0
            Longitudinal length of subtomograms for calculation.
        binsize : int, default 1
            Multiscale binsize to be used.
        min_radius : nm, default 1.0
            Minimum radius of the cylinder.
        max_radius : nm, default 100.0
            Maximum radius of the cylinder.
        update : bool, default True
            If True, spline properties will be updated.
        update_glob : bool, default True
            If True, global properties will be updated using the mean of the local
            radii.

        Returns
        -------
        pl.Series
            Radii along the spline.
        """
        LOGGER.info(f"Running: {self.__class__.__name__}.local_radii, i={i}")
        spl = self.splines[i]

        input_img = self._get_multiscale_or_original(binsize)

        depth = spl.config.fit_depth
        _scale = input_img.scale.x
        thickness = _misc.get_thickness(spl, _scale)
        min_radius_px = min_radius / _scale
        max_radius = min(max_radius, spl.config.fit_width / 2)
        max_radius_px = max_radius / _scale
        offset_px = _misc.get_radius_offset(min_radius_px, max_radius_px)
        spl_trans = spl.translate([-self.multiscale_translation(binsize)] * 3)
        tasks = []
        for anc in spl_trans.anchors:
            task = _misc.get_radial_prof(
                input_img, spl_trans, anc, (min_radius, max_radius), depth
            )
            tasks.append(task)
        profs: list[NDArray[np.float32]] = compute(*tasks)
        radii = list[float]()
        for prof in profs:
            imax_sub = find_centroid_peak(prof, *thickness)
            radii.append((imax_sub + offset_px) * _scale)

        out = pl.Series(H.radius, radii, dtype=pl.Float32)
        if update:
            spl.props.update_loc([out], depth, bin_size=binsize)
        if update_glob:
            spl.props.update_glob([pl.Series(H.radius, [out.mean()])], bin_size=binsize)
        return out

    @_misc.batch_process
    def local_cft_params(
        self,
        *,
        i: int = None,
        depth: nm = 50.0,
        binsize: int = 1,
        radius: nm | Literal["local", "global"] = "global",
        nsamples: int = 8,
        update: bool = True,
        update_glob: bool = False,
    ) -> pl.DataFrame:
        """
        Calculate local lattice parameters from cylindrical Fourier space.

        To determine the peaks upsampled discrete Fourier transformation is used
        for every subtomogram.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to analyze.
        depth : nm, default 50.0
            Length of subtomogram for calculation of local parameters.
        binsize : int, default 1
            Multiscale bin size used for calculation.
        radius : str, default "global"
            If "local", use the local radius for the analysis. If "global", use the
            global radius. If a float, use the given radius.
        nsamples : int, default 8
            Number of cylindrical coordinate samplings for Fourier transformation. Multiple
            samplings are needed because up-sampled discrete Fourier transformation does not
            return exactly the same power spectra with shifted inputs, unlike FFT. Larger
            ``nsamples`` reduces the error but is slower.
        update : bool, default True
            If True, spline properties will be updated.
        update_glob : bool, default False
            If True, global properties will be updated using the mean or mode of the local
            properties.

        Returns
        -------
        polars.DataFrame
            Local properties.
        """
        LOGGER.info(f"Running: {self.__class__.__name__}.local_cft_params, i={i}")
        spl = self.splines[i]
        radii = _misc.prepare_radii(spl, radius)
        input_img = self._get_multiscale_or_original(binsize)
        _scale = input_img.scale.x
        tasks: list[Delayed[LatticeParams]] = []
        spl_trans = spl.translate([-self.multiscale_translation(binsize)] * 3)
        _analyze_fn = LatticeAnalyzer(spl.config).estimate_lattice_params_task
        for anc, r0 in zip(spl_trans.anchors, radii, strict=True):
            rmin, rmax = spl.radius_range(r0)
            rc = (rmin + rmax) / 2
            coords = spl_trans.local_cylindrical((rmin, rmax), depth, anc, scale=_scale)
            tasks.append(_analyze_fn(input_img, coords, rc, nsamples=nsamples))

        lprops = pl.DataFrame(
            compute(*tasks),
            schema=LatticeParams.polars_schema(),
        )
        if update:
            spl.props.update_loc(lprops, depth, bin_size=binsize)
        if update_glob:
            gprops = lprops.select(
                pl.col(H.spacing).mean(),
                pl.col(H.pitch).mean(),
                pl.col(H.twist).mean(),
                pl.col(H.skew).mean(),
                pl.col(H.rise).mean(),
                pl.col(H.rise_length).mean(),
                pl.col(H.npf).mode().first(),
                pl.col(H.start).mode().first(),
            )
            spl.props.update_glob(gprops, bin_size=binsize)

        return lprops

    def iter_local_image(
        self,
        i: int,
        depth: nm = 50.0,
        pos: int | None = None,
        binsize: int = 1,
    ) -> Iterable[ip.ImgArray]:
        spl = self.splines[i]
        if spl.radius is None:
            raise ValueError("Radius has not been determined yet.")

        input_img = self._get_multiscale_or_original(binsize)
        _scale = input_img.scale.x
        rmin, rmax = spl.radius_range()
        rc = (rmin + rmax) / 2
        if pos is None:
            anchors = spl.anchors
        else:
            anchors = [spl.anchors[pos]]
        spl_trans = spl.translate([-self.multiscale_translation(binsize)] * 3)
        for anc in anchors:
            coords = spl_trans.local_cylindrical((rmin, rmax), depth, anc, scale=_scale)
            polar = get_polar_image(input_img, coords, rc)
            polar[:] -= np.mean(polar)
            yield polar

    @_misc.batch_process
    def local_cft(
        self,
        *,
        i: int = None,
        depth: nm = 50.0,
        pos: int | None = None,
        binsize: int = 1,
    ) -> ip.ImgArray:
        """
        Calculate non-upsampled local cylindric Fourier transormation along spline.

        This function does not up-sample.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to analyze.
        depth : nm, default 50.0
            Length of subtomogram for calculation of local parameters.
        pos : int, optional
            Only calculate at ``pos``-th anchor if given.
        binsize : int, default 1
            Multiscale bin size used for calculation.

        Returns
        -------
        ip.ImgArray
            FT images stacked along "p" axis.
        """
        out = list[ip.ImgArray]()
        with set_gpu():
            for polar in self.iter_local_image(i, depth, pos, binsize):
                out.append(polar.fft(dims="rya"))
        return np.stack(out, axis="p")

    @_misc.batch_process
    def local_cps(
        self,
        *,
        i: int = None,
        depth: nm = 50.0,
        pos: int | None = None,
        binsize: int = 1,
    ) -> ip.ImgArray:
        """
        Calculate non-upsampled local cylindric power spectra along spline.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to analyze.
        depth : nm, default 50.0
            Length of subtomogram for calculation of local parameters.
        pos : int, optional
            Only calculate at ``pos``-th anchor if given.
        binsize : int, default 1
            Multiscale bin size used for calculation.

        Returns
        -------
        ip.ImgArray
            FT images stacked along "p" axis.
        """
        cft = self.local_cft(i=i, depth=depth, pos=pos, binsize=binsize)
        return cft.real**2 + cft.imag**2

    @_misc.batch_process
    def local_image_with_peaks(
        self,
        *,
        i: int = None,
        binsize: int | None = None,
    ) -> list[_misc.ImageWithPeak]:
        spl = self.splines[i]
        depth = spl.props.window_size[H.twist]
        if binsize is None:
            binsize = spl.props.binsize_loc[H.twist]
        df_loc = spl.props.loc
        out = list[_misc.ImageWithPeak]()
        for j, polar_img in enumerate(self.iter_local_image(i, depth, binsize=binsize)):
            cparams = spl.cylinder_params(
                spacing=_misc.get_component(df_loc, H.spacing, j),
                pitch=_misc.get_component(df_loc, H.pitch, j),
                twist=_misc.get_component(df_loc, H.twist, j),
                skew=_misc.get_component(df_loc, H.skew, j),
                rise_angle=_misc.get_component(df_loc, H.rise, j),
                start=_misc.get_component(df_loc, H.start, j),
                npf=_misc.get_component(df_loc, H.npf, j),
            )
            analyzer = LatticeAnalyzer(spl.config)
            cft = polar_img.fft(dims="rya")
            cps_proj = (cft.real**2 + cft.imag**2).mean(axis="r")
            peakv, peakh = analyzer.params_to_peaks(cps_proj, cparams)
            peakv = peakv.shift_to_center()
            peakh = peakh.shift_to_center()
            out.append(_misc.ImageWithPeak(polar_img, cps_proj, [peakv, peakh]))
        return out

    @_misc.batch_process
    def global_cps(
        self,
        *,
        i: int = None,
        binsize: int = 1,
    ) -> ip.ImgArray:
        """
        Calculate global cylindrical power spectra.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to analyze.
        binsize : int, default 1
            Multiscale bin size used for calculation.

        Returns
        -------
        ip.ImgArray
            Complex image.
        """
        cft = self.global_cft(i=i, binsize=binsize)
        return cft.real**2 + cft.imag**2

    @_misc.batch_process
    def global_image_with_peaks(
        self,
        *,
        i: int = None,
        binsize: int | None = None,
    ) -> _misc.ImageWithPeak:
        spl = self.splines[i]
        if binsize is None:
            binsize = spl.props.binsize_glob[H.twist]
        img_st = self.straighten_cylindric(i, binsize=binsize)
        img_st -= np.mean(img_st)
        cft = img_st.fft(dims="rya")
        cps = cft.real**2 + cft.imag**2
        cps_proj = cps.mean(axis="r")
        cparams = spl.cylinder_params(
            spacing=spl.props.get_glob(H.spacing, default=None),
            pitch=spl.props.get_glob(H.pitch, default=None),
            twist=spl.props.get_glob(H.twist, default=None),
            skew=spl.props.get_glob(H.skew, default=None),
            rise_angle=spl.props.get_glob(H.rise, default=None),
            start=spl.props.get_glob(H.start, default=None),
            npf=spl.props.get_glob(H.npf, default=None),
        )
        analyzer = LatticeAnalyzer(spl.config)
        peakv, peakh = analyzer.params_to_peaks(cps_proj, cparams)
        peakv = peakv.shift_to_center()
        peakh = peakh.shift_to_center()
        return _misc.ImageWithPeak(img_st, cps_proj, [peakv, peakh])

    @_misc.batch_process
    def global_cft_params(
        self,
        *,
        i: int = None,
        binsize: int = 1,
        nsamples: int = 8,
        update: bool = True,
    ) -> pl.DataFrame:
        """
        Calculate global lattice parameters.

        This function transforms tomogram using cylindrical coordinate system along
        spline. This function calls ``straighten`` beforehand, so that Fourier space is
        distorted if the cylindrical structure is curved.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to analyze.
        binsize : int, default 1
            Multiscale bin size used for calculation.
        nsamples : int, default 8
            Number of cylindrical coordinate samplings for Fourier transformation.
            Multiple samplings are needed because up-sampled discrete Fourier
            transformation does not return exactly the same power spectra with shifted
            inputs, unlike FFT. Larger ``nsamples`` reduces the error but is slower.
        update : bool, default True
            If True, spline properties will be updated.

        Returns
        -------
        pl.DataFrame
            Global properties.
        """
        LOGGER.info(f"Running: {self.__class__.__name__}.global_cft_params, i={i}")
        spl = self.splines[i]
        rmin, rmax = spl.radius_range()
        img_st = self.straighten_cylindric(i, radii=(rmin, rmax), binsize=binsize)
        rc = (rmin + rmax) / 2
        analyzer = LatticeAnalyzer(spl.config)
        lparams = analyzer.estimate_lattice_params_polar(img_st, rc, nsamples=nsamples)
        out = lparams.to_polars()
        if update:
            spl.props.update_glob(spl.props.glob.with_columns(out), bin_size=binsize)
        return out

    @_misc.batch_process
    def global_cft(self, i: int = None, binsize: int = 1) -> ip.ImgArray:
        """
        Calculate global cylindrical fast Fourier tranformation.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to analyze.
        binsize : int, default 1
            Multiscale bin size used for calculation.

        Returns
        -------
        ip.ImgArray
            Complex image.
        """
        img_st = self.straighten_cylindric(i, binsize=binsize)
        img_st -= np.mean(img_st)
        return img_st.fft(dims="rya")

    @_misc.batch_process
    def infer_polarity(
        self,
        i: int = None,
        *,
        binsize: int = 1,
        depth: nm = 40,
        mask_freq: bool = True,
        update: bool = True,
    ) -> Ori:
        """
        Infer spline polarities using polar 2D image.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to analyze.
        binsize : int, default 1
            Multiscale bin size used for calculation.
        depth : nm, default 40.0
            Depth of images used to infer polarities.

        Returns
        -------
        Ori
            Orientation of corresponding splines.
        """
        LOGGER.info(f"Running: {self.__class__.__name__}.infer_polarity, i={i}")
        current_scale = self.scale * binsize

        if binsize > 1:
            imgb = self.get_multiscale(binsize)
        else:
            try:
                imgb = self.get_multiscale(1)
            except ValueError:
                imgb = self.image

        spl = self.splines[i]
        cfg = spl.config
        ori_clockwise = Ori(cfg.clockwise)
        ori_counterclockwise = Ori.invert(ori_clockwise, allow_none=False)
        if spl.radius is None:
            r_range = 0.5 * current_scale, cfg.fit_width / 2
        else:
            r_range = spl.radius_range()
        point = 0.5  # the sampling point
        coords = spl.local_cylindrical(r_range, depth, point, scale=current_scale)
        polar = get_polar_image(imgb, coords, spl.radius, order=1)
        if mask_freq:
            polar = LatticeAnalyzer(cfg).mask_spectra(polar)
        img_flat = polar.mean(axis="y")

        if (npf := spl.props.get_glob(H.npf, None)) is None:
            # if the global properties are already calculated, use it
            # otherwise, calculate the number of PFs from the power spectrum
            ft = img_flat.fft(shift=False, dims="ra")
            pw = ft.real**2 + ft.imag**2
            img_pw = np.mean(pw, axis=0)
            npf = np.argmax(img_pw[cfg.npf_range.asslice()]) + cfg.npf_range.min

        pw_peak = img_flat.local_power_spectra(
            key=ip.slicer.a[npf - 1 : npf + 2],
            upsample_factor=20,
            dims="ra",
        ).mean(axis="a")
        r_argmax = np.argmax(pw_peak)
        clkwise = r_argmax - (pw_peak.size + 1) // 2 <= 0
        ori = ori_clockwise if clkwise else ori_counterclockwise

        # logging
        _val = pw_peak[r_argmax]
        pw_non_peak = np.delete(pw_peak, r_argmax)
        _ave, _std = np.mean(pw_non_peak), np.std(pw_non_peak, ddof=1)
        LOGGER.info(f" >> polarity = {ori.name} (peak intensity={_val:.2g} compared to {_ave:.2g} ± {_std:.2g})")  # fmt: skip
        if update:
            spl.orientation = ori
        return ori

    @_misc.batch_process
    def straighten(
        self,
        i: int = None,
        *,
        size: nm | tuple[nm, nm] = None,
        range_: tuple[float, float] = (0.0, 1.0),
        chunk_length: nm | None = None,
        binsize: int = 1,
    ) -> ip.ImgArray:
        """
        Straightening by building curved coordinate system around splines. Currently
        Cartesian coordinate system and cylindrical coordinate system are supported.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to straighten.
        size : float (nm), optional
            Vertical/horizontal box size.
        range_ : tuple[float, float], default (0.0, 1.0)
            Range of spline domain.
        chunk_length : nm, optional
            If spline is longer than this, it will be first split into chunks,
            straightened respectively and all the straightened images are concatenated
            afterward, to avoid loading entire image into memory.
        binsize : int, default 1
            Multiscale bin size used for calculation.

        Returns
        -------
        ip.ImgArray
            Straightened image. If Cartesian coordinate system is used, it will have "zyx".
        """
        spl = self.splines[i]
        input_img = self._get_multiscale_or_original(binsize)
        chunk_length = _misc.normalize_chunk_length(input_img, chunk_length)
        return _straighten.straighten(input_img, spl, range_, size)

    @_misc.batch_process
    def straighten_cylindric(
        self,
        i: int = None,
        *,
        radii: tuple[nm, nm] | None = None,
        range_: tuple[float, float] = (0.0, 1.0),
        chunk_length: nm | None = None,
        binsize: int = 1,
    ) -> ip.ImgArray:
        """
        Straightening by building curved coordinate system around splines. Currently
        Cartesian coordinate system and cylindrical coordinate system are supported.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that you want to straighten.
        radii : tuple of float (nm), optional
            Lower/upper limit of radius.
        range_ : tuple[float, float], default (0.0, 1.0)
            Range of spline domain.
        chunk_length : nm, optional
            If spline is longer than this, it will be first split into chunks,
            straightened respectively and all the straightened images are concatenated
            afterward, to avoid loading entire image into memory.
        binsize : int, default 1
            Multiscale bin size used for calculation.

        Returns
        -------
        ip.ImgArray
            Straightened image. If Cartesian coordinate system is used, it will have "zyx".
        """
        spl = self.splines[i]
        input_img = self._get_multiscale_or_original(binsize)
        chunk_length = _misc.normalize_chunk_length(input_img, chunk_length)
        return _straighten.straighten_cylindric(input_img, spl, range_, radii)

    @_misc.batch_process
    def map_centers(
        self,
        i: int = None,
        *,
        interval: nm = 1.0,
        orientation: Ori | str | None = None,
        rotate_molecules: bool = True,
    ) -> Molecules:
        """
        Mapping molecules along the center of a cylinder.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that mapping will be calculated.
        interval : float (nm), optional
            Interval of molecules.
        rotate_molecules : bool, default True
            If True, twist the molecule orientations according to the spline twist.

        Returns
        -------
        Molecules
            Molecules object with mapped coordinates and angles.
        """
        spl = self.splines[i]
        u = spl.prep_anchor_positions(interval=interval)
        if rotate_molecules:
            spacing = spl.props.get_glob(H.spacing)
            twist = spl.props.get_glob(H.twist) / 2
            rotation = np.deg2rad(spl.distances(u) / spacing * twist)
        else:
            rotation = None
        mole = spl.anchors_to_molecules(u, rotation=rotation)
        if spl._need_rotation(orientation):
            mole = mole.rotate_by_rotvec_internal([np.pi, 0, 0])
        return mole

    def get_cylinder_model(
        self,
        i: int,
        offsets: tuple[float, float] = (0.0, 0.0),
        **kwargs,
    ) -> CylinderModel:  # fmt: skip
        """
        Return the cylinder model at the given spline ID.

        Parameters
        ----------
        i : int
            Spline ID from which model will be created.
        offsets : tuple of float, optional
            Offset of the model. See :meth:`map_monomers` for details.

        Returns
        -------
        CylinderModel
            The cylinder model.
        """
        return self.splines[i].cylinder_model(offsets=offsets, **kwargs)

    @_misc.batch_process
    def map_monomers(
        self,
        i: int = None,
        *,
        offsets: tuple[nm, float] | None = None,
        orientation: Ori | str | None = None,
        extensions: tuple[int, int] = (0, 0),
        **kwargs,
    ) -> Molecules:
        """
        Map monomers in a regular cylinder shape.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that mapping will be calculated.
        offsets : tuple of float, optional
            The offset of origin of oblique coordinate system to map monomers.
        orientation : Ori or str, optional
            Orientation of the y-axis of each molecule.

        Returns
        -------
        Molecules
            Object that represents monomer positions and angles.
        """
        model = self.get_cylinder_model(i, offsets=offsets, **kwargs)
        ny, na = model.shape
        ext0, ext1 = extensions
        if ny + ext0 + ext1 < 0:
            raise ValueError("The number of monomers is negative.")
        yy, aa = np.indices((ny + ext0 + ext1, na), dtype=np.int32)
        yy -= ext0
        coords = np.stack([yy.ravel(), aa.ravel()], axis=1)
        spl = self.splines[i]
        mole = model.locate_molecules(spl, coords)
        if spl._need_rotation(orientation):
            mole = mole.rotate_by_rotvec_internal([np.pi, 0, 0])
        return mole

    @_misc.batch_process
    def map_on_grid(
        self,
        i: int = None,
        coords: NDArray[np.int32] = (),
        *,
        offsets: tuple[nm, float] | None = None,
        orientation: Ori | str | None = None,
        **kwargs,
    ) -> Molecules:
        """
        Map monomers in a regular cylinder shape.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that mapping will be calculated.
        coords : ndarray
            Integer coordinates on the cylinder surface.
        offsets : tuple of float, optional
            The offset of origin of oblique coordinate system to map monomers.
        orientation : Ori or str, optional
            Orientation of the y-axis of each molecule.

        Returns
        -------
        Molecules
            Object that represents monomer positions and angles.
        """
        model = self.get_cylinder_model(i, offsets=offsets, **kwargs)
        coords = np.asarray(coords, dtype=np.int32)
        spl = self.splines[i]
        mole = model.locate_molecules(spl, coords)
        if spl._need_rotation(orientation):
            mole = mole.rotate_by_rotvec_internal([np.pi, 0, 0])
        return mole

    @_misc.batch_process
    def map_pf_line(
        self,
        i: int = None,
        *,
        interval: nm = 1.0,
        offsets: tuple[nm, float] = (0.0, 0.0),
        orientation: Ori | str | None = None,
    ) -> Molecules:
        """
        Mapping molecules along a protofilament line.

        This method is useful when you want to visualize seam or protofilament, or
        assign molecules for subtomogram averaging of seam binding protein or doublet
        microtubule.

        Parameters
        ----------
        i : int or iterable of int, optional
            Spline ID that mapping will be calculated.
        offsets : (float, float), default (0.0, 0.0)
            Axial offset in nm and angular offset in degree.

        Returns
        -------
        Molecules
            Object that represents protofilament positions and angles.
        """
        spl = self.splines[i]
        spacing = spl.props.get_glob(H.spacing)
        twist = spl.props.get_glob(H.twist) / 2

        ny = roundint(spl.length() / interval)
        skew_rad = np.deg2rad(twist) * interval / spacing

        yoffset, aoffset = offsets
        rcoords = np.full(ny, spl.radius)
        ycoords = np.arange(ny) * interval + yoffset
        acoords = np.arange(ny) * skew_rad + np.deg2rad(aoffset)
        coords = np.stack([rcoords, ycoords, acoords], axis=1)
        mole = spl.cylindrical_to_molecules(coords)
        if spl._need_rotation(orientation):
            mole = mole.rotate_by_rotvec_internal([np.pi, 0, 0])
        return mole

    def _prep_fit_spline(self, spl: CylSpline, anc: NDArray[np.float32], binsize: int):
        npoints = anc.size
        interval = spl.length() / (npoints - 1)
        depth_px = self.nm2pixel(spl.config.fit_depth, binsize=binsize)
        width_px = self.nm2pixel(spl.config.fit_width, binsize=binsize)

        # If subtomogram region is rotated by 45 degree, its XY-width will be
        # (length + width) / sqrt(2)
        if binsize > 1:
            centers = spl.map(anc) - self.multiscale_translation(binsize)
        else:
            centers = spl.map(anc)
        center_px = self.nm2pixel(centers, binsize=binsize)
        size_px = (width_px,) + (roundint((width_px + depth_px) / 1.414),) * 2
        input_img = self._get_multiscale_or_original(binsize)

        subtomograms = crop_tomograms(input_img, center_px, size_px)
        subtomograms[:] -= subtomograms.mean()
        scale = self.scale * binsize
        return subtomograms, interval, scale
splines: SplineList property

List of splines.

add_spline(coords, *, order=3, err_max=0.5, extrapolate=ExtrapolationMode.linear, config={})

Add spline path to tomogram.

Parameters:

Name Type Description Default
coords array - like

(N, 3) array of coordinates. A spline curve that fit it well is added.

required
order int

Order of spline curve.

3
extrapolate str

Extrapolation mode of the spline.

linear
config SplineConfig or dict

Configuration for spline fitting.

{}
Source code in cylindra/components/tomogram/_cyl_tomo.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def add_spline(
    self,
    coords: ArrayLike,
    *,
    order: int = 3,
    err_max: nm = 0.5,
    extrapolate: ExtrapolationMode | str = ExtrapolationMode.linear,
    config: SplineConfig | dict[str, Any] = {},
) -> None:
    """
    Add spline path to tomogram.

    Parameters
    ----------
    coords : array-like
        (N, 3) array of coordinates. A spline curve that fit it well is added.
    order : int, optional
        Order of spline curve.
    extrapolate : str, optional
        Extrapolation mode of the spline.
    config : SplineConfig or dict, optional
        Configuration for spline fitting.
    """
    _coords = np.asarray(coords)
    ncoords = _coords.shape[0]
    spl = CylSpline(
        order=order,
        config=config,
        extrapolate=extrapolate,
    ).fit(_coords, err_max=err_max)
    interval: nm = 30.0
    length = spl.length()

    n = int(length / interval) + 1
    fit = spl.map(np.linspace(0, 1, n))
    if ncoords <= spl.order and ncoords < fit.shape[0]:
        return self.add_spline(
            fit,
            order=order,
            err_max=err_max,
            extrapolate=extrapolate,
            config=config,
        )

    self.splines.append(spl)
    return None
align_to_polarity(orientation=Ori.MinusToPlus)

Align all the splines in the direction parallel to the given polarity.

Parameters:

Name Type Description Default
orientation Ori or str

To which direction splines will be aligned.

Ori.MinusToPlus

Returns:

Type Description
Tomogram object

Same object with updated splines.

Source code in cylindra/components/tomogram/_cyl_tomo.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def align_to_polarity(self, orientation: Ori | str = Ori.MinusToPlus) -> Self:
    """
    Align all the splines in the direction parallel to the given polarity.

    Parameters
    ----------
    orientation : Ori or str, default Ori.MinusToPlus
        To which direction splines will be aligned.

    Returns
    -------
    Tomogram object
        Same object with updated splines.
    """
    orientation = Ori(orientation)
    if orientation is Ori.none:
        raise ValueError("Must be PlusToMinus or MinusToPlus.")
    for i, spl in enumerate(self.splines):
        if spl.orientation is Ori.none:
            raise ValueError(f"Spline-{i} has no orientation.")
        if spl.orientation != orientation:
            try:
                self.splines[i] = spl.invert()
            except Exception as e:
                raise type(e)(f"Cannot invert spline-{i}: {e}")
    return self
fit(i=None, *, max_interval=30.0, degree_precision=0.5, binsize=1, err_max=1.0, edge_sigma=2.0, max_shift=5.0, n_rotations=5)

Roughly fit splines to cylindrical structures.

Subtomograms will be sampled at every max_interval nm. In dense mode, Subtomograms will be masked relative to XY-plane, using sigmoid function. Sharpness of the sigmoid function is determined by dense_mode_sigma (dense_mode_sigma=0 corresponds to a step function).

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to fit.

None
max_interval nm

Maximum interval of sampling points in nm unit.

30.0
degree_precision float

Precision of xy-tilt degree in angular correlation.

0.5
binsize int

Multiscale bin size used for fitting.

1
edge_sigma nm

Sharpness of mask at the edges. If not None, fitting will be executed after regions outside the cylinder are masked. Soft mask is important for precision because sharp changes in intensity cause strong correlation at the edges.

2.0
max_shift nm

Maximum shift from the true center of the cylinder. This parameter is used in phase cross correlation.

5.0
n_rotations int

Number of rotations to be tested during finding the cylinder center.

5

Returns:

Type Description
FitResult

Result of fitting.

Source code in cylindra/components/tomogram/_cyl_tomo.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
@_misc.batch_process
def fit(
    self,
    i: int = None,
    *,
    max_interval: nm = 30.0,
    degree_precision: float = 0.5,
    binsize: int = 1,
    err_max: nm = 1.0,
    edge_sigma: nm = 2.0,
    max_shift: nm = 5.0,
    n_rotations: int = 5,
) -> _misc.FitResult:
    """
    Roughly fit splines to cylindrical structures.

    Subtomograms will be sampled at every ``max_interval`` nm. In dense mode,
    Subtomograms will be masked relative to XY-plane, using sigmoid function.
    Sharpness of the sigmoid function is determined by ``dense_mode_sigma``
    (``dense_mode_sigma=0`` corresponds to a step function).

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to fit.
    max_interval : nm, default 30.0
        Maximum interval of sampling points in nm unit.
    degree_precision : float, default 0.5
        Precision of xy-tilt degree in angular correlation.
    binsize : int, default 1
        Multiscale bin size used for fitting.
    edge_sigma : nm, default 2.0
        Sharpness of mask at the edges. If not None, fitting will be executed after regions
        outside the cylinder are masked. Soft mask is important for precision because sharp
        changes in intensity cause strong correlation at the edges.
    max_shift: nm, default 5.0
        Maximum shift from the true center of the cylinder. This parameter is used in phase
        cross correlation.
    n_rotations : int, default 5
        Number of rotations to be tested during finding the cylinder center.

    Returns
    -------
    FitResult
        Result of fitting.
    """
    LOGGER.info(f"Running: {self.__class__.__name__}.fit, i={i}")
    spl = self.splines[i]
    anc = spl.prep_anchor_positions(max_interval=max_interval)
    subtomograms, interval, scale = self._prep_fit_spline(spl, anc, binsize)

    with set_gpu():
        subtomograms = _misc.soft_mask_edges(
            subtomograms, spl, anc, scale, edge_sigma
        )
        ds = spl.map(anc, der=1)
        yx_tilt = np.rad2deg(np.arctan2(-ds[:, 2], ds[:, 1]))
        degree_max = 14.0
        nrots = roundint(degree_max / degree_precision) + 1

        # Angular correlation
        out = _misc.dask_angle_corr(subtomograms, yx_tilt, nrots=nrots)
        refined_tilt_deg = np.array(out)
        refined_tilt_rad = np.deg2rad(refined_tilt_deg)

        # If subtomograms are sampled at short intervals, angles should be smoothened to
        # avoid overfitting.
        size = 2 * roundint(48.0 / interval) + 1
        if size > 1:
            # Mirror-mode padding is "a b c d | c b a".
            refined_tilt_rad = _misc.angle_uniform_filter(
                refined_tilt_rad, size=size, mode=Mode.mirror
            )
            refined_tilt_deg = np.rad2deg(refined_tilt_rad)

        # Rotate subtomograms in YX plane
        for _j, img in enumerate(subtomograms):
            img: ip.ImgArray
            angle = refined_tilt_deg[_j]
            img.rotate(-angle, cval=0, update=True)

        # zx-shift correction by self-ZNCC
        subtomo_proj = subtomograms.mean(axis="y")

        if edge_sigma is not None:
            # Regions outside the mask don't need to be considered.
            xc = int(subtomo_proj.shape.x / 2)
            w = int(spl.config.fit_width / scale / 2)
            subtomo_proj = subtomo_proj[ip.slicer.x[xc - w : xc + w + 1]]

        max_shift_px = max_shift / scale * 2
        pf_ang = 360 / spl.config.npf_range.center
        degrees = np.linspace(-pf_ang / 2, pf_ang / 2, n_rotations) + 180
        shifts = _misc.multi_rotated_auto_zncc(subtomo_proj, degrees, max_shift_px)

    # Update spline coordinates.
    # Because centers of subtomogram are on lattice points of pixel coordinate,
    # coordinates that will be shifted should be converted to integers.
    coords_px = self.nm2pixel(spl.map(anc), binsize=binsize).astype(np.float32)
    coords_px_new = _misc.shift_coords(coords_px, shifts, refined_tilt_rad)
    coords = coords_px_new * scale + self.multiscale_translation(binsize)

    # Update spline parameters
    self.splines[i] = spl.fit(coords, err_max=err_max)
    result = _misc.FitResult(shifts * scale)
    LOGGER.info(f" >> Shift RMSD = {result.rmsd:.3f} nm")
    return result
get_cylinder_model(i, offsets=(0.0, 0.0), **kwargs)

Return the cylinder model at the given spline ID.

Parameters:

Name Type Description Default
i int

Spline ID from which model will be created.

required
offsets tuple of float

Offset of the model. See :meth:map_monomers for details.

(0.0, 0.0)

Returns:

Type Description
CylinderModel

The cylinder model.

Source code in cylindra/components/tomogram/_cyl_tomo.py
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
def get_cylinder_model(
    self,
    i: int,
    offsets: tuple[float, float] = (0.0, 0.0),
    **kwargs,
) -> CylinderModel:  # fmt: skip
    """
    Return the cylinder model at the given spline ID.

    Parameters
    ----------
    i : int
        Spline ID from which model will be created.
    offsets : tuple of float, optional
        Offset of the model. See :meth:`map_monomers` for details.

    Returns
    -------
    CylinderModel
        The cylinder model.
    """
    return self.splines[i].cylinder_model(offsets=offsets, **kwargs)
global_cft(i=None, binsize=1)

Calculate global cylindrical fast Fourier tranformation.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to analyze.

None
binsize int

Multiscale bin size used for calculation.

1

Returns:

Type Description
ImgArray

Complex image.

Source code in cylindra/components/tomogram/_cyl_tomo.py
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
@_misc.batch_process
def global_cft(self, i: int = None, binsize: int = 1) -> ip.ImgArray:
    """
    Calculate global cylindrical fast Fourier tranformation.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to analyze.
    binsize : int, default 1
        Multiscale bin size used for calculation.

    Returns
    -------
    ip.ImgArray
        Complex image.
    """
    img_st = self.straighten_cylindric(i, binsize=binsize)
    img_st -= np.mean(img_st)
    return img_st.fft(dims="rya")
global_cft_params(*, i=None, binsize=1, nsamples=8, update=True)

Calculate global lattice parameters.

This function transforms tomogram using cylindrical coordinate system along spline. This function calls straighten beforehand, so that Fourier space is distorted if the cylindrical structure is curved.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to analyze.

None
binsize int

Multiscale bin size used for calculation.

1
nsamples int

Number of cylindrical coordinate samplings for Fourier transformation. Multiple samplings are needed because up-sampled discrete Fourier transformation does not return exactly the same power spectra with shifted inputs, unlike FFT. Larger nsamples reduces the error but is slower.

8
update bool

If True, spline properties will be updated.

True

Returns:

Type Description
DataFrame

Global properties.

Source code in cylindra/components/tomogram/_cyl_tomo.py
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
@_misc.batch_process
def global_cft_params(
    self,
    *,
    i: int = None,
    binsize: int = 1,
    nsamples: int = 8,
    update: bool = True,
) -> pl.DataFrame:
    """
    Calculate global lattice parameters.

    This function transforms tomogram using cylindrical coordinate system along
    spline. This function calls ``straighten`` beforehand, so that Fourier space is
    distorted if the cylindrical structure is curved.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to analyze.
    binsize : int, default 1
        Multiscale bin size used for calculation.
    nsamples : int, default 8
        Number of cylindrical coordinate samplings for Fourier transformation.
        Multiple samplings are needed because up-sampled discrete Fourier
        transformation does not return exactly the same power spectra with shifted
        inputs, unlike FFT. Larger ``nsamples`` reduces the error but is slower.
    update : bool, default True
        If True, spline properties will be updated.

    Returns
    -------
    pl.DataFrame
        Global properties.
    """
    LOGGER.info(f"Running: {self.__class__.__name__}.global_cft_params, i={i}")
    spl = self.splines[i]
    rmin, rmax = spl.radius_range()
    img_st = self.straighten_cylindric(i, radii=(rmin, rmax), binsize=binsize)
    rc = (rmin + rmax) / 2
    analyzer = LatticeAnalyzer(spl.config)
    lparams = analyzer.estimate_lattice_params_polar(img_st, rc, nsamples=nsamples)
    out = lparams.to_polars()
    if update:
        spl.props.update_glob(spl.props.glob.with_columns(out), bin_size=binsize)
    return out
global_cps(*, i=None, binsize=1)

Calculate global cylindrical power spectra.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to analyze.

None
binsize int

Multiscale bin size used for calculation.

1

Returns:

Type Description
ImgArray

Complex image.

Source code in cylindra/components/tomogram/_cyl_tomo.py
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
@_misc.batch_process
def global_cps(
    self,
    *,
    i: int = None,
    binsize: int = 1,
) -> ip.ImgArray:
    """
    Calculate global cylindrical power spectra.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to analyze.
    binsize : int, default 1
        Multiscale bin size used for calculation.

    Returns
    -------
    ip.ImgArray
        Complex image.
    """
    cft = self.global_cft(i=i, binsize=binsize)
    return cft.real**2 + cft.imag**2
infer_polarity(i=None, *, binsize=1, depth=40, mask_freq=True, update=True)

Infer spline polarities using polar 2D image.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to analyze.

None
binsize int

Multiscale bin size used for calculation.

1
depth nm

Depth of images used to infer polarities.

40.0

Returns:

Type Description
Ori

Orientation of corresponding splines.

Source code in cylindra/components/tomogram/_cyl_tomo.py
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
@_misc.batch_process
def infer_polarity(
    self,
    i: int = None,
    *,
    binsize: int = 1,
    depth: nm = 40,
    mask_freq: bool = True,
    update: bool = True,
) -> Ori:
    """
    Infer spline polarities using polar 2D image.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to analyze.
    binsize : int, default 1
        Multiscale bin size used for calculation.
    depth : nm, default 40.0
        Depth of images used to infer polarities.

    Returns
    -------
    Ori
        Orientation of corresponding splines.
    """
    LOGGER.info(f"Running: {self.__class__.__name__}.infer_polarity, i={i}")
    current_scale = self.scale * binsize

    if binsize > 1:
        imgb = self.get_multiscale(binsize)
    else:
        try:
            imgb = self.get_multiscale(1)
        except ValueError:
            imgb = self.image

    spl = self.splines[i]
    cfg = spl.config
    ori_clockwise = Ori(cfg.clockwise)
    ori_counterclockwise = Ori.invert(ori_clockwise, allow_none=False)
    if spl.radius is None:
        r_range = 0.5 * current_scale, cfg.fit_width / 2
    else:
        r_range = spl.radius_range()
    point = 0.5  # the sampling point
    coords = spl.local_cylindrical(r_range, depth, point, scale=current_scale)
    polar = get_polar_image(imgb, coords, spl.radius, order=1)
    if mask_freq:
        polar = LatticeAnalyzer(cfg).mask_spectra(polar)
    img_flat = polar.mean(axis="y")

    if (npf := spl.props.get_glob(H.npf, None)) is None:
        # if the global properties are already calculated, use it
        # otherwise, calculate the number of PFs from the power spectrum
        ft = img_flat.fft(shift=False, dims="ra")
        pw = ft.real**2 + ft.imag**2
        img_pw = np.mean(pw, axis=0)
        npf = np.argmax(img_pw[cfg.npf_range.asslice()]) + cfg.npf_range.min

    pw_peak = img_flat.local_power_spectra(
        key=ip.slicer.a[npf - 1 : npf + 2],
        upsample_factor=20,
        dims="ra",
    ).mean(axis="a")
    r_argmax = np.argmax(pw_peak)
    clkwise = r_argmax - (pw_peak.size + 1) // 2 <= 0
    ori = ori_clockwise if clkwise else ori_counterclockwise

    # logging
    _val = pw_peak[r_argmax]
    pw_non_peak = np.delete(pw_peak, r_argmax)
    _ave, _std = np.mean(pw_non_peak), np.std(pw_non_peak, ddof=1)
    LOGGER.info(f" >> polarity = {ori.name} (peak intensity={_val:.2g} compared to {_ave:.2g} ± {_std:.2g})")  # fmt: skip
    if update:
        spl.orientation = ori
    return ori
local_cft(*, i=None, depth=50.0, pos=None, binsize=1)

Calculate non-upsampled local cylindric Fourier transormation along spline.

This function does not up-sample.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to analyze.

None
depth nm

Length of subtomogram for calculation of local parameters.

50.0
pos int

Only calculate at pos-th anchor if given.

None
binsize int

Multiscale bin size used for calculation.

1

Returns:

Type Description
ImgArray

FT images stacked along "p" axis.

Source code in cylindra/components/tomogram/_cyl_tomo.py
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
@_misc.batch_process
def local_cft(
    self,
    *,
    i: int = None,
    depth: nm = 50.0,
    pos: int | None = None,
    binsize: int = 1,
) -> ip.ImgArray:
    """
    Calculate non-upsampled local cylindric Fourier transormation along spline.

    This function does not up-sample.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to analyze.
    depth : nm, default 50.0
        Length of subtomogram for calculation of local parameters.
    pos : int, optional
        Only calculate at ``pos``-th anchor if given.
    binsize : int, default 1
        Multiscale bin size used for calculation.

    Returns
    -------
    ip.ImgArray
        FT images stacked along "p" axis.
    """
    out = list[ip.ImgArray]()
    with set_gpu():
        for polar in self.iter_local_image(i, depth, pos, binsize):
            out.append(polar.fft(dims="rya"))
    return np.stack(out, axis="p")
local_cft_params(*, i=None, depth=50.0, binsize=1, radius='global', nsamples=8, update=True, update_glob=False)

Calculate local lattice parameters from cylindrical Fourier space.

To determine the peaks upsampled discrete Fourier transformation is used for every subtomogram.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to analyze.

None
depth nm

Length of subtomogram for calculation of local parameters.

50.0
binsize int

Multiscale bin size used for calculation.

1
radius str

If "local", use the local radius for the analysis. If "global", use the global radius. If a float, use the given radius.

"global"
nsamples int

Number of cylindrical coordinate samplings for Fourier transformation. Multiple samplings are needed because up-sampled discrete Fourier transformation does not return exactly the same power spectra with shifted inputs, unlike FFT. Larger nsamples reduces the error but is slower.

8
update bool

If True, spline properties will be updated.

True
update_glob bool

If True, global properties will be updated using the mean or mode of the local properties.

False

Returns:

Type Description
DataFrame

Local properties.

Source code in cylindra/components/tomogram/_cyl_tomo.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
@_misc.batch_process
def local_cft_params(
    self,
    *,
    i: int = None,
    depth: nm = 50.0,
    binsize: int = 1,
    radius: nm | Literal["local", "global"] = "global",
    nsamples: int = 8,
    update: bool = True,
    update_glob: bool = False,
) -> pl.DataFrame:
    """
    Calculate local lattice parameters from cylindrical Fourier space.

    To determine the peaks upsampled discrete Fourier transformation is used
    for every subtomogram.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to analyze.
    depth : nm, default 50.0
        Length of subtomogram for calculation of local parameters.
    binsize : int, default 1
        Multiscale bin size used for calculation.
    radius : str, default "global"
        If "local", use the local radius for the analysis. If "global", use the
        global radius. If a float, use the given radius.
    nsamples : int, default 8
        Number of cylindrical coordinate samplings for Fourier transformation. Multiple
        samplings are needed because up-sampled discrete Fourier transformation does not
        return exactly the same power spectra with shifted inputs, unlike FFT. Larger
        ``nsamples`` reduces the error but is slower.
    update : bool, default True
        If True, spline properties will be updated.
    update_glob : bool, default False
        If True, global properties will be updated using the mean or mode of the local
        properties.

    Returns
    -------
    polars.DataFrame
        Local properties.
    """
    LOGGER.info(f"Running: {self.__class__.__name__}.local_cft_params, i={i}")
    spl = self.splines[i]
    radii = _misc.prepare_radii(spl, radius)
    input_img = self._get_multiscale_or_original(binsize)
    _scale = input_img.scale.x
    tasks: list[Delayed[LatticeParams]] = []
    spl_trans = spl.translate([-self.multiscale_translation(binsize)] * 3)
    _analyze_fn = LatticeAnalyzer(spl.config).estimate_lattice_params_task
    for anc, r0 in zip(spl_trans.anchors, radii, strict=True):
        rmin, rmax = spl.radius_range(r0)
        rc = (rmin + rmax) / 2
        coords = spl_trans.local_cylindrical((rmin, rmax), depth, anc, scale=_scale)
        tasks.append(_analyze_fn(input_img, coords, rc, nsamples=nsamples))

    lprops = pl.DataFrame(
        compute(*tasks),
        schema=LatticeParams.polars_schema(),
    )
    if update:
        spl.props.update_loc(lprops, depth, bin_size=binsize)
    if update_glob:
        gprops = lprops.select(
            pl.col(H.spacing).mean(),
            pl.col(H.pitch).mean(),
            pl.col(H.twist).mean(),
            pl.col(H.skew).mean(),
            pl.col(H.rise).mean(),
            pl.col(H.rise_length).mean(),
            pl.col(H.npf).mode().first(),
            pl.col(H.start).mode().first(),
        )
        spl.props.update_glob(gprops, bin_size=binsize)

    return lprops
local_cps(*, i=None, depth=50.0, pos=None, binsize=1)

Calculate non-upsampled local cylindric power spectra along spline.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to analyze.

None
depth nm

Length of subtomogram for calculation of local parameters.

50.0
pos int

Only calculate at pos-th anchor if given.

None
binsize int

Multiscale bin size used for calculation.

1

Returns:

Type Description
ImgArray

FT images stacked along "p" axis.

Source code in cylindra/components/tomogram/_cyl_tomo.py
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
@_misc.batch_process
def local_cps(
    self,
    *,
    i: int = None,
    depth: nm = 50.0,
    pos: int | None = None,
    binsize: int = 1,
) -> ip.ImgArray:
    """
    Calculate non-upsampled local cylindric power spectra along spline.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to analyze.
    depth : nm, default 50.0
        Length of subtomogram for calculation of local parameters.
    pos : int, optional
        Only calculate at ``pos``-th anchor if given.
    binsize : int, default 1
        Multiscale bin size used for calculation.

    Returns
    -------
    ip.ImgArray
        FT images stacked along "p" axis.
    """
    cft = self.local_cft(i=i, depth=depth, pos=pos, binsize=binsize)
    return cft.real**2 + cft.imag**2
local_radii(*, i=None, depth=50.0, binsize=1, min_radius=1.0, max_radius=100.0, update=True, update_glob=True)

Measure the local radii along the splines.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to analyze.

None
depth nm

Longitudinal length of subtomograms for calculation.

50.0
binsize int

Multiscale binsize to be used.

1
min_radius nm

Minimum radius of the cylinder.

1.0
max_radius nm

Maximum radius of the cylinder.

100.0
update bool

If True, spline properties will be updated.

True
update_glob bool

If True, global properties will be updated using the mean of the local radii.

True

Returns:

Type Description
Series

Radii along the spline.

Source code in cylindra/components/tomogram/_cyl_tomo.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
@_misc.batch_process
def local_radii(
    self,
    *,
    i: int = None,
    depth: nm = 50.0,
    binsize: int = 1,
    min_radius: nm = 1.0,
    max_radius: nm = 100.0,
    update: bool = True,
    update_glob: bool = True,
) -> pl.Series:
    """
    Measure the local radii along the splines.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to analyze.
    depth : nm, default 50.0
        Longitudinal length of subtomograms for calculation.
    binsize : int, default 1
        Multiscale binsize to be used.
    min_radius : nm, default 1.0
        Minimum radius of the cylinder.
    max_radius : nm, default 100.0
        Maximum radius of the cylinder.
    update : bool, default True
        If True, spline properties will be updated.
    update_glob : bool, default True
        If True, global properties will be updated using the mean of the local
        radii.

    Returns
    -------
    pl.Series
        Radii along the spline.
    """
    LOGGER.info(f"Running: {self.__class__.__name__}.local_radii, i={i}")
    spl = self.splines[i]

    input_img = self._get_multiscale_or_original(binsize)

    depth = spl.config.fit_depth
    _scale = input_img.scale.x
    thickness = _misc.get_thickness(spl, _scale)
    min_radius_px = min_radius / _scale
    max_radius = min(max_radius, spl.config.fit_width / 2)
    max_radius_px = max_radius / _scale
    offset_px = _misc.get_radius_offset(min_radius_px, max_radius_px)
    spl_trans = spl.translate([-self.multiscale_translation(binsize)] * 3)
    tasks = []
    for anc in spl_trans.anchors:
        task = _misc.get_radial_prof(
            input_img, spl_trans, anc, (min_radius, max_radius), depth
        )
        tasks.append(task)
    profs: list[NDArray[np.float32]] = compute(*tasks)
    radii = list[float]()
    for prof in profs:
        imax_sub = find_centroid_peak(prof, *thickness)
        radii.append((imax_sub + offset_px) * _scale)

    out = pl.Series(H.radius, radii, dtype=pl.Float32)
    if update:
        spl.props.update_loc([out], depth, bin_size=binsize)
    if update_glob:
        spl.props.update_glob([pl.Series(H.radius, [out.mean()])], bin_size=binsize)
    return out
make_anchors(i=None, *, interval=None, n=None, max_interval=None)

Make anchors on spline object(s).

Parameters:

Name Type Description Default
interval nm

Anchor intervals.

None
n int

Number of anchors

None
max_interval nm

Maximum interval between anchors.

None
Source code in cylindra/components/tomogram/_cyl_tomo.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@_misc.batch_process
def make_anchors(
    self,
    i: int = None,
    *,
    interval: nm | None = None,
    n: int | None = None,
    max_interval: nm | None = None,
):
    """
    Make anchors on spline object(s).

    Parameters
    ----------
    interval : nm, optional
        Anchor intervals.
    n : int, optional
        Number of anchors
    max_interval : nm, optional
        Maximum interval between anchors.

    """
    self.splines[i].make_anchors(interval=interval, n=n, max_interval=max_interval)
    return None
map_centers(i=None, *, interval=1.0, orientation=None, rotate_molecules=True)

Mapping molecules along the center of a cylinder.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that mapping will be calculated.

None
interval float(nm)

Interval of molecules.

1.0
rotate_molecules bool

If True, twist the molecule orientations according to the spline twist.

True

Returns:

Type Description
Molecules

Molecules object with mapped coordinates and angles.

Source code in cylindra/components/tomogram/_cyl_tomo.py
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
@_misc.batch_process
def map_centers(
    self,
    i: int = None,
    *,
    interval: nm = 1.0,
    orientation: Ori | str | None = None,
    rotate_molecules: bool = True,
) -> Molecules:
    """
    Mapping molecules along the center of a cylinder.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that mapping will be calculated.
    interval : float (nm), optional
        Interval of molecules.
    rotate_molecules : bool, default True
        If True, twist the molecule orientations according to the spline twist.

    Returns
    -------
    Molecules
        Molecules object with mapped coordinates and angles.
    """
    spl = self.splines[i]
    u = spl.prep_anchor_positions(interval=interval)
    if rotate_molecules:
        spacing = spl.props.get_glob(H.spacing)
        twist = spl.props.get_glob(H.twist) / 2
        rotation = np.deg2rad(spl.distances(u) / spacing * twist)
    else:
        rotation = None
    mole = spl.anchors_to_molecules(u, rotation=rotation)
    if spl._need_rotation(orientation):
        mole = mole.rotate_by_rotvec_internal([np.pi, 0, 0])
    return mole
map_monomers(i=None, *, offsets=None, orientation=None, extensions=(0, 0), **kwargs)

Map monomers in a regular cylinder shape.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that mapping will be calculated.

None
offsets tuple of float

The offset of origin of oblique coordinate system to map monomers.

None
orientation Ori or str

Orientation of the y-axis of each molecule.

None

Returns:

Type Description
Molecules

Object that represents monomer positions and angles.

Source code in cylindra/components/tomogram/_cyl_tomo.py
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
@_misc.batch_process
def map_monomers(
    self,
    i: int = None,
    *,
    offsets: tuple[nm, float] | None = None,
    orientation: Ori | str | None = None,
    extensions: tuple[int, int] = (0, 0),
    **kwargs,
) -> Molecules:
    """
    Map monomers in a regular cylinder shape.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that mapping will be calculated.
    offsets : tuple of float, optional
        The offset of origin of oblique coordinate system to map monomers.
    orientation : Ori or str, optional
        Orientation of the y-axis of each molecule.

    Returns
    -------
    Molecules
        Object that represents monomer positions and angles.
    """
    model = self.get_cylinder_model(i, offsets=offsets, **kwargs)
    ny, na = model.shape
    ext0, ext1 = extensions
    if ny + ext0 + ext1 < 0:
        raise ValueError("The number of monomers is negative.")
    yy, aa = np.indices((ny + ext0 + ext1, na), dtype=np.int32)
    yy -= ext0
    coords = np.stack([yy.ravel(), aa.ravel()], axis=1)
    spl = self.splines[i]
    mole = model.locate_molecules(spl, coords)
    if spl._need_rotation(orientation):
        mole = mole.rotate_by_rotvec_internal([np.pi, 0, 0])
    return mole
map_on_grid(i=None, coords=(), *, offsets=None, orientation=None, **kwargs)

Map monomers in a regular cylinder shape.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that mapping will be calculated.

None
coords ndarray

Integer coordinates on the cylinder surface.

()
offsets tuple of float

The offset of origin of oblique coordinate system to map monomers.

None
orientation Ori or str

Orientation of the y-axis of each molecule.

None

Returns:

Type Description
Molecules

Object that represents monomer positions and angles.

Source code in cylindra/components/tomogram/_cyl_tomo.py
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
@_misc.batch_process
def map_on_grid(
    self,
    i: int = None,
    coords: NDArray[np.int32] = (),
    *,
    offsets: tuple[nm, float] | None = None,
    orientation: Ori | str | None = None,
    **kwargs,
) -> Molecules:
    """
    Map monomers in a regular cylinder shape.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that mapping will be calculated.
    coords : ndarray
        Integer coordinates on the cylinder surface.
    offsets : tuple of float, optional
        The offset of origin of oblique coordinate system to map monomers.
    orientation : Ori or str, optional
        Orientation of the y-axis of each molecule.

    Returns
    -------
    Molecules
        Object that represents monomer positions and angles.
    """
    model = self.get_cylinder_model(i, offsets=offsets, **kwargs)
    coords = np.asarray(coords, dtype=np.int32)
    spl = self.splines[i]
    mole = model.locate_molecules(spl, coords)
    if spl._need_rotation(orientation):
        mole = mole.rotate_by_rotvec_internal([np.pi, 0, 0])
    return mole
map_pf_line(i=None, *, interval=1.0, offsets=(0.0, 0.0), orientation=None)

Mapping molecules along a protofilament line.

This method is useful when you want to visualize seam or protofilament, or assign molecules for subtomogram averaging of seam binding protein or doublet microtubule.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that mapping will be calculated.

None
offsets (float, float)

Axial offset in nm and angular offset in degree.

(0.0, 0.0)

Returns:

Type Description
Molecules

Object that represents protofilament positions and angles.

Source code in cylindra/components/tomogram/_cyl_tomo.py
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
@_misc.batch_process
def map_pf_line(
    self,
    i: int = None,
    *,
    interval: nm = 1.0,
    offsets: tuple[nm, float] = (0.0, 0.0),
    orientation: Ori | str | None = None,
) -> Molecules:
    """
    Mapping molecules along a protofilament line.

    This method is useful when you want to visualize seam or protofilament, or
    assign molecules for subtomogram averaging of seam binding protein or doublet
    microtubule.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that mapping will be calculated.
    offsets : (float, float), default (0.0, 0.0)
        Axial offset in nm and angular offset in degree.

    Returns
    -------
    Molecules
        Object that represents protofilament positions and angles.
    """
    spl = self.splines[i]
    spacing = spl.props.get_glob(H.spacing)
    twist = spl.props.get_glob(H.twist) / 2

    ny = roundint(spl.length() / interval)
    skew_rad = np.deg2rad(twist) * interval / spacing

    yoffset, aoffset = offsets
    rcoords = np.full(ny, spl.radius)
    ycoords = np.arange(ny) * interval + yoffset
    acoords = np.arange(ny) * skew_rad + np.deg2rad(aoffset)
    coords = np.stack([rcoords, ycoords, acoords], axis=1)
    mole = spl.cylindrical_to_molecules(coords)
    if spl._need_rotation(orientation):
        mole = mole.rotate_by_rotvec_internal([np.pi, 0, 0])
    return mole
measure_radius(i=None, *, binsize=1, positions='auto', min_radius=1.0, max_radius=100.0, update=True)

Measure radius using radial profile from the center.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to measure.

None
binsize int

Multiscale bin size used for radius calculation.

1
positions array - like or 'auto' or 'anchor'

Sampling positions (between 0 and 1) to calculate radius. If "anchor" is given, anchors of the spline will be used. If "auto" is given, three positions along the spline will be used.

"auto"
min_radius nm

Minimum radius of the cylinder.

1.0
max_radius nm

Maximum radius of the cylinder.

100.0
update bool

If True, global radius property will be updated.

True

Returns:

Type Description
float(nm)

Cylinder radius.

Source code in cylindra/components/tomogram/_cyl_tomo.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
@_misc.batch_process
def measure_radius(
    self,
    i: int = None,
    *,
    binsize: int = 1,
    positions: NDArray[np.float32] | Literal["auto", "anchor"] = "auto",
    min_radius: nm = 1.0,
    max_radius: nm = 100.0,
    update: bool = True,
) -> nm:
    """
    Measure radius using radial profile from the center.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to measure.
    binsize : int, default 1
        Multiscale bin size used for radius calculation.
    positions : array-like or "auto" or "anchor", default "auto"
        Sampling positions (between 0 and 1) to calculate radius. If "anchor"
        is given, anchors of the spline will be used. If "auto" is given,
        three positions along the spline will be used.
    min_radius : nm, default 1.0
        Minimum radius of the cylinder.
    max_radius : nm, default 100.0
        Maximum radius of the cylinder.
    update : bool, default True
        If True, global radius property will be updated.

    Returns
    -------
    float (nm)
        Cylinder radius.
    """
    LOGGER.info(f"Running: {self.__class__.__name__}.measure_radius, i={i}")
    spl = self.splines[i]

    if isinstance(positions, str) and positions == "auto":
        nanchor = 3
        pos = 1 / nanchor * np.arange(nanchor) + 0.5 / nanchor
    elif isinstance(positions, str) and positions == "anchor":
        pos = spl.anchors
    else:
        pos = np.asarray(positions, dtype=np.float32)

    input_img = self._get_multiscale_or_original(binsize)

    depth = spl.config.fit_depth
    _scale = input_img.scale.x
    min_radius_px = min_radius / _scale
    max_radius = min(max_radius, spl.config.fit_width / 2)
    max_radius_px = max_radius / _scale
    spl_trans = spl.translate([-self.multiscale_translation(binsize)] * 3)
    tasks = [
        _misc.get_radial_prof(
            input_img, spl_trans, anc, (min_radius, max_radius), depth
        )
        for anc in pos
    ]
    profs: list[NDArray[np.float32]] = compute(*tasks)
    prof = np.stack(profs, axis=0).mean(axis=0)
    imax_sub = find_centroid_peak(prof, *_misc.get_thickness(spl, _scale))
    offset_px = _misc.get_radius_offset(min_radius_px, max_radius_px)
    radius = (imax_sub + offset_px) * _scale
    if update:
        spl.props.update_glob(
            [pl.Series(H.radius, [radius], dtype=pl.Float32)],
            bin_size=binsize,
        )
    LOGGER.info(f" >> Radius = {radius:.3f} nm")
    return radius
refine(i=None, *, max_interval=30.0, binsize=1, err_max=1.0, corr_allowed=0.9, max_shift=2.0, n_rotations=3)

Spline refinement using global lattice structural parameters.

Refine spline using the result of previous fit and the global structural parameters. During refinement, Y-projection of XZ cross section of cylinder is rotated with the twist angles, thus is much more precise than the coarse fitting.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to fit.

None
max_interval nm

Maximum interval of sampling points in nm unit.

24.0
binsize int

Multiscale bin size used for refining.

1
corr_allowed (float, defaul is 0.9)

How many images will be used to make template for alignment. If 0.9, then top 90% will be used.

0.9
max_shift nm

Maximum shift from the true center of the cylinder. This parameter is used in phase cross correlation.

2.0
n_rotations int

Number of rotations to be tested during finding the cylinder center.

3

Returns:

Type Description
FitResult

Result of fitting.

Source code in cylindra/components/tomogram/_cyl_tomo.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
@_misc.batch_process
def refine(
    self,
    i: int = None,
    *,
    max_interval: nm = 30.0,
    binsize: int = 1,
    err_max: nm = 1.0,
    corr_allowed: float = 0.9,
    max_shift: nm = 2.0,
    n_rotations: int = 3,
) -> _misc.FitResult:
    """
    Spline refinement using global lattice structural parameters.

    Refine spline using the result of previous fit and the global structural parameters.
    During refinement, Y-projection of XZ cross section of cylinder is rotated with the
    twist angles, thus is much more precise than the coarse fitting.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to fit.
    max_interval : nm, default 24.0
        Maximum interval of sampling points in nm unit.
    binsize : int, default 1
        Multiscale bin size used for refining.
    corr_allowed : float, defaul is 0.9
        How many images will be used to make template for alignment. If 0.9, then top
        90% will be used.
    max_shift: nm, default 2.0
        Maximum shift from the true center of the cylinder. This parameter is used in
        phase cross correlation.
    n_rotations : int, default 3
        Number of rotations to be tested during finding the cylinder center.

    Returns
    -------
    FitResult
        Result of fitting.
    """
    LOGGER.info(f"Running: {self.__class__.__name__}.refine, i={i}")
    spl = self.splines[i]
    _required = [H.spacing, H.twist, H.npf]
    if not spl.props.has_glob(_required):
        if (radius := spl.radius) is None:
            radius = self.measure_radius(
                i=i,
                binsize=binsize,
                positions="auto",
                update=False,
            )
        with spl.props.temp_glob(radius=radius):
            gprops = self.global_cft_params(
                i=i, binsize=binsize, nsamples=1, update=False
            )
    else:
        gprops = spl.props.glob.select(_required)
    gdict = {k: float(gprops[k][0]) for k in _required}
    ancs = spl.prep_anchor_positions(max_interval=max_interval)

    # Calculate Fourier parameters by cylindrical transformation along spline.
    # Skew angles are divided by the angle of single protofilament and the residual
    # angles are used, considering missing wedge effect.
    space = gdict[H.spacing]
    twist = gdict[H.twist]
    npf = roundint(gdict[H.npf])

    LOGGER.info(f" >> Parameters: spacing = {space:.2f} nm, twist = {twist:.3f} deg, PF = {npf}")  # fmt: skip

    # complement twisting
    pf_ang = 360 / npf
    twists = _misc.get_twists(spl.length(), ancs.size, space, twist, npf)
    scale = self.scale * binsize
    loader = _misc.prep_loader_for_refine(self, spl, ancs, binsize, twists)
    subtomograms = ip.asarray(loader.asnumpy(), axes="pzyx", dtype=np.float32)
    subtomograms[:] -= subtomograms.mean()  # normalize
    subtomograms.set_scale(zyx=scale)

    degrees = np.linspace(-pf_ang / 2, pf_ang / 2, n_rotations) + 180
    max_shift_px = max_shift / scale
    with set_gpu():
        inputs = subtomograms.mean(axis="y")[ip.slicer.x[::-1]]

        # Align twist-corrected images
        shifts_loc = _misc.multi_rotated_auto_zncc(inputs, degrees, max_shift_px)
        tasks = [
            _misc.delayed_translate(inputs[_j], shifts_loc[_j])
            for _j in range(ancs.size)
        ]
        imgs_aligned = _filter_by_corr(
            np.stack(compute(*tasks), axis="p"),
            corr_allowed,
        )

        # Make 2D template using coarse aligned images.
        imgcory = imgs_aligned.mean(axis="p")
        shift = rotated_auto_zncc(
            imgcory, degrees=degrees, max_shifts=max_shift_px * 2
        )
        template = imgcory.affine(translation=shift, mode=Mode.constant, cval=0.0)

        # Align twist-corrected images to the template
        quat = loader.molecules.quaternion()
        tasks = [
            _misc.delayed_zncc_maximum(
                inputs[_j],
                _misc.mask_missing_wedge(template, self.tilt_model, quat[_j]),
                max_shift_px,
                twists[_j],
            )
            for _j in range(ancs.size)
        ]
        shifts = np.stack(compute(*tasks), axis=0)

    # Update spline parameters
    self.splines[i] = spl.shift(ancs, shifts=shifts * scale, err_max=err_max)
    result = _misc.FitResult(shifts * scale)
    LOGGER.info(f" >> Shift RMSD = {result.rmsd:.3f} nm")
    return result
straighten(i=None, *, size=None, range_=(0.0, 1.0), chunk_length=None, binsize=1)

Straightening by building curved coordinate system around splines. Currently Cartesian coordinate system and cylindrical coordinate system are supported.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to straighten.

None
size float(nm)

Vertical/horizontal box size.

None
range_ tuple[float, float]

Range of spline domain.

(0.0, 1.0)
chunk_length nm

If spline is longer than this, it will be first split into chunks, straightened respectively and all the straightened images are concatenated afterward, to avoid loading entire image into memory.

None
binsize int

Multiscale bin size used for calculation.

1

Returns:

Type Description
ImgArray

Straightened image. If Cartesian coordinate system is used, it will have "zyx".

Source code in cylindra/components/tomogram/_cyl_tomo.py
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
@_misc.batch_process
def straighten(
    self,
    i: int = None,
    *,
    size: nm | tuple[nm, nm] = None,
    range_: tuple[float, float] = (0.0, 1.0),
    chunk_length: nm | None = None,
    binsize: int = 1,
) -> ip.ImgArray:
    """
    Straightening by building curved coordinate system around splines. Currently
    Cartesian coordinate system and cylindrical coordinate system are supported.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to straighten.
    size : float (nm), optional
        Vertical/horizontal box size.
    range_ : tuple[float, float], default (0.0, 1.0)
        Range of spline domain.
    chunk_length : nm, optional
        If spline is longer than this, it will be first split into chunks,
        straightened respectively and all the straightened images are concatenated
        afterward, to avoid loading entire image into memory.
    binsize : int, default 1
        Multiscale bin size used for calculation.

    Returns
    -------
    ip.ImgArray
        Straightened image. If Cartesian coordinate system is used, it will have "zyx".
    """
    spl = self.splines[i]
    input_img = self._get_multiscale_or_original(binsize)
    chunk_length = _misc.normalize_chunk_length(input_img, chunk_length)
    return _straighten.straighten(input_img, spl, range_, size)
straighten_cylindric(i=None, *, radii=None, range_=(0.0, 1.0), chunk_length=None, binsize=1)

Straightening by building curved coordinate system around splines. Currently Cartesian coordinate system and cylindrical coordinate system are supported.

Parameters:

Name Type Description Default
i int or iterable of int

Spline ID that you want to straighten.

None
radii tuple of float (nm)

Lower/upper limit of radius.

None
range_ tuple[float, float]

Range of spline domain.

(0.0, 1.0)
chunk_length nm

If spline is longer than this, it will be first split into chunks, straightened respectively and all the straightened images are concatenated afterward, to avoid loading entire image into memory.

None
binsize int

Multiscale bin size used for calculation.

1

Returns:

Type Description
ImgArray

Straightened image. If Cartesian coordinate system is used, it will have "zyx".

Source code in cylindra/components/tomogram/_cyl_tomo.py
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
@_misc.batch_process
def straighten_cylindric(
    self,
    i: int = None,
    *,
    radii: tuple[nm, nm] | None = None,
    range_: tuple[float, float] = (0.0, 1.0),
    chunk_length: nm | None = None,
    binsize: int = 1,
) -> ip.ImgArray:
    """
    Straightening by building curved coordinate system around splines. Currently
    Cartesian coordinate system and cylindrical coordinate system are supported.

    Parameters
    ----------
    i : int or iterable of int, optional
        Spline ID that you want to straighten.
    radii : tuple of float (nm), optional
        Lower/upper limit of radius.
    range_ : tuple[float, float], default (0.0, 1.0)
        Range of spline domain.
    chunk_length : nm, optional
        If spline is longer than this, it will be first split into chunks,
        straightened respectively and all the straightened images are concatenated
        afterward, to avoid loading entire image into memory.
    binsize : int, default 1
        Multiscale bin size used for calculation.

    Returns
    -------
    ip.ImgArray
        Straightened image. If Cartesian coordinate system is used, it will have "zyx".
    """
    spl = self.splines[i]
    input_img = self._get_multiscale_or_original(binsize)
    chunk_length = _misc.normalize_chunk_length(input_img, chunk_length)
    return _straighten.straighten_cylindric(input_img, spl, range_, radii)

Tomogram

Lazy-loading/multi-scale tomogram object.

It is always connected to a 3D image but processed lazily. Thus you can create a lot of Tomogram objects without MemoryError. Subtomograms are temporarily loaded into memory via cache map. Once memory usage exceed certain amount, the subtomogram cache will automatically deleted from the old ones.

Source code in cylindra/components/tomogram/_tomo_base.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
class Tomogram:
    """
    Lazy-loading/multi-scale tomogram object.

    It is always connected to a 3D image but processed lazily. Thus you can create
    a lot of Tomogram objects without MemoryError. Subtomograms are temporarily
    loaded into memory via cache map. Once memory usage exceed certain amount, the
    subtomogram cache will automatically deleted from the old ones.
    """

    def __init__(self):
        self._metadata: dict[str, Any] = {}
        self._image: ip.ImgArray | ip.LazyImgArray | None = None
        self._multiscaled = list[tuple[int, ip.ImgArray]]()
        self._tilt_model: TiltSeriesModel = single_axis(None)
        self._scale = 1.0

    def __hash__(self) -> int:
        """Use unsafe hash."""
        return id(self)

    def __repr__(self) -> str:
        return str(self)

    def __str__(self) -> str:
        shape = str(self._image.shape) if self._image is not None else "unknown"
        scale = f"{self.scale:.4f}" if self._image is not None else "unknown"
        if source := self.metadata.get("source", None):
            source = Path(source).as_posix()
        return (
            f"{self.__class__.__name__}(shape={shape}, scale={scale}, source={source})"
        )

    @property
    def scale(self) -> nm:
        """Scale of the tomogram."""
        return self._scale

    @property
    def metadata(self) -> dict[str, Any]:
        """Metadata relevant to the tomogram."""
        return self._metadata

    @metadata.setter
    def metadata(self, v: dict[str, Any]):
        if not isinstance(v, dict):
            raise TypeError(f"Cannot set type {type(v)} as a metadata.")
        self._metadata = v

    @property
    def source(self) -> Path:
        """Path to the source file."""
        source = self.metadata.get("source", None)
        if source is None:
            raise ValueError("Source file is unknown.")
        return Path(source)

    @property
    def tilt_model(self):
        """Tilt model of the tomogram."""
        return self._tilt_model

    @property
    def tilt(self) -> dict[str, Any]:
        """Tilt model as a dictionary."""
        if isinstance(self._tilt_model, SingleAxisX):
            return {"kind": "x", "range": self._tilt_model.tilt_range}
        elif isinstance(self._tilt_model, SingleAxisY):
            return {"kind": "y", "range": self._tilt_model.tilt_range}
        elif isinstance(self._tilt_model, NoWedge):
            return {"kind": "none"}
        elif isinstance(self._tilt_model, UnionAxes):
            return {
                "kind": "dual",
                "xrange": self._tilt_model._wedges[1].tilt_range,
                "yrange": self._tilt_model._wedges[0].tilt_range,
            }
        else:
            raise ValueError("Tilt model is not correctly set.")

    @property
    def is_inverted(self) -> bool:
        """True if this image is inverted"""
        return self.metadata.get("is_inverted", False)

    @classmethod
    def dummy(
        cls,
        *,
        scale: float = 1.0,
        tilt: tuple[float, float] | None = None,
        binsize: int | Iterable[int] = (),
        source: str | None = None,
        shape: tuple[int, int, int] = (24, 24, 24),
        name: str | None = None,
    ) -> Self:
        """Create a dummy tomogram."""
        dummy = ip.zeros(shape, name=name, dtype=np.float32, axes="zyx")
        dummy.source = source
        tomo = cls.from_image(
            dummy,
            scale=scale,
            tilt=tilt,
            binsize=binsize,
        )
        tomo.metadata["is_dummy"] = True
        return tomo

    @property
    def is_dummy(self) -> bool:
        """True if this tomogram object does not have a real image."""
        return self.metadata.get("is_dummy", False)

    @classmethod
    def from_image(
        cls,
        img: ip.ImgArray | ip.LazyImgArray,
        *,
        scale: float | None = None,
        tilt: tuple[float, float] | None = None,
        binsize: int | Iterable[int] = (),
        compute: bool = True,
    ):
        """
        Construct a Tomogram object from a image array.

        Parameters
        ----------
        img : array-like
            Input image.
        scale : float, optional
            Pixel size in nm. If not given, will try to read from image header.
        tilt : tuple of float, optional
            Tilt model.
        binsize : int or iterable of int, optional
            Binsize to generate multiscale images. If not given, will not generate.
        compute : bool, default True
            Whether to compute the binned images.

        Returns
        -------
        Tomogram
            Tomogram object with the image that has just been read and multi-scales.
        """
        self = cls()
        if type(img) is np.ndarray:
            img = ip.asarray(img, axes="zyx")

        if scale is not None:
            img.set_scale(xyz=scale)
        else:
            if (
                abs(img.scale.z - img.scale.x) > 1e-3
                or abs(img.scale.z - img.scale.y) > 1e-3
            ):
                raise ValueError(f"Uneven scale: {img.scale}.")

        self._set_image(img)
        if source := img.source:
            self._metadata["source"] = source.resolve()
        self._metadata["scale"] = scale
        # parse tilt model
        if not isinstance(tilt, TiltSeriesModel):
            if isinstance(tilt, dict):
                match tilt["kind"]:
                    case "none":
                        tilt = NoWedge()
                    case "x" | "y":
                        tilt = single_axis(tilt["range"], tilt["kind"])
                    case "dual":
                        tilt = dual_axis(tilt["yrange"], tilt["xrange"])
                    case _:
                        raise ValueError(
                            f"Tilt model {tilt!r} not in a correct format."
                        )
            else:
                tilt = single_axis(tilt)
        self._tilt_model = tilt

        if isinstance(binsize, int):
            binsize = [binsize]
        for b in sorted(binsize):
            self.add_multiscale(b, compute=compute)
        return self

    @classmethod
    def imread(
        cls,
        path: str | Path,
        *,
        scale: float = None,
        tilt: tuple[float, float] | None = None,
        binsize: int | Iterable[int] = (),
        eager: bool = False,
        compute: bool = True,
    ) -> Self:
        """
        Read a image as a dask array.

        Parameters
        ----------
        path : path-like
            Path to the image file.
        scale : float, optional
            Pixel size in nm. If not given, will try to read from image header.
        tilt : tuple of float, optional
            Tilt model.
        binsize : int or iterable of int, optional
            Binsize to generate multiscale images. If not given, will not generate.
        eager : bool, default False
            Whether to read the image lazily. If True, the entire image will be read
            into the memory.

        Returns
        -------
        Tomogram
            Tomogram object with the image that has just been read and multi-scales.
        """
        chunks = get_config().dask_chunk
        img = ip.lazy.imread(path, chunks=chunks)
        if eager:
            img = img.compute()
        return cls.from_image(
            _norm_dtype(img),
            scale=scale,
            tilt=tilt,
            binsize=binsize,
            compute=compute,
        )

    @property
    def image(self) -> ip.ImgArray | ip.LazyImgArray:
        """Tomogram image data."""
        if self._image is None:
            raise ValueError("Image is not set.")
        return self._image

    @property
    def multiscaled(self) -> list[tuple[int, ip.ImgArray]]:
        """Get all multi-scale factor and the corresponding multiscaled images."""
        return self._multiscaled

    def _set_image(self, img: ip.LazyImgArray | np.ndarray) -> None:
        if isinstance(img, ip.LazyImgArray):
            _img = _norm_dtype(img)
        elif isinstance(img, np.ndarray):
            if img.ndim != 3:
                raise ValueError("Can only set 3-D image.")
            _img = ip.lazy.asarray(
                img, dtype=np.float32, axes="zyx", chunks=get_config().dask_chunk
            )
            if isinstance(img, ip.ImgArray):
                _img.set_scale(img)
        else:
            raise TypeError(f"Cannot set type {type(img)} as an image.")
        if (
            abs(_img.scale.z - _img.scale.x) > 1e-4
            or abs(_img.scale.z - _img.scale.y) > 1e-4
        ):
            raise ValueError("Uneven scale.")
        self._scale = _img.scale.x
        self._image = _img
        return None

    @overload
    def nm2pixel(self, value: nm, binsize: int = 1) -> int:
        ...

    @overload
    def nm2pixel(
        self, value: Iterable[nm] | NDArray[np.number], binsize: int = 1
    ) -> NDArray[np.intp]:
        ...

    def nm2pixel(self, value, binsize: int = 1):
        """
        Convert nm float value into pixel value. Useful for conversion from
        coordinate to pixel position.

        Returns
        -------
        np.ndarray or int
            Pixel position.
        """
        pix = np.round(np.asarray(value) / self.scale / binsize).astype(np.int16)
        if np.isscalar(value):
            pix = int(pix)
        return pix

    def add_multiscale(self, binsize: int, compute: bool = True) -> ip.ImgArray:
        """Add new multiscaled image of given binsize."""
        # iterate from the larger bin size
        for _b, _img in reversed(self._multiscaled):
            if binsize == _b:
                warnings.warn(
                    f"Binsize {binsize} already exists in multiscale images. "
                    "Skip binning process.",
                    UserWarning,
                    stacklevel=2,
                )
                return _img
            if binsize % _b == 0:
                imgb = _img.binning(binsize // _b, check_edges=False)
                break
        else:
            imgb = self.image.binning(binsize, check_edges=False)
            if isinstance(imgb, ip.LazyImgArray) and compute:
                imgb = imgb.compute()
        self._multiscaled.append((binsize, imgb))
        self._multiscaled.sort(key=lambda x: x[0])
        return imgb

    def get_multiscale(self, binsize: int, add: bool = False) -> ip.ImgArray:
        """Get multiscaled image of given binsize."""
        for _b, _img in self._multiscaled:
            if _b == binsize:
                return _img
        if add:
            return self.add_multiscale(binsize)
        raise ValueError(f"Multiscale = {binsize} not found.")

    def _get_multiscale_or_original(
        self, binsize: int
    ) -> ip.ImgArray | ip.LazyImgArray:
        """Get multiscaled image of given binsize but use original one if needed."""
        if binsize > 1:
            out = self.get_multiscale(binsize)
        else:
            try:
                out = self.get_multiscale(1)
            except ValueError:
                out = self.image
        return out

    def multiscale_translation(self, binsize: int) -> nm:
        """Get lateral translation of binned image in nm."""
        return (binsize - 1) / 2 * self.scale

    def invert(self, cache: bool = False) -> Self:
        """Invert tomogram intensities **in-place**."""
        img_inv = -self.image
        if cache:
            img_inv.release()
        self._set_image(img_inv)
        self.metadata["is_inverted"] = True
        for i, (_b, _img) in enumerate(self._multiscaled):
            self._multiscaled[i] = (_b, -_img)
        return self

    def get_subtomogram_loader(
        self,
        mole: Molecules,
        output_shape: tuple[nm, nm, nm] | None = None,
        binsize: int = 1,
        order: int = 1,
    ) -> SubtomogramLoader:
        """Create a subtomogram loader from molecules."""
        from acryo import SubtomogramLoader

        if binsize == 1:
            try:
                img = self.get_multiscale(1)
            except ValueError:
                img = self.image
        else:
            tr = -self.multiscale_translation(binsize)
            mole = mole.translate([tr, tr, tr])
            img = self.get_multiscale(binsize)

        kwargs = {
            "order": order,
            "scale": self.scale * binsize,
        }
        if output_shape is not None:
            kwargs["output_shape"] = tuple(self.nm2pixel(output_shape, binsize=binsize))
        return SubtomogramLoader(
            img.value,
            mole,
            **kwargs,
        )
image: ip.ImgArray | ip.LazyImgArray property

Tomogram image data.

is_dummy: bool property

True if this tomogram object does not have a real image.

is_inverted: bool property

True if this image is inverted

metadata: dict[str, Any] property writable

Metadata relevant to the tomogram.

multiscaled: list[tuple[int, ip.ImgArray]] property

Get all multi-scale factor and the corresponding multiscaled images.

scale: nm property

Scale of the tomogram.

source: Path property

Path to the source file.

tilt: dict[str, Any] property

Tilt model as a dictionary.

tilt_model property

Tilt model of the tomogram.

__hash__()

Use unsafe hash.

Source code in cylindra/components/tomogram/_tomo_base.py
38
39
40
def __hash__(self) -> int:
    """Use unsafe hash."""
    return id(self)
add_multiscale(binsize, compute=True)

Add new multiscaled image of given binsize.

Source code in cylindra/components/tomogram/_tomo_base.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def add_multiscale(self, binsize: int, compute: bool = True) -> ip.ImgArray:
    """Add new multiscaled image of given binsize."""
    # iterate from the larger bin size
    for _b, _img in reversed(self._multiscaled):
        if binsize == _b:
            warnings.warn(
                f"Binsize {binsize} already exists in multiscale images. "
                "Skip binning process.",
                UserWarning,
                stacklevel=2,
            )
            return _img
        if binsize % _b == 0:
            imgb = _img.binning(binsize // _b, check_edges=False)
            break
    else:
        imgb = self.image.binning(binsize, check_edges=False)
        if isinstance(imgb, ip.LazyImgArray) and compute:
            imgb = imgb.compute()
    self._multiscaled.append((binsize, imgb))
    self._multiscaled.sort(key=lambda x: x[0])
    return imgb
dummy(*, scale=1.0, tilt=None, binsize=(), source=None, shape=(24, 24, 24), name=None) classmethod

Create a dummy tomogram.

Source code in cylindra/components/tomogram/_tomo_base.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@classmethod
def dummy(
    cls,
    *,
    scale: float = 1.0,
    tilt: tuple[float, float] | None = None,
    binsize: int | Iterable[int] = (),
    source: str | None = None,
    shape: tuple[int, int, int] = (24, 24, 24),
    name: str | None = None,
) -> Self:
    """Create a dummy tomogram."""
    dummy = ip.zeros(shape, name=name, dtype=np.float32, axes="zyx")
    dummy.source = source
    tomo = cls.from_image(
        dummy,
        scale=scale,
        tilt=tilt,
        binsize=binsize,
    )
    tomo.metadata["is_dummy"] = True
    return tomo
from_image(img, *, scale=None, tilt=None, binsize=(), compute=True) classmethod

Construct a Tomogram object from a image array.

Parameters:

Name Type Description Default
img array - like

Input image.

required
scale float

Pixel size in nm. If not given, will try to read from image header.

None
tilt tuple of float

Tilt model.

None
binsize int or iterable of int

Binsize to generate multiscale images. If not given, will not generate.

()
compute bool

Whether to compute the binned images.

True

Returns:

Type Description
Tomogram

Tomogram object with the image that has just been read and multi-scales.

Source code in cylindra/components/tomogram/_tomo_base.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
@classmethod
def from_image(
    cls,
    img: ip.ImgArray | ip.LazyImgArray,
    *,
    scale: float | None = None,
    tilt: tuple[float, float] | None = None,
    binsize: int | Iterable[int] = (),
    compute: bool = True,
):
    """
    Construct a Tomogram object from a image array.

    Parameters
    ----------
    img : array-like
        Input image.
    scale : float, optional
        Pixel size in nm. If not given, will try to read from image header.
    tilt : tuple of float, optional
        Tilt model.
    binsize : int or iterable of int, optional
        Binsize to generate multiscale images. If not given, will not generate.
    compute : bool, default True
        Whether to compute the binned images.

    Returns
    -------
    Tomogram
        Tomogram object with the image that has just been read and multi-scales.
    """
    self = cls()
    if type(img) is np.ndarray:
        img = ip.asarray(img, axes="zyx")

    if scale is not None:
        img.set_scale(xyz=scale)
    else:
        if (
            abs(img.scale.z - img.scale.x) > 1e-3
            or abs(img.scale.z - img.scale.y) > 1e-3
        ):
            raise ValueError(f"Uneven scale: {img.scale}.")

    self._set_image(img)
    if source := img.source:
        self._metadata["source"] = source.resolve()
    self._metadata["scale"] = scale
    # parse tilt model
    if not isinstance(tilt, TiltSeriesModel):
        if isinstance(tilt, dict):
            match tilt["kind"]:
                case "none":
                    tilt = NoWedge()
                case "x" | "y":
                    tilt = single_axis(tilt["range"], tilt["kind"])
                case "dual":
                    tilt = dual_axis(tilt["yrange"], tilt["xrange"])
                case _:
                    raise ValueError(
                        f"Tilt model {tilt!r} not in a correct format."
                    )
        else:
            tilt = single_axis(tilt)
    self._tilt_model = tilt

    if isinstance(binsize, int):
        binsize = [binsize]
    for b in sorted(binsize):
        self.add_multiscale(b, compute=compute)
    return self
get_multiscale(binsize, add=False)

Get multiscaled image of given binsize.

Source code in cylindra/components/tomogram/_tomo_base.py
333
334
335
336
337
338
339
340
def get_multiscale(self, binsize: int, add: bool = False) -> ip.ImgArray:
    """Get multiscaled image of given binsize."""
    for _b, _img in self._multiscaled:
        if _b == binsize:
            return _img
    if add:
        return self.add_multiscale(binsize)
    raise ValueError(f"Multiscale = {binsize} not found.")
get_subtomogram_loader(mole, output_shape=None, binsize=1, order=1)

Create a subtomogram loader from molecules.

Source code in cylindra/components/tomogram/_tomo_base.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def get_subtomogram_loader(
    self,
    mole: Molecules,
    output_shape: tuple[nm, nm, nm] | None = None,
    binsize: int = 1,
    order: int = 1,
) -> SubtomogramLoader:
    """Create a subtomogram loader from molecules."""
    from acryo import SubtomogramLoader

    if binsize == 1:
        try:
            img = self.get_multiscale(1)
        except ValueError:
            img = self.image
    else:
        tr = -self.multiscale_translation(binsize)
        mole = mole.translate([tr, tr, tr])
        img = self.get_multiscale(binsize)

    kwargs = {
        "order": order,
        "scale": self.scale * binsize,
    }
    if output_shape is not None:
        kwargs["output_shape"] = tuple(self.nm2pixel(output_shape, binsize=binsize))
    return SubtomogramLoader(
        img.value,
        mole,
        **kwargs,
    )
imread(path, *, scale=None, tilt=None, binsize=(), eager=False, compute=True) classmethod

Read a image as a dask array.

Parameters:

Name Type Description Default
path path - like

Path to the image file.

required
scale float

Pixel size in nm. If not given, will try to read from image header.

None
tilt tuple of float

Tilt model.

None
binsize int or iterable of int

Binsize to generate multiscale images. If not given, will not generate.

()
eager bool

Whether to read the image lazily. If True, the entire image will be read into the memory.

False

Returns:

Type Description
Tomogram

Tomogram object with the image that has just been read and multi-scales.

Source code in cylindra/components/tomogram/_tomo_base.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
@classmethod
def imread(
    cls,
    path: str | Path,
    *,
    scale: float = None,
    tilt: tuple[float, float] | None = None,
    binsize: int | Iterable[int] = (),
    eager: bool = False,
    compute: bool = True,
) -> Self:
    """
    Read a image as a dask array.

    Parameters
    ----------
    path : path-like
        Path to the image file.
    scale : float, optional
        Pixel size in nm. If not given, will try to read from image header.
    tilt : tuple of float, optional
        Tilt model.
    binsize : int or iterable of int, optional
        Binsize to generate multiscale images. If not given, will not generate.
    eager : bool, default False
        Whether to read the image lazily. If True, the entire image will be read
        into the memory.

    Returns
    -------
    Tomogram
        Tomogram object with the image that has just been read and multi-scales.
    """
    chunks = get_config().dask_chunk
    img = ip.lazy.imread(path, chunks=chunks)
    if eager:
        img = img.compute()
    return cls.from_image(
        _norm_dtype(img),
        scale=scale,
        tilt=tilt,
        binsize=binsize,
        compute=compute,
    )
invert(cache=False)

Invert tomogram intensities in-place.

Source code in cylindra/components/tomogram/_tomo_base.py
359
360
361
362
363
364
365
366
367
368
def invert(self, cache: bool = False) -> Self:
    """Invert tomogram intensities **in-place**."""
    img_inv = -self.image
    if cache:
        img_inv.release()
    self._set_image(img_inv)
    self.metadata["is_inverted"] = True
    for i, (_b, _img) in enumerate(self._multiscaled):
        self._multiscaled[i] = (_b, -_img)
    return self
multiscale_translation(binsize)

Get lateral translation of binned image in nm.

Source code in cylindra/components/tomogram/_tomo_base.py
355
356
357
def multiscale_translation(self, binsize: int) -> nm:
    """Get lateral translation of binned image in nm."""
    return (binsize - 1) / 2 * self.scale
nm2pixel(value, binsize=1)

Convert nm float value into pixel value. Useful for conversion from coordinate to pixel position.

Returns:

Type Description
ndarray or int

Pixel position.

Source code in cylindra/components/tomogram/_tomo_base.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def nm2pixel(self, value, binsize: int = 1):
    """
    Convert nm float value into pixel value. Useful for conversion from
    coordinate to pixel position.

    Returns
    -------
    np.ndarray or int
        Pixel position.
    """
    pix = np.round(np.asarray(value) / self.scale / binsize).astype(np.int16)
    if np.isscalar(value):
        pix = int(pix)
    return pix

BaseComponent

Base class for all tomographic components.

Source code in cylindra/components/_base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class BaseComponent(ABC):
    """Base class for all tomographic components."""

    @abstractclassmethod
    def from_dict(cls, js: dict[str, Any]) -> Self:
        """Construct a component from a dictionary."""
        raise NotImplementedError

    @abstractmethod
    def to_dict(self) -> dict[str, Any]:
        """Convert a component to a dictionary."""
        raise NotImplementedError

    def to_json(self, file_path: str | Path | io.IOBase, *, cls=None) -> None:
        """
        Save the model in a json format.

        Parameters
        ----------
        file_path : str
            Path to the file.
        cls : JSONEncoder, optional
            Custom JSON encoder, by default None
        """
        if isinstance(file_path, io.IOBase):
            return self._dump(file_path, cls)
        with open(str(file_path).strip("'").strip('"'), mode="w") as f:
            self._dump(f, cls)
        return None

    def _dump(self, f: io.IOBase, cls) -> None:
        """Dump the project to a file."""
        return json.dump(self.to_dict(), f, indent=4, separators=(",", ": "), cls=cls)

    @classmethod
    def from_json(cls, file_path: str | Path | io.IOBase) -> Self:
        """
        Construct a spline model from a json file.

        Parameters
        ----------
        file_path : str
            Path to json file.

        Returns
        -------
        BaseComponent
            Object constructed from the json file.
        """
        if isinstance(file_path, io.IOBase):
            return cls.from_dict(json.load(file_path))
        with open(str(file_path).strip("'").strip('"')) as f:
            js = json.load(f)
        return cls.from_dict(js)
from_dict(js)

Construct a component from a dictionary.

Source code in cylindra/components/_base.py
15
16
17
18
@abstractclassmethod
def from_dict(cls, js: dict[str, Any]) -> Self:
    """Construct a component from a dictionary."""
    raise NotImplementedError
from_json(file_path) classmethod

Construct a spline model from a json file.

Parameters:

Name Type Description Default
file_path str

Path to json file.

required

Returns:

Type Description
BaseComponent

Object constructed from the json file.

Source code in cylindra/components/_base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@classmethod
def from_json(cls, file_path: str | Path | io.IOBase) -> Self:
    """
    Construct a spline model from a json file.

    Parameters
    ----------
    file_path : str
        Path to json file.

    Returns
    -------
    BaseComponent
        Object constructed from the json file.
    """
    if isinstance(file_path, io.IOBase):
        return cls.from_dict(json.load(file_path))
    with open(str(file_path).strip("'").strip('"')) as f:
        js = json.load(f)
    return cls.from_dict(js)
to_dict() abstractmethod

Convert a component to a dictionary.

Source code in cylindra/components/_base.py
20
21
22
23
@abstractmethod
def to_dict(self) -> dict[str, Any]:
    """Convert a component to a dictionary."""
    raise NotImplementedError
to_json(file_path, *, cls=None)

Save the model in a json format.

Parameters:

Name Type Description Default
file_path str

Path to the file.

required
cls JSONEncoder

Custom JSON encoder, by default None

None
Source code in cylindra/components/_base.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def to_json(self, file_path: str | Path | io.IOBase, *, cls=None) -> None:
    """
    Save the model in a json format.

    Parameters
    ----------
    file_path : str
        Path to the file.
    cls : JSONEncoder, optional
        Custom JSON encoder, by default None
    """
    if isinstance(file_path, io.IOBase):
        return self._dump(file_path, cls)
    with open(str(file_path).strip("'").strip('"'), mode="w") as f:
        self._dump(f, cls)
    return None

AnnealingResult dataclass

Dataclass for storing the annealing results.

Parameters:

Name Type Description Default
energies ndarray

History of energies of the annealing process.

required
batch_size int

Batch size used in the annealing process.

required
time_const float

Time constant used for cooling.

required
indices ndarray

The optimized indices of the molecules.

required
niter int

Number of iterations.

required
state str

Optimization state.

required
Source code in cylindra/components/landscape.py
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
@dataclass
class AnnealingResult:
    """
    Dataclass for storing the annealing results.

    Parameters
    ----------
    energies : np.ndarray
        History of energies of the annealing process.
    batch_size : int
        Batch size used in the annealing process.
    time_const : float
        Time constant used for cooling.
    indices : np.ndarray
        The optimized indices of the molecules.
    niter : int
        Number of iterations.
    state : str
        Optimization state.
    """

    energies: NDArray[np.float32]
    batch_size: int
    time_const: float
    indices: NDArray[np.int32]
    niter: int
    state: str

Landscape dataclass

Energy landscape array.

Parameters:

Name Type Description Default
energies NDArray[float32]

4D array of energy values.

required
molecules Molecules

Molecules object.

required
argmax NDArray[int32]

Argmax indices to track which rotation resulted in the best alignment.

required
quaternions NDArray[float32]

Quaternions used for template rotation.

required
scale_factor float

Scale factor to convert from pixels to nanometers (upsampling considered). scale / upsample_factor will be passed to this parameter from the GUI.

required
Source code in cylindra/components/landscape.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
@dataclass
class Landscape:
    """
    Energy landscape array.

    Parameters
    ----------
    energies : NDArray[np.float32]
        4D array of energy values.
    molecules : Molecules
        Molecules object.
    argmax : NDArray[np.int32], optional
        Argmax indices to track which rotation resulted in the best alignment.
    quaternions : NDArray[np.float32]
        Quaternions used for template rotation.
    scale_factor : float
        Scale factor to convert from pixels to nanometers (upsampling considered).
        ``scale / upsample_factor`` will be passed to this parameter from the GUI.
    """

    energies: NDArray[np.float32]
    molecules: Molecules
    argmax: NDArray[np.int32] | None
    quaternions: NDArray[np.float32]
    scale_factor: float

    def __getitem__(
        self, key: slice | list[SupportsIndex] | NDArray[np.integer]
    ) -> Landscape:
        """Subset of the landscape."""
        if not isinstance(key, (slice, list, np.ndarray)):
            raise TypeError(f"Invalid type of key: {type(key)}")
        energy = self.energies[key]
        mole = self.molecules.subset(key)
        argmax = self.argmax[key] if self.argmax is not None else None
        return Landscape(energy, mole, argmax, self.quaternions, self.scale_factor)

    def __repr__(self) -> str:
        eng_repr = f"<{self.energies.shape!r} array>"
        mole_repr = f"<{self.molecules.count()} molecules>"
        argmax_repr = (
            f"<{self.argmax.shape!r} array>" if self.argmax is not None else None
        )
        return (
            f"Landscape(energies={eng_repr}, molecules={mole_repr}, "
            f"argmax={argmax_repr}, quaternion={self.quaternions!r}, "
            f"scale_factor={self.scale_factor:.3g})"
        )

    @property
    def offset(self) -> NDArray[np.int32]:
        """Shift from the corner (0, 0, 0) to the center."""
        shift = (np.array(self.energies.shape[1:], dtype=np.float32) - 1) / 2
        return shift.astype(np.int32)

    @property
    def offset_nm(self) -> NDArray[np.float32]:
        """Offset in nm."""
        return self.offset * self.scale_factor

    @classmethod
    def from_loader(
        cls,
        loader: LoaderBase,
        template: TemplateInputType,
        mask: MaskInputType = None,
        max_shifts: tuple[nm, nm, nm] = (0.8, 0.8, 0.8),
        upsample_factor: int = 5,
        alignment_model: alignment.TomographyInput = alignment.ZNCCAlignment,
    ) -> Landscape:
        """
        Construct a landscape from a loader object.

        Parameters
        ----------
        loader : LoaderBase
            Any loader object from ``acryo``.
        template : template input type
            Template image or a list of template images to be used.
        mask : mask input type, optional
            Mask image to be used, by default None
        max_shifts : (float, float, float), optional
            Maximum shifts in nm, in (Z, Y, X) order.
        upsample_factor : int
            Upsampling factor for landscape construction.
        alignment_model : alignment model object
            Alignment model to be used to evaluate correlation score.
        """
        if isinstance(template, (str, Path)):
            template = pipe.from_file(template)
            multi = False
        elif isinstance(template, (list, tuple)) and isinstance(
            next(iter(template), None), (str, Path)
        ):
            template = pipe.from_files(template)
            multi = True
        elif isinstance(template, np.ndarray):
            multi = template.ndim == 4
        else:
            raise TypeError(f"Invalid type of template: {type(template)}")

        score_dsk = loader.construct_landscape(
            template,
            mask=mask,
            max_shifts=max_shifts,
            upsample=upsample_factor,
            alignment_model=alignment_model,
        )
        score, argmax = _calc_landscape(
            alignment_model, score_dsk, multi_templates=multi
        )
        mole = loader.molecules
        to_drop = set(mole.features.columns) - {Mole.nth, Mole.pf, Mole.position}
        if to_drop:
            mole = mole.drop_features(*to_drop)
        return cls(
            energies=-np.ascontiguousarray(score),
            molecules=mole,
            argmax=argmax,
            quaternions=alignment_model.quaternions,
            scale_factor=loader.scale / upsample_factor,
        )

    def transform_molecules(
        self,
        molecules: Molecules,
        indices: NDArray[np.int32],
        detect_peak: bool = False,
    ) -> Molecules:
        """
        Transform the input molecules based on the landscape.

        Parameters
        ----------
        molecules : Molecules
            Molecules object to be transformed.
        indices : integer array
            Indices in the landscape to be used for transformation.
        detect_peak : bool, default False
            If True, landscape will be sub-sampled to detect the peak in higher
            precision. This should be false for constrained alignment, as the detected
            peak usually does not represent the optimal result.

        Returns
        -------
        Molecules
            Transformed molecules.
        """
        offset = self.offset
        indices_sub = indices.astype(np.float32)
        nmole = self.energies.shape[0]
        opt_energy = np.zeros(nmole, dtype=np.float32)
        nrepeat = 3 if detect_peak else 0
        for i in range(nmole):
            eng = self.energies[i]
            indices_sub[i], opt_energy[i] = find_peak(eng, indices[i], nrepeat=nrepeat)
        opt_score = -opt_energy
        shifts = ((indices_sub - offset) * self.scale_factor).astype(np.float32)
        molecules_opt = molecules.translate_internal(shifts)
        shift_feat = _as_n_series("align-d{}", shifts)
        if (nrots := self.quaternions.shape[0]) > 1:
            quats = np.stack(
                [
                    self.quaternions[self.argmax[(i, *ind)] % nrots]
                    for i, ind in enumerate(indices)
                ],
                axis=0,
            )
            rotator = Rotation.from_quat(quats).inv()
            molecules_opt = molecules_opt.rotate_by(rotator)
            rotvec = rotator.as_rotvec().astype(np.float32)
            rotvec_feat = _as_n_series("align-d{}rot", rotvec)
            molecules_opt = molecules_opt.with_features(*rotvec_feat)

        return molecules_opt.with_features(
            *shift_feat, pl.Series(Mole.score, opt_score)
        )

    def run_min_energy(
        self, spl: CylSpline | None = None
    ) -> tuple[Molecules, MinEnergyResult]:
        """Minimize energy for each local landscape independently."""
        shape = self.energies.shape[1:]
        indices = list[NDArray[np.int32]]()
        engs = list[float]()
        for i in range(self.energies.shape[0]):
            eng = self.energies[i]
            pos = np.unravel_index(np.argmin(eng), shape)
            indices.append(np.array(pos, dtype=np.int32))
            engs.append(eng[pos])
        indices = np.stack(indices, axis=0)
        engs = np.array(engs, dtype=np.float32)
        result = MinEnergyResult(indices, engs)

        mole_opt = self.transform_molecules(
            self.molecules, result.indices, detect_peak=True
        )
        if spl is not None:
            mole_opt = _update_mole_pos(mole_opt, self.molecules, spl)
        return mole_opt, result

    def run_viterbi(
        self, dist_range: tuple[_DistLike, _DistLike], angle_max: float | None = None
    ):
        """Run Viterbi alignment."""
        dist_min, dist_max = self._norm_dist(dist_range)
        if angle_max is not None:
            angle_max = np.deg2rad(angle_max)
        grid = self._prep_viterbi_grid()
        return ViterbiResult(*grid.viterbi(dist_min, dist_max, angle_max))

    def run_viterbi_along_spline(
        self,
        spl: CylSpline | None,
        range_long: tuple[nm, nm] = (4.0, 4.28),
        angle_max: float | None = 5.0,
    ):
        mole = self.molecules
        if Mole.pf in mole.features.columns:
            npfs: Sequence[int] = mole.features[Mole.pf].unique(maintain_order=True)
            slices = [(mole.features[Mole.pf] == i).to_numpy() for i in npfs]
            viterbi_tasks = [
                delayed(self[sl].run_viterbi)(range_long, angle_max) for sl in slices
            ]
        else:
            slices = [slice(None)]
            viterbi_tasks = [delayed(self.run_viterbi)(range_long, angle_max)]
        _Logger.info("Running Viterbi alignment")
        _Logger.info(f"  shape: {self.energies.shape[1:]!r}")
        _Logger.info(f"  tasks: {len(viterbi_tasks)}")
        vit_out = compute(*viterbi_tasks)

        inds = np.empty((mole.count(), 3), dtype=np.int32)
        max_shifts_px = (np.array(self.energies.shape[1:]) - 1) // 2
        for i, result in enumerate(vit_out):
            inds[slices[i], :] = _check_viterbi_shift(result.indices, max_shifts_px, i)
        molecules_opt = self.transform_molecules(mole, inds)
        if spl is not None:
            molecules_opt = _update_mole_pos(molecules_opt, mole, spl)
        return molecules_opt

    def run_viterbi_fixed_start(
        self,
        first: NDArray[np.float32],
        range_long: tuple[_DistLike, _DistLike] = (4.0, 4.28),
        angle_max: float | None = 5.0,
    ):
        """Run Viterbi alignment with a fixed start edge."""
        mole = self.molecules
        dist_min, dist_max = self._norm_dist(range_long)
        if angle_max is not None:
            angle_max = np.deg2rad(angle_max)
        grid = self._prep_viterbi_grid()
        mole0 = Molecules(first, mole.rotator[:1]).translate_internal(-self.offset_nm)
        origin = (mole0.pos[0] / self.scale_factor).astype(np.float32)
        res = ViterbiResult(
            *grid.viterbi_fixed_start(
                dist_min, dist_max, first / self.scale_factor, origin, angle_max
            )
        )
        max_shifts_px = (np.array(self.energies.shape[1:]) - 1) // 2
        inds = _check_viterbi_shift(res.indices, max_shifts_px, 0)
        molecules_opt = self.transform_molecules(mole, inds)
        return molecules_opt

    def _prep_annealing_model(
        self,
        cls: type[_ANN],
        spl: CylSpline,
        distance_range_long: tuple[_DistLike, _DistLike],
        distance_range_lat: tuple[_DistLike, _DistLike],
        angle_max: float | None = None,
        temperature_time_const: float = 1.0,
        temperature: float | None = None,
        cooling_rate: float | None = None,
        reject_limit: int | None = None,
    ) -> _ANN:
        cyl = spl.cylinder_model()
        _nrise, _npf = cyl.nrise, cyl.shape[1]
        molecules = self.molecules
        mole = molecules.translate_internal(-self.offset_nm)
        if angle_max is None:
            angle_max = 90.0

        time_const, temperature, cooling_rate, reject_limit = self._normalize_args(
            temperature_time_const, temperature, cooling_rate, reject_limit
        )
        ind = molecules.features.select([Mole.nth, Mole.pf]).to_numpy().astype(np.int32)
        ind[:, 0] -= ind[:, 0].min()
        ind[:, 1] -= ind[:, 1].min()
        model = cls().construct_graph(indices=ind, npf=_npf, nrise=_nrise)
        return (
            model.set_graph_coordinates(
                origin=mole.pos,
                zvec=(mole.z * self.scale_factor).astype(np.float32),
                yvec=(mole.y * self.scale_factor).astype(np.float32),
                xvec=(mole.x * self.scale_factor).astype(np.float32),
            )
            .set_energy_landscape(self.energies)
            .set_reservoir(
                temperature=temperature,
                time_constant=time_const,
            )
            .set_box_potential(
                *self._norm_distance_range_long(distance_range_long, model),
                *self._norm_distance_range_lat(distance_range_lat, model),
                float(np.deg2rad(angle_max)),
                cooling_rate=cooling_rate,
            )
            .with_reject_limit(reject_limit)
        )

    def annealing_model(
        self,
        spl: CylSpline,
        distance_range_long: tuple[_DistLike, _DistLike],
        distance_range_lat: tuple[_DistLike, _DistLike],
        angle_max: float | None = None,
        temperature_time_const: float = 1.0,
        temperature: float | None = None,
        cooling_rate: float | None = None,
        reject_limit: int | None = None,
    ) -> CylindricAnnealingModel:
        """Get an annealing model using the landscape."""
        from cylindra._cylindra_ext import CylindricAnnealingModel

        return self._prep_annealing_model(
            CylindricAnnealingModel,
            spl, distance_range_long, distance_range_lat, angle_max,
            temperature_time_const, temperature, cooling_rate, reject_limit,
        )  # fmt: skip

    def run_annealing(
        self,
        spl: CylSpline,
        distance_range_long: tuple[_DistLike, _DistLike],
        distance_range_lat: tuple[_DistLike, _DistLike],
        angle_max: float | None = None,
        temperature_time_const: float = 1.0,
        temperature: float | None = None,
        cooling_rate: float | None = None,
        reject_limit: int | None = None,
        random_seeds: list[int] = [0],
    ) -> list[AnnealingResult]:
        """Run simulated mesh annealing."""

        if angle_max is None:
            angle_max = 90.0
        random_seeds = _normalize_random_seeds(random_seeds)
        annealing = self.annealing_model(
            spl,
            distance_range_long=distance_range_long,
            distance_range_lat=distance_range_lat,
            angle_max=angle_max,
            temperature_time_const=temperature_time_const,
            temperature=temperature,
            cooling_rate=cooling_rate,
            reject_limit=reject_limit,
        )

        batch_size = _to_batch_size(annealing.time_constant())
        temp0 = annealing.temperature()
        _Logger.info("Running annealing")
        _Logger.info(f"  shape: {self.energies.shape[1:]!r}")
        tasks = [
            _run_annealing(annealing.with_seed(s), batch_size, temp0)
            for s in random_seeds
        ]
        return compute(*tasks)

    def run_annealing_along_spline(
        self,
        spl: CylSpline,
        range_long: tuple[_DistLike, _DistLike],
        range_lat: tuple[_DistLike, _DistLike],
        angle_max: float,
        temperature_time_const: float = 1.0,
        random_seeds: Sequence[int] = (0, 1, 2, 3, 4),
    ):
        mole = self.molecules
        results = self.run_annealing(
            spl,
            range_long,
            range_lat,
            angle_max,
            temperature_time_const=temperature_time_const,
            random_seeds=random_seeds,
        )
        if all(result.state == "failed" for result in results):
            raise RuntimeError(
                "Failed to optimize for all trials. You may check the distance range."
            )
        elif not any(result.state == "converged" for result in results):
            _Logger.print("Optimization did not converge for any trial.")

        _Logger.print_table(
            {
                "Iteration": [r.niter for r in results],
                "Score": [f"{-float(r.energies[-1]):.3g}" for r in results],
                "State": [r.state for r in results],
            }
        )
        results = sorted(results, key=lambda r: r.energies[-1])
        mole_opt = self.transform_molecules(mole, results[0].indices)
        mole_opt = _update_mole_pos(mole_opt, mole, spl)
        return mole_opt, results

    def normed(self, sd: bool = True) -> Landscape:
        """Return a landscape with normalized mean energy."""
        each_mean: NDArray[np.float32] = self.energies.mean(axis=(1, 2, 3))
        all_mean = each_mean.mean()
        sl = (slice(None), np.newaxis, np.newaxis, np.newaxis)
        if sd:
            each_sd: NDArray[np.float32] = self.energies.std(axis=(1, 2, 3))
            all_sd = each_sd.mean()
            dif = self.energies - each_mean[sl]
            new_array = dif * all_sd / each_sd[sl] + all_mean
        else:
            new_array = self.energies + all_mean - each_mean[sl]
        return self.replace_energies(new_array)

    def clip_energies(
        self,
        low: float | None = None,
        high: float | None = None,
    ) -> Landscape:
        low = np.float32(low) if low is not None else None
        high = np.float32(high) if high is not None else None
        energy = np.clip(self.energies, a_min=low, a_max=high)
        return self.replace_energies(energy)

    def replace_energies(self, energies: NDArray[np.float32]) -> Landscape:
        if energies.dtype != self.energies.dtype:
            raise ValueError(
                f"The dtype of energies must be float32, got {energies.dtype}"
            )
        if energies.shape != self.energies.shape:
            raise ValueError(
                f"The shape of energies must be {self.energies.shape}, got "
                f"{energies.shape}"
            )
        return Landscape(
            energies,
            self.molecules,
            self.argmax,
            self.quaternions,
            self.scale_factor,
        )

    def create_surface(
        self,
        level: float | None = None,
        resolution: nm | None = None,
        show_min: bool = True,
    ) -> SurfaceData:
        """Create a isosurface data from the landscape"""
        if level is None:
            level = self.energies.mean()
        if show_min:
            intensity = -self.energies
            level = -level
        else:
            intensity = self.energies
        if resolution is None:
            resolution = np.min(self.energies.shape[1:]) * self.scale_factor / 4

        step_size = max(int(resolution / self.scale_factor), 1)
        spacing = (self.scale_factor,) * 3
        center = np.array(intensity.shape[1:]) / 2 + 0.5
        offset = center * spacing
        n_verts = 0
        tasks = list[Delayed[SurfaceData]]()
        for i in range(intensity.shape[0]):
            arr: NDArray[np.float32] = intensity[i]
            tasks.append(delayed_isosurface(arr, level, spacing, step_size=step_size))
        surfs = compute(*tasks)
        for i in range(intensity.shape[0]):
            mole = self.molecules[i]
            surf = surfs[i]
            surf = SurfaceData(
                mole.rotator.apply(surf.vertices - offset) + mole.pos,
                surf.faces + n_verts,
                surf.values,
            )
            surfs[i] = surf  # update
            n_verts += len(surf.vertices)
        vertices = np.concatenate([s.vertices for s in surfs], axis=0)
        faces = np.concatenate([s.faces for s in surfs], axis=0)
        values = np.concatenate([s.values for s in surfs], axis=0)
        return SurfaceData(vertices, faces, values)

    ### IO ###

    @classmethod
    def from_dir(cls, path: str | Path) -> Landscape:
        """Load a landscape from a directory."""
        path = Path(path)
        if path.suffix != "":
            raise ValueError(f"Must be a directory, got {path}")
        energies = ip.imread(path / "landscape.tif")
        molecules = Molecules.from_parquet(path / "molecules.parquet")
        argmax = None
        if (fp := path / "argmax.parquet").exists():
            argmax = pl.read_parquet(fp).to_series().to_numpy()
        quaternions = np.atleast_2d(
            np.loadtxt(path / "quaternions.txt", delimiter=",", dtype=np.float32)
        )
        scale_factor = energies.scale["x"]
        return cls(energies.value, molecules, argmax, quaternions, scale_factor)

    def save(self, path: str | Path) -> None:
        """Save the landscape to a directory."""
        path = Path(path)
        if path.suffix != "":
            raise ValueError(f"Must be a directory, got {path}")
        path.mkdir(exist_ok=False)
        arr = ip.asarray(self.energies, axes="tzyx").set_scale(
            xyz=self.scale_factor, unit="nm"
        )
        arr.imsave(path / "landscape.tif")
        self.molecules.to_parquet(path / "molecules.parquet")
        if self.argmax is not None:
            pl.DataFrame({"argmax": self.argmax}).write_parquet(
                path / "argmax.parquet", compression_level=10
            )
        self.quaternions.tofile(path / "quaternions.txt", sep=",")
        return None

    def _norm_distance_range_long(
        self,
        rng: tuple[nm | str, nm | str],
        model: CylindricAnnealingModel,
    ) -> tuple[nm, nm]:
        rng0, rng1 = rng
        if isinstance(rng0, str) or isinstance(rng1, str):
            long_dist_arr = model.longitudinal_distances()
            if isinstance(rng0, str):
                rng0 = _norm_distance(rng0, long_dist_arr)
            if isinstance(rng1, str):
                rng1 = _norm_distance(rng1, long_dist_arr)
        if not rng0 < rng1:
            raise ValueError(f"Lower is larger than the upper: {(rng0, rng1)}")
        return rng0, rng1

    def _norm_distance_range_lat(
        self,
        rng: tuple[nm | str, nm | str],
        model: CylindricAnnealingModel,
    ) -> tuple[nm, nm]:
        rng0, rng1 = rng
        if isinstance(rng0, str) or isinstance(rng1, str):
            lat_dist_arr = model.lateral_distances()
            if isinstance(rng0, str):
                rng0 = _norm_distance(rng0, lat_dist_arr)
            if isinstance(rng1, str):
                rng1 = _norm_distance(rng1, lat_dist_arr)
        if not rng0 < rng1:
            raise ValueError(f"Lower is larger than the upper: {(rng0, rng1)}")
        return rng0, rng1

    def _normalize_args(
        self, temperature_time_const, temperature, cooling_rate, reject_limit
    ):
        nmole = self.molecules.count()
        time_const = nmole * np.prod(self.energies.shape[1:]) * temperature_time_const
        _energy_std = np.std(self.energies)
        if temperature is None:
            temperature = _energy_std * 2
        if cooling_rate is None:
            cooling_rate = _energy_std / time_const * 8
        if reject_limit is None:
            reject_limit = nmole * 50
        return time_const, temperature, cooling_rate, reject_limit

    def _norm_dist(self, dist_range: tuple[_DistLike, _DistLike]) -> tuple[nm, nm]:
        dist_min, dist_max = dist_range
        # normalize distance limits
        dist_arr = np.sqrt(np.sum(np.diff(self.molecules.pos, axis=0) ** 2, axis=1))
        dist_min = _norm_distance(dist_min, dist_arr)
        dist_max = _norm_distance(dist_max, dist_arr)
        return dist_min / self.scale_factor, dist_max / self.scale_factor

    def _prep_viterbi_grid(self):
        from cylindra._cylindra_ext import ViterbiGrid

        mole = self.molecules.translate_internal(-self.offset_nm)
        origin = (mole.pos / self.scale_factor).astype(np.float32)
        zvec = mole.z.astype(np.float32)
        yvec = mole.y.astype(np.float32)
        xvec = mole.x.astype(np.float32)
        return ViterbiGrid(-self.energies, origin, zvec, yvec, xvec)
offset: NDArray[np.int32] property

Shift from the corner (0, 0, 0) to the center.

offset_nm: NDArray[np.float32] property

Offset in nm.

__getitem__(key)

Subset of the landscape.

Source code in cylindra/components/landscape.py
72
73
74
75
76
77
78
79
80
81
def __getitem__(
    self, key: slice | list[SupportsIndex] | NDArray[np.integer]
) -> Landscape:
    """Subset of the landscape."""
    if not isinstance(key, (slice, list, np.ndarray)):
        raise TypeError(f"Invalid type of key: {type(key)}")
    energy = self.energies[key]
    mole = self.molecules.subset(key)
    argmax = self.argmax[key] if self.argmax is not None else None
    return Landscape(energy, mole, argmax, self.quaternions, self.scale_factor)
annealing_model(spl, distance_range_long, distance_range_lat, angle_max=None, temperature_time_const=1.0, temperature=None, cooling_rate=None, reject_limit=None)

Get an annealing model using the landscape.

Source code in cylindra/components/landscape.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def annealing_model(
    self,
    spl: CylSpline,
    distance_range_long: tuple[_DistLike, _DistLike],
    distance_range_lat: tuple[_DistLike, _DistLike],
    angle_max: float | None = None,
    temperature_time_const: float = 1.0,
    temperature: float | None = None,
    cooling_rate: float | None = None,
    reject_limit: int | None = None,
) -> CylindricAnnealingModel:
    """Get an annealing model using the landscape."""
    from cylindra._cylindra_ext import CylindricAnnealingModel

    return self._prep_annealing_model(
        CylindricAnnealingModel,
        spl, distance_range_long, distance_range_lat, angle_max,
        temperature_time_const, temperature, cooling_rate, reject_limit,
    )  # fmt: skip
create_surface(level=None, resolution=None, show_min=True)

Create a isosurface data from the landscape

Source code in cylindra/components/landscape.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def create_surface(
    self,
    level: float | None = None,
    resolution: nm | None = None,
    show_min: bool = True,
) -> SurfaceData:
    """Create a isosurface data from the landscape"""
    if level is None:
        level = self.energies.mean()
    if show_min:
        intensity = -self.energies
        level = -level
    else:
        intensity = self.energies
    if resolution is None:
        resolution = np.min(self.energies.shape[1:]) * self.scale_factor / 4

    step_size = max(int(resolution / self.scale_factor), 1)
    spacing = (self.scale_factor,) * 3
    center = np.array(intensity.shape[1:]) / 2 + 0.5
    offset = center * spacing
    n_verts = 0
    tasks = list[Delayed[SurfaceData]]()
    for i in range(intensity.shape[0]):
        arr: NDArray[np.float32] = intensity[i]
        tasks.append(delayed_isosurface(arr, level, spacing, step_size=step_size))
    surfs = compute(*tasks)
    for i in range(intensity.shape[0]):
        mole = self.molecules[i]
        surf = surfs[i]
        surf = SurfaceData(
            mole.rotator.apply(surf.vertices - offset) + mole.pos,
            surf.faces + n_verts,
            surf.values,
        )
        surfs[i] = surf  # update
        n_verts += len(surf.vertices)
    vertices = np.concatenate([s.vertices for s in surfs], axis=0)
    faces = np.concatenate([s.faces for s in surfs], axis=0)
    values = np.concatenate([s.values for s in surfs], axis=0)
    return SurfaceData(vertices, faces, values)
from_dir(path) classmethod

Load a landscape from a directory.

Source code in cylindra/components/landscape.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
@classmethod
def from_dir(cls, path: str | Path) -> Landscape:
    """Load a landscape from a directory."""
    path = Path(path)
    if path.suffix != "":
        raise ValueError(f"Must be a directory, got {path}")
    energies = ip.imread(path / "landscape.tif")
    molecules = Molecules.from_parquet(path / "molecules.parquet")
    argmax = None
    if (fp := path / "argmax.parquet").exists():
        argmax = pl.read_parquet(fp).to_series().to_numpy()
    quaternions = np.atleast_2d(
        np.loadtxt(path / "quaternions.txt", delimiter=",", dtype=np.float32)
    )
    scale_factor = energies.scale["x"]
    return cls(energies.value, molecules, argmax, quaternions, scale_factor)
from_loader(loader, template, mask=None, max_shifts=(0.8, 0.8, 0.8), upsample_factor=5, alignment_model=alignment.ZNCCAlignment) classmethod

Construct a landscape from a loader object.

Parameters:

Name Type Description Default
loader LoaderBase

Any loader object from acryo.

required
template template input type

Template image or a list of template images to be used.

required
mask mask input type

Mask image to be used, by default None

None
max_shifts (float, float, float)

Maximum shifts in nm, in (Z, Y, X) order.

(0.8, 0.8, 0.8)
upsample_factor int

Upsampling factor for landscape construction.

5
alignment_model alignment model object

Alignment model to be used to evaluate correlation score.

ZNCCAlignment
Source code in cylindra/components/landscape.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@classmethod
def from_loader(
    cls,
    loader: LoaderBase,
    template: TemplateInputType,
    mask: MaskInputType = None,
    max_shifts: tuple[nm, nm, nm] = (0.8, 0.8, 0.8),
    upsample_factor: int = 5,
    alignment_model: alignment.TomographyInput = alignment.ZNCCAlignment,
) -> Landscape:
    """
    Construct a landscape from a loader object.

    Parameters
    ----------
    loader : LoaderBase
        Any loader object from ``acryo``.
    template : template input type
        Template image or a list of template images to be used.
    mask : mask input type, optional
        Mask image to be used, by default None
    max_shifts : (float, float, float), optional
        Maximum shifts in nm, in (Z, Y, X) order.
    upsample_factor : int
        Upsampling factor for landscape construction.
    alignment_model : alignment model object
        Alignment model to be used to evaluate correlation score.
    """
    if isinstance(template, (str, Path)):
        template = pipe.from_file(template)
        multi = False
    elif isinstance(template, (list, tuple)) and isinstance(
        next(iter(template), None), (str, Path)
    ):
        template = pipe.from_files(template)
        multi = True
    elif isinstance(template, np.ndarray):
        multi = template.ndim == 4
    else:
        raise TypeError(f"Invalid type of template: {type(template)}")

    score_dsk = loader.construct_landscape(
        template,
        mask=mask,
        max_shifts=max_shifts,
        upsample=upsample_factor,
        alignment_model=alignment_model,
    )
    score, argmax = _calc_landscape(
        alignment_model, score_dsk, multi_templates=multi
    )
    mole = loader.molecules
    to_drop = set(mole.features.columns) - {Mole.nth, Mole.pf, Mole.position}
    if to_drop:
        mole = mole.drop_features(*to_drop)
    return cls(
        energies=-np.ascontiguousarray(score),
        molecules=mole,
        argmax=argmax,
        quaternions=alignment_model.quaternions,
        scale_factor=loader.scale / upsample_factor,
    )
normed(sd=True)

Return a landscape with normalized mean energy.

Source code in cylindra/components/landscape.py
453
454
455
456
457
458
459
460
461
462
463
464
465
def normed(self, sd: bool = True) -> Landscape:
    """Return a landscape with normalized mean energy."""
    each_mean: NDArray[np.float32] = self.energies.mean(axis=(1, 2, 3))
    all_mean = each_mean.mean()
    sl = (slice(None), np.newaxis, np.newaxis, np.newaxis)
    if sd:
        each_sd: NDArray[np.float32] = self.energies.std(axis=(1, 2, 3))
        all_sd = each_sd.mean()
        dif = self.energies - each_mean[sl]
        new_array = dif * all_sd / each_sd[sl] + all_mean
    else:
        new_array = self.energies + all_mean - each_mean[sl]
    return self.replace_energies(new_array)
run_annealing(spl, distance_range_long, distance_range_lat, angle_max=None, temperature_time_const=1.0, temperature=None, cooling_rate=None, reject_limit=None, random_seeds=[0])

Run simulated mesh annealing.

Source code in cylindra/components/landscape.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def run_annealing(
    self,
    spl: CylSpline,
    distance_range_long: tuple[_DistLike, _DistLike],
    distance_range_lat: tuple[_DistLike, _DistLike],
    angle_max: float | None = None,
    temperature_time_const: float = 1.0,
    temperature: float | None = None,
    cooling_rate: float | None = None,
    reject_limit: int | None = None,
    random_seeds: list[int] = [0],
) -> list[AnnealingResult]:
    """Run simulated mesh annealing."""

    if angle_max is None:
        angle_max = 90.0
    random_seeds = _normalize_random_seeds(random_seeds)
    annealing = self.annealing_model(
        spl,
        distance_range_long=distance_range_long,
        distance_range_lat=distance_range_lat,
        angle_max=angle_max,
        temperature_time_const=temperature_time_const,
        temperature=temperature,
        cooling_rate=cooling_rate,
        reject_limit=reject_limit,
    )

    batch_size = _to_batch_size(annealing.time_constant())
    temp0 = annealing.temperature()
    _Logger.info("Running annealing")
    _Logger.info(f"  shape: {self.energies.shape[1:]!r}")
    tasks = [
        _run_annealing(annealing.with_seed(s), batch_size, temp0)
        for s in random_seeds
    ]
    return compute(*tasks)
run_min_energy(spl=None)

Minimize energy for each local landscape independently.

Source code in cylindra/components/landscape.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def run_min_energy(
    self, spl: CylSpline | None = None
) -> tuple[Molecules, MinEnergyResult]:
    """Minimize energy for each local landscape independently."""
    shape = self.energies.shape[1:]
    indices = list[NDArray[np.int32]]()
    engs = list[float]()
    for i in range(self.energies.shape[0]):
        eng = self.energies[i]
        pos = np.unravel_index(np.argmin(eng), shape)
        indices.append(np.array(pos, dtype=np.int32))
        engs.append(eng[pos])
    indices = np.stack(indices, axis=0)
    engs = np.array(engs, dtype=np.float32)
    result = MinEnergyResult(indices, engs)

    mole_opt = self.transform_molecules(
        self.molecules, result.indices, detect_peak=True
    )
    if spl is not None:
        mole_opt = _update_mole_pos(mole_opt, self.molecules, spl)
    return mole_opt, result
run_viterbi(dist_range, angle_max=None)

Run Viterbi alignment.

Source code in cylindra/components/landscape.py
247
248
249
250
251
252
253
254
255
def run_viterbi(
    self, dist_range: tuple[_DistLike, _DistLike], angle_max: float | None = None
):
    """Run Viterbi alignment."""
    dist_min, dist_max = self._norm_dist(dist_range)
    if angle_max is not None:
        angle_max = np.deg2rad(angle_max)
    grid = self._prep_viterbi_grid()
    return ViterbiResult(*grid.viterbi(dist_min, dist_max, angle_max))
run_viterbi_fixed_start(first, range_long=(4.0, 4.28), angle_max=5.0)

Run Viterbi alignment with a fixed start edge.

Source code in cylindra/components/landscape.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def run_viterbi_fixed_start(
    self,
    first: NDArray[np.float32],
    range_long: tuple[_DistLike, _DistLike] = (4.0, 4.28),
    angle_max: float | None = 5.0,
):
    """Run Viterbi alignment with a fixed start edge."""
    mole = self.molecules
    dist_min, dist_max = self._norm_dist(range_long)
    if angle_max is not None:
        angle_max = np.deg2rad(angle_max)
    grid = self._prep_viterbi_grid()
    mole0 = Molecules(first, mole.rotator[:1]).translate_internal(-self.offset_nm)
    origin = (mole0.pos[0] / self.scale_factor).astype(np.float32)
    res = ViterbiResult(
        *grid.viterbi_fixed_start(
            dist_min, dist_max, first / self.scale_factor, origin, angle_max
        )
    )
    max_shifts_px = (np.array(self.energies.shape[1:]) - 1) // 2
    inds = _check_viterbi_shift(res.indices, max_shifts_px, 0)
    molecules_opt = self.transform_molecules(mole, inds)
    return molecules_opt
save(path)

Save the landscape to a directory.

Source code in cylindra/components/landscape.py
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
def save(self, path: str | Path) -> None:
    """Save the landscape to a directory."""
    path = Path(path)
    if path.suffix != "":
        raise ValueError(f"Must be a directory, got {path}")
    path.mkdir(exist_ok=False)
    arr = ip.asarray(self.energies, axes="tzyx").set_scale(
        xyz=self.scale_factor, unit="nm"
    )
    arr.imsave(path / "landscape.tif")
    self.molecules.to_parquet(path / "molecules.parquet")
    if self.argmax is not None:
        pl.DataFrame({"argmax": self.argmax}).write_parquet(
            path / "argmax.parquet", compression_level=10
        )
    self.quaternions.tofile(path / "quaternions.txt", sep=",")
    return None
transform_molecules(molecules, indices, detect_peak=False)

Transform the input molecules based on the landscape.

Parameters:

Name Type Description Default
molecules Molecules

Molecules object to be transformed.

required
indices integer array

Indices in the landscape to be used for transformation.

required
detect_peak bool

If True, landscape will be sub-sampled to detect the peak in higher precision. This should be false for constrained alignment, as the detected peak usually does not represent the optimal result.

False

Returns:

Type Description
Molecules

Transformed molecules.

Source code in cylindra/components/landscape.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def transform_molecules(
    self,
    molecules: Molecules,
    indices: NDArray[np.int32],
    detect_peak: bool = False,
) -> Molecules:
    """
    Transform the input molecules based on the landscape.

    Parameters
    ----------
    molecules : Molecules
        Molecules object to be transformed.
    indices : integer array
        Indices in the landscape to be used for transformation.
    detect_peak : bool, default False
        If True, landscape will be sub-sampled to detect the peak in higher
        precision. This should be false for constrained alignment, as the detected
        peak usually does not represent the optimal result.

    Returns
    -------
    Molecules
        Transformed molecules.
    """
    offset = self.offset
    indices_sub = indices.astype(np.float32)
    nmole = self.energies.shape[0]
    opt_energy = np.zeros(nmole, dtype=np.float32)
    nrepeat = 3 if detect_peak else 0
    for i in range(nmole):
        eng = self.energies[i]
        indices_sub[i], opt_energy[i] = find_peak(eng, indices[i], nrepeat=nrepeat)
    opt_score = -opt_energy
    shifts = ((indices_sub - offset) * self.scale_factor).astype(np.float32)
    molecules_opt = molecules.translate_internal(shifts)
    shift_feat = _as_n_series("align-d{}", shifts)
    if (nrots := self.quaternions.shape[0]) > 1:
        quats = np.stack(
            [
                self.quaternions[self.argmax[(i, *ind)] % nrots]
                for i, ind in enumerate(indices)
            ],
            axis=0,
        )
        rotator = Rotation.from_quat(quats).inv()
        molecules_opt = molecules_opt.rotate_by(rotator)
        rotvec = rotator.as_rotvec().astype(np.float32)
        rotvec_feat = _as_n_series("align-d{}rot", rotvec)
        molecules_opt = molecules_opt.with_features(*rotvec_feat)

    return molecules_opt.with_features(
        *shift_feat, pl.Series(Mole.score, opt_score)
    )

SurfaceData

Tuple for storing isosurface data for landscapes.

Source code in cylindra/components/landscape.py
715
716
717
718
719
720
class SurfaceData(NamedTuple):
    """Tuple for storing isosurface data for landscapes."""

    vertices: NDArray[np.float32]
    faces: NDArray[np.int32]
    values: NDArray[np.float32]

ViterbiResult dataclass

Dataclass for storing the Viterbi alignment results.

Parameters:

Name Type Description Default
indices ndarray

The optimized indices of the molecules.

required
score float

The score of the optimal alignment.

required
Source code in cylindra/components/landscape.py
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
@dataclass
class ViterbiResult:
    """
    Dataclass for storing the Viterbi alignment results.

    Parameters
    ----------
    indices : np.ndarray
        The optimized indices of the molecules.
    score : float
        The score of the optimal alignment.
    """

    indices: NDArray[np.int32]
    score: float

delayed_isosurface(arr, level, spacing, step_size=1)

Create an isosurface from a 3D array using marching cubes algorithm.

Source code in cylindra/components/landscape.py
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
@delayed
def delayed_isosurface(
    arr: NDArray[np.float32],
    level: float,
    spacing: tuple[float, float, float],
    step_size: int = 1,
) -> SurfaceData:
    """Create an isosurface from a 3D array using marching cubes algorithm."""
    arr_pad = np.pad(arr, step_size, mode="constant", constant_values=arr.min())
    try:
        verts, faces, _, vals = marching_cubes(
            arr_pad,
            level,
            spacing=spacing,
            gradient_direction="descent",
            step_size=step_size,
        )
        verts -= np.array(spacing)[np.newaxis] * step_size
    except (RuntimeError, ValueError):
        verts = np.zeros((0, 3), dtype=np.float32)
        faces = np.zeros((0, 3), dtype=np.int32)
        vals = np.zeros((0,), dtype=np.float32)
    return SurfaceData(verts, faces, vals)

CorrelationSeamSearcher

Seam searcher based on correlation with the template.

Source code in cylindra/components/seam_search.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class CorrelationSeamSearcher(SeamSearcher):
    """Seam searcher based on correlation with the template."""

    def search(
        self,
        loader: SubtomogramLoader,
        template: ip.ImgArray,
        anti_template: ip.ImgArray | None = None,
        mask: NDArray[np.float32] | None = None,
        cutoff: float = 0.5,
    ) -> CorrelationSeamSearchResult:
        corrs = list[float]()

        if mask is None:
            mask = 1

        masked_template = (template * mask).lowpass_filter(cutoff, dims="zyx")
        has_anti_template = anti_template is not None
        loader = loader.replace(output_shape=template.shape)
        averaged_images = self.calc_averages(loader)
        if has_anti_template:
            if anti_template.shape != template.shape:
                raise ValueError(
                    f"The shape of anti-template ({anti_template.shape}) must be the "
                    f"same as the shape of template ({template.shape})."
                )
            masked_anti_template = (anti_template * mask).lowpass_filter(
                cutoff, dims="zyx"
            )
            anti_corrs = np.empty(averaged_images.shape[0], dtype=np.float32)
        else:
            masked_anti_template = None
            anti_corrs = None

        corrs = np.empty(averaged_images.shape[0], dtype=np.float32)
        for _i, avg in enumerate(averaged_images):
            avg: ip.ImgArray
            masked_avg = (avg * mask).lowpass_filter(cutoff=cutoff, dims="zyx")
            corrs[_i] = ip.zncc(masked_avg, masked_template)
            if has_anti_template:
                anti_corrs[_i] = ip.zncc(masked_avg, masked_anti_template)

        if has_anti_template:
            score = corrs - anti_corrs
        else:
            corr1, corr2 = corrs[: self.npf], corrs[self.npf :]
            score = np.empty_like(corrs, dtype=np.float32)
            score[: self.npf] = corr1 - corr2
            score[self.npf :] = corr2 - corr1

        return CorrelationSeamSearchResult(score, averaged_images, corrs, anti_corrs)

SeamSearcher

Source code in cylindra/components/seam_search.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class SeamSearcher(ABC):
    def __init__(self, npf: int):
        self._npf = npf

    @property
    def npf(self) -> int:
        return self._npf

    @abstractmethod
    def search(self, *args, **kwargs) -> SeamSearchResult:
        """Search for the seam position."""

    def label_with_seam(self, size: int) -> NDArray[np.bool_]:
        labels = list[NDArray[np.bool_]]()  # list of boolean arrays
        _id = np.arange(size)
        assert _id.size % self.npf == 0
        for pf in range(2 * self.npf):
            res = (_id - pf) // self.npf
            sl = res % 2 == 0
            labels.append(sl)
        return np.stack(labels, axis=0)

    def calc_averages(self, loader: SubtomogramLoader) -> ip.ImgArray:
        # prepare all the labels in advance
        labels = self.label_with_seam(loader.molecules.count())

        # here, dask_array is (N, Z, Y, X) array where dask_array[i] is i-th subtomogram.
        dask_array = loader.construct_dask()
        averaged_images = da.compute(
            [da.mean(dask_array[sl], axis=0) for sl in labels]
        )[0]
        return ip.asarray(np.stack(averaged_images, axis=0), axes="pzyx").set_scale(
            zyx=loader.scale
        )
search(*args, **kwargs) abstractmethod

Search for the seam position.

Source code in cylindra/components/seam_search.py
24
25
26
@abstractmethod
def search(self, *args, **kwargs) -> SeamSearchResult:
    """Search for the seam position."""