"""Typed simulator output - ``Recording`` / ``GroundTruth`` - and ``finalize()``.
Where :mod:`minisim.scene` is the mutable working state a pipeline of
steps fills in, this module is the *frozen, typed result* that state is distilled
into once the pipeline is exhausted. :func:`finalize` is the transform: it turns a
run-out ``Scene`` into a ``Recording`` carrying the observed movie, per-stage
snapshots, and the numpydantic-typed ``GroundTruth`` that tests, metrics, and the
training notebooks consume.
Two things that were deliberately deferred from earlier steps land here:
* **FOV cropping.** Under motion the tissue steps render on a canvas larger than
the sensor (see :mod:`minisim.steps.motion`); cells carry canvas-sized
footprints and canvas-frame positions. ``finalize`` crops them to the sensor
FOV at the reference (zero-shift) frame - the template motion correction aligns
back to - and drops cells whose reference footprint falls entirely in the
margin (real tissue, but background that only flickers in transiently, not a
recoverable unit).
* **Detectability.** ``detectable`` is not an optics-only property: a cell's peak
signal (``optical_brightness``) is further dimmed by the illumination/vignette
field at its position and then judged against a sensor-derived noise floor.
``finalize`` is the first point all three exist, so it is where the flag is set.
"""
from __future__ import annotations
import os
import shutil
from pathlib import Path
import numpy as np
import xarray as xr
import zarr
from numpydantic import NDArray, Shape
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr
from minisim.footprint import Footprint, FootprintStack, degrade_footprint
from minisim.scene import MOVIE_DIMS, Cell, Scene
from minisim.spec import Acquisition, Spec
# Minimum realized peak SNR (signal electrons over the sensor noise floor) for a
# cell to count as detectable. A provisional value: a future threshold
# calibration revisits it against observed metric distributions. Kept here, named,
# rather than buried as a literal so that calibration is a one-line change.
DETECT_SNR_THRESHOLD = 3.0
# On-disk layout for save()/load() (zarr group + sibling spec.json). The format
# version is stamped in the group attrs so a future layout change can be detected
# rather than silently misread.
# v2: footprints stored sparse (planted patches; observed regenerated on load).
_FORMAT_VERSION = 2
# Plain GroundTruth array fields saved as datasets, split by whether they are always
# present or optional (None when their producing step is absent). The sparse planted
# footprints (and the fov crop) are handled separately; A_observed is not stored.
_GT_REQUIRED = (
"C", "S", "centers_um", "amplitude_per_cell", "in_focus", "detectable",
)
_GT_OPTIONAL = (
"observed_sigma_px", "observed_gain",
"shifts", "illumination", "vignette", "leakage", "bleaching",
"neuropil_temporal", "neuropil_spatial", "neuropil_population",
"vasculature_mask", "vessel_overlap_fraction",
)
[docs]
class GroundTruth(BaseModel):
"""The per-recording truth: structural targets + per-cell and per-effect fields.
The **planted vs observed footprint split is load-bearing**: ``A_observed`` is
what CNMF can actually recover (tests match against it), while ``A_planted`` is
the ideal, optics-free target that quantifies the irreducible limit. Both are
exposed as dense ``(unit, height, width)`` arrays via properties, but neither is
*stored* dense:
* Footprints are stored sparse, as canvas-coordinate patches in :attr:`planted`
(a :class:`~minisim.footprint.FootprintStack`); :attr:`fov_offset` /
:attr:`fov_shape` crop them to the sensor FOV.
* The observed footprint is **not stored at all**: it is the deterministic blur
``gain · (planted ⊛ Gaussian(sigma_px))`` of the planted one, so it is
regenerated on demand from the per-unit :attr:`observed_sigma_px` /
:attr:`observed_gain` scalars. Deep cells' observed footprints are
near-full-canvas, so storing them dominated memory and disk; regenerating is
bit-identical and far cheaper to keep.
Per-effect fields are ``None`` when their step is absent from the recording.
"""
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
# structural truth (sparse) --------------------------------------------
# Canvas-coordinate planted footprints + the crop to the sensor FOV. Stored in
# canvas coords (not pre-cropped) so the regenerated observed footprint exactly
# matches blur-then-crop even for cells straddling the FOV edge.
planted: FootprintStack
fov_offset: tuple[int, int] # (top, left) crop from canvas to sensor FOV
fov_shape: tuple[int, int] # (height, width) of the sensor FOV
# Per-unit optics scalars defining the observed footprint; both None when the
# optics step did not run (then A_observed falls back to A_planted).
observed_sigma_px: NDArray[Shape["* unit"], float] | None = None
observed_gain: NDArray[Shape["* unit"], float] | None = None
C: NDArray[Shape["* unit, * frame"], float]
S: NDArray[Shape["* unit, * frame"], float]
# per-cell physical attributes -----------------------------------------
centers_um: NDArray[Shape["* unit, 3"], float]
# Per-cell brightness/expression gain (the clean input). NaN for a cell whose
# ``cell_activity`` step has not run. SNR is deliberately absent: noise is a
# downstream physical effect, so any SNR is computed later, not stored here.
amplitude_per_cell: NDArray[Shape["* unit"], float]
in_focus: NDArray[Shape["* unit"], bool]
detectable: NDArray[Shape["* unit"], bool]
# per-effect ground truth (None when that step is absent) ---------------
shifts: NDArray[Shape["* frame, 2"], float] | None = None
illumination: NDArray[Shape["* height, * width"], float] | None = None
vignette: NDArray[Shape["* height, * width"], float] | None = None
leakage: NDArray[Shape["* height, * width"], float] | None = None
bleaching: NDArray[Shape["* unit, * frame"], float] | None = None
neuropil_temporal: NDArray[Shape["* component, * frame"], float] | None = None
neuropil_spatial: NDArray[Shape["* component, * height, * width"], float] | None = None
neuropil_population: NDArray[Shape["* frame"], float] | None = None
# The static vessel transmission mask (height, width) in (0, 1] from the
# vasculature step (cropped to the FOV); None when the step is off / layer-less.
vasculature_mask: NDArray[Shape["* height, * width"], float] | None = None
# Per-cell fraction of footprint-integrated light absorbed by vessels, in [0, 1)
# (0 = clear, ->1 = a vessel sits over the whole footprint). The scoreable
# confound axis: stratify recall / footprint-correlation by vessel burden. None
# when the vasculature step is off. The footprints themselves stay vessel-free
# (A_observed is the single-cell optical truth); occlusion lives here, not in A.
vessel_overlap_fraction: NDArray[Shape["* unit"], float] | None = None
# The concrete focal depth (µm) the optics step resolved "auto" to - the plane
# that maximized recoverable yield. A scalar, so persisted as a group attr (not
# a dataset) by save/load; None when the optics step did not run.
focal_depth_um: float | None = None
# Memoizes the regenerated dense A_observed (one blur pass over all units), so
# repeated reads on the same object are free. A private attr, so it does not
# affect equality, serialization, or the frozen field set.
_observed_cache: np.ndarray | None = PrivateAttr(default=None)
@property
def n_units(self) -> int:
"""Number of ground-truth cells (units) in the recording."""
return len(self.planted)
@property
def A_planted(self) -> np.ndarray:
"""The sharp, pre-optics footprints, dense ``(unit, height, width)`` over the FOV."""
top, left = self.fov_offset
h, w = self.fov_shape
return self.planted.crop(top, left, h, w).to_dense(dtype=float)
@property
def A_observed(self) -> np.ndarray:
"""The optically degraded footprints CNMF could recover, dense ``(unit, H, W)``.
Regenerated (and memoized) from the planted footprints and the per-unit
``observed_sigma_px`` / ``observed_gain`` scalars, then cropped to the FOV -
bit-identical to what the optics step produced. Falls back to
:attr:`A_planted` when the optics step did not run.
"""
if self._observed_cache is None:
self._observed_cache = self._regenerate_observed()
return self._observed_cache
def _regenerate_observed(self) -> np.ndarray:
top, left = self.fov_offset
h, w = self.fov_shape
if self.observed_sigma_px is None:
return self.A_planted # no optics -> observed == planted
observed = []
for i, fp in enumerate(self.planted):
sigma = float(self.observed_sigma_px[i])
if np.isnan(sigma): # a cell without optics params -> sharp footprint
observed.append(fp)
else:
observed.append(degrade_footprint(fp, sigma, float(self.observed_gain[i])))
return FootprintStack(tuple(observed), self.planted.canvas_shape).crop(
top, left, h, w
).to_dense(dtype=float)
@property
def depth_um(self) -> np.ndarray:
"""Per-cell depth ``z`` (µm) - the first column of ``centers_um``.
Exposed as a derived view rather than stored, to avoid drift. Lateral
pixel coordinates are likewise ``centers_um[:, 1:] / pixel_size_um`` using
the owning ``Recording.spec.acquisition``.
"""
return self.centers_um[:, 0]
def detectable_subset(self) -> GroundTruth:
"""Subset to detectable cells - the fair denominator for recall metrics.
Slices the per-unit fields by the ``detectable`` mask (the planted stack,
the optics scalars, and ``bleaching`` are all per-unit); the per-effect
fields (shifts, vignette, neuropil, …) are not per-unit and are carried
unchanged.
"""
m = self.detectable
return GroundTruth(
planted=self.planted[m],
fov_offset=self.fov_offset,
fov_shape=self.fov_shape,
observed_sigma_px=self.observed_sigma_px[m] if self.observed_sigma_px is not None else None,
observed_gain=self.observed_gain[m] if self.observed_gain is not None else None,
C=self.C[m],
S=self.S[m],
centers_um=self.centers_um[m],
amplitude_per_cell=self.amplitude_per_cell[m],
in_focus=self.in_focus[m],
detectable=self.detectable[m],
vessel_overlap_fraction=(
self.vessel_overlap_fraction[m]
if self.vessel_overlap_fraction is not None else None
),
bleaching=self.bleaching[m] if self.bleaching is not None else None,
shifts=self.shifts,
illumination=self.illumination,
vignette=self.vignette,
leakage=self.leakage,
neuropil_temporal=self.neuropil_temporal,
neuropil_spatial=self.neuropil_spatial,
neuropil_population=self.neuropil_population,
vasculature_mask=self.vasculature_mask,
focal_depth_um=self.focal_depth_um,
)
[docs]
class Recording(BaseModel):
"""A complete simulated recording: the spec, the observed movie, and the truth.
``observed`` holds the integer-valued sensor counts in a float container (per
``Output.store_dtype``). ``snapshots`` is populated only when
``Output.save_intermediates`` is set, keyed by each step's stage ``name``;
``stage()`` reads them.
"""
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
spec: Spec
observed: NDArray[Shape["* frame, * height, * width"], float]
ground_truth: GroundTruth
snapshots: dict[str, xr.DataArray] = Field(default_factory=dict)
def stage(self, name: str) -> xr.DataArray:
"""Return the snapshot taken after the named stage."""
if name not in self.snapshots:
raise KeyError(
f"Stage '{name}' unavailable. Set Output.save_intermediates=True, "
f"or pick from {sorted(self.snapshots)}."
)
return self.snapshots[name]
def save(self, path: str | Path) -> None:
"""Persist this recording to a self-contained zarr directory at ``path``.
Layout (one portable directory; ``path`` is conventionally
``{spec.cache_key()}.zarr`` but any path works)::
{path}/
spec.json human-readable, diffable spec
(group attrs) format_version, spec_cache_key, gt_present,
snapshot_names
observed (frame, height, width) in store_dtype
ground_truth/ the GroundTruth arrays (optional ones only
when not None; listed in the gt_present attr)
planted/ sparse footprints: offsets, shapes, data
(+ canvas_shape/fov_offset/fov_shape attrs)
snapshots/ per-stage movie values, only when non-empty
Footprints are stored sparse (the ``planted`` subgroup holds the ragged
patch arrays); the observed footprints are not stored -- they regenerate
from the per-unit ``observed_sigma_px`` / ``observed_gain`` scalars.
Snapshot coordinates are not stored - they are the trivial ``arange`` grid
over ``MOVIE_DIMS`` and are rebuilt on :meth:`load`. The write is atomic: it
builds a sibling ``{path}.tmp`` and renames it into place, so a crash never
leaves a half-written directory that :meth:`load` would trust.
"""
path = Path(path)
tmp = path.with_name(path.name + ".tmp")
if tmp.exists():
shutil.rmtree(tmp)
gt = self.ground_truth
root = zarr.open_group(str(tmp), mode="w")
root.create_dataset("observed", data=np.asarray(self.observed))
gt_group = root.create_group("ground_truth")
for name in _GT_REQUIRED:
gt_group.create_dataset(name, data=np.asarray(getattr(gt, name)))
present = []
for name in _GT_OPTIONAL:
value = getattr(gt, name)
if value is not None:
gt_group.create_dataset(name, data=np.asarray(value))
present.append(name)
# Sparse planted footprints: ragged patches flattened to three datasets.
offsets, shapes, data = gt.planted.to_arrays()
planted_group = gt_group.create_group("planted")
planted_group.create_dataset("offsets", data=offsets)
planted_group.create_dataset("shapes", data=shapes)
planted_group.create_dataset("data", data=data)
planted_group.attrs["canvas_shape"] = list(gt.planted.canvas_shape)
planted_group.attrs["fov_offset"] = list(gt.fov_offset)
planted_group.attrs["fov_shape"] = list(gt.fov_shape)
snapshot_names = sorted(self.snapshots)
if snapshot_names:
snap_group = root.create_group("snapshots")
for name in snapshot_names:
snap_group.create_dataset(name, data=np.asarray(self.snapshots[name].values))
# focal_depth_um is a scalar, not an array: stash it as a group attr.
gt_group.attrs["focal_depth_um"] = gt.focal_depth_um
root.attrs["format_version"] = _FORMAT_VERSION
root.attrs["spec_cache_key"] = self.spec.cache_key()
root.attrs["gt_present"] = present
root.attrs["snapshot_names"] = snapshot_names
(tmp / "spec.json").write_text(self.spec.model_dump_json(indent=2))
if path.exists():
shutil.rmtree(path)
os.replace(tmp, path)
@classmethod
def load(cls, path: str | Path) -> Recording:
"""Load a recording written by :meth:`save`, verifying its spec hash.
Reads back ``spec.json``, re-validates it, and checks that its
:attr:`Spec.cache_key` matches the one stamped at save time - guarding
against a stale cache or a hand-edited ``spec.json``. Snapshots are
rebuilt as ``DataArray``s over ``MOVIE_DIMS`` with ``arange`` coordinates.
"""
path = Path(path)
root = zarr.open_group(str(path), mode="r")
spec = Spec.model_validate_json((path / "spec.json").read_text())
stored_key = root.attrs.get("spec_cache_key")
if stored_key != spec.cache_key():
raise ValueError(
f"Spec hash mismatch loading {path}: stored {stored_key!r} != "
f"recomputed {spec.cache_key()!r}. The cache is stale or spec.json "
f"was edited; delete it and re-simulate."
)
gt_group = root["ground_truth"]
fields = {name: np.asarray(gt_group[name]) for name in _GT_REQUIRED}
for name in root.attrs.get("gt_present", []):
fields[name] = np.asarray(gt_group[name])
focal = gt_group.attrs.get("focal_depth_um")
if focal is not None:
fields["focal_depth_um"] = float(focal)
# Rebuild the sparse planted footprints and the FOV crop.
planted_group = gt_group["planted"]
canvas_shape = tuple(int(v) for v in planted_group.attrs["canvas_shape"])
fields["planted"] = FootprintStack.from_arrays(
np.asarray(planted_group["offsets"]),
np.asarray(planted_group["shapes"]),
np.asarray(planted_group["data"]),
canvas_shape,
)
fields["fov_offset"] = tuple(int(v) for v in planted_group.attrs["fov_offset"])
fields["fov_shape"] = tuple(int(v) for v in planted_group.attrs["fov_shape"])
ground_truth = GroundTruth(**fields)
snapshots = {
name: _movie_dataarray(np.asarray(root["snapshots"][name]))
for name in root.attrs.get("snapshot_names", [])
}
return cls(
spec=spec,
observed=np.asarray(root["observed"]),
ground_truth=ground_truth,
snapshots=snapshots,
)
def write_video(
self,
path: str | Path,
*,
fps: float | None = None,
vmin: float = 0.0,
vmax: float | None = None,
codec: str = "Y800",
progress: bool = True,
) -> Path:
"""Write the in-memory ``observed`` movie to a grayscale video at ``path``.
Maps counts to 8-bit gray over ``[vmin, vmax]`` (``vmax`` defaults to the
sensor's full ADC range, ``2**bit_depth - 1``, so saturation reads as white)
and encodes with ``cv2.VideoWriter`` + the ``codec`` fourcc (default
``"Y800"``: uncompressed 8-bit gray -- exact counts, no artifacts, but large;
pass ``"MJPG"`` for a small lossy file). Uses opencv's bundled ffmpeg, so no
system ffmpeg or ``mediapy`` extra is needed. Use this when you already hold a
``Recording``; to stream a long recording to disk **without** building the
whole movie in memory, use :func:`minisim.video.simulate_video` instead.
Returns ``path``.
"""
from minisim.video import _default_vmax, _write_gray_video
n_frames = self.observed.shape[0]
if n_frames == 0:
raise ValueError("recording has no frames to write (observed is empty).")
fps = float(fps if fps is not None else self.spec.acquisition.fps)
fov = (self.observed.shape[1], self.observed.shape[2])
if vmax is None:
vmax = _default_vmax(self.spec)
frames = (self.observed[f] for f in range(n_frames))
return _write_gray_video(
frames, n_frames, Path(path), fov, fps, vmin, vmax, codec, progress
)
# ---------------------------------------------------------------------------
# finalize: Scene -> Recording
# ---------------------------------------------------------------------------
[docs]
def finalize(scene: Scene, spec: Spec) -> Recording:
"""Distill an exhausted ``Scene`` into a frozen, typed ``Recording``.
Keeps each cell's canvas-coordinate planted footprint (sparse) plus its
canvas-frame position rebased to the sensor FOV, drops cells left entirely in
the motion margin, records the per-unit optics scalars (so ``A_observed`` can be
regenerated rather than stored), assembles the per-cell structural truth, sets
``detectable`` from the realized optical × illumination peak versus the sensor
noise floor (folding in vessel transmission, and recording each cell's
vessel-occlusion burden), reads the per-effect fields off ``scene.truth``, and
downcasts the working movie to ``Output.store_dtype`` for ``observed``.
"""
acq = scene.acq
fov_h = acq.image_sensor.n_px_height
fov_w = acq.image_sensor.n_px_width
n_frames = acq.n_frames
# Footprints were stamped on the canvas (sensor FOV + any motion margin); read
# the canvas size off them, since a brain_motion step crops scene.movie down to
# the FOV after stamping. Falls back to the bare FOV when there are no cells.
canvas_h, canvas_w = next(
(c.footprint_planted.canvas_shape for c in scene.cells if c.footprint_planted is not None),
(fov_h, fov_w),
)
# Centered crop from the canvas (sensor FOV + any motion margin) down to the FOV.
margin_h = (canvas_h - fov_h) // 2
margin_w = (canvas_w - fov_w) // 2
sensor_spec = next((s for s in spec.steps if s.kind == "sensor"), None)
# Both falloff fields are FOV-sized (built post-motion-crop), or None. Their
# product is the per-pixel photon budget a cell's signal is dimmed by.
illumination = scene.truth.illumination
vignette = scene.truth.vignette
photon_field = _combine_fields(illumination, vignette)
# The vessel transmission mask, if the vasculature step ran. The canvas-sized
# version aligns with the (canvas-coordinate) footprints for the per-cell
# overlap integral; the FOV crop is sampled at each cell's position for
# detectability, the same frame as photon_field.
vasc_mask_canvas = scene.truth.vasculature_mask
vasc_mask_fov = _crop_field(vasc_mask_canvas, fov_h, fov_w)
planted_fps, traces, spikes, bleaches = [], [], [], []
centers, amplitudes, in_focus, detectable, overlaps = [], [], [], [], []
sigmas, gains = [], []
for cell in scene.cells:
if cell.footprint_planted is None:
continue
# Drop a cell whose planted footprint, cropped to the FOV, is empty: it sits
# entirely in the motion margin -- real tissue, but background that only
# flickers in transiently, not a recoverable unit.
if cell.footprint_planted.crop(margin_h, margin_w, fov_h, fov_w).is_empty:
continue
z, y_um, x_um = cell.center_um
y_fov_um = y_um - margin_h * acq.pixel_size_um
x_fov_um = x_um - margin_w * acq.pixel_size_um
trace = cell.trace if cell.trace is not None else np.zeros(n_frames)
spike = cell.spikes if cell.spikes is not None else np.zeros(n_frames)
ifocus = cell.in_focus if cell.in_focus is not None else True
# Stored in canvas coords; A_planted/A_observed crop to the FOV on access.
planted_fps.append(cell.footprint_planted)
sigmas.append(cell.observed_sigma_px if cell.observed_sigma_px is not None else np.nan)
gains.append(cell.observed_gain if cell.observed_gain is not None else np.nan)
traces.append(trace)
spikes.append(spike)
bleaches.append(cell.bleach)
centers.append((z, y_fov_um, x_fov_um))
amplitudes.append(cell.amplitude if cell.amplitude is not None else float("nan"))
in_focus.append(ifocus)
# A vessel over the cell absorbs part of its light: dim the peak by the
# transmission at the cell's position (detectability is a peak test) and
# record the footprint-weighted occlusion as the scoreable confound axis.
vessel_t = sample_field_at(vasc_mask_fov, y_fov_um, x_fov_um, acq.pixel_size_um)
detectable.append(
_is_detectable(
cell, ifocus, y_fov_um, x_fov_um, photon_field, sensor_spec, acq, vessel_t
)
)
overlaps.append(_vessel_overlap(cell.observed_footprint(), vasc_mask_canvas))
# Optics ran iff any surviving cell carries a sigma; then keep the per-unit
# scalars so A_observed regenerates. Otherwise None -> A_observed == A_planted.
optics_ran = any(not np.isnan(s) for s in sigmas)
gt = GroundTruth(
planted=FootprintStack.from_footprints(planted_fps, (canvas_h, canvas_w)),
fov_offset=(margin_h, margin_w),
fov_shape=(fov_h, fov_w),
observed_sigma_px=np.array(sigmas, dtype=float) if optics_ran else None,
observed_gain=np.array(gains, dtype=float) if optics_ran else None,
C=_stack(traces, (0, n_frames)),
S=_stack(spikes, (0, n_frames)),
centers_um=np.array(centers, dtype=float).reshape(-1, 3),
amplitude_per_cell=np.array(amplitudes, dtype=float),
in_focus=np.array(in_focus, dtype=bool),
detectable=np.array(detectable, dtype=bool),
# Per-cell vessel occlusion, present only when the vasculature step ran.
vessel_overlap_fraction=(
np.array(overlaps, dtype=float) if vasc_mask_canvas is not None else None
),
# Per-cell bleaching envelopes (unit, frame), present only if the bleaching
# step ran; any cell without one (e.g. added afterward) gets a no-fade row.
bleaching=(
_stack([b if b is not None else np.ones(n_frames) for b in bleaches], (0, n_frames))
if any(b is not None for b in bleaches)
else None
),
shifts=scene.truth.shifts,
illumination=illumination,
vignette=vignette,
leakage=scene.truth.leakage,
neuropil_temporal=scene.truth.neuropil_temporal,
neuropil_spatial=_crop_components(scene.truth.neuropil_spatial, fov_h, fov_w),
neuropil_population=scene.truth.neuropil_population,
vasculature_mask=_crop_field(scene.truth.vasculature_mask, fov_h, fov_w),
focal_depth_um=scene.truth.focal_depth_um,
)
# observed is always the sensor FOV: brain_motion already crops the movie,
# but a partial build (until= before motion) can leave it canvas-sized, so
# crop the centered FOV here too for consistency with the cropped footprints.
# A build that ran only cell-domain steps (e.g. until="optics") never wrote a
# pixel, so no movie was materialized: return an empty (0, H, W) stack rather
# than allocating a full zero buffer only to discard it. The per-cell ground
# truth (C, S, A, bleaching, ...) is fully populated either way.
if scene.has_movie:
movie = _center_crop_hw(scene.movie.values, fov_h, fov_w)
observed_movie = movie.astype(spec.output.store_dtype)
else:
observed_movie = np.zeros((0, fov_h, fov_w), dtype=spec.output.store_dtype)
return Recording(
spec=spec, observed=observed_movie, ground_truth=gt, snapshots=scene.snapshots
)
def _movie_dataarray(values: np.ndarray) -> xr.DataArray:
"""Rebuild a movie ``DataArray`` from stored values (the saved snapshot form).
Snapshots are persisted as bare arrays; their coordinates are the trivial
``arange`` grid over ``MOVIE_DIMS``, reconstructed here so :meth:`Recording.load`
returns the same ``(frame, height, width)`` labelling the pipeline produced.
"""
return xr.DataArray(
values,
dims=list(MOVIE_DIMS),
coords={dim: np.arange(size) for dim, size in zip(MOVIE_DIMS, values.shape, strict=True)},
name="movie",
)
def _center_crop_hw(arr: np.ndarray, h: int, w: int) -> np.ndarray:
"""Slice the trailing ``(H, W)`` axes of ``arr`` to a centered ``(h, w)`` window."""
top = (arr.shape[-2] - h) // 2
left = (arr.shape[-1] - w) // 2
return arr[..., top : top + h, left : left + w]
def _crop_components(stack: np.ndarray | None, h: int, w: int) -> np.ndarray | None:
"""Crop each ``(component, H, W)`` field to the reference FOV (or pass None)."""
return None if stack is None else _center_crop_hw(stack, h, w)
def _crop_field(field: np.ndarray | None, h: int, w: int) -> np.ndarray | None:
"""Crop a single ``(H, W)`` field to the centered reference FOV (or pass None)."""
return None if field is None else _center_crop_hw(field, h, w)
def _stack(arrays: list[np.ndarray], empty_shape: tuple[int, ...]) -> np.ndarray:
"""Stack a per-unit list, or an empty array of ``empty_shape`` when there are none."""
return np.stack(arrays) if arrays else np.zeros(empty_shape)
def _combine_fields(
a: np.ndarray | None, b: np.ndarray | None
) -> np.ndarray | None:
"""Element-wise product of two optional FOV fields (each absent → identity)."""
if a is None:
return b
if b is None:
return a
return a * b
def sample_field_at(
field: np.ndarray | None, y_um: float, x_um: float, pixel_size_um: float
) -> float:
"""Value of a static FOV field at a µm position, with edge-clamped indexing.
Converts a ``(y_um, x_um)`` FOV-frame position to the nearest pixel (rounded
and clipped to the field bounds) and returns ``field`` there. ``field`` is a
sensor-FOV-sized array such as the combined illumination × vignette photon
budget; an absent field (``None``) returns ``1.0``, the multiplicative
identity, so callers can treat "no field" and "uniform field" alike. The one
sampler shared by detectability (:func:`_is_detectable`) and the teaching
notebook's per-cell SNR panels, so both read the field the same way.
"""
if field is None:
return 1.0
h, w = field.shape
iy = int(np.clip(round(y_um / pixel_size_um), 0, h - 1))
ix = int(np.clip(round(x_um / pixel_size_um), 0, w - 1))
return float(field[iy, ix])
def detection_snr(peak_dF, baseline, gain, read_e):
"""Transient SNR in detected electrons - the single detectability formula.
``gain`` converts a cell's ΔF units to detected electrons
(``optical_brightness · illumination · photons_per_unit · QE``); ``peak_dF``
and ``baseline`` are the transient height and the (non-negative) resting level
in those same ΔF units. The noise floor is shot noise on the baseline
electrons plus the sensor read noise::
SNR = peak_dF·gain / sqrt(max(baseline,0)·gain + read_e²)
Works on scalars or numpy arrays (so both ``finalize`` and the auto-focus
yield scan share one definition). Where the noise floor is exactly zero the
SNR is ``inf`` if there is any signal, else ``0``.
"""
signal_e = peak_dF * gain
noise_e = np.sqrt(np.maximum(baseline, 0.0) * gain + read_e * read_e)
with np.errstate(divide="ignore", invalid="ignore"):
snr = signal_e / noise_e
return np.where(noise_e > 0, snr, np.where(signal_e > 0, np.inf, 0.0))
def _is_detectable(
cell: Cell,
in_focus: bool,
y_fov_um: float,
x_fov_um: float,
photon_field: np.ndarray | None,
sensor_spec,
acq: Acquisition,
vessel_transmission: float = 1.0,
) -> bool:
"""Whether a cell's realized peak clears the sensor noise floor (and is in focus).
The cell's peak ΔF is dimmed by its optical brightness (depth defocus +
scatter), the illumination/vignette field at its position, and any vessel
transmission there (``vessel_transmission`` in (0, 1], 1.0 = no vessel), scaled
to detected electrons by the exposure and QE, then compared to the shot + read
noise floor riding on its steady baseline (see :func:`detection_snr`). The
vessel term is sampled at the cell's position because detectability is a peak
test; the footprint-weighted occlusion is recorded separately as
``vessel_overlap_fraction``.
``detectable`` requires ``in_focus`` and ``SNR ≥ DETECT_SNR_THRESHOLD``. With
no activity (no trace) a cell emits no transient and is not detectable; with
no ``sensor`` step there is no noise floor to test against, so detectability
falls back to the geometric ``in_focus`` flag.
"""
if cell.trace is None:
return False
if sensor_spec is None:
return in_focus
if not in_focus:
return False
brightness = cell.optical_brightness if cell.optical_brightness is not None else 1.0
illum = sample_field_at(photon_field, y_fov_um, x_fov_um, acq.pixel_size_um)
gain = (
brightness * illum * vessel_transmission
* sensor_spec.photons_per_unit * acq.image_sensor.quantum_efficiency
)
peak_dF = float(cell.trace.max() - cell.trace.min())
baseline = float(cell.trace.min())
snr = detection_snr(peak_dF, baseline, gain, acq.image_sensor.read_noise_e)
return bool(snr >= DETECT_SNR_THRESHOLD)
def _vessel_overlap(footprint: Footprint | None, mask_canvas: np.ndarray | None) -> float:
"""Footprint-weighted fraction of a cell's light absorbed by vessels, in [0, 1).
``mask_canvas`` is the canvas-coordinate transmission mask M in (0, 1]; the
occlusion is ``1 − Σ(A·M)/Σ(A)`` over the cell's (canvas-coordinate) footprint
patch - 0 where no vessel touches the cell, approaching 1 as a vessel covers
the whole footprint. Returns 0.0 when there is no mask or no footprint, so a
vessel-free recording reports zero burden for every cell.
"""
if mask_canvas is None or footprint is None or footprint.is_empty:
return 0.0
y0, x0 = footprint.offset
ph, pw = footprint.patch.shape
sub = mask_canvas[y0 : y0 + ph, x0 : x0 + pw]
total = float(footprint.patch.sum())
if total <= 0.0:
return 0.0
return float(1.0 - (footprint.patch * sub).sum() / total)