from __future__ import annotations
import pickle
from typing import Self, TypedDict, cast, overload
import h5py
import numpy as np
import polars as pl
from neuralib.typing import PathLike
from neuralib.util.dataframe import DataFrameWrapper
from neuralib.util.utils import uglob
__all__ = [
'read_facemap',
'KeyPoint',
'FaceMapResult',
'KeyPointDataFrame',
'SVDVariables',
'KeyPointsMeta',
'PupilDict',
'RoiDict'
]
[docs]
def read_facemap(directory: PathLike) -> FaceMapResult:
"""loading facemap result
:param directory: facemap output directory
"""
return FaceMapResult.from_directory(directory)
KeyPoint = str
"""keypoint name"""
[docs]
class FaceMapResult:
"""Facemap result container"""
[docs]
def __init__(self, svd: SVDVariables | None,
meta: KeyPointsMeta | None,
data: h5py.Group | None,
with_keypoints: bool):
"""
:param svd: attr:`~neuralib.tracking.facemap.core.SVDVariables`
:param meta: attr:`~neuralib.tracking.facemap.core.KeyPointsMeta`
:param data: facemap data result
:param with_keypoints: whether it has keypoint tracking
"""
self.svd = svd
self.meta = meta
self.data = data
self._with_keypoints = with_keypoints
[docs]
@classmethod
def from_directory(cls, directory: PathLike) -> Self:
"""
init class loading from a directory
:param directory: Facemap output directory
"""
# svd
try:
svd_path = uglob(directory, '*.npy')
except FileNotFoundError:
svd = None
else:
svd = np.load(svd_path, allow_pickle=True).item()
# meta
try:
meta_path = uglob(directory, '*.pkl')
except FileNotFoundError:
meta = None
data = None
keypoints = False
else:
with open(meta_path, 'rb') as f:
meta = pickle.load(f)
data_path = uglob(directory, '*.h5')
data = cast(h5py.Group, h5py.File(data_path)['Facemap'])
keypoints = True
return cls(svd, meta, data, keypoints)
@property
def with_keypoint(self) -> bool:
return self._with_keypoints
# ============== #
# Pupil Tracking #
# ============== #
[docs]
def get_pupil(self) -> PupilDict:
"""pupil tracking result
:raises RuntimeError: If no pupil data is available.
"""
if self.svd is None:
raise RuntimeError('no SVD data found')
pupil = self.svd.get('pupil')
if pupil is None:
raise RuntimeError('no pupil data found')
return pupil[0]
[docs]
def get_pupil_area(self) -> np.ndarray:
"""pupil area. `Array[float, F]`"""
return self.get_pupil()['area_smooth']
[docs]
def get_pupil_center_of_mass(self) -> np.ndarray:
"""center of mass of pupil tracking. `Array[float, [F, 2]]`"""
return self.get_pupil()['com_smooth']
[docs]
def get_pupil_location_movement(self) -> np.ndarray:
"""Calculate the Euclidean distance from the origin for each point in a 2D array. `Array[float, F]`"""
com = self.get_pupil_center_of_mass()
return np.sqrt(np.sum(com ** 2, axis=1))
[docs]
def get_blink(self) -> np.ndarray:
"""eye blinking array. `Array[float, F]`
:raises RuntimeError: If no blink data is available.
"""
if self.svd is None:
raise RuntimeError('no SVD data found')
ret = self.svd.get('blink')
if ret is None:
raise RuntimeError('no blink data found')
return ret[0]
# ========= #
# Keypoints #
# ========= #
@property
def keypoints(self) -> list[KeyPoint]:
"""list of all keypoint name"""
if self.data is None:
raise RuntimeError('no keypoint data found')
return list(self.data.keys())
[docs]
def get(self, *keypoint) -> KeyPointDataFrame:
"""get keypoint(s) dataframe"""
if len(keypoint) == 1:
return self._get(keypoint[0])
else:
ret = [self._get(k).dataframe() for k in keypoint]
return KeyPointDataFrame(pl.concat(ret))
def _get(self, keypoint: KeyPoint) -> KeyPointDataFrame:
if self.data is None:
raise RuntimeError('no keypoint data found')
group = cast(h5py.Group, self.data[keypoint])
x = np.array(group['x'])
y = np.array(group['y'])
llh = np.array(group['likelihood'])
df = pl.DataFrame({'x': x, 'y': y, 'likelihood': llh}).with_columns(pl.lit(keypoint).alias('keypoint'))
return KeyPointDataFrame(df)
[docs]
class KeyPointDataFrame(DataFrameWrapper):
"""
Dataframe with ``x``, ``y``, ``likelihood`` and ``keypoint`` columns ::
┌────────────┬────────────┬────────────┬───────────┐
│ x ┆ y ┆ likelihood ┆ keypoint │
│ --- ┆ --- ┆ --- ┆ --- │
│ f32 ┆ f32 ┆ f32 ┆ str │
╞════════════╪════════════╪════════════╪═══════════╡
│ 374.102081 ┆ 199.159668 ┆ 0.777443 ┆ eye(back) │
│ 373.785919 ┆ 199.425873 ┆ 0.787424 ┆ eye(back) │
│ 374.075867 ┆ 199.507111 ┆ 0.779713 ┆ eye(back) │
│ 374.028473 ┆ 199.359955 ┆ 0.761724 ┆ eye(back) │
│ 374.222382 ┆ 199.777466 ┆ 0.770329 ┆ eye(back) │
│ … ┆ … ┆ … ┆ … │
│ 317.318756 ┆ 285.396912 ┆ 0.596486 ┆ mouth │
│ 318.163696 ┆ 285.492676 ┆ 0.589684 ┆ mouth │
│ 317.758606 ┆ 285.560425 ┆ 0.603126 ┆ mouth │
│ 317.453491 ┆ 285.572235 ┆ 0.573179 ┆ mouth │
│ 317.976196 ┆ 285.477051 ┆ 0.58359 ┆ mouth │
└────────────┴────────────┴────────────┴───────────┘
"""
[docs]
def __init__(self, df: pl.DataFrame):
self._df = df
def __repr__(self):
return repr(self.dataframe())
@overload
def dataframe(self) -> pl.DataFrame:
...
@overload
def dataframe(self, dataframe: pl.DataFrame, may_inplace: bool = True) -> Self:
...
[docs]
def dataframe(self, dataframe: pl.DataFrame | None = None, may_inplace: bool = True) -> pl.DataFrame | Self:
if dataframe is None:
return self._df
else:
return type(self)(dataframe)
[docs]
def to_zscore(self) -> Self:
"""
xy to zscore
:return:
"""
return self.with_columns([
((pl.col('x') - pl.col('x').mean()) / pl.col('x').std()).alias('x'),
((pl.col('y') - pl.col('y').mean()) / pl.col('y').std()).alias('y'),
])
[docs]
def with_outlier_filter(self, filter_window: int = 15,
baseline_window: int = 50,
max_spike: int = 25,
max_diff: int = 25) -> Self:
"""
with outlier filter
:param filter_window:
:param baseline_window:
:param max_spike:
:param max_diff:
:return:
"""
from .util import filter_outliers
x, y = filter_outliers(np.array(self['x']), np.array(self['y']), filter_window, baseline_window, max_spike, max_diff)
return self.dataframe(pl.DataFrame({
'x': x,
'y': y,
'likelihood': self['likelihood'],
'keypoint': self['keypoint']
}))
[docs]
class PupilDict(TypedDict):
"""
Pupil data dict
`Dimension parameters`:
F: number pf frames
"""
area: np.ndarray
"""`Array[float, F]`"""
com: np.ndarray
"""center of maze in XY. `Array[float, [F, 2]]`"""
axdir: np.ndarray
"""`Array[float, [F, 2, 2]]`"""
axlen: np.ndarray
"""`Array[float, [F, 2]]`"""
area_smooth: np.ndarray
"""`Array[float, F]`"""
com_smooth: np.ndarray
"""`Array[float, [F, 2]]`"""
[docs]
class RoiDict(TypedDict, total=False):
"""Roi Dict"""
rind: int
rtype: str
iROI: int
ivid: int
color: tuple[float, float, float]
yrange: np.ndarray
xrange: np.ndarray
saturation: float
pupil_sigma: float
ellipse: np.ndarray
yrange_bin: np.ndarray
xrange_bin: np.ndarray
[docs]
class SVDVariables(TypedDict, total=False):
"""SVD output from facemap
.. seealso:: `<http://facemap.readthedocs.io/en/stable/outputs.html#roi-and-svd-processing>`_"""
filenames: list[str]
save_path: str
Ly: list[int]
Lx: list[int]
sbin: int
fullSVD: bool
save_mat: bool
Lybin: np.ndarray
Lxbin: np.ndarray
sybin: np.ndarray
sxbin: np.ndarray
LYbin: int
LXbin: int
avgframe: list[np.ndarray]
avgmotion: list[np.ndarray]
avgframe_reshape: np.ndarray
avgmotion_reshape: np.ndarray
motion: list[np.ndarray]
motSv: list[np.ndarray]
movSv: list[np.ndarray]
motMask: list[int]
movMask: list[int]
motMask_reshape: list[int]
movMask_reshape: list[int]
motSVD: list[np.ndarray]
movSVD: list[np.ndarray]
pupil: list[PupilDict]
running: list[np.ndarray]
blink: list[np.ndarray]
rois: list[RoiDict]
sy: np.ndarray
sx: np.ndarray