Source code for histomicstk.saliency.cellularity_detection_superpixels

"""
Created on Mon Sep 23 21:17:43 2019.

@author: mtageld
"""
import numpy as np
from PIL import Image
from skimage.color import rgb2gray

from histomicstk.annotations_and_masks.annotation_and_mask_utils import \
    get_image_from_htk_response
from histomicstk.annotations_and_masks.masks_to_annotations_handler import (
    get_annotation_documents_from_contours, get_contours_from_mask)
from histomicstk.features.compute_haralick_features import \
    compute_haralick_features
from histomicstk.features.compute_intensity_features import \
    compute_intensity_features
from histomicstk.preprocessing.color_conversion import lab_mean_std
from histomicstk.preprocessing.color_normalization import reinhard
from histomicstk.saliency.tissue_detection import (
    _deconv_color, get_slide_thumbnail,
    get_tissue_boundary_annotation_documents, get_tissue_mask)
from histomicstk.utils.general_utils import Base_HTK_Class

Image.MAX_IMAGE_PIXELS = None


[docs] class CD_single_tissue_piece: """Detect cellular regions in a single tissue piece (internal).""" def __init__(self, cd, tissue_mask, monitorPrefix=''): """Detect cellularity in one tissue piece (Internal). Arguments: --------- cd : object Cellularity_detector_superpixels instance tissue_mask : np array (mxn) mask of the tissue piece at cd.MAG magnification monitorPrefix : str Text to prepend to printed statements """ self.cd = cd self.tissue_mask = 0 + tissue_mask self.monitorPrefix = monitorPrefix
[docs] def run(self): """Get cellularity and optionally visualize on DSA.""" # get cellularity self.restrict_mask_to_single_tissue_piece() self.cd._print2('%s: set_tissue_rgb()' % self.monitorPrefix) self.set_tissue_rgb() self.cd._print2('%s: set_superpixel_mask()' % self.monitorPrefix) self.set_superpixel_mask() self.cd._print2('%s: set_superpixel_features()' % self.monitorPrefix) self.set_superpixel_features() self.cd._print2('%s: set_superpixel_assignment()' % self.monitorPrefix) self.set_superpixel_assignment() self.cd._print2('%s: assign_cellularity_scores()' % self.monitorPrefix) self.assign_cellularity_scores() # visualize if self.cd.visualize_spixels or self.cd.visualize_contiguous: self.assign_colors_to_spixel_clusters() if self.cd.visualize_spixels: self.cd._print2( '%s: visualize_individual_superpixels()' % self.monitorPrefix) self.visualize_individual_superpixels() if self.cd.visualize_contiguous: self.cd._print2( '%s: visualize_contiguous_superpixels()' % self.monitorPrefix) self.visualize_contiguous_superpixels()
[docs] def restrict_mask_to_single_tissue_piece(self): """Only keep relevant part of slide mask.""" # find coordinates at scan magnification tloc = np.argwhere(self.tissue_mask) F = self.cd.slide_info['F_tissue'] self.ymin, self.xmin = (int(j) for j in np.min(tloc, axis=0) * F) self.ymax, self.xmax = (int(j) for j in np.max(tloc, axis=0) * F) self.tissue_mask = self.tissue_mask[ int(self.ymin / F): int(self.ymax / F), int(self.xmin / F): int(self.xmax / F)]
[docs] def set_tissue_rgb(self): """Load RGB from server for single tissue piece.""" # load RGB for this tissue piece at saliency magnification getStr = '/item/%s/tiles/region?left=%d&right=%d&top=%d&bottom=%d&encoding=PNG' % ( self.cd.slide_id, self.xmin, self.xmax, self.ymin, self.ymax, ) + '&magnification=%d' % self.cd.MAG resp = self.cd.gc.get(getStr, jsonResp=False) self.tissue_rgb = get_image_from_htk_response(resp) # color normalization if desired if 'main' in self.cd.cnorm_params.keys(): self.tissue_rgb = np.uint8(reinhard( im_src=self.tissue_rgb, target_mu=self.cd.cnorm_params['main']['mu'], target_sigma=self.cd.cnorm_params['main']['sigma']))
[docs] def set_superpixel_mask(self): """Use Simple Linear Iterative Clustering (SLIC) to get superpixels.""" from skimage.segmentation import slic from skimage.transform import resize # Get superpixel size and number spixel_size = self.cd.spixel_size_baseMag * ( self.cd.MAG / self.cd.slide_info['magnification']) n_spixels = int( self.tissue_rgb.shape[0] * self.tissue_rgb.shape[1] / spixel_size) # get superpixel mask # optionally use grayscale instead of RGB -- seems more robust to # color variations and sometimes gives better results if self.cd.use_grayscale: try: self.spixel_mask = slic( rgb2gray(self.tissue_rgb), n_segments=n_spixels, compactness=self.cd.compactness, channel_axis=None) except Exception: self.spixel_mask = slic( rgb2gray(self.tissue_rgb), n_segments=n_spixels, compactness=self.cd.compactness) else: self.spixel_mask = slic( self.tissue_rgb, n_segments=n_spixels, compactness=self.cd.compactness) # restrict to tissue mask tmask = resize( self.tissue_mask, output_shape=self.spixel_mask.shape, order=0, preserve_range=True, anti_aliasing=False) self.spixel_mask[tmask == 0] = 0
[docs] def set_superpixel_features(self): """Get superpixel features.""" from pandas import concat from skimage.measure import regionprops assert (self.cd.use_intensity or self.cd.use_texture) # Possibly deconvolvve to get hematoxylin channel (cellular areas) # hematoxylin channel return shows MINIMA so we invert if self.cd.deconvolve: Stains, channel = _deconv_color(self.tissue_rgb) tissue_htx = 255 - Stains[..., channel] else: tissue_htx = rgb2gray(self.tissue_rgb) # calculate features from superpixels rprops = regionprops(self.spixel_mask) fdata_list = [] if self.cd.use_intensity: fdata_list.append(compute_intensity_features( im_label=self.spixel_mask, im_intensity=tissue_htx, rprops=rprops)) if self.cd.use_texture: fdata_list.append(compute_haralick_features( im_label=self.spixel_mask, im_intensity=tissue_htx, rprops=rprops)) self.fdata = concat(fdata_list, axis=1) if self.cd.keep_feats is not None: self.fdata = self.fdata.loc[:, self.cd.keep_feats] # Index is corresponding pixel value in the superpixel mask # IMPORTANT: this assumes that regionprops output is sorted by unique # pixel values in label mask, which it is by default self.fdata.index = set(np.unique(self.spixel_mask)) - {0}
[docs] def set_superpixel_assignment(self): """Fit gaussian mixture model to features and get assignment.""" from sklearn.mixture import GaussianMixture mmodel = GaussianMixture(n_components=self.cd.n_gaussian_components) self.spixel_labels = mmodel.fit_predict(self.fdata.values) + 1
[docs] def assign_cellularity_scores(self): """Assign cellularity scores to spixel clusters.""" assert self.cd.use_intensity, 'We need intensity to rank cellularity.' assert self.cd.deconvolve, \ 'We must use hematoxyling channel to rank by cellularity.' self.cluster_props = {} self.fdata.loc[:, 'cluster'] = self.spixel_labels for clid in np.unique(self.spixel_labels): self.cluster_props[clid] = { 'cellularity': int(np.median(self.fdata.loc[ self.fdata.loc[:, 'cluster'] == clid, 'Intensity.Median']) / 255 * 100), }
[docs] def assign_colors_to_spixel_clusters(self): """Assign RGB color string to cellularity clusters.""" # normalize values by given value, else by max for each tissue piece if self.cd.max_cellularity is not None: max_cellularity = self.cd.max_cellularity else: max_cellularity = max( j['cellularity'] for _, j in self.cluster_props.items() ) # Assign rgb string for clid in np.unique(self.spixel_labels): cellularity = min( self.cluster_props[clid]['cellularity'], max_cellularity) rgb = self.cd.cMap(int(cellularity / max_cellularity * 255))[:-1] rgb = [int(255 * j) for j in rgb] self.cluster_props[clid]['color'] = 'rgb(%d,%d,%d)' % tuple(rgb)
[docs] def visualize_individual_superpixels(self): """Visualize individual spixels, color-coded by cellularity.""" # Define GTCodes dataframe from pandas import DataFrame GTCodes_df = DataFrame(columns=['group', 'GT_code', 'color']) for spval, sp in self.fdata.iterrows(): spstr = 'spixel-%d_cellularity-%d' % ( spval, self.cluster_props[sp['cluster']]['cellularity']) GTCodes_df.loc[spstr, 'group'] = spstr GTCodes_df.loc[spstr, 'GT_code'] = spval GTCodes_df.loc[spstr, 'color'] = \ self.cluster_props[sp['cluster']]['color'] # get contours df contours_df = get_contours_from_mask( MASK=self.spixel_mask, GTCodes_df=GTCodes_df, get_roi_contour=False, MIN_SIZE=0, MAX_SIZE=None, verbose=self.cd.verbose == 3, monitorPrefix=self.monitorPrefix) contours_df.loc[:, 'group'] = [ j.split('_')[-1] for j in contours_df.loc[:, 'group']] # get annotation docs annprops = { 'F': (self.ymax - self.ymin) / self.tissue_rgb.shape[0], 'X_OFFSET': self.xmin, 'Y_OFFSET': self.ymin, 'opacity': self.cd.opacity, 'lineWidth': self.cd.lineWidth, } annotation_docs = get_annotation_documents_from_contours( contours_df.copy(), docnamePrefix='spixel', annprops=annprops, annots_per_doc=1000, separate_docs_by_group=True, verbose=self.cd.verbose == 3, monitorPrefix=self.monitorPrefix) for didx, doc in enumerate(annotation_docs): self.cd._print2('%s: Posting doc %d of %d' % ( self.monitorPrefix, didx + 1, len(annotation_docs))) _ = self.cd.gc.post( '/annotation?itemId=' + self.cd.slide_id, json=doc)
[docs] def visualize_contiguous_superpixels(self): """Visualize contiguous spixels, color-coded by cellularity.""" from pandas import DataFrame # get cellularity cluster membership mask cellularity_mask = np.zeros(self.spixel_mask.shape) for spval, sp in self.fdata.iterrows(): cellularity_mask[self.spixel_mask == spval] = sp['cluster'] # Define GTCodes dataframe GTCodes_df = DataFrame(columns=['group', 'GT_code', 'color']) for spval, cp in self.cluster_props.items(): spstr = 'cellularity-%d' % (cp['cellularity']) GTCodes_df.loc[spstr, 'group'] = spstr GTCodes_df.loc[spstr, 'GT_code'] = spval GTCodes_df.loc[spstr, 'color'] = cp['color'] # get contours df contours_df = get_contours_from_mask( MASK=cellularity_mask, GTCodes_df=GTCodes_df, get_roi_contour=False, MIN_SIZE=0, MAX_SIZE=None, verbose=self.cd.verbose == 3, monitorPrefix=self.monitorPrefix) # get annotation docs annprops = { 'F': (self.ymax - self.ymin) / self.tissue_rgb.shape[0], 'X_OFFSET': self.xmin, 'Y_OFFSET': self.ymin, 'opacity': self.cd.opacity_contig, 'lineWidth': self.cd.lineWidth, } annotation_docs = get_annotation_documents_from_contours( contours_df.copy(), docnamePrefix='contig', annprops=annprops, annots_per_doc=1000, separate_docs_by_group=True, verbose=self.cd.verbose == 3, monitorPrefix=self.monitorPrefix) for didx, doc in enumerate(annotation_docs): self.cd._print2('%s: Posting doc %d of %d' % ( self.monitorPrefix, didx + 1, len(annotation_docs))) _ = self.cd.gc.post( '/annotation?itemId=' + self.cd.slide_id, json=doc)
[docs] class Cellularity_detector_superpixels (Base_HTK_Class): """Detect cellular regions in a slides by classifying superpixels. This uses Simple Linear Iterative Clustering (SLIC) to get superpixels at a low slide magnification to detect cellular regions. The first step of this pipeline detects tissue regions (i.e. individual tissue pieces) using the get_tissue_mask method of the histomicstk.saliency module. Then, each tissue piece is processed separately for accuracy and disk space efficiency. It is important to keep in mind that this does NOT rely on a tile iterator, but loads the entire tissue region (but NOT the whole slide) in memory and passes it on to skimage.segmentation.slic method. Once superpixels are segmented, the image is deconvolved and features are extracted from the hematoxylin channel. Features include intensity and possibly also texture features. Then, a mixed component Gaussian mixture model is fit to the features, and median intensity is used to rank superpixel clusters by 'cellularity' (since we are working with the hematoxylin channel). Additional functionality includes contour extraction to get the final segmentation boundaries of cellular regions and to visualize them in DSA using one's preferred colormap. """ def __init__(self, gc, slide_id, **kwargs): """Init Cellularity_Detector_Superpixels object. Arguments: --------- gc : object girder client object slide_id : str girder ID of slide verbose : int 0 - Do not print to screen 1 - Print only key messages 2 - Print everything to screen 3 - print everything including from inner functions monitorPrefix : str text to prepend to printed statements logging_savepath : str or None where to save run logs suppress_warnings : bool whether to suppress warnings cnorm_params : dict Reinhard color normalization parameters. Accepted keys: thumbnail and main (since thumbnail normalization is different from color normalization of tissue at target magnification. Each entry is a dict containing values for mu and sigma. This is either given here or can be set using self.set_color_normalization_values(). May be left unset if you do not want to normalize. get_tissue_mask_kwargs : dict kwargs for the get_tissue_mask() method. MAG : float magnification at which to detect cellularity spixel_size_baseMag : int approximate superpixel size at base (scan) magnification compactness : float compactness parameter for the SLIC method. Higher values result in more regular superpixels while smaller values are more likely to respect tissue boundaries. deconvolve : bool Whether to deconvolve and use hematoxylin channel for feature extraction. Must be True to ranks spixel clusters by cellularity. use_grayscale : bool If True, grayscale image is used with SLIC. May be more robust to color variations from slide to slide and more efficient. use_intensity : bool Whether to extract intensity features from the hematoxylin channel. This must be True to rank spuerpixel clusters by cellularity. use_texture : bool Whether to extract Haralick texture features from Htx channel. May not necessarily improve results when used in conjunction with intensity features. keep_feats : list Name of intensity features to use. See histomicstk.features.compute_intensity_features. Using fewer informative features may result in better gaussian mixture modeling results. n_gaussian_components : int no of gaussian mixture model components max_cellularity : int Range [0, 100] or None. If None, normalize visualization RGB values for each tissue piece separately, else normalize by given number. opacity : float opacity of superpixel polygons when posted to DSA. 0 (no opacity) is more efficient to render. opacity_contig : float opacity of contiguous region polygons when posted to DSA. 0 (no opacity) is more efficient to render. lineWidth : float width of line when displaying superpixel boundaries. cMap : object matplotlib color map to use when visualizing cellularity visualize_tissue_boundary : bool whether to visualize result from tissue detection component visualize_spixels : bool whether to visualize superpixels, color-coded by cellularity visualize_contiguous : bool whether to visualize contiguous cellular regions """ from matplotlib import cm default_attr = { # The following are already assigned defaults by Base_HTK_Class # 'verbose': 1, # 'monitorPrefix': "", # 'logging_savepath': None, # 'suppress_warnings': False, 'cnorm_params': {}, 'MAG': 3.0, 'get_tissue_mask_kwargs': { 'deconvolve_first': True, 'n_thresholding_steps': 1, 'sigma': 1.5, 'min_size': 500, }, 'spixel_size_baseMag': 256 * 256, 'compactness': 0.1, 'deconvolve': True, 'use_grayscale': True, 'use_intensity': True, 'use_texture': False, # 'keep_feats': None, # keep everything 'keep_feats': [ 'Intensity.Mean', 'Intensity.Median', 'Intensity.Std', 'Intensity.IQR', 'Intensity.HistEntropy', ], 'n_gaussian_components': 5, 'max_cellularity': None, 'opacity': 0, 'opacity_contig': 0.3, 'lineWidth': 3.0, 'cMap': cm.seismic, 'visualize_tissue_boundary': True, 'visualize_spixels': True, 'visualize_contiguous': True, } default_attr.update(kwargs) super().__init__( default_attr=default_attr) # set attribs self.gc = gc self.slide_id = slide_id
[docs] def run(self): """Run cellularity detection and optionally visualize result. This runs the cellularity detection +/- visualization pipeline and returns a list of CD_single_tissue_piece objects. Each object has the following attributes tissue_mask : np array mask of where tissue is at target magnification ymin : int min y coordinate at base (scan) magnification xmin : int min x coordinate at base (scan) magnification ymax : int max y coordinate at base (scan) magnification xmax : int max x coordinate at base (scan) magnification spixel_mask : np array np array where each unique value represents one superpixel fdata : pandas DataFrame features extracted for each superpixel. Index corresponds to values in the spixel_mask. This includes a 'cluster' column indicatign which cluster this superpixel belongs to. cluster_props : dict properties of each superpixel cluster, including its assigned cellularity score. """ if (len(self.cnorm_params) == 0) and (not self.suppress_warnings): input(""" %s: WARNING!! Consider running set_color_normalization_values() first, using what='thumbnail' and/or what='main' before running this method. Continue anyway?""" % self.monitorPrefix) # get mask, each unique value is a single tissue piece self._print1( '%s: set_slide_info_and_get_tissue_mask()' % self.monitorPrefix) labeled = self.set_slide_info_and_get_tissue_mask() # Go through tissue pieces and do run sequence unique_tvals = list(set(np.unique(labeled)) - {0}) tissue_pieces = [None for _ in range(len(unique_tvals))] for idx, tval in enumerate(unique_tvals): monitorPrefix = '%s: Tissue piece %d of %d' % ( self.monitorPrefix, idx + 1, len(unique_tvals)) self._print1(monitorPrefix) tissue_pieces[idx] = CD_single_tissue_piece( self, tissue_mask=labeled == tval, monitorPrefix=monitorPrefix) tissue_pieces[idx].run() del tissue_pieces[idx].tissue_rgb # too much space return tissue_pieces
[docs] def set_color_normalization_values( self, mu=None, sigma=None, ref_image_path=None, what='main'): """Set color normalization values for thumbnail or main image.""" assert ( all(j is not None for j in (mu, sigma)) or ref_image_path is not None ), 'You must provide mu & sigma values or ref. image to get them.' assert what in ('thumbnail', 'main') if ref_image_path is not None: from imageio import imread ref_im = np.array(imread(ref_image_path, pilmode='RGB')) mu, sigma = lab_mean_std(ref_im) self.cnorm_params[what] = {'mu': mu, 'sigma': sigma}
[docs] def set_slide_info_and_get_tissue_mask(self): """Set self.slide_info dict and self.labeled tissue mask.""" # This is a persistent dict to store information about slide self.slide_info = self.gc.get('item/%s/tiles' % self.slide_id) # get tissue mask thumbnail_rgb = get_slide_thumbnail(self.gc, self.slide_id) # color normalization if desired if 'thumbnail' in self.cnorm_params.keys(): thumbnail_rgb = np.uint8(reinhard( im_src=thumbnail_rgb, target_mu=self.cnorm_params['thumbnail']['mu'], target_sigma=self.cnorm_params['thumbnail']['sigma'])) # get labeled tissue mask -- each unique value is one tissue piece labeled, _ = get_tissue_mask( thumbnail_rgb, **self.get_tissue_mask_kwargs) if len(np.unique(labeled)) < 2: msg = 'No tissue detected!' raise ValueError(msg) if self.visualize_tissue_boundary: annotation_docs = get_tissue_boundary_annotation_documents( self.gc, slide_id=self.slide_id, labeled=labeled) for doc in annotation_docs: _ = self.gc.post( '/annotation?itemId=' + self.slide_id, json=doc) # Find size relative to WSI self.slide_info['F_tissue'] = self.slide_info[ 'sizeX'] / labeled.shape[1] return labeled