Source code for satproc.masks

import logging
import os
from collections import defaultdict

import cv2
import fiona
import numpy as np
import rasterio
from rasterio.features import rasterize
from shapely.geometry import shape
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

__author__ = "Damián Silvani"
__copyright__ = "Dymaxion Labs"
__license__ = "Apache-2.0"

_logger = logging.getLogger(__name__)


[docs]def make_masks( rasters, *, output_dir, labels, label_property="class", classes=None, mask_type="class", masks={"extent"}, extent_no_border=False, ): if mask_type not in ("single", "class"): raise RuntimeError(f"mask type '{mask_type}' not implemented") # If mask type is single, there is a single class if mask_type == "single": classes = None if extent_no_border and "extent" not in masks: _logger.warn( ( "You specified `no_border` option but will be ignored " "(no `extent` mask specified)" ) ) polys_dict = classify_polygons(labels, label_property, classes) with logging_redirect_tqdm(): for raster in tqdm(rasters, ascii=True, desc="Rasters"): name, _ = os.path.splitext(os.path.basename(raster)) with rasterio.open(raster) as src: profile = src.profile.copy() transform = src.transform b = src.bounds # Calculate window from image bounds window = rasterio.windows.from_bounds( b.left, b.bottom, b.right, b.top, src.transform, src.height, src.width, ) paths = { kind: os.path.join(output_dir, kind, f"{name}.tif") for kind in masks } keys = classes if classes is not None else polys_dict.keys() masks = multiband_chip_mask_by_classes( classes=keys, transform=transform, window=window, polys_dict=polys_dict, label_property=label_property, extent_no_border=extent_no_border, extent_mask_path=paths.get("extent"), boundary_mask_path=paths.get("boundary"), distance_mask_path=paths.get("distance"), ) write_window_masks( masks, window=window, metadata=profile, transform=transform )
[docs]def multiband_chip_mask_by_classes( classes, transform, window, label_property, polys_dict=None, label_path=None, extent_mask_path=None, boundary_mask_path=None, distance_mask_path=None, extent_no_border=False, ): mb_extent, mb_boundary, mb_distance = [], [], [] if polys_dict is None and label_path is not None: polys_dict = classify_polygons(label_path, label_property, classes) for k in classes: polys = polys_dict.get(k, []) extent_mask, bound_mask, dist_mask = mask_from_polygons( polys, win=window, t=transform, extent_no_border=extent_no_border, boundary_mask=boundary_mask_path, distance_mask=distance_mask_path, ) mb_extent.append(extent_mask) mb_boundary.append(bound_mask) mb_distance.append(dist_mask) masks = zip( [mb_extent, mb_boundary, mb_distance], [extent_mask_path, boundary_mask_path, distance_mask_path], ) masks = [(path, mask) for mask, path in masks if path is not None] return masks
[docs]def all_masks_empty(masks): """Check if all masks are empty""" for _, mask in masks: for arr in mask: if np.any(arr): return False return True
[docs]def write_window_masks(masks, *, window, metadata, transform): """Write window masks to files""" for path, mask in masks: kwargs = metadata.copy() kwargs.update( driver="GTiff", dtype=rasterio.uint8, count=len(mask), nodata=0, transform=rasterio.windows.transform(window, transform), width=window.width, height=window.height, ) os.makedirs(os.path.dirname(path), exist_ok=True) with rasterio.open(path, "w", **kwargs) as dst: for i in range(len(mask)): dst.write(mask[i], i + 1)
[docs]def mask_from_polygons( polygons, *, win, t, extent_no_border=True, boundary_mask=None, distance_mask=None, ): """Generate a binary mask array from a set of polygon It can also generate a distance transform mask Parameters ---------- polygons : List[Union[Polygon, MultiPolygon]] list of polygon or multipolygon geometries win : rasterio.windows.Window window t : rasterio.transform.Affine affine transform extent_no_border : bool if True, the extent mask will not include the border of the polygon boundary_mask : bool whether to generate boundary (edges) mask distance_mask : bool whether to generate a distance mask Returns ------- numpy.ndarray """ transform = rasterio.windows.transform(win, t) win_shape = (int(win.height), int(win.width)) bound_mask, dist_mask = None, None if polygons is None or len(polygons) == 0: mask = np.zeros(win_shape, dtype=np.uint8) if boundary_mask: bound_mask = mask.copy() if distance_mask: dist_mask = mask.copy() return mask, bound_mask, dist_mask mask = rasterize( polygons, win_shape, default_value=255, transform=transform, ) if boundary_mask or distance_mask or extent_no_border: lines = _get_linestrings_from_polygons(polygons) bound_mask = rasterize(lines, win_shape, default_value=255, transform=transform) if extent_no_border or distance_mask: mask_no_bounds = mask.copy() mask_no_bounds[bound_mask != 0] = 0 if extent_no_border: mask = mask_no_bounds if distance_mask: dist_mask = cv2.distanceTransform( mask_no_bounds, cv2.DIST_L2, 3 ).astype(np.uint8) return mask, bound_mask, dist_mask
def _get_linestrings_from_polygons(polys): for poly in polys: boundary = poly.boundary if boundary.type == "MultiLineString": for line in boundary.geoms: yield line else: yield boundary
[docs]def prepare_label_shapes( labels, mask_type="class", label_property="class", classes=None ): if mask_type in ("class", "single"): polys_dict = classify_polygons(labels, label_property, classes) return polys_dict else: raise RuntimeError(f"mask type '{mask_type}' not supported")
[docs]def classify_polygons(labels, label_property, classes): with fiona.open(labels) as blocks: _logger.info("Found %d labels on label file %s", len(blocks), labels) if classes is not None and label_property: if label_property not in blocks.schema["properties"]: raise ValueError( f"label property '{label_property}' not present in labels schema" ) class_counts = defaultdict(int) missing_classes = set() polys = defaultdict(list) for block in blocks: if classes is None: c = "_any" else: c = str(block["properties"][label_property]) class_counts[c] += 1 if c not in classes: missing_classes.add(c) continue try: geom = shape(block["geometry"]) except RuntimeError: _logger.warning( "Failed to get geometry shape for feature: %s", block ) continue polys[c].append(geom) if missing_classes: _logger.warn( f"Some features with classes {missing_classes} were ignored as they are not in the specified classes list" ) for c in missing_classes: _logger.warn(f"Num. features of class '{c}': {class_counts[c]}") return polys