Source code for timagetk.components.time_series

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#  Copyright (c) 2022 Univ. Lyon, ENS de Lyon, UCB Lyon 1, CNRS, INRAe, Inria
#  All rights reserved.
#  This file is part of the TimageTK library, and is released under the "GPLv3"
#  license. Please see the LICENSE.md file that should have been included as
#  part of this package.
# ------------------------------------------------------------------------------

"""This module implement a class dedicated to time series analysis.

Since they can be made of a great number of images, and that these can be
too big to save them all in memory we use disk file access instead of loading
everything in memory.
"""

import logging
from os.path import exists
from os.path import join
from os.path import split

from timagetk.components.multi_channel import MultiChannelImage
from timagetk.components.spatial_image import SpatialImage
from timagetk.components.trsf import Trsf
from timagetk.io import imread
from timagetk.io import imsave
from timagetk.io.util import all_in_same_folder

RES_IMG_FNAME = "{}t{}_on_t{}-{}.{}"  # name, t0, t1, trsf_type, extension
TRSF_FNAME = "{}t{}_on_t{}-{}.trsf"  # name, t0, t1, trsf_type

DEF_IMG_EXT = 'mha'


[docs] class TimeSeries(object): """Implementation of a time series as a temporal succession of SpatialImages. TODO: - use saved 'acquisition_time' to compute time intervals between successive images """
[docs] def __init__(self, list_img, time_points, list_lineages=None, time_unit='h', name="", **kwargs): """TimeSeries constructor. Parameters ---------- list_img : list of str or list of timagetk.components.spatial_image.SpatialImage List of images to use time_points : list of int Acquisition time steps, starting from t0 = 0, *e.g.* [0, 5, 10] for acquisitions every 5h. list_lineages : list(Lineage), optional If you already have cell lineages, provide them here time_unit : str, optional Indicate the time unit of the time-points, *e.g.* ``'h'`` for hours (default), ``'m'`` for minutes, ect. name : str, optional Name to give to this TimeSeries Other Parameters ---------------- ext : str Extension to use to save images, default is ``DEF_IMG_EXT`` Notes ----- If a list of ``SpatialImage`` is given as input, we use the filename and filepath attributes and discard the rest of the object. Examples -------- >>> from timagetk.components.time_series import TimeSeries >>> from timagetk.io.util import shared_data >>> # Using shared data as example, images where taken every 12 hours: >>> fname = 'p58-t{}_INT_down_interp_2x.inr.gz' >>> list_img = [shared_data(fname.format(t), "p58") for t in range(3)] >>> ts = TimeSeries(list_img, [0, 12, 24], name="p58") >>> from timagetk.io.image import imread >>> list_img = [imread(img) for img in list_img] >>> ts = TimeSeries(list_img, [0, 12, 24], name="p58") """ # - Defines EMPTY attributes: # -- Name of the TimeSeries: self.name = "" # -- Output path: self._out_path = "" # -- List of input images as str: self.images = [] # -- List of input time steps: self.time_points = [] self.nb_time_points = 0 self.time_unit = '' # -- Transformations: # --- {(t_start, t_stop): Trsf}, with # * ``t_start`` & ``t_stop`` as temporal index (not time-point!) # * ``Trsf`` the corresponding transformation file location self.trsf = {'rigid': {}, 'affine': {}, 'vectorfield': {}} # --- {(t_start, t_stop): trsf_unit}, with # * ``t_start`` & ``t_stop`` as temporal index (not time-point!) # * ``trsf_unit`` the corresponding transformation unit self.trsf_unit = {'rigid': None, 'affine': None, 'vectorfield': None} # --- {(t_start, t_stop): location}, with # * ``t_start`` & ``t_stop`` as temporal index (not time-point!) # * ``location`` the corresponding registered image file location self.reg_images = {'rigid': {}, 'affine': {}, 'vectorfield': {}} # -- Lineages: # --- {(t_start, t_stop): location}, with # * ``t_start`` & ``t_stop`` as temporal index (not time-point!) # * ``location`` the corresponding lineage file location self.lineages_path = {} # --- {(t_start, t_stop): Lineage}, with # * ``t_start`` & ``t_stop`` as temporal index (not time-point!) # * ``Lineage`` the corresponding Lineage instance self.lineages = {} # - Set some attributes: # -- Name: self.name = name # -- Time unit: self.time_unit = time_unit # -- List of images: all_found = True for n, f in enumerate(list_img): if isinstance(f, (SpatialImage, MultiChannelImage)): f = join(f.filepath, f.filename) list_img[n] = f else: try: assert isinstance(f, str) except AssertionError: raise NotImplementedError("HO ho...") if not exists(f): all_found = False logging.error("Can not find image: '{}'.".format(f)) if not all_found: raise IOError("Some images could not be found!") else: self.images = list_img logging.info("List of image files:\n -" + "\n -".join(self.images)) # -- List of time steps: try: assert len(list_img) == len(time_points) except AssertionError: raise ValueError("Need the same number of images and time points.") else: self.time_points = time_points self.nb_time_points = len(self.time_points) # -- List of lineages: if list_lineages is not None: try: assert len(list_lineages) == self.nb_time_points - 1 except AssertionError: raise ValueError( "Need one less lineage than number of images or time points!") for f in list_lineages: if not exists(f): all_found = False logging.error("Can not find lineage: '{}'.".format(f)) if not all_found: raise IOError("Some lineages could not be found!") else: self.lineages = list_lineages else: logging.debug("No lineage given!") # - Set some attributes from kwargs: self._ext = kwargs.get("ext", DEF_IMG_EXT) # output image file extension
[docs] def get_trsf(self, method, t_float, t_ref): """Return a ``Trsf`` instance for given transformation index and method. Parameters ---------- method : str Used method t_float : int Floating image index t_ref : int Reference image index Returns ------- Trsf Corresponding transformation object """ # Load the corresponding tranformation file: return Trsf(self.trsf[method].get((t_float, t_ref), None))
[docs] def get_registered_image(self, method, t_float, t_ref): """Return an image instance for given transformation index and method. Parameters ---------- method : str Used method t_float : int Floating image index of transformation t_ref : int Reference image index of transformation Returns ------- SpatialImage Corresponding image object Examples -------- >>> from timagetk.components.time_series import TimeSeries >>> from timagetk.io.util import shared_data >>> # Using shared data as example, images where taken every 12 hours: >>> fname = 'p58-t{}_INT_down_interp_2x.inr.gz' >>> list_img = [shared_data(fname.format(t), "p58") for t in range(3)] >>> ts = TimeSeries(list_img, [0, 12, 24], name="p58") >>> # - Register the first image on the last one and return it: >>> ts.get_registered_image('rigid', 0, 2) """ trsf = self.get_trsf(method, t_float, t_ref) img = imread(self.images[t_float]) res_img = img.apply_trsf(trsf) self._handle_img(res_img, method, t_float, t_ref) return res_img
[docs] def set_output_path(self, path=""): """Set the output path to use. Parameters ---------- path : str Path where to save transformations and images """ if path == "": if all_in_same_folder(self.images): self._out_path = split(self.images[0])[0] else: msg = "Could not find a common root directory for the given images!" raise ValueError(msg) else: if exists(path): self._out_path = path else: msg = "Given output path is not valid!" raise ValueError(msg)
def _get_trsf_fname(self, t0, t1, trsf_type): """Formatter for transformation file names. Parameters ---------- t0 : int Floating (registered) time point t1 : int Reference time point trsf_type : str Type of transformation used Returns ------- str Formatted filename for a transformation """ name = self.name if self.name != "": name += "-" fname = TRSF_FNAME.format(name, t0, t1, trsf_type) return join(self._out_path, fname) def _get_img_fname(self, t0, t1, trsf_type): """Formatter for registered image file names. Parameters ---------- t0 : int Floating (registered) time point t1 : int Reference time point trsf_type : str Type of transformation used, *e.g.* 'rigid' Returns ------- str Formatted filename for an image """ name = self.name if self.name != "": name += "-" fname = RES_IMG_FNAME.format(name, t0, t1, trsf_type, self._ext) return join(self._out_path, fname) def _handle_trsf(self, trsf, method, t_float, t_ref): """Generic function to know what to do when you get a ``Trsf`` instance. Save the matrix if the Trsf is linear, else write a file and save its location in ``self.trsf``. Parameters ---------- trsf : Trsf Transformation to handle method : str Used method t_float : int Floating image index t_ref : int Reference image index """ if isinstance(trsf, list): return [self._handle_trsf(t, method, t_float, t_ref) for t in trsf] else: from timagetk.third_party import save_trsf if trsf.is_linear(): # - In case of linear transformation, save the matrix self.trsf[method].update({(t_float, t_ref): trsf}) return None else: # - In case of a non-linear transformation, write the file and save its location fname = self._get_trsf_fname(t_float, t_ref, method) save_trsf(trsf, fname) self.trsf[method].update({(t_float, t_ref): fname}) return None def _handle_img(self, image, method, t_float, t_ref): """Generic function to know what to do when you get a transformed image. Write a file and save its location in ``self.reg_images``. Parameters ---------- image : Trsf Image to handle method : str Used method t_float : int Floating image index t_ref : int Reference image index """ if isinstance(image, list): return [self._handle_img(t, method, t_float, t_ref) for t in image] else: fname = self._get_img_fname(t_float, t_ref, method) imsave(fname, image) self.reg_images[method].update({(t_float, t_ref): fname}) return None