Source code for neuralib.suite2p.core

from __future__ import annotations

import datetime
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal, Required, Self, TypedDict, cast, final

import numpy as np
import polars as pl
from neuralib.typing import PathLike
from neuralib.util.verbose import fprint

try:
    from neuralib.registration import CellularCoordinates, get_cellular_coordinate
except ModuleNotFoundError as e:
    fprint('registration module not found, please install neuralib-imaging', vtype='warning')
    raise e


__all__ = [
    'SIGNAL_TYPE',
    'CALCIUM_TYPE',
    'read_suite2p',
    #
    'Suite2pGUIOptions',
    'Suite2pRoiStat',
    'Suite2PResult',
    #
    'get_s2p_coords'
]

SIGNAL_TYPE = Literal['df_f', 'spks']
"""imaging fluorescence signal type"""

CALCIUM_TYPE = Literal['baseline', 'transient']


[docs] def read_suite2p(directory: PathLike, *, cell_prob_thres: float | None = 0.5, red_cell_threshold: float = 0.65, channel: int = 0, runtime_check_frame_rate: float | None = 30.0) -> Suite2PResult: r""" Load suite2p result from directory :param directory: Directory contain all the s2p output files. e.g., \*/suite2p/plane[P] :param cell_prob_thres: Cell probability. If float type, mask for the value in ``iscell[:, 1]``. If None, use the binary criteria in GUI output :param red_cell_threshold: Red cell threshold :param channel: channel (PMT) Number for the functional channel. i.e., 0 if GCaMP, 1 if jRGECO in scanbox setting :param runtime_check_frame_rate: if not None, check frame rate lower-bound to make sure the s2p runconfig :return: :class:`Suite2PResult` """ return Suite2PResult.load(directory, cell_prob_thres, red_cell_threshold, channel, runtime_check_frame_rate)
[docs] @final @dataclass(frozen=True) class Suite2PResult: """suite2p result container `Dimension parameters`: N: number of neurons F: number pf frames W: image width H: image height """ directory: Path """Directory contain all the s2p output files""" f_raw: np.ndarray """Fluorescence traces 2D array. `Array[float, [N, F]]`""" f_neu: np.ndarray """Neuropil fluorescence traces 2D array. `Array[float, [N, F]]`""" spks: np.ndarray """Deconvolved activity 2D array. `Array[float, [N, F]]`""" stat: np.ndarray """GUI imaging after registration, i.e., x, ypixel., etc. `Array[Suite2pRoiStat, N]`""" ops: Suite2pGUIOptions """GUI options""" iscell: np.ndarray """Cell probability for each ROI. `Array[float, [N, 2]]`""" cell_prob_thres: float | None """Cell probability threshold for loading the data""" redcell: np.ndarray | None = field(default=None) """Red cell probability 2D array. `Array[float, [N, 2]]`""" redcell_threshold: float | None = field(default=None) """Red cell probability threshold""" runtime_frate_check: float | None = field(default=None) """If not None, check frame rate lower bound""" def __post_init__(self): if self.runtime_frate_check is not None: self._check_frame_rate() def _check_frame_rate(self): """User specific check suite2p config is correct""" runtime_frate_check = self.runtime_frate_check if runtime_frate_check is not None and self.fs * self.n_plane > runtime_frate_check: fprint(f'the fr: {self.fs} and n_etl: {self.n_plane} might not set properly in suite2p,' f'check output ops.json', vtype='error') raise RuntimeError('fs and n_plane are not set properly in suite2p')
[docs] @classmethod def load(cls, directory: PathLike, cell_prob_thres: float | None = 0.5, red_cell_threshold: float = 0.65, channel: int = 0, runtime_check_frame_rate: float | None = 30.0) -> Self: r""" Load suite2p result from directory :param directory: Directory contain all the s2p output files. e.g., \*/suite2p/plane[P] :param cell_prob_thres: Cell probability. If float type, mask for the value in ``iscell[:, 1]``. If None, use the binary criteria in GUI output :param red_cell_threshold: Red cell threshold :param channel: channel (PMT) Number for the functional channel. i.e., 0 if GCaMP, 1 if jRGECO in scanbox setting :param runtime_check_frame_rate: if not None, check frame rate lower-bound to make sure the s2p runconfig :return: :class:`Suite2PResult` """ if not isinstance(directory, Path): directory = Path(directory) redcell_file = directory / 'redcell.npy' redcell = np.load(redcell_file, allow_pickle=True) if redcell_file.exists() else None if channel == 0: F = np.load(directory / 'F.npy', allow_pickle=True) FNeu = np.load(directory / 'Fneu.npy', allow_pickle=True) spks = np.load(directory / 'spks.npy') stat = np.load(directory / 'stat.npy', allow_pickle=True) ops = np.load(directory / 'ops.npy', allow_pickle=True).tolist() iscell = np.load(directory / 'iscell.npy', allow_pickle=True) elif channel == 1: # second channel F = np.load(directory / 'F_chan2.npy', allow_pickle=True) FNeu = np.load(directory / 'Fneu_chan2.npy', allow_pickle=True) spks = np.load(directory / 'spks.npy') stat = np.load(directory / 'stat.npy', allow_pickle=True) ops = np.load(directory / 'ops.npy', allow_pickle=True).tolist() iscell = np.load(directory / 'iscell.npy', allow_pickle=True) else: raise IndexError(f'{channel} unknown') # if cell_prob_thres is None: x = iscell[:, 0] == 1 elif isinstance(cell_prob_thres, float): x = iscell[:, 1] >= cell_prob_thres else: raise TypeError(f'invalid type: {type(cell_prob_thres)}') F = F[x] FNeu = FNeu[x] spks = spks[x] stat = stat[x] iscell = iscell[x] if redcell is not None: redcell = redcell[x] return cls( directory, F, FNeu, spks, stat, ops, iscell, cell_prob_thres, redcell, red_cell_threshold if redcell is not None else None, runtime_check_frame_rate )
@property def has_chan2(self) -> bool: """if has a second channel""" if self.ops['nchannels'] == 2: return True return False @property def n_neurons(self) -> int: """number of neurons after load. could be less than GUI ROI number if use higher cell_prob in :meth:`~neuralib.imaging.suite2p.core.Suite2PResult.load()`""" return self.f_raw.shape[0] @property def n_frame(self) -> int: """number of frame number""" return self.f_raw.shape[1] @property def cell_prob(self) -> np.ndarray: """probability that the ROI is a cell based on the default classifier. `Array[float, N]`""" return self.iscell[:, 1] @property def n_red_neuron(self) -> int: """number of identified neuron based on red cell threshold""" if not self.has_chan2 or self.red_cell_prob is None or self.redcell_threshold is None: raise RuntimeError('no channel 2') return int(np.count_nonzero(self.red_cell_prob >= self.redcell_threshold)) @property def red_cell_prob(self) -> np.ndarray | None: """red cell probability, `Array[float, N]`""" if self.redcell is None: return None return self.redcell[:, 1] @property def signal_baseline(self) -> float: """Gaussian filter width in seconds""" return self.ops['sig_baseline'] @property def window_baseline(self) -> float: """window for max/min filter in seconds""" return self.ops['win_baseline'] @property def fs(self) -> float: """suite2p approximate frame rate per plane, exact value should be checked in .sbx or .mat""" return self.ops['fs'] @property def neucoeff(self) -> float: """neuropil coefficient, normally should be ~0.7""" return self.ops['neucoeff'] @property def prctile_baseline(self) -> float: """percentile of trace to use as baseline if ops['baseline'] = constant_percentile""" return self.ops['prctile_baseline'] @property def n_plane(self) -> int: """number of optical plane""" return self.ops['nplanes'] @property def image_width(self) -> int: """image width (in pixel)""" return self.ops['Lx'] @property def image_height(self) -> int: """image height (in pixel)""" return self.ops['Ly'] @property def image_mean(self) -> np.ndarray: """mean image for chan0(1st). `Array[float, [H, W]]`""" return self.ops['meanImg'].T @property def image_mean_ch2(self) -> np.ndarray: """mean image for chan1(2nd). `Array[float, [H, W]]`""" return self.ops['meanImg_chan2'].T @property def indicator_tau(self) -> float: """The timescale of the sensor (in seconds)""" return self.ops['tau'] @property def rigid_x_offsets(self) -> np.ndarray: """x-shifts of recording at each timepoint. `Array[int, F]`""" return self.ops['xoff'] @property def rigid_y_offsets(self) -> np.ndarray: """y-shifts of recording at each timepoint. `Array[int, F]`""" return self.ops['yoff'] @property def rigid_xy_offset(self) -> np.ndarray: """peak of phase correlation between frame and reference image at each timepoint. `Array[float, F]`""" return self.ops['corrXY'] @property def nonrigid_x_offsets(self) -> np.ndarray: """(frames, block_size). `Array[float, F]`""" return self.ops['xoff1'] @property def nonrigid_y_offsets(self) -> np.ndarray: """`Array[float, F]`""" return self.ops['yoff1'] @property def nonrigid_xy_offsets(self) -> np.ndarray: """`Array[float, F]`""" return self.ops['corrXY1']
[docs] @classmethod def load_total_neuron_number(cls, directory: Path, cell_prob: float | None = 0.5) -> int: """ Load number of neuron based on iscell.npy :param directory: directory contains the iscell.npy :param cell_prob: cell probability, bool type: use the binary criteria in GUI output float type: value in ``iscell[:, 1]`` :return: Number of neurons """ iscell = np.load(directory / 'iscell.npy', allow_pickle=True) if cell_prob is None: return int(np.count_nonzero(iscell[:, 0] == 1)) else: return int(np.count_nonzero(iscell[:, 1] >= cell_prob))
[docs] def get_rois_pixels(self) -> np.ndarray: """ROIs pixel (N, 2)""" ret = np.zeros((self.n_neurons, 2)) for i in range(self.n_neurons): x = np.mean(self.stat[i]['xpix']) y = np.mean(self.stat[i]['ypix']) ret[i] = np.array([x, y]) return ret
[docs] def get_neuron_id_mapping(self) -> pl.DataFrame: """ Retrieves a mapping between neuron IDs and their corresponding raw indices based on whether the cell detection probabilities meet a specified threshold. If no cell detection probabilities are provided, the mapping assumes all indices are valid neurons. :return: A Polars DataFrame containing two columns: `neuron_id` and `raw_index`. """ n = np.arange(len(self.f_raw)) if self.cell_prob_thres is None: return pl.DataFrame([n, n], schema=['neuron_id', 'raw_index'], orient='col') else: iscell = np.load(self.directory / 'iscell.npy', allow_pickle=True) mx = np.nonzero(iscell[:, 1] >= self.cell_prob_thres)[0] return pl.DataFrame([n, mx], schema=['neuron_id', 'raw_index'], orient='col')
[docs] class Suite2pGUIOptions(TypedDict, total=False): """ Suite2p GUI setting. .. seealso:: `<https://suite2p.readthedocs.io/en/latest/settings.html>`_""" look_one_level_down: float fast_disk: str delete_bin: bool mesoscan: bool bruker: bool h5py: list h5py_key: str save_path0: str save_folder: str subfolders: list move_bin: bool nplanes: Required[int] nchannels: Required[int] functional_chan: int tau: Required[float] fs: Required[float] force_sktiff: bool frames_include: int multiplane_parallel: float preclassify: float save_mat: bool save_NWB: float combined: float aspect: float do_bidiphase: bool bidiphase: float bidi_corrected: bool do_registration: int two_step_registration: float keep_movie_raw: bool nimg_init: int batch_size: int maxregshift: float align_by_chan: int reg_tif: bool reg_tif_chan2: bool subpixel: int smooth_sigma_time: float smooth_sigma: float th_badframes: float norm_frames: bool force_refImg: bool pad_fft: bool nonrigid: bool block_size: tuple[int, int] snr_thresh: float maxregshiftNR: float oneP_reg: bool # 1Preg spatial_hp: int spatial_hp_reg: float spatial_hp_detect: int pre_smooth: float spatial_taper: float roidetect: bool spikedetect: bool anatomical_only: float sparse_mode: bool diameter: float spatial_scale: float connected: bool nbinned: int max_iterations: int threshold_scaling: float max_overlap: float high_pass: float denoise: bool soma_crop: bool neuropil_extract: bool inner_neuropil_radius: float min_neuropil_pixels: int lam_percentile: float allow_overlap: bool use_builtin_classifier: bool classifier_path: int chan2_thres: float baseline: str win_baseline: Required[float] sig_baseline: Required[float] prctile_baseline: Required[float] neucoeff: Required[int] suite2p_version: str data_path: list[str] sbx_ndeadcols: int input_format: str save_path: str ops_path: str reg_file: str filelist: list[str] nframes_per_folder: np.ndarray sbx_ndeadrows: int meanImg: Required[np.ndarray] meanImg_chan2: Required[np.ndarray] # if chan_2 nframes: int Ly: Required[int] Lx: Required[int] date_proc: datetime.datetime refImg: np.ndarray rmin: int rmax: int yblock: list[np.ndarray] xblock: list[np.ndarray] nblocks: list[int] NRsm: np.ndarray yoff: Required[np.ndarray] xoff: Required[np.ndarray] corrXY: Required[np.ndarray] yoff1: Required[np.ndarray] xoff1: Required[np.ndarray] corrXY1: Required[np.ndarray] badframes: np.ndarray yrange: list[int] xrange: list[int] tPC: np.ndarray regPC: np.ndarray regDX: np.ndarray Lyc: int Lxc: int max_proj: np.ndarray Vmax: np.ndarray ihop: np.ndarray Vsplit: np.ndarray Vcorr: np.ndarray Vmap: list[np.ndarray] spatscale_pix: np.ndarray meanImgE: np.ndarray timing: dict[str, float]
[docs] class Suite2pRoiStat(TypedDict, total=False): """Suite2p GUI imaging. .. seealso:: `<https://suite2p.readthedocs.io/en/latest/outputs.html#stat-npy-fields>`_ """ ypix: Required[np.ndarray] xpix: Required[np.ndarray] lam: np.ndarray med: tuple[int, int] footprint: float mrs: float mrs0: float compact: float solidity: float npix: int npix_soma: int soma_crop: np.ndarray overlap: Required[np.ndarray] radius: float aspect_ratio: float npix_norm_no_crop: float npix_norm: float skew: float std: float
[docs] def get_s2p_coords(s2p: Suite2PResult, neuron_list: int | list[int] | slice | np.ndarray | None, plane_index: int, factor: float) -> CellularCoordinates: """ Get the suite2p coordinates of all cells. :param s2p: ``Suite2PResult`` :param neuron_list: neuron index or index list/arr. If None, then load all neurons :param plane_index: optic plane index :param factor: pixel to um factor :return: :class:`~neuralib.imaging.cellular_cords.CellularCoordinates` """ if neuron_list is None: neuron_list = np.arange(s2p.n_neurons) if isinstance(neuron_list, int): neuron_indices = np.array([neuron_list]) src_plane_index = np.array([plane_index]) elif isinstance(neuron_list, slice): neuron_indices = np.arange(s2p.n_neurons)[neuron_list] src_plane_index = np.full(len(neuron_indices), plane_index) else: neuron_indices = np.asarray(neuron_list) src_plane_index = np.full(len(neuron_indices), plane_index) n_neurons = len(neuron_indices) xpix = np.zeros(n_neurons) ypix = np.zeros(n_neurons) for i, n in enumerate(neuron_indices): stat = cast(Suite2pRoiStat, s2p.stat[int(n)]) xpix[i] = np.mean(stat['xpix']) ypix[i] = np.mean(stat['ypix']) xcord = xpix * factor / 1000 # ap ycord = ypix * factor / 1000 # ml return get_cellular_coordinate( neuron_indices, xcord, ycord, unit='mm', plane_index=src_plane_index )