#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# Copyright (c) 2022 Univ. Lyon, ENS de Lyon, UCB Lyon 1, CNRS, INRAe, Inria
# All rights reserved.
# This file is part of the TimageTK library, and is released under the "GPLv3"
# license. Please see the LICENSE.md file that should have been included as
# part of this package.
# ------------------------------------------------------------------------------
import numpy as np
from scipy.ndimage import binary_fill_holes
from timagetk import TissueImage2D
from timagetk import TissueImage3D
from timagetk.algorithms.connexe import connected_components
from timagetk.algorithms.exposure import equalize_adapthist
from timagetk.algorithms.exposure import global_contrast_stretch
from timagetk.algorithms.linearfilter import gaussian_filter
from timagetk.algorithms.morphology import morphology_oc_alternate_sequential_filter
from timagetk.algorithms.regionalext import regional_extrema
from timagetk.algorithms.trsf import apply_trsf
from timagetk.algorithms.watershed import watershed
from timagetk.array_util import guess_intensity_threshold
from timagetk.bin.logger import get_logger
from timagetk.tasks.decorators import singlechannel_wrapper
log = get_logger(__name__)
[docs]
@singlechannel_wrapper
def watershed_preprocessing(image, sigma=None, equalize_hist=True, contrast_stretch=False, **kwargs):
"""Performs the pre-processing step prior to watershed segmentation of an intensity image.
A Gaussian smoothing is applied prior to local minima detection to avoid detecting to many small local minima due to noise.
Parameters
----------
image : timagetk.SpatialImage or timagetk.MultiChannelImage
The intensity image to segment.
sigma : float, optional
Sigma value, in real units, used to smooth the input image, use `0` to skip.
If ``None`` (default), automatically set to ``1.5 * max(image.voxelsize)``.
equalize_hist : bool, optional
If ``True`` (default), performs *adaptive histogram equalization* to image prior to any other step.
contrast_stretch : bool, optional
If ``True`` (default is ``False``), performs *contrast stretching* to image prior to any other step.
Other Parameters
----------------
channel : str
If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm.
process : dict
A dictionary keeping track of the algorithms and their parameters.
Returns
-------
timagetk.SpatialImage
The segmented image
dict
Dictionary summarizing the process with used parameters
Notes
-----
We recommend performing automatic global contrast stretching OR adaptive histogram equalization to improve overall segmentation quality.
It is done prior to the Gaussian smoothing step.
See Also
--------
timagetk.algorithms.exposure.equalize_adapthist
timagetk.algorithms.exposure.global_contrast_stretch
timagetk.algorithms.linearfilter.gaussian_filter
Examples
--------
>>> from timagetk.io import imread
>>> from timagetk.io.util import shared_dataset
>>> from timagetk.tasks.segmentation import watershed_preprocessing
>>> from timagetk.visu.stack import stack_browser
>>> img_path = shared_dataset("p58")[0]
>>> image = imread(img_path)
>>> proc_image, _ = watershed_preprocessing(image, 1.0, equalize_hist=True)
>>> stack_browser(proc_image, cmap='gray')
>>> from timagetk import SpatialImage
>>> from skimage.morphology import white_tophat, ball
>>> noise_img = SpatialImage(white_tophat(image.get_array(), ball(3)), voxelsize=image.voxelsize)
>>> stack_browser(noise_img, cmap='gray')
>>> denoise_img = SpatialImage(image.get_array() - noise_img, voxelsize=image.voxelsize)
>>> stack_browser(denoise_img, cmap='gray')
>>> proc_image, _ = watershed_preprocessing(denoise_img, 1.0, equalize_hist=True)
>>> stack_browser(proc_image, cmap='gray')
>>> from timagetk import MultiChannelImage
>>> im_url = "https://zenodo.org/record/3737795/files/qDII-CLV3-PIN1-PI-E35-LD-SAM4.czi"
>>> ch_names = ['DII-VENUS-N7', 'pPIN1:PIN1-GFP', 'Propidium Iodide', 'pRPS5a:TagBFP-SV40', 'pCLV3:mCherry-N7']
>>> colors = ['yellow', 'green', 'red', 'blue', 'purple']
>>> image = imread(im_url, channel_names=ch_names)
>>> proc_image, _ = watershed_preprocessing(image, 1.0, equalize_hist=True, channel='Propidium Iodide')
"""
process = kwargs.get('process', {})
# - Performs histogram equalization of intensity image if required
if equalize_hist:
log.info("Pre-processing: Adaptive Histogram Equalisation...")
image = equalize_adapthist(image)
process["equalized_hist"] = True
# - Performs contrasts stretching of intensity images if required
if contrast_stretch:
log.info("Pre-processing: Global Contrast Stretching...")
image = global_contrast_stretch(image)
process["contrast_stretch"] = True
# - Closing Opening alternate sequential filter on intensity image if required
oc_asf = kwargs.get("oc_asf", 0)
if oc_asf >= 1:
log.info("Pre-processing: Opening-Closing alternate sequential filter...")
image = morphology_oc_alternate_sequential_filter(image, max_radius=oc_asf)
# Defines the minimal acceptable sigma value
min_sigma_val = max(image.get_voxelsize())
# If no sigma was given, compute one
if sigma is None:
sigma = 1.5 * min_sigma_val
else:
if sigma < min_sigma_val:
log.error(f"Given `sigma` ({sigma}) is inferior to image max voxel-size ({min_sigma_val}!")
sigma = 1.5 * min_sigma_val
log.info(f"Changed `sigma` value to {sigma} (1.5 * max(voxel-size)!")
log.info(f"Gaussian smoothing with sigma={sigma}...")
image = gaussian_filter(image, sigma=sigma, real=True)
process["sigma"] = sigma
return image, process
[docs]
@singlechannel_wrapper
def seeded_watershed(intensity_image, h_min, **kwargs):
"""Performs a seeded watershed of an intensity image.
Parameters
----------
intensity_image : timagetk.SpatialImage or timagetk.MultiChannelImage
The intensity image to segment.
h_min: int
Height-minima value to use for seed detection.
Other Parameters
----------------
channel : str
If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm.
labelchoice : {"first", "min", "most"}
How to deal with "labels conflicts", *i.e.* where several labels meet.
process : dict
A dictionary keeping track of the algorithms and their parameters.
Returns
-------
LabelledImage
The segmented image
LabelledImage
The seeds image
dict
Dictionary summarizing the process with used parameters
Notes
-----
Explanations of available ``labelchoice`` keyword argument:
- **first**: the first label wins (default);
- **min**: the less represented label wins;
- **most**: the most represented label wins;
Examples
--------
>>> from timagetk import TissueImage3D
>>> from timagetk.io import imread
>>> from timagetk.io.util import shared_dataset
>>> from timagetk.tasks.segmentation import seeded_watershed
>>> from timagetk.visu.stack import stack_browser
>>> img_path = shared_dataset("p58")[0]
>>> image = imread(img_path)
>>> # Example 1 - Basic example:
>>> seg_img, _, _ = seeded_watershed(image, 15)
>>> b = stack_browser(TissueImage3D(seg_img, background=1), cmap='glasbey')
>>> # Example 2 - Add a pre-processing step:
>>> from timagetk.tasks.segmentation import watershed_preprocessing
>>> proc_image, _ = watershed_preprocessing(image,0.5,equalize_hist=True)
>>> seg_img, _, _ = seeded_watershed(proc_image, 25)
>>> b = stack_browser(TissueImage3D(seg_img, background=1), cmap='glasbey')
"""
process = kwargs.get('process', {})
# - Local minima detection:
log.info("Local minima detection...")
ext_img = regional_extrema(intensity_image, height=h_min, method='minima')
process["regionalext"] = {'method': 'minima', 'height': h_min}
# - Connexe component labelling of detected local minima:
log.info("Connexe local minima labelling...")
seeds_image = connected_components(ext_img, connectivity=18, low_threshold=1, high_threshold=h_min)
process["connexe"] = {'connectivity': 18, 'low_threshold': 1, 'high_threshold': h_min}
# Print some stuff about seed image
n_seeds = len(np.unique(seeds_image)) - 1 # '0' is in the list!
log.info("Detected {} seeds!".format(n_seeds))
# - Watershed segmentation:
log.info("Watershed segmentation...")
segmented_image = watershed(intensity_image, seeds_image, labelchoice=kwargs.get("labelchoice", None))
return segmented_image, seeds_image, process
[docs]
@singlechannel_wrapper
def seed_detection(intensity_image, h_min, **kwargs):
"""Performs a seed detection an intensity image.
Parameters
----------
intensity_image : timagetk.SpatialImage or timagetk.MultiChannelImage
The intensity image to segment
h_min : int
Height-minima value to use for seed detection
Other Parameters
----------------
channel : str
If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm.
process : dict
A dictionary keeping track of the algorithms and their parameters.
Returns
-------
LabelledImage
The seeds image.
dict
Dictionary summarizing the process with used parameters
"""
process = kwargs.get('process', {})
# - Local minima detection:
log.info("Local minima detection...")
ext_img = regional_extrema(intensity_image, method='minima', height=h_min)
process["regionalext"] = {'method': 'minima', 'height': h_min}
# - Connexe component labelling of detected local minima:
log.info("Connexe local minima labelling...")
seeds_image = connected_components(ext_img, connectivity=18, low_threshold=1, high_threshold=h_min)
process["connexe"] = {'connectivity': 18, 'low_threshold': 1, 'high_threshold': h_min}
# Print some stuff about seed image
n_seeds = len(np.unique(seeds_image)) - 1 # '0' is in the list!
log.info("Detected {} seeds!".format(n_seeds))
return seeds_image, process
[docs]
def watershed_postprocessing(intensity_image, segmented_image, seeds_image, min_size=None, max_size=None, cell_sigma=1,
**kwargs):
"""Performs the post-processing step after watershed segmentation of an intensity image.
Parameters
----------
intensity_image : timagetk.components.spatial_image.SpatialImage
The intensity image to segment
segmented_image : timagetk.LabelledImage
The segmented image from the seeded watershed step
seeds_image: timagetk.LabelledImage
The seeds image from the seeded watershed step
min_size : float, optional
A minimal area (if 2D) or volume (if 3D), in real units, to accept a label, ``None`` by default.
If equal or lower, remove it from the seed image and re-run the watershed.
max_size : float, optional
A maximal area (if 2D) or volume (if 3D), in real units, to accept a label, ``None`` by default.
If equal or superior, remove it from the seed image and re-run the watershed.
cell_sigma : int, optional
Apply a "cell smoothing" with a sigma in voxel units, ``1`` by default.
Set to ``0`` to avoid this step.
Warnings, even small values (like 2) can drastically change the topology!
Other Parameters
----------------
sizes_data : dict
Dictionary of size data to use.
labelchoice : {"first", "min", "most"}
How to deal with "labels conflicts", *i.e.* where several labels meet.
process : dict
A dictionary keeping track of the algorithms and their parameters.
Returns
-------
LabelledImage
The segmented image
LabelledImage
The seeds image
dict
Dictionary summarizing the process with used parameters
Notes
-----
We recommend performing this step to remove too small cells (un-realistic) and smooth the cells interfaces.
Explanations of available ``labelchoice`` keyword argument:
- **first**: the first label wins (default);
- **min**: the less represented label wins;
- **most**: the most represented label wins;
Examples
--------
>>> from timagetk.io import imread
>>> from timagetk.io.util import shared_dataset
>>> from timagetk.tasks.segmentation import watershed_preprocessing
>>> from timagetk.tasks.segmentation import seeded_watershed
>>> from timagetk.tasks.segmentation import watershed_postprocessing
>>> from timagetk.visu.stack import stack_browser
>>> img_path = shared_dataset("p58")[0]
>>> image = imread(img_path)
>>> proc_image, _ = watershed_preprocessing(image,0.5,equalize_hist=True)
>>> seg_img, seeds, _ = seeded_watershed(proc_image, 25)
>>> seg_img, _, _ = watershed_postprocessing(proc_image,seg_img,seeds,min_size=50,cell_sigma=1)
>>> b = stack_browser(seg_img, cmap='glasbey')
"""
not_a_label = kwargs.get('not_a_label', 0)
background = kwargs.get('background', 1)
process = kwargs.get('process', {})
sizes = kwargs.get('sizes_data', {})
if sizes == {} and (min_size is not None or max_size is not None):
# - Select labels based on their volume (if image is 3D):
if segmented_image.is3D():
tissue = TissueImage3D(segmented_image, background=background, not_a_label=not_a_label)
sizes = tissue.cells.volume(real=True)
# - Select labels based on their area (if image is 2D):
elif segmented_image.is2D():
tissue = TissueImage2D(segmented_image, background=background, not_a_label=not_a_label)
sizes = tissue.cells.area(real=True)
if background in sizes:
sizes.pop(background)
if not_a_label in sizes:
sizes.pop(not_a_label)
small_cells = []
if min_size is not None:
small_cells = [cid for cid, vol in sizes.items() if vol <= min_size]
big_cells = []
if max_size is not None:
big_cells = [cid for cid, vol in sizes.items() if vol >= max_size]
if small_cells != []:
process["min_cell_volume"] = min_size
# Remove labels corresponding to small cells from seed image and re-run watershed
log.info(f"A list of {len(small_cells)} small cells (volume<={min_size}µm³) has been detected!")
seeds_image.remove_labels_from_image(small_cells)
if small_cells != []:
log.info("Filtered watershed segmentation...")
segmented_image = watershed(intensity_image, seeds_image, labelchoice=kwargs.get("labelchoice", None))
# - Cell smoothing step:
if cell_sigma >= 1:
process["cell_sigma"] = cell_sigma
segmented_image = apply_trsf(segmented_image, template_img=segmented_image, interpolation='cellbased',
cell_based_sigma=cell_sigma)
if big_cells != []:
from timagetk.components.labelled_image import relabel_from_mapping
process["max_cell_volume"] = max_size
# Remove labels corresponding to big cells from seed image and re-run watershed
log.info(f"A list of {len(big_cells)} big cells (volume>={max_size}µm³) has been detected!")
segmented_image = relabel_from_mapping(segmented_image, {cid: not_a_label for cid in big_cells},
clear_unmapped=False)
label_co_radius = kwargs.get('label_co_asf', 0)
if label_co_radius >= 1:
from timagetk.algorithms.morphology import label_filtering_coc_alternate_sequential_filter
segmented_image = label_filtering_coc_alternate_sequential_filter(segmented_image, max_radius=label_co_radius)
return segmented_image, seeds_image, process
[docs]
@singlechannel_wrapper
def watershed_segmentation(intensity_image, h_min, sigma=None, equalize_hist=True, contrast_stretch=False,
min_size=50, max_size=None, cell_sigma=1):
"""Performs watershed segmentation of an intensity image after detection of connexe local minima.
A Gaussian smoothing is applied prior to local minima detection to avoid detecting to many small local minima due to noise.
The (stretched/equalized) smoothed intensity image is used for seed detection (connexe local minima) AND watershed.
Parameters
----------
intensity_image : timagetk.SpatialImage or timagetk.MultiChannelImage
The intensity image to segment.
h_min : int
Height-minima value to use for seed detection
sigma : float, optional
Sigma value used with Gaussian smoothing
equalize_hist : bool, optional
If ``True`` (default), performs *adaptive histogram equalization* to image prior to any other step.
contrast_stretch : bool, optional
If ``True`` (default is ``False``), performs *contrast stretching* to image prior to any other step.
min_size : float, optional
A minimal volume or area, in real units, to accept a label, ``50`` µm³ by default.
If equal or lower, remove it from the seed image and re-run the watershed.
max_size : float, optional
A maximal volume or area, in real units, to accept a label, ``None`` by default.
If equal or superior, remove it from the seed image and re-run the watershed.
cell_sigma : int, optional
Apply a "cell smoothing" with a sigma in voxel units, ``1`` by default.
Set to ``0`` to avoid this step.
Warnings, even small values (like 2) can drastically change the topology!
Other Parameters
----------------
channel : str
If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm.
Returns
-------
LabelledImage
The segmented image.
LabelledImage
The seeds image.
dict
Dictionary summarizing the process with used parameters.
Notes
-----
We recommend performing automatic global contrast stretching OR adaptive histogram equalization to improve overall segmentation quality.
It is done prior to the Gaussian smoothing step.
See Also
--------
timagetk.algorithms.exposure.equalize_adapthist
timagetk.algorithms.exposure.global_contrast_stretch
timagetk.algorithms.linearfilter.gaussian_filter
Examples
--------
>>> from timagetk.io import imread
>>> from timagetk.io.util import shared_dataset
>>> from timagetk.tasks.segmentation import watershed_segmentation
>>> from timagetk.visu.stack import stack_browser
>>> img_path = shared_dataset("p58")[0]
>>> image = imread(img_path)
>>> # Example 1 - Basic example:
>>> seg_img, seeds, _ = watershed_segmentation(image, 5)
>>> from timagetk.visu.util import greedy_colormap
>>> b = stack_browser(seg_img, cmap=greedy_colormap(seg_img))
>>> # Example 2 - Effect of cell sigma smoothing:
>>> from timagetk.algorithms.reconstruction import project_segmentation
>>> from timagetk.visu.mplt import grayscale_imshow
>>> seg_img_0, _, _ = watershed_segmentation(image, 30, cell_sigma=0)
>>> seg_img_1, _, _ = watershed_segmentation(image, 30, cell_sigma=1)
>>> seg_img_2, _, _ = watershed_segmentation(image, 30, cell_sigma=2)
>>> seg_img_3, _, _ = watershed_segmentation(image, 30, cell_sigma=3)
>>> z_proj = [project_segmentation(seg_img_0)]
>>> z_proj += [project_segmentation(seg_img_1)]
>>> z_proj += [project_segmentation(seg_img_2)]
>>> z_proj += [project_segmentation(seg_img_3)]
>>> grayscale_imshow(z_proj, title=["cell_sigma=0", "cell_sigma=1", "cell_sigma=2", "cell_sigma=3"], cmap='glasbey', val_range='auto')
>>> # Example 3 - Show volumes distribution (in real units)
>>> from timagetk import TissueImage3D
>>> seg_img, _, _ = watershed_segmentation(image, 15, min_vol=0)
>>> tissue = TissueImage3D(seg_img, background=1, not_a_label=0)
>>> volumes = tissue.cells.volume(real=True)
>>> volumes.pop(1)
>>> import matplotlib.pyplot as plt
>>> fig = plt.figure()
>>> plt.hist(volumes.values(), range=(0, 500), bins=100) # crop the range for better output
>>> plt.xlabel("Volumes (µm³)")
>>> plt.ylabel("Frequency")
>>> plt.show()
"""
intensity_image, process = watershed_preprocessing(intensity_image, sigma, equalize_hist, contrast_stretch)
segmented_image, seeds_image, process = seeded_watershed(intensity_image, h_min, process=process)
segmented_image, seeds_image, process = watershed_postprocessing(intensity_image, segmented_image, seeds_image,
min_size=min_size, max_size=max_size,
cell_sigma=cell_sigma, process=process)
return segmented_image, seeds_image, process
[docs]
@singlechannel_wrapper
def background_detection(image, threshold=None, sigma=None, equalize_hist=True, contrast_stretch=False, **kwargs):
"""Detect the background position, *i.e.* not part of the object.
Parameters
----------
image : timagetk.SpatialImage or timagetk.MultiChannelImage
The intensity image to segment.
threshold : int, optional
The threshold to use for bacground detection, if not defined use the ``li_threshold`` method to estimate it.
sigma : float, optional
Sigma value used with Gaussian smoothing
equalize_hist : bool, optional
If ``True`` (default), performs *adaptive histogram equalization* to image prior to any other step.
contrast_stretch : bool, optional
If ``True`` (default is ``False``), performs *contrast stretching* to image prior to any other step.
sigma : float, optional
Sigma value, in real units, used to smooth the input image, use `0` to skip.
If ``None`` (default), automatically set to ``1.5 * max(image.voxelsize)``.
Other Parameters
----------------
channel : str
If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm.
Returns
-------
timagetk.components.spatial_image.SpatialImage
Mask image (binary) where ``1`` indicate background position.
See Also
--------
timagetk.array_util.guess_intensity_threshold
scipy.ndimage.binary_fill_holes
Notes
-----
This strategy assumes the background is where "there is no signal".
We fill holes in the detected structure, slice by slice, moving along the z-axis.
Example
-------
>>> from timagetk.io import imread
>>> from timagetk.io.util import shared_dataset
>>> from timagetk.tasks.segmentation import background_detection
>>> img_path = shared_dataset("p58")[0]
>>> image = imread(img_path)
>>> bkgd_img, _ = background_detection(image)
>>> from timagetk.visu.stack import stack_browser
>>> b = stack_browser(bkgd_img, val_range=[0,1])
"""
from timagetk import SpatialImage
from timagetk.components.image import get_image_attributes
attr = get_image_attributes(image, exclude=["dtype"])
image, process = watershed_preprocessing(image, sigma, equalize_hist, contrast_stretch)
if threshold is None:
# Defines the threshold value if none was given
threshold = guess_intensity_threshold(image)
process["threshold"] = threshold
th_image = connected_components(image, low_threshold=threshold)
th_arr = th_image.get_array() != 0
th_arr = binary_fill_holes(th_arr)
# Fill holes by z-slices:
n_sl = th_arr.shape[0]
for sl in range(n_sl):
th_arr[sl, :, :] = binary_fill_holes(th_arr[sl, :, :])
# Invert image to get background "mask":
th_arr = th_arr == 0
return SpatialImage(th_arr.astype('uint8'), **attr), process