Source code for timagetk.tasks.segmentation

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#  Copyright (c) 2022 Univ. Lyon, ENS de Lyon, UCB Lyon 1, CNRS, INRAe, Inria
#  All rights reserved.
#  This file is part of the TimageTK library, and is released under the "GPLv3"
#  license. Please see the LICENSE.md file that should have been included as
#  part of this package.
# ------------------------------------------------------------------------------

import numpy as np
from scipy.ndimage import binary_fill_holes

from timagetk import TissueImage2D
from timagetk import TissueImage3D
from timagetk.algorithms.connexe import connected_components
from timagetk.algorithms.exposure import equalize_adapthist
from timagetk.algorithms.exposure import global_contrast_stretch
from timagetk.algorithms.linearfilter import gaussian_filter
from timagetk.algorithms.morphology import morphology_oc_alternate_sequential_filter
from timagetk.algorithms.regionalext import regional_extrema
from timagetk.algorithms.trsf import apply_trsf
from timagetk.algorithms.watershed import watershed
from timagetk.array_util import guess_intensity_threshold
from timagetk.bin.logger import get_logger
from timagetk.tasks.decorators import singlechannel_wrapper

log = get_logger(__name__)


[docs] @singlechannel_wrapper def watershed_preprocessing(image, sigma=None, equalize_hist=True, contrast_stretch=False, **kwargs): """Performs the pre-processing step prior to watershed segmentation of an intensity image. A Gaussian smoothing is applied prior to local minima detection to avoid detecting to many small local minima due to noise. Parameters ---------- image : timagetk.SpatialImage or timagetk.MultiChannelImage The intensity image to segment. sigma : float, optional Sigma value, in real units, used to smooth the input image, use `0` to skip. If ``None`` (default), automatically set to ``1.5 * max(image.voxelsize)``. equalize_hist : bool, optional If ``True`` (default), performs *adaptive histogram equalization* to image prior to any other step. contrast_stretch : bool, optional If ``True`` (default is ``False``), performs *contrast stretching* to image prior to any other step. Other Parameters ---------------- channel : str If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm. process : dict A dictionary keeping track of the algorithms and their parameters. Returns ------- timagetk.SpatialImage The segmented image dict Dictionary summarizing the process with used parameters Notes ----- We recommend performing automatic global contrast stretching OR adaptive histogram equalization to improve overall segmentation quality. It is done prior to the Gaussian smoothing step. See Also -------- timagetk.algorithms.exposure.equalize_adapthist timagetk.algorithms.exposure.global_contrast_stretch timagetk.algorithms.linearfilter.gaussian_filter Examples -------- >>> from timagetk.io import imread >>> from timagetk.io.util import shared_dataset >>> from timagetk.tasks.segmentation import watershed_preprocessing >>> from timagetk.visu.stack import stack_browser >>> img_path = shared_dataset("p58")[0] >>> image = imread(img_path) >>> proc_image, _ = watershed_preprocessing(image, 1.0, equalize_hist=True) >>> stack_browser(proc_image, cmap='gray') >>> from timagetk import SpatialImage >>> from skimage.morphology import white_tophat, ball >>> noise_img = SpatialImage(white_tophat(image.get_array(), ball(3)), voxelsize=image.voxelsize) >>> stack_browser(noise_img, cmap='gray') >>> denoise_img = SpatialImage(image.get_array() - noise_img, voxelsize=image.voxelsize) >>> stack_browser(denoise_img, cmap='gray') >>> proc_image, _ = watershed_preprocessing(denoise_img, 1.0, equalize_hist=True) >>> stack_browser(proc_image, cmap='gray') >>> from timagetk import MultiChannelImage >>> im_url = "https://zenodo.org/record/3737795/files/qDII-CLV3-PIN1-PI-E35-LD-SAM4.czi" >>> ch_names = ['DII-VENUS-N7', 'pPIN1:PIN1-GFP', 'Propidium Iodide', 'pRPS5a:TagBFP-SV40', 'pCLV3:mCherry-N7'] >>> colors = ['yellow', 'green', 'red', 'blue', 'purple'] >>> image = imread(im_url, channel_names=ch_names) >>> proc_image, _ = watershed_preprocessing(image, 1.0, equalize_hist=True, channel='Propidium Iodide') """ process = kwargs.get('process', {}) # - Performs histogram equalization of intensity image if required if equalize_hist: log.info("Pre-processing: Adaptive Histogram Equalisation...") image = equalize_adapthist(image) process["equalized_hist"] = True # - Performs contrasts stretching of intensity images if required if contrast_stretch: log.info("Pre-processing: Global Contrast Stretching...") image = global_contrast_stretch(image) process["contrast_stretch"] = True # - Closing Opening alternate sequential filter on intensity image if required oc_asf = kwargs.get("oc_asf", 0) if oc_asf >= 1: log.info("Pre-processing: Opening-Closing alternate sequential filter...") image = morphology_oc_alternate_sequential_filter(image, max_radius=oc_asf) # Defines the minimal acceptable sigma value min_sigma_val = max(image.get_voxelsize()) # If no sigma was given, compute one if sigma is None: sigma = 1.5 * min_sigma_val else: if sigma < min_sigma_val: log.error(f"Given `sigma` ({sigma}) is inferior to image max voxel-size ({min_sigma_val}!") sigma = 1.5 * min_sigma_val log.info(f"Changed `sigma` value to {sigma} (1.5 * max(voxel-size)!") log.info(f"Gaussian smoothing with sigma={sigma}...") image = gaussian_filter(image, sigma=sigma, real=True) process["sigma"] = sigma return image, process
[docs] @singlechannel_wrapper def seeded_watershed(intensity_image, h_min, **kwargs): """Performs a seeded watershed of an intensity image. Parameters ---------- intensity_image : timagetk.SpatialImage or timagetk.MultiChannelImage The intensity image to segment. h_min: int Height-minima value to use for seed detection. Other Parameters ---------------- channel : str If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm. labelchoice : {"first", "min", "most"} How to deal with "labels conflicts", *i.e.* where several labels meet. process : dict A dictionary keeping track of the algorithms and their parameters. Returns ------- LabelledImage The segmented image LabelledImage The seeds image dict Dictionary summarizing the process with used parameters Notes ----- Explanations of available ``labelchoice`` keyword argument: - **first**: the first label wins (default); - **min**: the less represented label wins; - **most**: the most represented label wins; Examples -------- >>> from timagetk import TissueImage3D >>> from timagetk.io import imread >>> from timagetk.io.util import shared_dataset >>> from timagetk.tasks.segmentation import seeded_watershed >>> from timagetk.visu.stack import stack_browser >>> img_path = shared_dataset("p58")[0] >>> image = imread(img_path) >>> # Example 1 - Basic example: >>> seg_img, _, _ = seeded_watershed(image, 15) >>> b = stack_browser(TissueImage3D(seg_img, background=1), cmap='glasbey') >>> # Example 2 - Add a pre-processing step: >>> from timagetk.tasks.segmentation import watershed_preprocessing >>> proc_image, _ = watershed_preprocessing(image,0.5,equalize_hist=True) >>> seg_img, _, _ = seeded_watershed(proc_image, 25) >>> b = stack_browser(TissueImage3D(seg_img, background=1), cmap='glasbey') """ process = kwargs.get('process', {}) # - Local minima detection: log.info("Local minima detection...") ext_img = regional_extrema(intensity_image, height=h_min, method='minima') process["regionalext"] = {'method': 'minima', 'height': h_min} # - Connexe component labelling of detected local minima: log.info("Connexe local minima labelling...") seeds_image = connected_components(ext_img, connectivity=18, low_threshold=1, high_threshold=h_min) process["connexe"] = {'connectivity': 18, 'low_threshold': 1, 'high_threshold': h_min} # Print some stuff about seed image n_seeds = len(np.unique(seeds_image)) - 1 # '0' is in the list! log.info("Detected {} seeds!".format(n_seeds)) # - Watershed segmentation: log.info("Watershed segmentation...") segmented_image = watershed(intensity_image, seeds_image, labelchoice=kwargs.get("labelchoice", None)) return segmented_image, seeds_image, process
[docs] @singlechannel_wrapper def seed_detection(intensity_image, h_min, **kwargs): """Performs a seed detection an intensity image. Parameters ---------- intensity_image : timagetk.SpatialImage or timagetk.MultiChannelImage The intensity image to segment h_min : int Height-minima value to use for seed detection Other Parameters ---------------- channel : str If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm. process : dict A dictionary keeping track of the algorithms and their parameters. Returns ------- LabelledImage The seeds image. dict Dictionary summarizing the process with used parameters """ process = kwargs.get('process', {}) # - Local minima detection: log.info("Local minima detection...") ext_img = regional_extrema(intensity_image, method='minima', height=h_min) process["regionalext"] = {'method': 'minima', 'height': h_min} # - Connexe component labelling of detected local minima: log.info("Connexe local minima labelling...") seeds_image = connected_components(ext_img, connectivity=18, low_threshold=1, high_threshold=h_min) process["connexe"] = {'connectivity': 18, 'low_threshold': 1, 'high_threshold': h_min} # Print some stuff about seed image n_seeds = len(np.unique(seeds_image)) - 1 # '0' is in the list! log.info("Detected {} seeds!".format(n_seeds)) return seeds_image, process
[docs] def watershed_postprocessing(intensity_image, segmented_image, seeds_image, min_size=None, max_size=None, cell_sigma=1, **kwargs): """Performs the post-processing step after watershed segmentation of an intensity image. Parameters ---------- intensity_image : timagetk.components.spatial_image.SpatialImage The intensity image to segment segmented_image : timagetk.LabelledImage The segmented image from the seeded watershed step seeds_image: timagetk.LabelledImage The seeds image from the seeded watershed step min_size : float, optional A minimal area (if 2D) or volume (if 3D), in real units, to accept a label, ``None`` by default. If equal or lower, remove it from the seed image and re-run the watershed. max_size : float, optional A maximal area (if 2D) or volume (if 3D), in real units, to accept a label, ``None`` by default. If equal or superior, remove it from the seed image and re-run the watershed. cell_sigma : int, optional Apply a "cell smoothing" with a sigma in voxel units, ``1`` by default. Set to ``0`` to avoid this step. Warnings, even small values (like 2) can drastically change the topology! Other Parameters ---------------- sizes_data : dict Dictionary of size data to use. labelchoice : {"first", "min", "most"} How to deal with "labels conflicts", *i.e.* where several labels meet. process : dict A dictionary keeping track of the algorithms and their parameters. Returns ------- LabelledImage The segmented image LabelledImage The seeds image dict Dictionary summarizing the process with used parameters Notes ----- We recommend performing this step to remove too small cells (un-realistic) and smooth the cells interfaces. Explanations of available ``labelchoice`` keyword argument: - **first**: the first label wins (default); - **min**: the less represented label wins; - **most**: the most represented label wins; Examples -------- >>> from timagetk.io import imread >>> from timagetk.io.util import shared_dataset >>> from timagetk.tasks.segmentation import watershed_preprocessing >>> from timagetk.tasks.segmentation import seeded_watershed >>> from timagetk.tasks.segmentation import watershed_postprocessing >>> from timagetk.visu.stack import stack_browser >>> img_path = shared_dataset("p58")[0] >>> image = imread(img_path) >>> proc_image, _ = watershed_preprocessing(image,0.5,equalize_hist=True) >>> seg_img, seeds, _ = seeded_watershed(proc_image, 25) >>> seg_img, _, _ = watershed_postprocessing(proc_image,seg_img,seeds,min_size=50,cell_sigma=1) >>> b = stack_browser(seg_img, cmap='glasbey') """ not_a_label = kwargs.get('not_a_label', 0) background = kwargs.get('background', 1) process = kwargs.get('process', {}) sizes = kwargs.get('sizes_data', {}) if sizes == {} and (min_size is not None or max_size is not None): # - Select labels based on their volume (if image is 3D): if segmented_image.is3D(): tissue = TissueImage3D(segmented_image, background=background, not_a_label=not_a_label) sizes = tissue.cells.volume(real=True) # - Select labels based on their area (if image is 2D): elif segmented_image.is2D(): tissue = TissueImage2D(segmented_image, background=background, not_a_label=not_a_label) sizes = tissue.cells.area(real=True) if background in sizes: sizes.pop(background) if not_a_label in sizes: sizes.pop(not_a_label) small_cells = [] if min_size is not None: small_cells = [cid for cid, vol in sizes.items() if vol <= min_size] big_cells = [] if max_size is not None: big_cells = [cid for cid, vol in sizes.items() if vol >= max_size] if small_cells != []: process["min_cell_volume"] = min_size # Remove labels corresponding to small cells from seed image and re-run watershed log.info(f"A list of {len(small_cells)} small cells (volume<={min_size}µm³) has been detected!") seeds_image.remove_labels_from_image(small_cells) if small_cells != []: log.info("Filtered watershed segmentation...") segmented_image = watershed(intensity_image, seeds_image, labelchoice=kwargs.get("labelchoice", None)) # - Cell smoothing step: if cell_sigma >= 1: process["cell_sigma"] = cell_sigma segmented_image = apply_trsf(segmented_image, template_img=segmented_image, interpolation='cellbased', cell_based_sigma=cell_sigma) if big_cells != []: from timagetk.components.labelled_image import relabel_from_mapping process["max_cell_volume"] = max_size # Remove labels corresponding to big cells from seed image and re-run watershed log.info(f"A list of {len(big_cells)} big cells (volume>={max_size}µm³) has been detected!") segmented_image = relabel_from_mapping(segmented_image, {cid: not_a_label for cid in big_cells}, clear_unmapped=False) label_co_radius = kwargs.get('label_co_asf', 0) if label_co_radius >= 1: from timagetk.algorithms.morphology import label_filtering_coc_alternate_sequential_filter segmented_image = label_filtering_coc_alternate_sequential_filter(segmented_image, max_radius=label_co_radius) return segmented_image, seeds_image, process
[docs] @singlechannel_wrapper def watershed_segmentation(intensity_image, h_min, sigma=None, equalize_hist=True, contrast_stretch=False, min_size=50, max_size=None, cell_sigma=1): """Performs watershed segmentation of an intensity image after detection of connexe local minima. A Gaussian smoothing is applied prior to local minima detection to avoid detecting to many small local minima due to noise. The (stretched/equalized) smoothed intensity image is used for seed detection (connexe local minima) AND watershed. Parameters ---------- intensity_image : timagetk.SpatialImage or timagetk.MultiChannelImage The intensity image to segment. h_min : int Height-minima value to use for seed detection sigma : float, optional Sigma value used with Gaussian smoothing equalize_hist : bool, optional If ``True`` (default), performs *adaptive histogram equalization* to image prior to any other step. contrast_stretch : bool, optional If ``True`` (default is ``False``), performs *contrast stretching* to image prior to any other step. min_size : float, optional A minimal volume or area, in real units, to accept a label, ``50`` µm³ by default. If equal or lower, remove it from the seed image and re-run the watershed. max_size : float, optional A maximal volume or area, in real units, to accept a label, ``None`` by default. If equal or superior, remove it from the seed image and re-run the watershed. cell_sigma : int, optional Apply a "cell smoothing" with a sigma in voxel units, ``1`` by default. Set to ``0`` to avoid this step. Warnings, even small values (like 2) can drastically change the topology! Other Parameters ---------------- channel : str If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm. Returns ------- LabelledImage The segmented image. LabelledImage The seeds image. dict Dictionary summarizing the process with used parameters. Notes ----- We recommend performing automatic global contrast stretching OR adaptive histogram equalization to improve overall segmentation quality. It is done prior to the Gaussian smoothing step. See Also -------- timagetk.algorithms.exposure.equalize_adapthist timagetk.algorithms.exposure.global_contrast_stretch timagetk.algorithms.linearfilter.gaussian_filter Examples -------- >>> from timagetk.io import imread >>> from timagetk.io.util import shared_dataset >>> from timagetk.tasks.segmentation import watershed_segmentation >>> from timagetk.visu.stack import stack_browser >>> img_path = shared_dataset("p58")[0] >>> image = imread(img_path) >>> # Example 1 - Basic example: >>> seg_img, seeds, _ = watershed_segmentation(image, 5) >>> from timagetk.visu.util import greedy_colormap >>> b = stack_browser(seg_img, cmap=greedy_colormap(seg_img)) >>> # Example 2 - Effect of cell sigma smoothing: >>> from timagetk.algorithms.reconstruction import project_segmentation >>> from timagetk.visu.mplt import grayscale_imshow >>> seg_img_0, _, _ = watershed_segmentation(image, 30, cell_sigma=0) >>> seg_img_1, _, _ = watershed_segmentation(image, 30, cell_sigma=1) >>> seg_img_2, _, _ = watershed_segmentation(image, 30, cell_sigma=2) >>> seg_img_3, _, _ = watershed_segmentation(image, 30, cell_sigma=3) >>> z_proj = [project_segmentation(seg_img_0)] >>> z_proj += [project_segmentation(seg_img_1)] >>> z_proj += [project_segmentation(seg_img_2)] >>> z_proj += [project_segmentation(seg_img_3)] >>> grayscale_imshow(z_proj, title=["cell_sigma=0", "cell_sigma=1", "cell_sigma=2", "cell_sigma=3"], cmap='glasbey', val_range='auto') >>> # Example 3 - Show volumes distribution (in real units) >>> from timagetk import TissueImage3D >>> seg_img, _, _ = watershed_segmentation(image, 15, min_vol=0) >>> tissue = TissueImage3D(seg_img, background=1, not_a_label=0) >>> volumes = tissue.cells.volume(real=True) >>> volumes.pop(1) >>> import matplotlib.pyplot as plt >>> fig = plt.figure() >>> plt.hist(volumes.values(), range=(0, 500), bins=100) # crop the range for better output >>> plt.xlabel("Volumes (µm³)") >>> plt.ylabel("Frequency") >>> plt.show() """ intensity_image, process = watershed_preprocessing(intensity_image, sigma, equalize_hist, contrast_stretch) segmented_image, seeds_image, process = seeded_watershed(intensity_image, h_min, process=process) segmented_image, seeds_image, process = watershed_postprocessing(intensity_image, segmented_image, seeds_image, min_size=min_size, max_size=max_size, cell_sigma=cell_sigma, process=process) return segmented_image, seeds_image, process
[docs] @singlechannel_wrapper def background_detection(image, threshold=None, sigma=None, equalize_hist=True, contrast_stretch=False, **kwargs): """Detect the background position, *i.e.* not part of the object. Parameters ---------- image : timagetk.SpatialImage or timagetk.MultiChannelImage The intensity image to segment. threshold : int, optional The threshold to use for bacground detection, if not defined use the ``li_threshold`` method to estimate it. sigma : float, optional Sigma value used with Gaussian smoothing equalize_hist : bool, optional If ``True`` (default), performs *adaptive histogram equalization* to image prior to any other step. contrast_stretch : bool, optional If ``True`` (default is ``False``), performs *contrast stretching* to image prior to any other step. sigma : float, optional Sigma value, in real units, used to smooth the input image, use `0` to skip. If ``None`` (default), automatically set to ``1.5 * max(image.voxelsize)``. Other Parameters ---------------- channel : str If a ``MultiChannelImage`` is used as input `image`, select the channel to use with this algorithm. Returns ------- timagetk.components.spatial_image.SpatialImage Mask image (binary) where ``1`` indicate background position. See Also -------- timagetk.array_util.guess_intensity_threshold scipy.ndimage.binary_fill_holes Notes ----- This strategy assumes the background is where "there is no signal". We fill holes in the detected structure, slice by slice, moving along the z-axis. Example ------- >>> from timagetk.io import imread >>> from timagetk.io.util import shared_dataset >>> from timagetk.tasks.segmentation import background_detection >>> img_path = shared_dataset("p58")[0] >>> image = imread(img_path) >>> bkgd_img, _ = background_detection(image) >>> from timagetk.visu.stack import stack_browser >>> b = stack_browser(bkgd_img, val_range=[0,1]) """ from timagetk import SpatialImage from timagetk.components.image import get_image_attributes attr = get_image_attributes(image, exclude=["dtype"]) image, process = watershed_preprocessing(image, sigma, equalize_hist, contrast_stretch) if threshold is None: # Defines the threshold value if none was given threshold = guess_intensity_threshold(image) process["threshold"] = threshold th_image = connected_components(image, low_threshold=threshold) th_arr = th_image.get_array() != 0 th_arr = binary_fill_holes(th_arr) # Fill holes by z-slices: n_sl = th_arr.shape[0] for sl in range(n_sl): th_arr[sl, :, :] = binary_fill_holes(th_arr[sl, :, :]) # Invert image to get background "mask": th_arr = th_arr == 0 return SpatialImage(th_arr.astype('uint8'), **attr), process