"""
Created on Sat Aug 24 16:01:26 2019.
@author: tageldim
"""
import numpy as np
from shapely.geometry.polygon import Polygon
from shapely.ops import unary_union
from histomicstk.annotations_and_masks.masks_to_annotations_handler import \
_parse_annot_coords
from histomicstk.annotations_and_masks.pyrtree.rtree import Rect, RTree
from histomicstk.utils.general_utils import Base_HTK_Class
[docs]
class Polygon_merger_v2(Base_HTK_Class):
"""Methods to merge contiguous polygons from whole-slide image."""
def __init__(self, contours_df, **kwargs):
"""Init Polygon_merger object.
Arguments:
---------
contours_df : pandas DataFrame
The following columns are needed.
group : str
annotation group (ground truth label).
ymin : int
minimum y coordinate
ymax : int
maximum y coordinate
xmin : int
minimum x coordinate
xmax : int
maximum x coordinate
coords_x : str
vertex x coordinates comma-separated values
coords_y
vertex y coordinated comma-separated values
merge_thresh : int
how close do the polygons need to be (in pixels) to be merged
verbose : int
0 - Do not print to screen
1 - Print only key messages
2 - Print everything to screen
monitorPrefix : str
text to prepend to printed statements
"""
from pandas import DataFrame
# see: https://stackoverflow.com/questions/8187082/how-can-you-set-...
# class-attributes-from-variable-arguments-kwargs-in-python
default_attr = {
'verbose': 1,
'monitorPrefix': '',
'merge_thresh': 3,
}
default_attr.update(kwargs)
super().__init__(default_attr=default_attr)
# This is where contours will be stored
self.contours_df = contours_df
self.contours_df.reset_index(inplace=True, drop=True)
self.new_contours = DataFrame(columns=self.contours_df.columns)
# prepwork
self.buffer_size = self.merge_thresh + 3
self.unique_groups = set(self.contours_df.loc[:, 'group'])
[docs]
def set_contours_slice(self, group):
"""Slice a single group from self.contours_df."""
self.contours_slice = self.contours_df.loc[
self.contours_df.loc[:, 'group'] == group, :]
[docs]
def create_rtree(self):
"""Add contour bounding boxes to R-tree."""
self.rtree = RTree()
for cidx, cont in self.contours_slice.iterrows():
self.rtree.insert('polygon-%d' % cidx, Rect(
minx=cont['xmin'], miny=cont['ymin'],
maxx=cont['xmax'], maxy=cont['ymax']))
[docs]
def set_tree_dict(self):
"""Get tree in convenience dict format (dicts inside dicts)."""
def _traverse(node):
"""Recursively traverse tree till you get to leafs."""
if not node.is_leaf():
node_dict = {}
for c in node.children():
node_dict[c.index] = _traverse(c)
return node_dict
else:
return node.index
self.tree_dict = _traverse(self.rtree.cursor)
[docs]
def set_hierarchy(self):
"""Get hierarchy of node indices."""
self.hierarchy = {}
def _add_hierarchy_level(node_dict, level, parent_idx):
"""Recursively add hierarchy levels."""
lk = 'level-%d' % (level)
child_nodes = [
{'nidx': k, 'parent_idx': parent_idx,
'is_leaf': not isinstance(v, dict)} for k, v in node_dict.items()]
if len(child_nodes) < 1:
return
# add to current level
if lk in self.hierarchy.keys():
self.hierarchy[lk].extend(child_nodes)
else:
self.hierarchy[lk] = child_nodes
# add next level
for nidx, ndict in node_dict.items():
if isinstance(ndict, dict):
_add_hierarchy_level(ndict, level=level + 1, parent_idx=nidx)
_add_hierarchy_level(self.tree_dict, level=0, parent_idx=0)
def _merge_polygons(self, poly_list):
if self.buffer_size > 0:
poly_list = [j.buffer(self.buffer_size) for j in poly_list]
merged_polys = unary_union(poly_list).buffer(-self.buffer_size)
else:
merged_polys = unary_union(poly_list)
return merged_polys
def _merge_leafs(self, leafs):
nest_polygons = []
for leaf in leafs:
leafidx = int(leaf.split('polygon-')[1])
nest = dict(self.contours_slice.loc[leafidx, :])
coords = _parse_annot_coords(nest)
nest_polygons.append(Polygon(coords))
return self._merge_polygons(nest_polygons)
def _get_merged_polygon(self, nidx):
self.rtree.cursor._become(nidx)
leafs = [c.leaf_obj() for c in self.rtree.cursor.children()]
merged_polygon = self._merge_leafs(leafs)
return merged_polygon
[docs]
def get_merged_multipolygon(self):
"""Get final merged shapely multipolygon by hierarchical merger."""
merged_polygons_all = {}
for level in range(len(self.hierarchy) - 1, -1, -1):
merged_polygons = {}
# merge polygons from previous level
to_merge = {}
for node in self.hierarchy['level-%d' % level]:
if not node['is_leaf']:
if node['parent_idx'] not in to_merge.keys():
to_merge[node['parent_idx']] = []
to_merge[node['parent_idx']].append(merged_polygons_all[
'level-%d' % (level + 1)][node['nidx']])
del merged_polygons_all[
'level-%d' % (level + 1)][node['nidx']]
for parent_idx, polygon_list in to_merge.items():
merged_polygons[parent_idx] = self._merge_polygons(
polygon_list)
# merge polygons from this level
to_merge = {}
for node in self.hierarchy['level-%d' % level]:
if node['is_leaf']:
if node['parent_idx'] not in to_merge.keys():
to_merge[node['parent_idx']] = []
self.rtree.cursor._become(node['nidx'])
to_merge[node['parent_idx']].append(
self.rtree.cursor.leaf_obj())
for parent_idx, leafs in to_merge.items():
merged_polygons[parent_idx] = self._merge_leafs(leafs)
# assign to persistent dict
merged_polygons_all['level-%d' % level] = merged_polygons
return merged_polygons_all['level-0'][0]
def _get_coord_str_from_polygon(self, polygon):
"""Parse shapely polygon coordinates into string form (Internal)."""
coords = np.int32(polygon.exterior.coords.xy)
coords_x = ','.join([str(j) for j in coords[0, :]])
coords_y = ','.join([str(j) for j in coords[1, :]])
return coords_x, coords_y, coords.T
def _add_single_merged_edge_contour(self, polygon, group):
"""Add single contour to self.new_contours (Internal)."""
idx = self.new_contours.shape[0]
self.new_contours.loc[idx, 'type'] = 'polyline'
self.new_contours.loc[idx, 'group'] = group
self.new_contours.loc[idx, 'has_holes'] = int(
polygon.boundary.geom_type == 'MultiLineString')
coords_x, coords_y, coords = self._get_coord_str_from_polygon(
polygon)
self.new_contours.loc[idx, 'coords_x'] = coords_x
self.new_contours.loc[idx, 'coords_y'] = coords_y
xmin, ymin = np.min(coords, axis=0)
xmax, ymax = np.max(coords, axis=0)
self.new_contours.loc[idx, 'xmin'] = xmin
self.new_contours.loc[idx, 'ymin'] = ymin
self.new_contours.loc[idx, 'xmax'] = xmax
self.new_contours.loc[idx, 'ymax'] = ymax
self.new_contours.loc[idx, 'bbox_area'] = int(
(ymax - ymin) * (xmax - xmin))
def _add_merged_multipolygon_contours(
self, merged_multipolygon, group, monitorPrefix=''):
"""Add merged polygons to self.new_contours df (Internal)."""
if merged_multipolygon.geom_type == 'Polygon':
merged_multipolygon = [merged_multipolygon]
elif (merged_multipolygon.geom_type == 'MultiPolygon' and
hasattr(merged_multipolygon, 'geoms')):
merged_multipolygon = merged_multipolygon.geoms
for pno, polygon in enumerate(merged_multipolygon):
self._print2('%s: contour %d of %d' % (
monitorPrefix, pno + 1, len(merged_multipolygon)))
self._add_single_merged_edge_contour(polygon, group=group)
[docs]
def run_for_single_group(self, group, monitorPrefix=''):
"""Run sequence for merging polygons & adding contours (one group)."""
# Prep to get polygons
self._print1('%s: set_contours_slice' % monitorPrefix)
self.set_contours_slice(group)
self._print1('%s: create_rtree' % monitorPrefix)
self.create_rtree()
self._print1('%s: set_tree_dict' % monitorPrefix)
self.set_tree_dict()
self._print1('%s: set_hierarchy' % monitorPrefix)
self.set_hierarchy()
# get shapely multipolygon object with merged adjacent contours
self._print1('%s: get_merged_multipolygon' % monitorPrefix)
merged_multipolygon = self.get_merged_multipolygon()
# add contours to new dataframe
self._print1('%s: _add_merged_multipolygon_contours' % monitorPrefix)
self._add_merged_multipolygon_contours(
merged_multipolygon, group=group)
[docs]
def run(self):
"""Run sequence for merging polygons & adding contours."""
for group in self.unique_groups:
monitorPrefix = '%s: %s' % (self.monitorPrefix, group)
self.run_for_single_group(group, monitorPrefix=monitorPrefix)