Source code for neuralib.suite2p.signals

from typing import Literal, NamedTuple

import numpy as np
from scipy.ndimage import gaussian_filter, gaussian_filter1d, maximum_filter1d, minimum_filter1d

from .core import SIGNAL_TYPE, Suite2PResult

__all__ = [
    'get_neuron_signal',
    'normalize_signal',
    'normalize_signal_factor',
    #
    'DFFSignal',
    'dff_signal',
    'calc_signal_baseline',
    #
    'sync_s2p_rigevent',
]

BASELINE_METHOD = Literal['maximin', 'constant', 'constant_prctile']


[docs] def get_neuron_signal(s2p: Suite2PResult, n: int | np.ndarray | list[int] | None = None, *, signal_type: SIGNAL_TYPE = 'df_f', normalize: bool = True, dff: bool = True, correct_neuropil: bool = True, method: BASELINE_METHOD = 'maximin') -> tuple[np.ndarray, np.ndarray]: """ Select neuronal signals for analysis. For single cell (F,) OR multiple cells (N, F) :param s2p: suite 2p result :param n: neuron index (`int`) or index arraylike (`Array[int, N]`). If None, then use all neurons :param signal_type: signal type. :data:`~neuralib.imaging.suite2p.core.SIGNAL_TYPE` {'df_f', 'spks'} :param normalize: 01 normalization for each neuron :param dff: normalize to the baseline fluorescence changed (dF/F) :param correct_neuropil: do the neuropil correction :param method: baseline calculation method {'maximin', 'constant', 'constant_prctile'} :return: tuple with (signal, baseline signal). `Array[float, F|[N,F]]` """ if n is None: n = list(range(s2p.n_neurons)) f = s2p.f_raw[n] fneu = s2p.f_neu[n] if signal_type == 'df_f': if dff: s1 = dff_signal(f, fneu, s2p, correct_neuropil, method).dff s2 = np.full_like(s1, 0, dtype=int) if normalize: o, f = normalize_signal_factor(s1) s1 = (s1 - o) / f else: fcorr = f - 0.7 * fneu sig_baseline = s2p.signal_baseline window = int(s2p.window_baseline * s2p.fs) s1 = fcorr # baseline not a constant value, due to calcium decay or motion drift s2 = _maximin_filter(fcorr, sig_baseline, window) if normalize: o, f = normalize_signal_factor(s1) s1 = (s1 - o) / f ob, fb = normalize_signal_factor(s2) s2 = (s2 - ob) / fb elif signal_type == 'spks': s1 = s2p.spks[n] s2 = np.full_like(s1, 0, dtype=int) if normalize: s1 = normalize_signal(s1) else: raise ValueError('specify signal type either in df_f or spks') return s1, s2
[docs] def normalize_signal(s: np.ndarray, axis: int = -1) -> np.ndarray: """ do 0 1 normalization :param s: signal :param axis: axis :return: normalized signal """ o, f = normalize_signal_factor(s, axis) return (s - o) / f
[docs] def normalize_signal_factor(s: np.ndarray, axis: int = -1) -> tuple[np.ndarray, np.ndarray]: return np.min(s, axis=axis, keepdims=True), \ np.max(s, axis=axis, keepdims=True) - np.min(s, axis=axis, keepdims=True)
# ============ # # DF/F Compute # # ============ #
[docs] class DFFSignal(NamedTuple): """Container for dF/F signal processing. `Dimension parameters`: N = number of neurons F = number of frame For single cell (F,) OR multiple cells (N, F) """ s2p: Suite2PResult """Suite2PResult""" f: np.ndarray """Fluorescence intensity. `Array[float, F | [N,F]]`""" fneu: np.ndarray """Neuropil intensity. `Array[float, F | [N,F]]`""" fcorr: np.ndarray """Fluorescence corrected by fneu that used for dff calculation. `Array[float, F | [N,F]]`""" f0: np.ndarray """Background Fluorescence. `Array[float, F | [N,F]]`""" dff: np.ndarray """dff after f0 normalization. `Array[float, F | [N,F]]`""" @property def dff_baseline(self) -> np.ndarray: """baseline of dff, supposed to be 0""" return np.full_like(self.dff, 0) @property def baseline_fluctuation(self) -> np.ndarray: """get the fluctuation of the fneu signal. `Array[float, F | [N,F]]` Perhaps not fully corrected with physiological reason. **used to get the baseline std (i.e., trial reliability metric)** """ fneu_corr = 0.3 * self.fneu fneu_bas = calc_signal_baseline(fneu_corr, self.s2p, method='maximin') return 100 * ((fneu_corr - fneu_bas) / fneu_bas)
[docs] def dff_signal(f: np.ndarray, fneu: np.ndarray, s2p: Suite2PResult, correct_neuropil: bool = True, method: BASELINE_METHOD = 'maximin') -> DFFSignal: """ df_f signal normalization container :param f: Fluorescence intensity. `Array[float, F | [N,F]]` :param fneu: Neuropil intensity. `Array[float, F | [N,F]]` :param s2p: ``Suite2PResult`` :param correct_neuropil: whether do the subtraction using neuropil :param method: {'maximin', 'constant', 'constant_prctile'} :return: DFFSignal """ fcorr = f - 0.7 * fneu if correct_neuropil else f f0 = calc_signal_baseline(fcorr, s2p, method) dff = 100 * ((fcorr - f0) / f0) return DFFSignal(s2p, f, fneu, fcorr, f0, dff)
[docs] def calc_signal_baseline(signal: np.ndarray, s2p: Suite2PResult, method: BASELINE_METHOD = 'maximin') -> np.ndarray: """ f0 calculation .. seealso:: source code from suite2p: ``suite2p.extraction.dcnv.preprocess`` :param signal: signal activity. i.e., fcorr :param s2p: :class:`~neuralib.imaging.suite2p.core.Suite2PResult` :param method: BASELINE_METHOD :return: f0 baseline for the df/f calculation """ if method == 'maximin': sig_baseline = s2p.signal_baseline window = int(s2p.window_baseline * s2p.fs) f0 = _maximin_filter(signal, sig_baseline, window) elif method == 'constant': signal = gaussian_filter(signal, [0., s2p.signal_baseline]) f0 = np.min(signal, axis=-1) elif method == 'constant_prctile': sig_prc = np.percentile(signal, s2p.prctile_baseline, axis=-1) f0 = np.expand_dims(sig_prc, axis=-1) else: raise TypeError(f'unknown method{method}') return f0
def _maximin_filter(signal: np.ndarray, kernel: float, size: int) -> np.ndarray: """ max/min method to calculate the signal baseline. **Note that this method is sensitive to the kernel size :param signal: :param kernel: standard deviation for Gaussian kernel :param size: length along which to calculate 1D minimum :return: """ signal = gaussian_filter1d(signal, kernel, axis=-1) signal = minimum_filter1d(signal, size, axis=-1) baseline = maximum_filter1d(signal, size, axis=-1) return baseline # ================= # # Sync Pulse Number # # ================= #
[docs] def sync_s2p_rigevent(image_time: np.ndarray, s2p: Suite2PResult, plane: int | None = 0) -> np.ndarray: r""" Do the alignment to make the shape the same in hardware pulses `(P,)` & suite2p output files `(F,)` \* ASSUME both recordings are sync \* ASSUME sequential scanning pattern. i.e., 0, 1, 2, 3 ETL scanning order :param image_time: imaging time hardware pulses. `Array[float, P]` :param s2p: :class:`~neuralib.imaging.suite2p.core.Suite2PResult` :param plane: number of optical plane :return: image_time: with same len as s2p results `Array[float, F]` """ n_plane = s2p.n_plane image_fr = s2p.fs if plane is not None: image_time = image_time[plane::n_plane] else: # TODO used npy in combined file raise NotImplementedError('') n_image = len(image_time) n_frame = s2p.n_frame n_diff = abs(n_image - n_frame) if n_diff >= n_frame / 100: raise RuntimeError('too large difference between riglog imaging header and calcium data') if n_image > n_frame: # drop latest, mostly this case image_time = image_time[:n_frame] elif n_image < n_frame: # padding at the tail image_time = np.hstack((image_time, [image_time[-1] + (i + 1) / image_fr for i in range(n_diff)])) return image_time