#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# Copyright (c) 2022 Univ. Lyon, ENS de Lyon, UCB Lyon 1, CNRS, INRAe, Inria
# All rights reserved.
# This file is part of the TimageTK library, and is released under the "GPLv3"
# license. Please see the LICENSE.md file that should have been included as
# part of this package.
# ------------------------------------------------------------------------------
"""This module implement a class dedicated to time series analysis.
Since they can be made of a great number of images, and that these can be
too big to save them all in memory we use disk file access instead of loading
everything in memory.
"""
import logging
from os.path import exists
from os.path import join
from os.path import split
from timagetk.components.multi_channel import MultiChannelImage
from timagetk.components.spatial_image import SpatialImage
from timagetk.components.trsf import Trsf
from timagetk.io import imread
from timagetk.io import imsave
from timagetk.io.util import all_in_same_folder
RES_IMG_FNAME = "{}t{}_on_t{}-{}.{}" # name, t0, t1, trsf_type, extension
TRSF_FNAME = "{}t{}_on_t{}-{}.trsf" # name, t0, t1, trsf_type
DEF_IMG_EXT = 'mha'
[docs]
class TimeSeries(object):
"""Implementation of a time series as a temporal succession of SpatialImages.
TODO:
- use saved 'acquisition_time' to compute time intervals between successive images
"""
[docs]
def __init__(self, list_img, time_points, list_lineages=None, time_unit='h', name="", **kwargs):
"""TimeSeries constructor.
Parameters
----------
list_img : list of str or list of timagetk.components.spatial_image.SpatialImage
List of images to use
time_points : list of int
Acquisition time steps, starting from t0 = 0, *e.g.* [0, 5, 10] for acquisitions every 5h.
list_lineages : list(Lineage), optional
If you already have cell lineages, provide them here
time_unit : str, optional
Indicate the time unit of the time-points, *e.g.* ``'h'`` for hours (default), ``'m'`` for minutes, ect.
name : str, optional
Name to give to this TimeSeries
Other Parameters
----------------
ext : str
Extension to use to save images, default is ``DEF_IMG_EXT``
Notes
-----
If a list of ``SpatialImage`` is given as input, we use the filename and filepath attributes and discard the rest of the object.
Examples
--------
>>> from timagetk.components.time_series import TimeSeries
>>> from timagetk.io.util import shared_data
>>> # Using shared data as example, images where taken every 12 hours:
>>> fname = 'p58-t{}_INT_down_interp_2x.inr.gz'
>>> list_img = [shared_data(fname.format(t), "p58") for t in range(3)]
>>> ts = TimeSeries(list_img, [0, 12, 24], name="p58")
>>> from timagetk.io.image import imread
>>> list_img = [imread(img) for img in list_img]
>>> ts = TimeSeries(list_img, [0, 12, 24], name="p58")
"""
# - Defines EMPTY attributes:
# -- Name of the TimeSeries:
self.name = ""
# -- Output path:
self._out_path = ""
# -- List of input images as str:
self.images = []
# -- List of input time steps:
self.time_points = []
self.nb_time_points = 0
self.time_unit = ''
# -- Transformations:
# --- {(t_start, t_stop): Trsf}, with
# * ``t_start`` & ``t_stop`` as temporal index (not time-point!)
# * ``Trsf`` the corresponding transformation file location
self.trsf = {'rigid': {},
'affine': {},
'vectorfield': {}}
# --- {(t_start, t_stop): trsf_unit}, with
# * ``t_start`` & ``t_stop`` as temporal index (not time-point!)
# * ``trsf_unit`` the corresponding transformation unit
self.trsf_unit = {'rigid': None,
'affine': None,
'vectorfield': None}
# --- {(t_start, t_stop): location}, with
# * ``t_start`` & ``t_stop`` as temporal index (not time-point!)
# * ``location`` the corresponding registered image file location
self.reg_images = {'rigid': {},
'affine': {},
'vectorfield': {}}
# -- Lineages:
# --- {(t_start, t_stop): location}, with
# * ``t_start`` & ``t_stop`` as temporal index (not time-point!)
# * ``location`` the corresponding lineage file location
self.lineages_path = {}
# --- {(t_start, t_stop): Lineage}, with
# * ``t_start`` & ``t_stop`` as temporal index (not time-point!)
# * ``Lineage`` the corresponding Lineage instance
self.lineages = {}
# - Set some attributes:
# -- Name:
self.name = name
# -- Time unit:
self.time_unit = time_unit
# -- List of images:
all_found = True
for n, f in enumerate(list_img):
if isinstance(f, (SpatialImage, MultiChannelImage)):
f = join(f.filepath, f.filename)
list_img[n] = f
else:
try:
assert isinstance(f, str)
except AssertionError:
raise NotImplementedError("HO ho...")
if not exists(f):
all_found = False
logging.error("Can not find image: '{}'.".format(f))
if not all_found:
raise IOError("Some images could not be found!")
else:
self.images = list_img
logging.info("List of image files:\n -" + "\n -".join(self.images))
# -- List of time steps:
try:
assert len(list_img) == len(time_points)
except AssertionError:
raise ValueError("Need the same number of images and time points.")
else:
self.time_points = time_points
self.nb_time_points = len(self.time_points)
# -- List of lineages:
if list_lineages is not None:
try:
assert len(list_lineages) == self.nb_time_points - 1
except AssertionError:
raise ValueError(
"Need one less lineage than number of images or time points!")
for f in list_lineages:
if not exists(f):
all_found = False
logging.error("Can not find lineage: '{}'.".format(f))
if not all_found:
raise IOError("Some lineages could not be found!")
else:
self.lineages = list_lineages
else:
logging.debug("No lineage given!")
# - Set some attributes from kwargs:
self._ext = kwargs.get("ext", DEF_IMG_EXT) # output image file extension
[docs]
def get_trsf(self, method, t_float, t_ref):
"""Return a ``Trsf`` instance for given transformation index and method.
Parameters
----------
method : str
Used method
t_float : int
Floating image index
t_ref : int
Reference image index
Returns
-------
Trsf
Corresponding transformation object
"""
# Load the corresponding tranformation file:
return Trsf(self.trsf[method].get((t_float, t_ref), None))
[docs]
def get_registered_image(self, method, t_float, t_ref):
"""Return an image instance for given transformation index and method.
Parameters
----------
method : str
Used method
t_float : int
Floating image index of transformation
t_ref : int
Reference image index of transformation
Returns
-------
SpatialImage
Corresponding image object
Examples
--------
>>> from timagetk.components.time_series import TimeSeries
>>> from timagetk.io.util import shared_data
>>> # Using shared data as example, images where taken every 12 hours:
>>> fname = 'p58-t{}_INT_down_interp_2x.inr.gz'
>>> list_img = [shared_data(fname.format(t), "p58") for t in range(3)]
>>> ts = TimeSeries(list_img, [0, 12, 24], name="p58")
>>> # - Register the first image on the last one and return it:
>>> ts.get_registered_image('rigid', 0, 2)
"""
trsf = self.get_trsf(method, t_float, t_ref)
img = imread(self.images[t_float])
res_img = img.apply_trsf(trsf)
self._handle_img(res_img, method, t_float, t_ref)
return res_img
[docs]
def set_output_path(self, path=""):
"""Set the output path to use.
Parameters
----------
path : str
Path where to save transformations and images
"""
if path == "":
if all_in_same_folder(self.images):
self._out_path = split(self.images[0])[0]
else:
msg = "Could not find a common root directory for the given images!"
raise ValueError(msg)
else:
if exists(path):
self._out_path = path
else:
msg = "Given output path is not valid!"
raise ValueError(msg)
def _get_trsf_fname(self, t0, t1, trsf_type):
"""Formatter for transformation file names.
Parameters
----------
t0 : int
Floating (registered) time point
t1 : int
Reference time point
trsf_type : str
Type of transformation used
Returns
-------
str
Formatted filename for a transformation
"""
name = self.name
if self.name != "":
name += "-"
fname = TRSF_FNAME.format(name, t0, t1, trsf_type)
return join(self._out_path, fname)
def _get_img_fname(self, t0, t1, trsf_type):
"""Formatter for registered image file names.
Parameters
----------
t0 : int
Floating (registered) time point
t1 : int
Reference time point
trsf_type : str
Type of transformation used, *e.g.* 'rigid'
Returns
-------
str
Formatted filename for an image
"""
name = self.name
if self.name != "":
name += "-"
fname = RES_IMG_FNAME.format(name, t0, t1, trsf_type, self._ext)
return join(self._out_path, fname)
def _handle_trsf(self, trsf, method, t_float, t_ref):
"""Generic function to know what to do when you get a ``Trsf`` instance.
Save the matrix if the Trsf is linear, else write a file and save its location in ``self.trsf``.
Parameters
----------
trsf : Trsf
Transformation to handle
method : str
Used method
t_float : int
Floating image index
t_ref : int
Reference image index
"""
if isinstance(trsf, list):
return [self._handle_trsf(t, method, t_float, t_ref) for t in trsf]
else:
from timagetk.third_party import save_trsf
if trsf.is_linear():
# - In case of linear transformation, save the matrix
self.trsf[method].update({(t_float, t_ref): trsf})
return None
else:
# - In case of a non-linear transformation, write the file and save its location
fname = self._get_trsf_fname(t_float, t_ref, method)
save_trsf(trsf, fname)
self.trsf[method].update({(t_float, t_ref): fname})
return None
def _handle_img(self, image, method, t_float, t_ref):
"""Generic function to know what to do when you get a transformed image.
Write a file and save its location in ``self.reg_images``.
Parameters
----------
image : Trsf
Image to handle
method : str
Used method
t_float : int
Floating image index
t_ref : int
Reference image index
"""
if isinstance(image, list):
return [self._handle_img(t, method, t_float, t_ref) for t in image]
else:
fname = self._get_img_fname(t_float, t_ref, method)
imsave(fname, image)
self.reg_images[method].update({(t_float, t_ref): fname})
return None